matrix: revert to the classical syncer when the application sevrice is no longer working
This commit is contained in:
@@ -77,7 +77,6 @@ func (b *Bmatrix) ignoreBridgingEvents(ev *event.Event) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
//nolint:funlen
|
||||
func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
if ev.Type == event.EphemeralEventReceipt {
|
||||
// we do not support read receipts across servers, considering that
|
||||
@@ -99,8 +98,8 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
// This needs to be defined before rejecting bridging events, as we rely on this to cache
|
||||
// avatar URLs sent with appService (otherwise we would upload one avatar per message sent
|
||||
// across the bridge!).
|
||||
// As such, beware! Moving this below the b.ignoreBridgingEvents condiiton would appear to
|
||||
// work, but it would also lead to a high file upload rate, until being eventually
|
||||
// As such, beware! Moving this below the b.ignoreBridgingEvents condition would appear to
|
||||
// work, but it would also lead to a high file upload rate, until the bridge becomes eventually
|
||||
// rate-limited by the homeserver
|
||||
if ev.Type == event.StateMember {
|
||||
b.handleMemberChange(ev)
|
||||
@@ -108,25 +107,57 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
// if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event
|
||||
if channel.appService && origin != originAppService {
|
||||
b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if b.ignoreBridgingEvents(ev) {
|
||||
return
|
||||
}
|
||||
|
||||
// if we receive appservice events for this room, there is no need to check them with the classical syncer
|
||||
if !channel.appService && origin == originAppService {
|
||||
// try again later in case the appservice is not working
|
||||
if origin == originClassicSyncer && channel.appService {
|
||||
go b.processMessageIfAppServiceIsNotWorking(ev)
|
||||
|
||||
return
|
||||
} else if origin == originAppService {
|
||||
// if we receive appservice events for this room, there is no need to check them with the classical syncer
|
||||
channel.appService = true
|
||||
channel.lastMessageAppService = time.Now()
|
||||
b.Lock()
|
||||
b.RoomMap[ev.RoomID] = channel
|
||||
b.Unlock()
|
||||
}
|
||||
|
||||
b.handleAcceptedEvent(ev, channel, origin == originAppService)
|
||||
}
|
||||
|
||||
func (b *Bmatrix) processMessageIfAppServiceIsNotWorking(ev *event.Event) {
|
||||
// sleep some time and check whether we received the event via the appService
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
b.RLock()
|
||||
channel, ok := b.RoomMap[ev.RoomID]
|
||||
b.RUnlock()
|
||||
if !ok {
|
||||
b.Log.Debugf("Room %s no longer exit, aborting the retry operation", ev.RoomID)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// we receive no appservice message in the latest 30 seconds -> appService must be failing for now, disable it
|
||||
if time.Since(channel.lastMessageAppService) > 30*time.Second {
|
||||
b.Log.Warnf("The AppService no longer receive the events in room %s, reverting to the classical syncer", ev.RoomID)
|
||||
|
||||
channel.appService = false
|
||||
b.Lock()
|
||||
b.RoomMap[ev.RoomID] = channel
|
||||
b.Unlock()
|
||||
|
||||
b.handleAcceptedEvent(ev, channel, false)
|
||||
} else {
|
||||
// if we receive messages both via the classical matrix syncer and appserver, prefer appservice and throw away this duplicate event
|
||||
b.Log.Debugf("Dropping event, should receive it via appservice: %s", ev.ID)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bmatrix) handleAcceptedEvent(ev *event.Event, channel RoomInfo, appservice bool) {
|
||||
if ev.Type == event.EphemeralEventTyping {
|
||||
typing := ev.Content.AsTyping()
|
||||
if len(typing.UserIDs) > 0 {
|
||||
@@ -141,7 +172,7 @@ func (b *Bmatrix) handleEvent(origin EventOrigin, ev *event.Event) {
|
||||
return
|
||||
}
|
||||
|
||||
b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, origin == originAppService)
|
||||
b.Log.Debugf("== Receiving event: %#v (appService=%t)", ev, appservice)
|
||||
|
||||
defer (func(ev *event.Event) {
|
||||
// not crucial, so no ratelimit check here
|
||||
@@ -343,6 +374,7 @@ func (b *Bmatrix) handleUploadFiles(msg *config.Message, channel id.RoomID) (str
|
||||
}
|
||||
|
||||
// handleUploadFile handles native upload of a file.
|
||||
//
|
||||
//nolint:funlen
|
||||
func (b *Bmatrix) handleUploadFile(msg *config.Message, channel id.RoomID, fi *config.FileInfo) {
|
||||
content := bytes.NewReader(*fi.Data)
|
||||
|
||||
@@ -29,8 +29,9 @@ const (
|
||||
)
|
||||
|
||||
type RoomInfo struct {
|
||||
name string
|
||||
appService bool
|
||||
name string
|
||||
appService bool
|
||||
lastMessageAppService time.Time
|
||||
}
|
||||
|
||||
type Bmatrix struct {
|
||||
|
||||
Reference in New Issue
Block a user