From cddab32419bf67af8ac7acad1d1e006d56b611ba Mon Sep 17 00:00:00 2001 From: Simon Thoby Date: Sun, 9 Apr 2023 14:27:11 +0200 Subject: [PATCH] matrix: revert to the classical syncer when the application sevrice is no longer working --- bridge/matrix/handlers.go | 58 ++++++++++++++++++++++++++++++--------- bridge/matrix/matrix.go | 5 ++-- 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/bridge/matrix/handlers.go b/bridge/matrix/handlers.go index 78ed8470..f2bb6e47 100644 --- a/bridge/matrix/handlers.go +++ b/bridge/matrix/handlers.go @@ -77,7 +77,6 @@ func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool { return false } -//nolint:funlen func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) { if ev.Type == event.EphemeralEventReceipt { // we do not support read receipts across servers, considering that @@ -99,8 +98,8 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) { // This needs to be defined before rejecting bridging events, as we rely on this to cache // avatar URLs sent with appService (otherwise we would upload one avatar per message sent // across the bridge!). - // As such, beware! Moving this below the b.ignoreBridgingEvents condiiton would appear to - // work, but it would also lead to a high file upload rate, until being eventually + // As such, beware! Moving this below the b.ignoreBridgingEvents condition would appear to + // work, but it would also lead to a high file upload rate, until the bridge becomes eventually // rate-limited by the homeserver if ev.Type == event.StateMember { b.handleMemberChange(ev) @@ -108,25 +107,57 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) { return } - // if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event - if channel.appService && origin != originAppService { - b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID) - - return - } - if b.ignoreBridgingEvents(ev) { return } - // if we receive appservice events for this room, there is no need to check them with the classical syncer - if !channel.appService && origin == originAppService { + // try again later in case the appservice is not working + if origin == originClassicSyncer && channel.appService { + go b.processMessageIfAppServiceIsNotWorking(ev) + + return + } else if origin == originAppService { + // if we receive appservice events for this room, there is no need to check them with the classical syncer channel.appService = true + channel.lastMessageAppService = time.Now() b.Lock() b.RoomMap[ev.RoomID] = channel b.Unlock() } + b.handleAcceptedEvent(ev, channel, origin == originAppService) +} + +func (b *Bmatrix) processMessageIfAppServiceIsNotWorking(ev *event.Event) { + // sleep some time and check whether we received the event via the appService + time.Sleep(15 * time.Second) + + b.RLock() + channel, ok := b.RoomMap[ev.RoomID] + b.RUnlock() + if !ok { + b.Log.Debugf("Room %s no longer exit, aborting the retry operation", ev.RoomID) + + return + } + + // we receive no appservice message in the latest 30 seconds -> appService must be failing for now, disable it + if time.Since(channel.lastMessageAppService) > 30*time.Second { + b.Log.Warnf("The AppService no longer receive the events in room %s, reverting to the classical syncer", ev.RoomID) + + channel.appService = false + b.Lock() + b.RoomMap[ev.RoomID] = channel + b.Unlock() + + b.handleAcceptedEvent(ev, channel, false) + } else { + // if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event + b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID) + } +} + +func (b *Bmatrix) handleAcceptedEvent(ev *event.Event, channel RoomInfo, appservice bool) { if ev.Type == event.EphemeralEventTyping { typing := ev.Content.AsTyping() if len(typing.UserIDs) > 0 { @@ -141,7 +172,7 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) { return } - b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, origin == originAppService) + b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, appservice) defer (func(ev *event.Event) { // not crucial, so no ratelimit check here @@ -343,6 +374,7 @@ func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel id.RoomID) (str } // handleUploadFile handles native upload of a file. +// //nolint:funlen func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *config.FileInfo) { content := bytes.NewReader(*fi.Data) diff --git a/bridge/matrix/matrix.go b/bridge/matrix/matrix.go index 0011c45f..c5ba43e7 100644 --- a/bridge/matrix/matrix.go +++ b/bridge/matrix/matrix.go @@ -29,8 +29,9 @@ const ( ) type RoomInfo struct { - name string - appService bool + name string + appService bool + lastMessageAppService time.Time } type Bmatrix struct {