Matrix: switch to the mautrix-go library

Also: add initial support for [Matrix] application services
This commit is contained in:
Simon THOBY
2022-07-24 20:12:03 +02:00
committed by Simon Thoby
parent 89b0d362d2
commit 625d7cd94c
175 changed files with 31467 additions and 2873 deletions

147
bridge/matrix/appservice.go Normal file
View File

@@ -0,0 +1,147 @@
package bmatrix
import (
"fmt"
"regexp"
"github.com/sirupsen/logrus"
"maunium.net/go/mautrix/appservice"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
type AppServiceNamespaces struct {
rooms []*regexp.Regexp
usernames []*regexp.Regexp
prefixes []string
}
type AppServiceWrapper struct {
appService *appservice.AppService
namespaces AppServiceNamespaces
stop chan struct{}
stopAck chan struct{}
}
func (w *AppServiceWrapper) ParseNamespaces(logger *logrus.Entry) error {
if w.appService.Registration != nil {
// TODO: handle non-exclusive registrations
for _, v := range w.appService.Registration.Namespaces.RoomIDs {
re, err := regexp.Compile(v.Regex)
if err != nil {
logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex)
continue
}
w.namespaces.rooms = append(w.namespaces.rooms, re)
}
for _, v := range w.appService.Registration.Namespaces.UserIDs {
re, err := regexp.Compile(v.Regex)
if err != nil {
logger.Warnf("couldn't parse the appservice regex '%s'", v.Regex)
continue
}
// we assume that the user regexes will be of the form '@<some prefix>.*'
// where '.*' will be replaced by the username we spoof
prefix, _ := re.LiteralPrefix()
if prefix == "" || prefix == "@" {
logger.Warnf("couldn't find an acceptable prefix in the appservice regex '%s'", v.Regex)
continue
}
if v.Regex != fmt.Sprintf("%s.*", prefix) {
logger.Warnf("complex regexpes are not supported for appServices, the regexp '%s' does not match the format '@<prefix>.*'", v.Regex)
continue
}
w.namespaces.usernames = append(w.namespaces.usernames, re)
// drop the '@' in the prefix
w.namespaces.prefixes = append(w.namespaces.prefixes, prefix[1:])
}
}
return nil
}
func (b *Bmatrix) NewAppService() (*AppServiceWrapper, error) {
w := &AppServiceWrapper{
appService: appservice.Create(),
namespaces: AppServiceNamespaces{
rooms: []*regexp.Regexp{},
usernames: []*regexp.Regexp{},
prefixes: []string{},
},
stop: make(chan struct{}, 1),
stopAck: make(chan struct{}, 1),
}
err := w.appService.SetHomeserverURL(b.mc.HomeserverURL.String())
if err != nil {
return nil, err
}
_, homeServerDomain, _ := b.mc.UserID.Parse()
w.appService.HomeserverDomain = homeServerDomain
//nolint:exhaustruct
w.appService.Host = appservice.HostConfig{
Hostname: b.GetString("AppServiceHost"),
Port: uint16(b.GetInt("AppServicePort")),
}
w.appService.Registration, err = appservice.LoadRegistration(b.GetString("AppServiceConfigPath"))
if err != nil {
return nil, err
}
// forward logs from the appService to the matterbridge logger
w.appService.Log = NewZerologWrapper(b.Log)
if err = w.ParseNamespaces(b.Log); err != nil {
return nil, err
}
return w, nil
}
func (a *AppServiceNamespaces) containsRoom(roomID id.RoomID) bool {
// no room specified: we check all the rooms
if len(a.rooms) == 0 {
return true
}
for _, room := range a.rooms {
if room.MatchString(roomID.String()) {
return true
}
}
return false
}
// nolint: wrapcheck
func (b *Bmatrix) startAppService() error {
wrapper := b.appService
// TODO: detect service completion and rerun automatically
go wrapper.appService.Start()
b.Log.Debug("appservice launched")
processor := appservice.NewEventProcessor(wrapper.appService)
processor.On(event.EventMessage, func(ev *event.Event) {
b.handleEvent(originAppService, ev)
})
go processor.Start()
b.Log.Debug("appservice event dispatcher launched")
// handle service stopping/restarting
go func(appService *appservice.AppService, processor *appservice.EventProcessor) {
<-wrapper.stop
appService.Stop()
processor.Stop()
wrapper.stopAck <- struct{}{}
}(wrapper.appService, processor)
return nil
}

344
bridge/matrix/handlers.go Normal file
View File

@@ -0,0 +1,344 @@
package bmatrix
import (
"bytes"
"fmt"
"mime"
"regexp"
"strings"
matrix "maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"github.com/42wim/matterbridge/bridge/config"
"github.com/42wim/matterbridge/bridge/helper"
)
// Determines if the event comes from ourselves, in which case we want to ignore it
func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool {
if ev.Sender == b.UserID {
return true
}
// ignore messages we may have sent via the appservice
if b.appService != nil {
if ev.Sender == b.appService.appService.BotClient().UserID {
return true
}
// ignore virtual users messages (we ignore the 'exclusive' field of Namespace for now)
for _, username := range b.appService.namespaces.usernames {
if username.MatchString(ev.Sender.String()) {
return true
}
}
}
return false
}
//nolint: funlen
func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
if b.ignoreBridgingEvents(ev) {
return
}
b.RLock()
channel, ok := b.RoomMap[ev.RoomID]
b.RUnlock()
if !ok {
// we don't know that room yet, that could be a room returned by an
// application service, but matterbridge doesn't handle those just yet
b.Log.Debugf("Received event for room %s, not joined yet/not handled", ev.RoomID)
return
}
if ev.Type == event.EphemeralEventReceipt {
// we do not support read receipts across servers, considering that
// multiple services (e.g. Discord) doesn't expose that information)
return
}
if ev.Type == event.StateMember {
b.handleMemberChange(ev)
return
}
// if we receive appservice events for this room, there is no need to check them with the classical syncer
if !channel.appService && origin == originAppService {
channel.appService = true
b.Lock()
b.RoomMap[ev.RoomID] = channel
b.Unlock()
}
// if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event
if channel.appService && origin != originAppService {
b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID)
return
}
b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, origin == originAppService)
if ev.Type == event.EphemeralEventTyping {
typing := ev.Content.AsTyping()
if len(typing.UserIDs) > 0 {
//nolint:exhaustruct
b.Remote <- config.Message{
Event: config.EventUserTyping,
Channel: channel.name,
Account: b.Account,
}
}
return
}
defer (func(ev *event.Event) {
// not crucial, so no ratelimit check here
if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil {
b.Log.Errorf("couldn't mark message as read %s", err.Error())
}
})(ev)
// Create our message
//nolint:exhaustruct
rmsg := config.Message{
Username: b.getDisplayName(ev.RoomID, ev.Sender),
Channel: channel.name,
Account: b.Account,
UserID: string(ev.Sender),
ID: string(ev.ID),
}
// Remove homeserver suffix if configured
if b.GetBool("NoHomeServerSuffix") {
re := regexp.MustCompile("(.*?):.*")
rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`)
}
// Delete event
if ev.Type == event.EventRedaction {
rmsg.Event = config.EventMsgDelete
rmsg.ID = string(ev.Redacts)
rmsg.Text = config.EventMsgDelete
b.Remote <- rmsg
return
}
b.handleMessage(rmsg, ev)
}
func (b *Bmatrix) handleMemberChange(ev *event.Event) {
member := ev.Content.AsMember()
if member == nil {
b.Log.Errorf("Couldn't process a member event:\n%#v", ev)
return
}
// Update the displayname on join messages, according to https://spec.matrix.org/v1.3/client-server-api/#events-on-change-of-profile-information
if member.Membership == event.MembershipJoin {
b.cacheDisplayName(ev.RoomID, ev.Sender, member.Displayname)
} else if member.Membership == event.MembershipLeave || member.Membership == event.MembershipBan {
b.removeDisplayNameFromCache(ev.Sender)
}
}
func (b *Bmatrix) handleMessage(rmsg config.Message, ev *event.Event) {
msg := ev.Content.AsMessage()
if msg == nil {
b.Log.Errorf("matterbridge don't support this event type: %s", ev.Type.Type)
b.Log.Debugf("Full event: %#v", ev)
return
}
rmsg.Text = msg.Body
// TODO: cache the avatars
avatarURL := b.getAvatarURL(ev.Sender)
contentURI, err := id.ParseContentURI(avatarURL)
if err == nil {
avatarURL = b.mc.GetDownloadURL(contentURI)
rmsg.Avatar = avatarURL
}
// Do we have a /me action
//nolint: exhaustive
switch msg.MsgType {
case event.MsgEmote:
rmsg.Event = config.EventUserAction
case event.MsgImage, event.MsgVideo, event.MsgFile:
// Do we have attachments? (we only allow images, videos or files msgtypes)
err := b.handleDownloadFile(&rmsg, *msg)
if err != nil {
b.Log.Errorf("download failed: %#v", err)
}
default:
if msg.RelatesTo == nil {
break
}
if msg.RelatesTo.Type == event.RelReplace && msg.NewContent != nil {
// Is it an edit?
rmsg.ID = string(msg.RelatesTo.EventID)
rmsg.Text = msg.NewContent.Body
} else if msg.RelatesTo.Type == event.RelReference && msg.RelatesTo.InReplyTo != nil {
// Is it a reply?
body := msg.Body
if !b.GetBool("keepquotedreply") {
for strings.HasPrefix(body, "> ") {
lineIdx := strings.Index(body, "\n\n")
if lineIdx == -1 {
break
}
body = body[(lineIdx + 2):]
}
}
rmsg.ParentID = string(msg.RelatesTo.EventID)
rmsg.Text = body
}
}
b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account)
b.Remote <- rmsg
}
// handleDownloadFile handles file download
func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, msg event.MessageEventContent) error {
rmsg.Extra = make(map[string][]interface{})
if msg.URL == "" || msg.Info == nil {
b.Log.Error("couldn't download a file with no URL or no file informations (invalid event ?)")
b.Log.Debugf("Full Message content:\n%#v", msg)
}
url := strings.ReplaceAll(string(msg.URL), "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/")
filename := msg.Body
// check if we have an image uploaded without extension
if !strings.Contains(filename, ".") {
mext, _ := mime.ExtensionsByType(msg.Info.MimeType)
if len(mext) > 0 {
filename += mext[0]
} else if msg.MsgType == event.MsgImage {
// just a default .png extension if we don't have mime info
filename += ".png"
}
}
// check if the size is ok
err := helper.HandleDownloadSize(b.Log, rmsg, filename, int64(msg.Info.Size), b.General)
if err != nil {
return err
}
// actually download the file
data, err := helper.DownloadFile(url)
if err != nil {
return fmt.Errorf("download %s failed %#v", url, err)
}
// add the downloaded data to the message
helper.HandleDownloadData(b.Log, rmsg, filename, "", url, data, b.General)
return nil
}
// handleUploadFiles handles native upload of files.
func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel id.RoomID) (string, error) {
for _, f := range msg.Extra["file"] {
if fi, ok := f.(config.FileInfo); ok {
b.handleUploadFile(msg, channel, &fi)
}
}
return "", nil
}
// handleUploadFile handles native upload of a file.
//nolint: funlen
func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *config.FileInfo) {
content := bytes.NewReader(*fi.Data)
sp := strings.Split(fi.Name, ".")
mtype := mime.TypeByExtension("." + sp[len(sp)-1])
// image and video uploads send no username, we have to do this ourself here #715
//nolint:exhaustruct
m := event.MessageEventContent{
MsgType: event.MsgText,
Body: fi.Comment,
FormattedBody: fi.Comment,
}
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
if err != nil {
b.Log.Errorf("file comment failed: %#v", err)
}
b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
var res *matrix.RespMediaUpload
//nolint:exhaustruct
req := matrix.ReqUploadMedia{
Content: content,
ContentType: mtype,
ContentLength: fi.Size,
}
err = b.retry(func() error {
res, err = b.mc.UploadMedia(req)
return err
})
if err != nil {
b.Log.Errorf("file upload failed: %#v", err)
return
}
b.Log.Debugf("result: %#v", res)
//nolint:exhaustruct
m = event.MessageEventContent{
Body: fi.Name,
URL: res.ContentURI.CUString(),
}
switch {
case strings.Contains(mtype, "video"):
b.Log.Debugf("sendVideo %s", res.ContentURI)
m.MsgType = event.MsgVideo
case strings.Contains(mtype, "image"):
b.Log.Debugf("sendImage %s", res.ContentURI)
m.MsgType = event.MsgImage
case strings.Contains(mtype, "audio"):
b.Log.Debugf("sendAudio %s", res.ContentURI)
m.MsgType = event.MsgAudio
//nolint:exhaustruct
m.Info = &event.FileInfo{
MimeType: mtype,
Size: len(*fi.Data),
}
default:
b.Log.Debugf("sendFile %s", res.ContentURI)
m.MsgType = event.MsgFile
//nolint:exhaustruct
m.Info = &event.FileInfo{
MimeType: mtype,
Size: len(*fi.Data),
}
}
_, err = b.sendMessageEventWithRetries(channel, m, msg.Username)
if err != nil {
b.Log.Errorf("sending the message referencing the uploaded file failed: %#v", err)
}
}

View File

@@ -1,16 +1,21 @@
package bmatrix
import (
"encoding/json"
"errors"
"fmt"
"html"
"strings"
"sort"
"sync"
"time"
matrix "github.com/matterbridge/gomatrix"
matrix "maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
)
// arbitrary limit to determine when to cleanup nickname cache entries
const MaxNumberOfUsersInCache = 50_000
func newMatrixUsername(username string) *matrixUsername {
mUsername := new(matrixUsername)
@@ -28,11 +33,11 @@ func newMatrixUsername(username string) *matrixUsername {
}
// getRoomID retrieves a matching room ID from the channel name.
func (b *Bmatrix) getRoomID(channel string) string {
func (b *Bmatrix) getRoomID(channelName string) id.RoomID {
b.RLock()
defer b.RUnlock()
for ID, name := range b.RoomMap {
if name == channel {
for ID, channel := range b.RoomMap {
if channelName == channel.name {
return ID
}
}
@@ -40,31 +45,59 @@ func (b *Bmatrix) getRoomID(channel string) string {
return ""
}
// interface2Struct marshals and immediately unmarshals an interface.
// Useful for converting map[string]interface{} to a struct.
func interface2Struct(in interface{}, out interface{}) error {
jsonObj, err := json.Marshal(in)
if err != nil {
return err //nolint:wrapcheck
}
return json.Unmarshal(jsonObj, out)
type NicknameCacheEntry struct {
displayName string
lastUpdated time.Time
conflictWithOtherUsername bool
}
// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache.
func (b *Bmatrix) getDisplayName(mxid string) string {
if b.GetBool("UseUserName") {
return mxid[1:]
type NicknameUserEntry struct {
globalEntry *NicknameCacheEntry
perChannel map[id.RoomID]NicknameCacheEntry
}
type NicknameCache struct {
users map[id.UserID]NicknameUserEntry
sync.RWMutex
}
func NewNicknameCache() *NicknameCache {
return &NicknameCache{
users: make(map[id.UserID]NicknameUserEntry),
RWMutex: sync.RWMutex{},
}
}
// note: cache is not locked here
func (c *NicknameCache) retrieveDisplaynameFromCache(channelID id.RoomID, mxid id.UserID) string {
var cachedEntry *NicknameCacheEntry = nil
c.RLock()
if user, userPresent := c.users[mxid]; userPresent {
// try first the name of the user in the room, then globally
if roomCachedEntry, roomPresent := user.perChannel[channelID]; roomPresent {
cachedEntry = &roomCachedEntry
} else if user.globalEntry != nil {
cachedEntry = user.globalEntry
}
}
c.RUnlock()
if cachedEntry == nil {
return ""
}
b.RLock()
if val, present := b.NicknameMap[mxid]; present {
b.RUnlock()
return val.displayName
if cachedEntry.conflictWithOtherUsername {
// TODO: the current behavior is that only users with clashing usernames and *that have
// spoken since the bridge started* will get their mxids shown, and this doesn't
// feel right
return fmt.Sprintf("%s (%s)", cachedEntry.displayName, mxid)
}
b.RUnlock()
return cachedEntry.displayName
}
func (b *Bmatrix) retrieveGlobalDisplayname(mxid id.UserID) string {
displayName, err := b.mc.GetDisplayName(mxid)
var httpError *matrix.HTTPError
if errors.As(err, &httpError) {
@@ -72,127 +105,198 @@ func (b *Bmatrix) getDisplayName(mxid string) string {
}
if err != nil {
return b.cacheDisplayName(mxid, mxid[1:])
return string(mxid)[1:]
}
return b.cacheDisplayName(mxid, displayName.DisplayName)
return displayName.DisplayName
}
// cacheDisplayName stores the mapping between a mxid and a display name, to be reused later without performing a query to the homserver.
// Note that old entries are cleaned when this function is called.
func (b *Bmatrix) cacheDisplayName(mxid string, displayName string) string {
now := time.Now()
// getDisplayName retrieves the displayName for mxid, querying the homeserver if the mxid is not in the cache.
func (b *Bmatrix) getDisplayName(channelID id.RoomID, mxid id.UserID) string {
if b.GetBool("UseUserName") {
return string(mxid)[1:]
}
// scan to delete old entries, to stop memory usage from becoming too high with old entries.
// In addition, we also detect if another user have the same username, and if so, we append their mxids to their usernames to differentiate them.
toDelete := []string{}
displayname := b.NicknameCache.retrieveDisplaynameFromCache(channelID, mxid)
if displayname != "" {
return displayname
}
// retrieve the global display name
return b.cacheDisplayName("", mxid, b.retrieveGlobalDisplayname(mxid))
}
// scan to delete old entries, to stop memory usage from becoming high with obsolete entries.
// note: assume the cache is already write-locked
// TODO: should we update the timestamp when the entry is used?
func (c *NicknameCache) clearObsoleteEntries(mxid id.UserID) {
// we have a "off-by-one" to account for when the user being added to the
// cache already have obsolete cache entries, as we want to keep it because
// we will be refreshing it in a minute
if len(c.users) <= MaxNumberOfUsersInCache+1 {
return
}
usersLastTimestamp := make(map[id.UserID]int64, len(c.users))
// compute the last updated timestamp entry for each user
for mxidIter, NicknameCacheIter := range c.users {
userLastTimestamp := time.Unix(0, 0)
for _, userInChannelCacheEntry := range NicknameCacheIter.perChannel {
if userInChannelCacheEntry.lastUpdated.After(userLastTimestamp) {
userLastTimestamp = userInChannelCacheEntry.lastUpdated
}
}
if NicknameCacheIter.globalEntry != nil {
if NicknameCacheIter.globalEntry.lastUpdated.After(userLastTimestamp) {
userLastTimestamp = NicknameCacheIter.globalEntry.lastUpdated
}
}
usersLastTimestamp[mxidIter] = userLastTimestamp.UnixNano()
}
// get the limit timestamp before which we must clear entries as obsolete
sortedTimestamps := make([]int64, 0, len(usersLastTimestamp))
for _, value := range usersLastTimestamp {
sortedTimestamps = append(sortedTimestamps, value)
}
sort.Slice(sortedTimestamps, func(i, j int) bool { return sortedTimestamps[i] < sortedTimestamps[j] })
limitTimestamp := sortedTimestamps[len(sortedTimestamps)-MaxNumberOfUsersInCache]
// delete entries older than the limit
for mxidIter, timestamp := range usersLastTimestamp {
// do not clear the user that we are adding to the cache
if timestamp <= limitTimestamp && mxidIter != mxid {
delete(c.users, mxidIter)
}
}
}
// to prevent username reuse across matrix rooms - or even inside the same room, if a user uses multiple servers -
// identify users with naming conflicts
func (c *NicknameCache) detectConflict(mxid id.UserID, displayName string) bool {
conflict := false
b.Lock()
for mxid, v := range b.NicknameMap {
// to prevent username reuse across matrix servers - or even on the same server, append
// the mxid to the username when there is a conflict
if v.displayName == displayName {
for mxidIter, NicknameCacheIter := range c.users {
// skip conflict detection against ourselves, obviously
if mxidIter == mxid {
continue
}
for channelID, userInChannelCacheEntry := range NicknameCacheIter.perChannel {
if userInChannelCacheEntry.displayName == displayName {
userInChannelCacheEntry.conflictWithOtherUsername = true
c.users[mxidIter].perChannel[channelID] = userInChannelCacheEntry
conflict = true
}
}
if NicknameCacheIter.globalEntry != nil && NicknameCacheIter.globalEntry.displayName == displayName {
c.users[mxidIter].globalEntry.conflictWithOtherUsername = true
conflict = true
// TODO: it would be nice to be able to rename previous messages from this user.
// The current behavior is that only users with clashing usernames and *that have spoken since the bridge last started* will get their mxids shown, and I don't know if that's the expected behavior.
v.displayName = fmt.Sprintf("%s (%s)", displayName, mxid)
b.NicknameMap[mxid] = v
}
if now.Sub(v.lastUpdated) > 10*time.Minute {
toDelete = append(toDelete, mxid)
}
}
if conflict {
displayName = fmt.Sprintf("%s (%s)", displayName, mxid)
return conflict
}
// cacheDisplayName stores the mapping between a mxid and a display name, to be reused
// later without performing a query to the homeserver.
// Note that old entries are cleaned when this function is called.
func (b *Bmatrix) cacheDisplayName(channelID id.RoomID, mxid id.UserID, displayName string) string {
now := time.Now()
cache := b.NicknameCache
cache.Lock()
defer cache.Unlock()
conflict := cache.detectConflict(mxid, displayName)
cache.clearObsoleteEntries(mxid)
var newEntry NicknameUserEntry
if user, userPresent := cache.users[mxid]; userPresent {
newEntry = user
} else {
newEntry = NicknameUserEntry{
globalEntry: nil,
perChannel: make(map[id.RoomID]NicknameCacheEntry),
}
}
for _, v := range toDelete {
delete(b.NicknameMap, v)
cacheEntry := NicknameCacheEntry{
displayName: displayName,
lastUpdated: now,
conflictWithOtherUsername: conflict,
}
b.NicknameMap[mxid] = NicknameCacheEntry{
displayName: displayName,
lastUpdated: now,
// this is a local (room-specific) display name, let's cache it as such
if channelID == "" {
newEntry.globalEntry = &cacheEntry
} else {
globalDisplayName := b.retrieveGlobalDisplayname(mxid)
// updating the global display name or resetting the room name to the global name
if globalDisplayName == displayName {
delete(newEntry.perChannel, channelID)
newEntry.globalEntry = &cacheEntry
} else {
newEntry.perChannel[channelID] = cacheEntry
}
}
b.Unlock()
cache.users[mxid] = newEntry
return displayName
}
// handleError converts errors into httpError.
//nolint:exhaustivestruct
func handleError(err error) *httpError {
var mErr matrix.HTTPError
if !errors.As(err, &mErr) {
return &httpError{
Err: "not a HTTPError",
}
}
func (b *Bmatrix) removeDisplayNameFromCache(mxid id.UserID) {
cache := b.NicknameCache
var httpErr httpError
cache.Lock()
defer cache.Unlock()
if err := json.Unmarshal(mErr.Contents, &httpErr); err != nil {
return &httpError{
Err: "unmarshal failed",
}
}
return &httpErr
}
func (b *Bmatrix) containsAttachment(content map[string]interface{}) bool {
// Skip empty messages
if content["msgtype"] == nil {
return false
}
// Only allow image,video or file msgtypes
if !(content["msgtype"].(string) == "m.image" ||
content["msgtype"].(string) == "m.video" ||
content["msgtype"].(string) == "m.file") {
return false
}
return true
delete(cache.users, mxid)
}
// getAvatarURL returns the avatar URL of the specified sender.
func (b *Bmatrix) getAvatarURL(sender string) string {
urlPath := b.mc.BuildURL("profile", sender, "avatar_url")
s := struct {
AvatarURL string `json:"avatar_url"`
}{}
err := b.mc.MakeRequest("GET", urlPath, nil, &s)
func (b *Bmatrix) getAvatarURL(sender id.UserID) string {
url, err := b.mc.GetAvatarURL(sender)
if err != nil {
b.Log.Errorf("getAvatarURL failed: %s", err)
b.Log.Errorf("Couldn't retrieve the URL of the avatar for MXID %s", sender)
return ""
}
url := strings.ReplaceAll(s.AvatarURL, "mxc://", b.GetString("Server")+"/_matrix/media/r0/thumbnail/")
if url != "" {
url += "?width=37&height=37&method=crop"
}
return url
return url.String()
}
// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
httpErr := handleError(err)
if httpErr.Errcode != "M_LIMIT_EXCEEDED" {
var mErr matrix.HTTPError
if !errors.As(err, &mErr) {
b.Log.Errorf("Received a non-HTTPError, don't know what to make of it:\n%#v", err)
return 0, false
}
b.Log.Debugf("ratelimited: %s", httpErr.Err)
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000)
if mErr.RespError.ErrCode != "M_LIMIT_EXCEEDED" {
return 0, false
}
return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true
b.Log.Debugf("ratelimited: %s", mErr.RespError.Err)
// fallback to a one-second delay
retryDelayMs := 1000
if retryDelayString, present := mErr.RespError.ExtraData["retry_after_ms"]; present {
if retryDelayInt, correct := retryDelayString.(int); correct && retryDelayInt > retryDelayMs {
retryDelayMs = retryDelayInt
}
}
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", retryDelayMs/1000)
return time.Duration(retryDelayMs) * time.Millisecond, true
}
// retry function will check if we're ratelimited and retries again when backoff time expired
@@ -213,3 +317,66 @@ func (b *Bmatrix) retry(f func() error) error {
}
}
}
type SendMessageEventWrapper struct {
inner *matrix.Client
}
//nolint: wrapcheck
func (w SendMessageEventWrapper) SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (*matrix.RespSendEvent, error) {
return w.inner.SendMessageEvent(roomID, eventType, contentJSON)
}
//nolint: wrapcheck
func (b *Bmatrix) sendMessageEventWithRetries(channel id.RoomID, message event.MessageEventContent, username string) (string, error) {
var (
resp *matrix.RespSendEvent
client interface {
SendMessageEvent(roomID id.RoomID, eventType event.Type, contentJSON interface{}) (resp *matrix.RespSendEvent, err error)
}
err error
)
b.RLock()
appservice := b.RoomMap[channel].appService
b.RUnlock()
client = SendMessageEventWrapper{inner: b.mc}
// only try to send messages through the app Service *once* we have received
// events through it (otherwise we don't really know if the appservice works)
// Additionally, even if we're receiving messages in that room via the appService listener,
// let's check that the appservice "covers" that room
if appservice && b.appService.namespaces.containsRoom(channel) && len(b.appService.namespaces.prefixes) > 0 {
b.Log.Debugf("Sending with appService")
// we take the first prefix
bridgeUserID := fmt.Sprintf("@%s%s:%s", b.appService.namespaces.prefixes[0], id.EncodeUserLocalpart(username), b.appService.appService.HomeserverDomain)
intent := b.appService.appService.Intent(id.UserID(bridgeUserID))
// if we can't change the display name it's not great but not the end of the world either, ignore it
// TODO: do not perform this action on every message, with an in-memory cache or something
_ = intent.SetDisplayName(username)
client = intent
} else {
applyUsernametoMessage(&message, username)
}
err = b.retry(func() error {
resp, err = client.SendMessageEvent(channel, event.EventMessage, message)
return err
})
if err != nil {
return "", err
}
return string(resp.EventID), err
}
func applyUsernametoMessage(newMsg *event.MessageEventContent, username string) {
matrixUsername := newMatrixUsername(username)
newMsg.Body = matrixUsername.plain + newMsg.Body
if newMsg.FormattedBody != "" {
newMsg.FormattedBody = matrixUsername.formatted + newMsg.FormattedBody
}
}

View File

@@ -1,18 +1,18 @@
// nolint: exhaustivestruct
package bmatrix
import (
"bytes"
"fmt"
"mime"
"regexp"
"strings"
"sync"
"time"
matrix "maunium.net/go/mautrix"
"maunium.net/go/mautrix/event"
"maunium.net/go/mautrix/id"
"github.com/42wim/matterbridge/bridge"
"github.com/42wim/matterbridge/bridge/config"
"github.com/42wim/matterbridge/bridge/helper"
matrix "github.com/matterbridge/gomatrix"
)
var (
@@ -20,25 +20,30 @@ var (
htmlReplacementTag = regexp.MustCompile("<[^>]*>")
)
type NicknameCacheEntry struct {
displayName string
lastUpdated time.Time
type EventOrigin int
const (
originClassicSyncer EventOrigin = iota
originAppService
)
type RoomInfo struct {
name string
appService bool
}
type Bmatrix struct {
mc *matrix.Client
UserID string
NicknameMap map[string]NicknameCacheEntry
RoomMap map[string]string
rateMutex sync.RWMutex
mc *matrix.Client
UserID id.UserID
appService *AppServiceWrapper
NicknameCache *NicknameCache
RoomMap map[id.RoomID]RoomInfo
rateMutex sync.RWMutex
joinedRooms []id.RoomID
sync.RWMutex
*bridge.Config
}
type httpError struct {
Errcode string `json:"errcode"`
Err string `json:"error"`
RetryAfterMs int `json:"retry_after_ms"`
stopNormalSync chan struct{}
stopNormalSyncAck chan struct{}
}
type matrixUsername struct {
@@ -46,44 +51,12 @@ type matrixUsername struct {
formatted string
}
// SubTextMessage represents the new content of the message in edit messages.
type SubTextMessage struct {
MsgType string `json:"msgtype"`
Body string `json:"body"`
FormattedBody string `json:"formatted_body,omitempty"`
Format string `json:"format,omitempty"`
}
// MessageRelation explains how the current message relates to a previous message.
// Notably used for message edits.
type MessageRelation struct {
EventID string `json:"event_id"`
Type string `json:"rel_type"`
}
type EditedMessage struct {
NewContent SubTextMessage `json:"m.new_content"`
RelatedTo MessageRelation `json:"m.relates_to"`
matrix.TextMessage
}
type InReplyToRelationContent struct {
EventID string `json:"event_id"`
}
type InReplyToRelation struct {
InReplyTo InReplyToRelationContent `json:"m.in_reply_to"`
}
type ReplyMessage struct {
RelatedTo InReplyToRelation `json:"m.relates_to"`
matrix.TextMessage
}
func New(cfg *bridge.Config) bridge.Bridger {
b := &Bmatrix{Config: cfg}
b.RoomMap = make(map[string]string)
b.NicknameMap = make(map[string]NicknameCacheEntry)
b.RoomMap = make(map[id.RoomID]RoomInfo)
b.NicknameCache = NewNicknameCache()
b.stopNormalSync = make(chan struct{}, 1)
b.stopNormalSyncAck = make(chan struct{}, 1)
return b
}
@@ -91,13 +64,13 @@ func (b *Bmatrix) Connect() error {
var err error
b.Log.Infof("Connecting %s", b.GetString("Server"))
if b.GetString("MxID") != "" && b.GetString("Token") != "" {
b.UserID = id.UserID(b.GetString("MxID"))
b.mc, err = matrix.NewClient(
b.GetString("Server"), b.GetString("MxID"), b.GetString("Token"),
b.GetString("Server"), b.UserID, b.GetString("Token"),
)
if err != nil {
return err
}
b.UserID = b.GetString("MxID")
b.Log.Info("Using existing Matrix credentials")
} else {
b.mc, err = matrix.NewClient(b.GetString("Server"), "", "")
@@ -105,102 +78,150 @@ func (b *Bmatrix) Connect() error {
return err
}
resp, err := b.mc.Login(&matrix.ReqLogin{
Type: "m.login.password",
User: b.GetString("Login"),
Password: b.GetString("Password"),
Identifier: matrix.NewUserIdentifier(b.GetString("Login")),
Type: matrix.AuthTypePassword,
Password: b.GetString("Password"),
Identifier: matrix.UserIdentifier{Type: matrix.IdentifierTypeUser, User: b.GetString("Login")}, //nolint: exhaustruct
StoreCredentials: true,
})
if err != nil {
return err
}
b.mc.SetCredentials(resp.UserID, resp.AccessToken)
b.UserID = resp.UserID
b.Log.Info("Connection succeeded")
}
go b.handlematrix()
b.Log.Debug("Retrieving the list of rooms we have already joined")
joinedRooms, err := b.mc.JoinedRooms()
if err != nil {
b.Log.Errorf("couldn't list the joined rooms")
return err
}
b.joinedRooms = joinedRooms.JoinedRooms
for _, roomID := range joinedRooms.JoinedRooms {
// leave the channel name (usually a channel alias - in the matrix sense)
// unresolved for now, it will be completed when JoinChannel() is called
b.RoomMap[roomID] = RoomInfo{name: "", appService: false}
}
return nil
}
func (b *Bmatrix) Disconnect() error {
// tell the Sync() loop to exit
b.stopNormalSync <- struct{}{}
b.mc.StopSync()
// wait for both the syncer and the appservice to terminate
<-b.stopNormalSyncAck
if b.appService != nil {
b.appService.stop <- struct{}{}
<-b.appService.stopAck
}
return nil
}
func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error {
return b.retry(func() error {
resp, err := b.mc.JoinRoom(channel.Name, "", nil)
resolvedAlias, err := b.mc.ResolveAlias(id.RoomAlias(channel.Name))
if err != nil {
b.Log.Errorf("couldn't retrieve the room ID for the alias '%s'", channel.Name)
return err
}
roomInfo := RoomInfo{name: channel.Name, appService: false}
alreadyJoined := false
for _, roomID := range b.joinedRooms {
// we have already joined this room (e.g. in a previous execution of matterbridge)
// => we only update the room alias, but do not attempt to join it again
if roomID == resolvedAlias.RoomID {
alreadyJoined = true
break
}
}
if !alreadyJoined {
err = b.retry(func() error {
_, innerErr := b.mc.JoinRoom(channel.Name, "", nil)
return innerErr
})
if err != nil {
return err
}
}
b.Lock()
b.RoomMap[resp.RoomID] = channel.Name
b.Unlock()
b.Lock()
b.RoomMap[resolvedAlias.RoomID] = roomInfo
b.Unlock()
return nil
})
return nil
}
func (b *Bmatrix) Start() error {
// at this point, JoinChannel() has been called on all the channels
// declared in the configuration, so we can exit every other joined room
// in order to stop receiving events from rooms we no longer follow
b.RLock()
for _, roomID := range b.joinedRooms {
if _, present := b.RoomMap[roomID]; !present {
// we deliberately ignore the return value,
// because the bridge will still work even if we couln't exit the room
_, _ = b.mc.LeaveRoom(roomID, &matrix.ReqLeave{Reason: "No longer bridged"})
}
}
b.RUnlock()
go b.handlematrix()
if b.GetBool("UseAppService") {
appService, err := b.NewAppService()
if err != nil {
b.Log.Errorf("couldn't load the app service configuration: %#v", err)
return err
}
b.appService = appService
err = b.startAppService()
if err != nil {
b.Log.Errorf("couldn't start the application service: %#v", err)
return err
}
}
return nil
}
func (b *Bmatrix) Send(msg config.Message) (string, error) {
b.Log.Debugf("=> Receiving %#v", msg)
b.Log.Debugf("=> Sending %#v", msg)
channel := b.getRoomID(msg.Channel)
b.Log.Debugf("Channel %s maps to channel id %s", msg.Channel, channel)
if channel == "" {
return "", fmt.Errorf("got message for unknown channel '%s'", msg.Channel)
}
username := newMatrixUsername(msg.Username)
body := username.plain + msg.Text
formattedBody := username.formatted + helper.ParseMarkdown(msg.Text)
if b.GetBool("SpoofUsername") {
// https://spec.matrix.org/v1.3/client-server-api/#mroommember
type stateMember struct {
AvatarURL string `json:"avatar_url,omitempty"`
DisplayName string `json:"displayname"`
Membership string `json:"membership"`
}
// TODO: reset username afterwards with DisplayName: null ?
m := stateMember{
AvatarURL: "",
DisplayName: username.plain,
Membership: "join",
}
_, err := b.mc.SendStateEvent(channel, "m.room.member", b.UserID, m)
if err == nil {
body = msg.Text
formattedBody = helper.ParseMarkdown(msg.Text)
}
if msg.Event == config.EventUserTyping && b.GetBool("ShowUserTyping") {
_, err := b.mc.UserTyping(channel, true, 15000)
return "", err
}
// Make a action /me of the message
if msg.Event == config.EventUserAction {
m := matrix.TextMessage{
MsgType: "m.emote",
Body: body,
FormattedBody: formattedBody,
Format: "org.matrix.custom.html",
//nolint:exhaustruct
m := event.MessageEventContent{
MsgType: event.MsgEmote,
Body: msg.Text,
}
if b.GetBool("HTMLDisable") {
m.Format = ""
m.FormattedBody = ""
if !b.GetBool("HTMLDisable") {
m.FormattedBody = helper.ParseMarkdown(msg.Text)
m.Format = event.FormatHTML
}
msgID := ""
err := b.retry(func() error {
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
if err != nil {
return err
}
msgID = resp.EventID
return err
})
return msgID, err
return b.sendMessageEventWithRetries(channel, m, msg.Username)
}
// Delete message
@@ -212,12 +233,10 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
msgID := ""
err := b.retry(func() error {
resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
if err != nil {
return err
}
//nolint:exhaustruct
resp, err := b.mc.RedactEvent(channel, id.EventID(msg.ID), matrix.ReqRedact{})
msgID = resp.EventID
msgID = string(resp.EventID)
return err
})
@@ -228,13 +247,13 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
// Upload a file if it exists
if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
rmsg := rmsg
//nolint:exhaustruct
m := event.MessageEventContent{
MsgType: event.MsgText,
Body: rmsg.Text,
}
err := b.retry(func() error {
_, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text)
return err
})
_, err := b.sendMessageEventWithRetries(channel, m, msg.Username)
if err != nil {
b.Log.Errorf("sendText failed: %s", err)
}
@@ -247,472 +266,159 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
// Edit message if we have an ID
if msg.ID != "" {
rmsg := EditedMessage{
TextMessage: matrix.TextMessage{
Body: body,
MsgType: "m.text",
Format: "org.matrix.custom.html",
FormattedBody: formattedBody,
},
//nolint:exhaustruct
rmsg := event.MessageEventContent{
MsgType: event.MsgText,
Body: msg.Text,
}
rmsg.NewContent = SubTextMessage{
Body: rmsg.TextMessage.Body,
FormattedBody: rmsg.TextMessage.FormattedBody,
Format: rmsg.TextMessage.Format,
MsgType: "m.text",
//nolint:exhaustruct
rmsg.NewContent = &event.MessageEventContent{
Body: rmsg.Body,
MsgType: event.MsgText,
}
if b.GetBool("HTMLDisable") {
rmsg.TextMessage.Format = ""
rmsg.TextMessage.FormattedBody = ""
rmsg.NewContent.Format = ""
rmsg.NewContent.FormattedBody = ""
rmsg.FormattedBody = "* " + msg.Text
} else {
rmsg.Format = event.FormatHTML
rmsg.FormattedBody = "* " + helper.ParseMarkdown(msg.Text)
rmsg.NewContent.Format = rmsg.Format
rmsg.NewContent.FormattedBody = rmsg.FormattedBody
}
rmsg.RelatedTo = MessageRelation{
EventID: msg.ID,
Type: "m.replace",
//nolint:exhaustruct
rmsg.RelatesTo = &event.RelatesTo{
EventID: id.EventID(msg.ID),
Type: event.RelReplace,
}
err := b.retry(func() error {
_, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)
return b.sendMessageEventWithRetries(channel, rmsg, msg.Username)
}
return err
})
if err != nil {
return "", err
}
//nolint:exhaustruct
m := event.MessageEventContent{
Body: msg.Text,
}
return msg.ID, nil
if !b.GetBool("HTMLDisable") {
m.Format = event.FormatHTML
m.FormattedBody = msg.Text
}
// Use notices to send join/leave events
if msg.Event == config.EventJoinLeave {
m := matrix.TextMessage{
MsgType: "m.notice",
Body: body,
FormattedBody: formattedBody,
Format: "org.matrix.custom.html",
}
m.MsgType = event.MsgNotice
} else {
m.MsgType = event.MsgText
if b.GetBool("HTMLDisable") {
m.Format = ""
m.FormattedBody = ""
} else {
m.FormattedBody = helper.ParseMarkdown(msg.Text)
}
var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
return err
})
if err != nil {
return "", err
if msg.ParentValid() {
m.RelatesTo = &event.RelatesTo{
EventID: "",
Type: event.RelReference,
InReplyTo: &event.InReplyTo{
EventID: id.EventID(msg.ParentID),
},
Key: "",
}
}
return resp.EventID, err
}
if msg.ParentValid() {
m := ReplyMessage{
TextMessage: matrix.TextMessage{
MsgType: "m.text",
Body: body,
FormattedBody: formattedBody,
Format: "org.matrix.custom.html",
},
}
return b.sendMessageEventWithRetries(channel, m, msg.Username)
}
if b.GetBool("HTMLDisable") {
m.TextMessage.Format = ""
m.TextMessage.FormattedBody = ""
}
m.RelatedTo = InReplyToRelation{
InReplyTo: InReplyToRelationContent{
EventID: msg.ParentID,
},
}
var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)
return err
})
if err != nil {
return "", err
}
return resp.EventID, err
// DontProcessOldEvents returns true if a sync event should be considered for further processing.
// We use that function to filter out events we have already read.
func (b *Bmatrix) DontProcessOldEvents(resp *matrix.RespSync, since string) bool {
// we only filter old events in the initial sync(), because subsequent sync()
// (where since != "") should only return new events
if since != "" {
return true
}
if b.GetBool("HTMLDisable") {
var (
resp *matrix.RespSendEvent
err error
)
for joinedRoom, roomData := range resp.Rooms.Join {
var readTimestamp int64 = 0
// retrieve the timestamp of the last read receipt
// note: we're not sure some events will not be thrown away in this
// initial sync, as the server may not have received some events yet when
// the read receipt was sent: there is a mix of timestamps between
// the read receipt on the target homeserver and the timestamps when
// events were *created* on the homeserver peers
for _, evt := range roomData.Ephemeral.Events {
if evt.Type != event.EphemeralEventReceipt {
continue
}
err = b.retry(func() error {
resp, err = b.mc.SendText(channel, body)
err := evt.Content.ParseRaw(evt.Type)
if err != nil {
b.Log.Warnf("couldn't parse receipt event %#v", evt.Content)
}
receipts := *evt.Content.AsReceipt()
for _, receiptByType := range receipts {
for _, receiptsByUser := range receiptByType {
for userID, userReceipt := range receiptsByUser {
// ignore read receipts of other users
if userID != b.UserID {
continue
}
return err
})
if err != nil {
return "", err
readTimestamp = userReceipt.Timestamp.UnixNano()
}
}
}
}
return resp.EventID, err
newEventList := make([]*event.Event, 0, len(roomData.Timeline.Events))
for _, evt := range roomData.Timeline.Events {
// remove old event, except for state changes
if evt.Timestamp > readTimestamp || evt.Type.Class == event.StateEventType {
newEventList = append(newEventList, evt)
}
}
roomData.Timeline.Events = newEventList
resp.Rooms.Join[joinedRoom] = roomData
}
// Post normal message with HTML support (eg riot.im)
var (
resp *matrix.RespSendEvent
err error
)
err = b.retry(func() error {
resp, err = b.mc.SendFormattedText(channel, body, formattedBody)
return err
})
if err != nil {
return "", err
}
return resp.EventID, err
return true
}
func (b *Bmatrix) handlematrix() {
syncer := b.mc.Syncer.(*matrix.DefaultSyncer)
syncer.OnEventType("m.room.redaction", b.handleEvent)
syncer.OnEventType("m.room.message", b.handleEvent)
syncer.OnEventType("m.room.member", b.handleMemberChange)
syncer, ok := b.mc.Syncer.(*matrix.DefaultSyncer)
if !ok {
b.Log.Errorf("couldn't convert the Syncer object to a DefaultSyncer structure, the matrix bridge won't work")
return
}
// register our custom filtering function
syncer.OnSync(b.DontProcessOldEvents)
eventsTypes := []event.Type{event.EventRedaction, event.EventMessage, event.StateMember, event.EphemeralEventReceipt}
if b.GetBool("ShowUserTyping") {
eventsTypes = append(eventsTypes, event.EphemeralEventTyping)
}
for _, evType := range eventsTypes {
syncer.OnEventType(evType, func(source matrix.EventSource, ev *event.Event) {
b.handleEvent(originClassicSyncer, ev)
})
}
go func() {
for {
if b == nil {
select {
case <-b.stopNormalSync:
b.stopNormalSyncAck <- struct{}{}
return
}
if err := b.mc.Sync(); err != nil {
b.Log.Println("Sync() returned ", err)
default:
if err := b.mc.Sync(); err != nil {
b.Log.Warningf("Sync() returned %#v", err)
}
}
}
}()
}
func (b *Bmatrix) handleEdit(ev *matrix.Event, rmsg config.Message) bool {
relationInterface, present := ev.Content["m.relates_to"]
newContentInterface, present2 := ev.Content["m.new_content"]
if !(present && present2) {
return false
}
var relation MessageRelation
if err := interface2Struct(relationInterface, &relation); err != nil {
b.Log.Warnf("Couldn't parse 'm.relates_to' object with value %#v", relationInterface)
return false
}
var newContent SubTextMessage
if err := interface2Struct(newContentInterface, &newContent); err != nil {
b.Log.Warnf("Couldn't parse 'm.new_content' object with value %#v", newContentInterface)
return false
}
if relation.Type != "m.replace" {
return false
}
rmsg.ID = relation.EventID
rmsg.Text = newContent.Body
b.Remote <- rmsg
return true
}
func (b *Bmatrix) handleReply(ev *matrix.Event, rmsg config.Message) bool {
relationInterface, present := ev.Content["m.relates_to"]
if !present {
return false
}
var relation InReplyToRelation
if err := interface2Struct(relationInterface, &relation); err != nil {
// probably fine
return false
}
body := rmsg.Text
if !b.GetBool("keepquotedreply") {
for strings.HasPrefix(body, "> ") {
lineIdx := strings.IndexRune(body, '\n')
if lineIdx == -1 {
body = ""
} else {
body = body[(lineIdx + 1):]
}
}
}
rmsg.Text = body
rmsg.ParentID = relation.InReplyTo.EventID
b.Remote <- rmsg
return true
}
func (b *Bmatrix) handleMemberChange(ev *matrix.Event) {
// Update the displayname on join messages, according to https://matrix.org/docs/spec/client_server/r0.6.1#events-on-change-of-profile-information
if ev.Content["membership"] == "join" {
if dn, ok := ev.Content["displayname"].(string); ok {
b.cacheDisplayName(ev.Sender, dn)
}
}
}
func (b *Bmatrix) handleEvent(ev *matrix.Event) {
b.Log.Debugf("== Receiving event: %#v", ev)
if ev.Sender != b.UserID {
b.RLock()
channel, ok := b.RoomMap[ev.RoomID]
b.RUnlock()
if !ok {
b.Log.Debugf("Unknown room %s", ev.RoomID)
return
}
// Create our message
rmsg := config.Message{
Username: b.getDisplayName(ev.Sender),
Channel: channel,
Account: b.Account,
UserID: ev.Sender,
ID: ev.ID,
Avatar: b.getAvatarURL(ev.Sender),
}
// Remove homeserver suffix if configured
if b.GetBool("NoHomeServerSuffix") {
re := regexp.MustCompile("(.*?):.*")
rmsg.Username = re.ReplaceAllString(rmsg.Username, `$1`)
}
// Delete event
if ev.Type == "m.room.redaction" {
rmsg.Event = config.EventMsgDelete
rmsg.ID = ev.Redacts
rmsg.Text = config.EventMsgDelete
b.Remote <- rmsg
return
}
// Text must be a string
if rmsg.Text, ok = ev.Content["body"].(string); !ok {
b.Log.Errorf("Content[body] is not a string: %T\n%#v",
ev.Content["body"], ev.Content)
return
}
// Do we have a /me action
if ev.Content["msgtype"].(string) == "m.emote" {
rmsg.Event = config.EventUserAction
}
// Is it an edit?
if b.handleEdit(ev, rmsg) {
return
}
// Is it a reply?
if b.handleReply(ev, rmsg) {
return
}
// Do we have attachments
if b.containsAttachment(ev.Content) {
err := b.handleDownloadFile(&rmsg, ev.Content)
if err != nil {
b.Log.Errorf("download failed: %#v", err)
}
}
b.Log.Debugf("<= Sending message from %s on %s to gateway", ev.Sender, b.Account)
b.Remote <- rmsg
// not crucial, so no ratelimit check here
if err := b.mc.MarkRead(ev.RoomID, ev.ID); err != nil {
b.Log.Errorf("couldn't mark message as read %s", err.Error())
}
}
}
// handleDownloadFile handles file download
func (b *Bmatrix) handleDownloadFile(rmsg *config.Message, content map[string]interface{}) error {
var (
ok bool
url, name, msgtype, mtype string
info map[string]interface{}
size float64
)
rmsg.Extra = make(map[string][]interface{})
if url, ok = content["url"].(string); !ok {
return fmt.Errorf("url isn't a %T", url)
}
url = strings.Replace(url, "mxc://", b.GetString("Server")+"/_matrix/media/v1/download/", -1)
if info, ok = content["info"].(map[string]interface{}); !ok {
return fmt.Errorf("info isn't a %T", info)
}
if size, ok = info["size"].(float64); !ok {
return fmt.Errorf("size isn't a %T", size)
}
if name, ok = content["body"].(string); !ok {
return fmt.Errorf("name isn't a %T", name)
}
if msgtype, ok = content["msgtype"].(string); !ok {
return fmt.Errorf("msgtype isn't a %T", msgtype)
}
if mtype, ok = info["mimetype"].(string); !ok {
return fmt.Errorf("mtype isn't a %T", mtype)
}
// check if we have an image uploaded without extension
if !strings.Contains(name, ".") {
if msgtype == "m.image" {
mext, _ := mime.ExtensionsByType(mtype)
if len(mext) > 0 {
name += mext[0]
}
} else {
// just a default .png extension if we don't have mime info
name += ".png"
}
}
// check if the size is ok
err := helper.HandleDownloadSize(b.Log, rmsg, name, int64(size), b.General)
if err != nil {
return err
}
// actually download the file
data, err := helper.DownloadFile(url)
if err != nil {
return fmt.Errorf("download %s failed %#v", url, err)
}
// add the downloaded data to the message
helper.HandleDownloadData(b.Log, rmsg, name, "", url, data, b.General)
return nil
}
// handleUploadFiles handles native upload of files.
func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel string) (string, error) {
for _, f := range msg.Extra["file"] {
if fi, ok := f.(config.FileInfo); ok {
b.handleUploadFile(msg, channel, &fi)
}
}
return "", nil
}
// handleUploadFile handles native upload of a file.
func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *config.FileInfo) {
username := newMatrixUsername(msg.Username)
content := bytes.NewReader(*fi.Data)
sp := strings.Split(fi.Name, ".")
mtype := mime.TypeByExtension("." + sp[len(sp)-1])
// image and video uploads send no username, we have to do this ourself here #715
err := b.retry(func() error {
_, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
return err
})
if err != nil {
b.Log.Errorf("file comment failed: %#v", err)
}
b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
var res *matrix.RespMediaUpload
err = b.retry(func() error {
res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))
return err
})
if err != nil {
b.Log.Errorf("file upload failed: %#v", err)
return
}
switch {
case strings.Contains(mtype, "video"):
b.Log.Debugf("sendVideo %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
return err
})
if err != nil {
b.Log.Errorf("sendVideo failed: %#v", err)
}
case strings.Contains(mtype, "image"):
b.Log.Debugf("sendImage %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
return err
})
if err != nil {
b.Log.Errorf("sendImage failed: %#v", err)
}
case strings.Contains(mtype, "audio"):
b.Log.Debugf("sendAudio %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
MsgType: "m.audio",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.AudioInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
})
return err
})
if err != nil {
b.Log.Errorf("sendAudio failed: %#v", err)
}
default:
b.Log.Debugf("sendFile %s", res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
MsgType: "m.file",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.FileInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
})
return err
})
if err != nil {
b.Log.Errorf("sendFile failed: %#v", err)
}
}
b.Log.Debugf("result: %#v", res)
}

View File

@@ -0,0 +1,43 @@
package bmatrix
import (
"errors"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
)
var levels_zerolog2logrus = map[zerolog.Level]logrus.Level{
zerolog.DebugLevel: logrus.DebugLevel,
zerolog.InfoLevel: logrus.InfoLevel,
zerolog.WarnLevel: logrus.WarnLevel,
zerolog.FatalLevel: logrus.FatalLevel,
zerolog.PanicLevel: logrus.PanicLevel,
zerolog.ErrorLevel: logrus.ErrorLevel,
zerolog.TraceLevel: logrus.TraceLevel,
}
// an abstraction for zerolog so we can pipe its output to logrus.Entry, that is used in matterbridge
type zerologWrapper struct {
inner *logrus.Entry
}
func (w zerologWrapper) Write(p []byte) (n int, err error) {
return w.inner.Logger.Writer().Write(p)
}
func (w zerologWrapper) WriteLevel(level zerolog.Level, p []byte) (n int, err error) {
if logrus_level, present := levels_zerolog2logrus[level]; present {
return w.inner.Logger.WriterLevel(logrus_level).Write(p)
}
// drop the message if we haven't a matching level
return 0, errors.New("Unsupported logging level")
}
func NewZerologWrapper(entry *logrus.Entry) zerolog.Logger {
wrapper := zerologWrapper{inner: entry}
log := zerolog.New(wrapper)
return log
}