matrix: add support for remote avatars & improve cache handling (again)
This commit is contained in:
@@ -9,8 +9,11 @@ import (
|
||||
)
|
||||
|
||||
type UserInRoomCacheEntry struct {
|
||||
displayName *string
|
||||
avatarURL *string
|
||||
displayName *string
|
||||
avatarURL *string
|
||||
// for bridged messages that we sent, keep the source URL to know when to upgrade the
|
||||
// profile picture (instead of doing it on every message)
|
||||
sourceAvatar *string
|
||||
lastUpdated time.Time
|
||||
conflictWithOtherUsername bool
|
||||
}
|
||||
@@ -32,22 +35,23 @@ func NewUserInfoCache() *UserInfoCache {
|
||||
}
|
||||
}
|
||||
|
||||
// note: cache is locked inside this function
|
||||
func (c *UserInfoCache) retrieveUserInRoomFromCache(channelID id.RoomID, mxid id.UserID) *UserInRoomCacheEntry {
|
||||
var cachedEntry *UserInRoomCacheEntry = nil
|
||||
|
||||
// note: the cache is read-locked inside this function
|
||||
func (c *UserInfoCache) getAttributeFromCache(channelID id.RoomID, mxid id.UserID, attributeIsPresent func(UserInRoomCacheEntry) bool) *UserInRoomCacheEntry {
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
if user, userPresent := c.users[mxid]; userPresent {
|
||||
// try first the name of the user in the room, then globally
|
||||
if roomCachedEntry, roomPresent := user.perChannel[channelID]; roomPresent {
|
||||
cachedEntry = &roomCachedEntry
|
||||
} else if user.globalEntry != nil {
|
||||
cachedEntry = user.globalEntry
|
||||
if roomCachedEntry, roomPresent := user.perChannel[channelID]; roomPresent && attributeIsPresent(roomCachedEntry) {
|
||||
return &roomCachedEntry
|
||||
}
|
||||
|
||||
if user.globalEntry != nil && attributeIsPresent(*user.globalEntry) {
|
||||
return user.globalEntry
|
||||
}
|
||||
}
|
||||
c.RUnlock()
|
||||
|
||||
return cachedEntry
|
||||
return nil
|
||||
}
|
||||
|
||||
// note: cache is locked inside this function
|
||||
|
||||
@@ -40,10 +40,6 @@ func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool {
|
||||
|
||||
//nolint: funlen
|
||||
func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
if b.ignoreBridgingEvents(ev) {
|
||||
return
|
||||
}
|
||||
|
||||
b.RLock()
|
||||
channel, ok := b.RoomMap[ev.RoomID]
|
||||
b.RUnlock()
|
||||
@@ -55,15 +51,25 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
// This needs to be defined before rejecting bridging events, as we rely on this to cache
|
||||
// avatar URLs sent with appService (otherwise we would upload one avatar per message sent
|
||||
// across the bridge!).
|
||||
// As such, beware! Moving this below the b.ignoreBridgingEvents condiiton would appear to
|
||||
// work, but it would also lead to a high file upload rate, until being eventually
|
||||
// rate-limited by the homeserver
|
||||
if ev.Type == event.StateMember {
|
||||
b.handleMemberChange(ev)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if ev.Type == event.EphemeralEventReceipt {
|
||||
// we do not support read receipts across servers, considering that
|
||||
// multiple services (e.g. Discord) doesn't expose that information)
|
||||
return
|
||||
}
|
||||
|
||||
if ev.Type == event.StateMember {
|
||||
b.handleMemberChange(ev)
|
||||
|
||||
if b.ignoreBridgingEvents(ev) {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -272,7 +278,7 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *c
|
||||
FormattedBody: fi.Comment,
|
||||
}
|
||||
|
||||
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
|
||||
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username, msg.Avatar)
|
||||
if err != nil {
|
||||
b.Log.Errorf("file comment failed: %#v", err)
|
||||
}
|
||||
@@ -335,7 +341,7 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *c
|
||||
}
|
||||
}
|
||||
|
||||
_, err = b.sendMessageEventWithRetries(channel, m, msg.Username)
|
||||
_, err = b.sendMessageEventWithRetries(channel, m, msg.Username, msg.Avatar)
|
||||
if err != nil {
|
||||
b.Log.Errorf("sending the message referencing the uploaded file failed: %#v", err)
|
||||
}
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
package bmatrix
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"html"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
matrix "maunium.net/go/mautrix"
|
||||
matrixAppService "maunium.net/go/mautrix/appservice"
|
||||
"maunium.net/go/mautrix/event"
|
||||
"maunium.net/go/mautrix/id"
|
||||
|
||||
"github.com/42wim/matterbridge/bridge/helper"
|
||||
)
|
||||
|
||||
// arbitrary limit to determine when to cleanup nickname cache entries
|
||||
@@ -63,8 +69,10 @@ func (b *Bmatrix) getDisplayName(channelID id.RoomID, mxid id.UserID) string {
|
||||
return string(mxid)[1:]
|
||||
}
|
||||
|
||||
cachedEntry := b.UserCache.retrieveUserInRoomFromCache(channelID, mxid)
|
||||
if cachedEntry == nil || cachedEntry.displayName == nil {
|
||||
cachedEntry := b.UserCache.getAttributeFromCache(channelID, mxid, func(e UserInRoomCacheEntry) bool {
|
||||
return e.displayName != nil
|
||||
})
|
||||
if cachedEntry == nil {
|
||||
// retrieve the global display name
|
||||
return b.cacheDisplayName("", mxid, b.retrieveGlobalDisplayname(mxid))
|
||||
}
|
||||
@@ -136,8 +144,10 @@ func (b *Bmatrix) retrieveGlobalAvatarURL(mxid id.UserID) id.ContentURIString {
|
||||
|
||||
// getAvatarURL retrieves the avatar URL for mxid, querying the homeserver if the mxid is not in the cache.
|
||||
func (b *Bmatrix) getAvatarURL(channelID id.RoomID, mxid id.UserID) string {
|
||||
cachedEntry := b.UserCache.retrieveUserInRoomFromCache(channelID, mxid)
|
||||
if cachedEntry == nil || cachedEntry.avatarURL == nil {
|
||||
cachedEntry := b.UserCache.getAttributeFromCache(channelID, mxid, func(e UserInRoomCacheEntry) bool {
|
||||
return e.avatarURL != nil
|
||||
})
|
||||
if cachedEntry == nil {
|
||||
// retrieve the global display name
|
||||
return b.cacheAvatarURL("", mxid, b.retrieveGlobalAvatarURL(mxid))
|
||||
}
|
||||
@@ -164,6 +174,28 @@ func (b *Bmatrix) cacheAvatarURL(channelID id.RoomID, mxid id.UserID, avatarURL
|
||||
return fullURL
|
||||
}
|
||||
|
||||
// cacheSourceAvatarURL stores the mapping between a virtual user and the *source* URL of the user avatar,
|
||||
// to be reused later without reuploading the same avatar repeatedly.
|
||||
// Note that old entries are cleaned when this function is called.
|
||||
func (b *Bmatrix) cacheSourceAvatarURL(channelID id.RoomID, mxid id.UserID, avatarURL string) {
|
||||
b.cacheEntry(channelID, mxid, func(entry UserInRoomCacheEntry) UserInRoomCacheEntry {
|
||||
entry.sourceAvatar = &avatarURL
|
||||
return entry
|
||||
})
|
||||
}
|
||||
|
||||
// getSourceAvatarURL retrieves the avatar URL for mxid, querying the homeserver if the mxid is not in the cache.
|
||||
func (b *Bmatrix) getSourceAvatarURL(channelID id.RoomID, mxid id.UserID) string {
|
||||
cachedEntry := b.UserCache.getAttributeFromCache(channelID, mxid, func(e UserInRoomCacheEntry) bool {
|
||||
return e.sourceAvatar != nil
|
||||
})
|
||||
if cachedEntry == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
return *cachedEntry.sourceAvatar
|
||||
}
|
||||
|
||||
// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
|
||||
func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
|
||||
var mErr matrix.HTTPError
|
||||
@@ -220,8 +252,61 @@ func (w SendMessageEventWrapper) SendMessageEvent(roomID id.RoomID, eventType ev
|
||||
return w.inner.SendMessageEvent(roomID, eventType, contentJSON)
|
||||
}
|
||||
|
||||
func (b *Bmatrix) uploadAvatar(channelID id.RoomID, mxid id.UserID, intent *matrixAppService.IntentAPI, avatarURL string) error {
|
||||
cachedURL := b.getSourceAvatarURL(channelID, mxid)
|
||||
|
||||
// do we need to update the avatar for that user
|
||||
if avatarURL == cachedURL {
|
||||
return nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
client := &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
req, err := http.NewRequest(http.MethodGet, avatarURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
_, err = io.Copy(&buf, resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data := buf.Bytes()
|
||||
|
||||
existingData, err := helper.DownloadFile(b.getAvatarURL(channelID, mxid))
|
||||
if err == nil && existingData != nil && bytes.Equal(*existingData, data) {
|
||||
// the existing avatar is already correct, cache the source URL and return
|
||||
b.cacheSourceAvatarURL(channelID, mxid, avatarURL)
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint: exhaustruct
|
||||
matrixResp, err := b.mc.UploadMedia(matrix.ReqUploadMedia{
|
||||
ContentBytes: data,
|
||||
ContentType: resp.Header.Get("Content-Type"),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = intent.SetAvatarURL(matrixResp.ContentURI)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b.cacheSourceAvatarURL(channelID, mxid, avatarURL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//nolint: wrapcheck
|
||||
func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.MessageEventContent, username string) (string, error) {
|
||||
func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.MessageEventContent, username string, avatarURL string) (string, error) {
|
||||
var (
|
||||
resp *matrix.RespSendEvent
|
||||
client interface {
|
||||
@@ -248,6 +333,8 @@ func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.M
|
||||
// if we can't change the display name it's not great but not the end of the world either, ignore it
|
||||
// TODO: do not perform this action on every message, with an in-memory cache or something
|
||||
_ = intent.SetDisplayName(username)
|
||||
//nolint: errcheck
|
||||
go b.uploadAvatar(channel, id.UserID(bridgeUserID), intent, avatarURL)
|
||||
client = intent
|
||||
} else {
|
||||
applyUsernametoMessage(&message, username)
|
||||
|
||||
@@ -222,7 +222,7 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
|
||||
m.Format = event.FormatHTML
|
||||
}
|
||||
|
||||
return b.sendMessageEventWithRetries(channel, m, msg.Username)
|
||||
return b.sendMessageEventWithRetries(channel, m, msg.Username, msg.Avatar)
|
||||
}
|
||||
|
||||
// Delete message
|
||||
@@ -236,8 +236,9 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
|
||||
err := b.retry(func() error {
|
||||
//nolint:exhaustruct
|
||||
resp, err := b.mc.RedactEvent(channel, id.EventID(msg.ID), matrix.ReqRedact{})
|
||||
|
||||
msgID = string(resp.EventID)
|
||||
if resp != nil {
|
||||
msgID = string(resp.EventID)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
@@ -254,7 +255,7 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
|
||||
Body: rmsg.Text,
|
||||
}
|
||||
|
||||
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
|
||||
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username, msg.Avatar)
|
||||
if err != nil {
|
||||
b.Log.Errorf("sendText failed: %s", err)
|
||||
}
|
||||
@@ -292,7 +293,7 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
|
||||
Type: event.RelReplace,
|
||||
}
|
||||
|
||||
return b.sendMessageEventWithRetries(channel, rmsg, msg.Username)
|
||||
return b.sendMessageEventWithRetries(channel, rmsg, msg.Username, msg.Avatar)
|
||||
}
|
||||
|
||||
//nolint:exhaustruct
|
||||
@@ -328,7 +329,7 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
return b.sendMessageEventWithRetries(channel, m, msg.Username)
|
||||
return b.sendMessageEventWithRetries(channel, m, msg.Username, msg.Avatar)
|
||||
}
|
||||
|
||||
// DontProcessOldEvents returns true if a sync event should be considered for further processing.
|
||||
|
||||
Reference in New Issue
Block a user