Split parent message resolution into two steps.

This commit is contained in:
Patrick Connolly
2018-12-02 02:22:45 +08:00
parent bb417f19e0
commit 45f9c16e48
2 changed files with 38 additions and 11 deletions

View File

@@ -42,6 +42,17 @@ type Message struct {
Extra map[string][]interface{} Extra map[string][]interface{}
} }
func (msg *Message) CanonicalParentMsgID() string {
if msg.Extra != nil && len(msg.Extra["canonical_parent_mid"]) == 1 {
return msg.Extra["canonical_parent_mid"][0].(string)
}
return ""
}
func (msg *Message) IsThreadable() bool {
return msg.CanonicalParentMsgID() != ""
}
type FileInfo struct { type FileInfo struct {
Name string Name string
Data *[]byte Data *[]byte

View File

@@ -56,6 +56,10 @@ func New(cfg config.Gateway, r *Router) *Gateway {
// Find the canonical ID that the message is keyed under in cache // Find the canonical ID that the message is keyed under in cache
func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string { func (gw *Gateway) FindCanonicalMsgID(protocol string, mID string) string {
if mID == "" {
return ""
}
ID := protocol + " " + mID ID := protocol + " " + mID
if gw.Messages.Contains(ID) { if gw.Messages.Contains(ID) {
return mID return mID
@@ -250,13 +254,16 @@ func (gw *Gateway) handleMessage(msg config.Message, dest *bridge.Bridge) []*BrM
return brMsgIDs return brMsgIDs
} }
// Perform single intensive operation to resolve parent msg ID through
// cache, which allows us to rethread messages.
gw.resolveCanonicalParentMsgID(&msg)
originchannel := msg.Channel originchannel := msg.Channel
origmsg := msg origmsg := msg
channels := gw.getDestChannel(&msg, *dest) channels := gw.getDestChannel(&msg, *dest)
for _, channel := range channels { for _, channel := range channels {
if parentID, isThreaded := gw.handleThreading(&msg, dest, channel); isThreaded { // Performs rethreading.
msg.ParentID = parentID msg.ParentID = gw.getParentID(&msg, dest, channel)
}
// Only send the avatar download event to ourselves. // Only send the avatar download event to ourselves.
if msg.Event == config.EventAvatarDownload { if msg.Event == config.EventAvatarDownload {
@@ -431,22 +438,31 @@ func (gw *Gateway) modifyMessage(msg *config.Message) {
} }
} }
func (gw *Gateway) handleThreading(msg *config.Message, dest *bridge.Bridge, channel config.ChannelInfo) (string, bool) { func (gw *Gateway) resolveCanonicalParentMsgID(msg *config.Message) {
if msg.ParentID != "" {
extras := make([]interface{}, 1)
extras[0] = gw.FindCanonicalMsgID(msg.Protocol, msg.ParentID)
msg.Extra["canonical_parent_mid"] = extras
}
}
func (gw *Gateway) getParentID(msg *config.Message, dest *bridge.Bridge, channel config.ChannelInfo) string {
if msg.ParentID == "" { if msg.ParentID == "" {
// Message is not threaded. // Message is not threaded.
return "", false return ""
} }
canonicalParentMsgID := gw.FindCanonicalMsgID(msg.Protocol, msg.ParentID) if !dest.GetBool("PreserveThreading") || !msg.IsThreadable() {
if !dest.GetBool("PreserveThreading") || canonicalParentMsgID == "" {
// Mark message as unthreaded, either because disabled or uncached. // Mark message as unthreaded, either because disabled or uncached.
return "unthreaded", true return "unthreaded"
} }
if parentID := gw.getDestMsgID(msg.Protocol+" "+canonicalParentMsgID, dest, channel); parentID != "" { if downstreamID := gw.getDestMsgID(msg.Protocol+" "+msg.CanonicalParentMsgID(), dest, channel); downstreamID != "" {
return parentID, true return downstreamID
} }
return canonicalParentMsgID, true
// If all else fails, this must be the original parent message.
return msg.CanonicalParentMsgID()
} }
// handleFiles uploads or places all files on the given msg to the MediaServer and // handleFiles uploads or places all files on the given msg to the MediaServer and