forked from jshiffer/matterbridge
Refactor gateway (#648)
* Decrease complexity of handleMessage, handleReceive, handleFiles * Move handlers to handlers.go * Split ignoreMessage up in ignoreTextEmpty, ignoreNicks and IgnoreTexts * Add ignoreEvent * Add testcase for ignoreTextEmpty, ignoreNicks, ignoreTexts and ignoreEvent
This commit is contained in:
parent
bfa9a83d31
commit
ccd55d2a28
@ -1,13 +1,6 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha1" //nolint:gosec
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
@ -50,7 +43,9 @@ func New(cfg config.Gateway, r *Router) *Gateway {
|
||||
Router: r, Bridges: make(map[string]*bridge.Bridge), Config: r.Config}
|
||||
cache, _ := lru.New(5000)
|
||||
gw.Messages = cache
|
||||
gw.AddConfig(&cfg)
|
||||
if err := gw.AddConfig(&cfg); err != nil {
|
||||
flog.Errorf("AddConfig failed: %s", err)
|
||||
}
|
||||
return gw
|
||||
}
|
||||
|
||||
@ -94,7 +89,9 @@ func (gw *Gateway) AddBridge(cfg *config.Bridge) error {
|
||||
func (gw *Gateway) AddConfig(cfg *config.Gateway) error {
|
||||
gw.Name = cfg.Name
|
||||
gw.MyConfig = cfg
|
||||
gw.mapChannels()
|
||||
if err := gw.mapChannels(); err != nil {
|
||||
flog.Errorf("mapChannels() failed: %s", err)
|
||||
}
|
||||
for _, br := range append(gw.MyConfig.In, append(gw.MyConfig.InOut, gw.MyConfig.Out...)...) {
|
||||
br := br //scopelint
|
||||
err := gw.AddBridge(&br)
|
||||
@ -114,7 +111,9 @@ func (gw *Gateway) mapChannelsToBridge(br *bridge.Bridge) {
|
||||
}
|
||||
|
||||
func (gw *Gateway) reconnectBridge(br *bridge.Bridge) {
|
||||
br.Disconnect()
|
||||
if err := br.Disconnect(); err != nil {
|
||||
flog.Errorf("Disconnect() %s failed: %s", br.Account, err)
|
||||
}
|
||||
time.Sleep(time.Second * 5)
|
||||
RECONNECT:
|
||||
flog.Infof("Reconnecting %s", br.Account)
|
||||
@ -125,7 +124,9 @@ RECONNECT:
|
||||
goto RECONNECT
|
||||
}
|
||||
br.Joined = make(map[string]bool)
|
||||
br.JoinChannels()
|
||||
if err := br.JoinChannels(); err != nil {
|
||||
flog.Errorf("JoinChannels() %s failed: %s", br.Account, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (gw *Gateway) mapChannelConfig(cfg []config.Bridge, direction string) {
|
||||
@ -212,105 +213,55 @@ func (gw *Gateway) getDestMsgID(msgID string, dest *bridge.Bridge, channel confi
|
||||
return ""
|
||||
}
|
||||
|
||||
func (gw *Gateway) handleMessage(msg config.Message, dest *bridge.Bridge) []*BrMsgID {
|
||||
var brMsgIDs []*BrMsgID
|
||||
|
||||
// if we have an attached file, or other info
|
||||
if msg.Extra != nil {
|
||||
if len(msg.Extra[config.EventFileFailureSize]) != 0 {
|
||||
if msg.Text == "" {
|
||||
return brMsgIDs
|
||||
}
|
||||
}
|
||||
// ignoreTextEmpty returns true if we need to ignore a message with an empty text.
|
||||
func (gw *Gateway) ignoreTextEmpty(msg *config.Message) bool {
|
||||
if msg.Text != "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// Avatar downloads are only relevant for telegram and mattermost for now
|
||||
if msg.Event == config.EventAvatarDownload {
|
||||
if dest.Protocol != "mattermost" &&
|
||||
dest.Protocol != "telegram" {
|
||||
return brMsgIDs
|
||||
}
|
||||
if msg.Event == config.EventUserTyping {
|
||||
return false
|
||||
}
|
||||
|
||||
// only relay join/part when configured
|
||||
if msg.Event == config.EventJoinLeave && !dest.GetBool("ShowJoinPart") {
|
||||
return brMsgIDs
|
||||
// we have an attachment or actual bytes, do not ignore
|
||||
if msg.Extra != nil &&
|
||||
(msg.Extra["attachments"] != nil ||
|
||||
len(msg.Extra["file"]) > 0 ||
|
||||
len(msg.Extra[config.EventFileFailureSize]) > 0) {
|
||||
return false
|
||||
}
|
||||
flog.Debugf("ignoring empty message %#v from %s", msg, msg.Account)
|
||||
return true
|
||||
}
|
||||
|
||||
// only relay topic change when used in some way on other side
|
||||
if msg.Event == config.EventTopicChange &&
|
||||
dest.GetBool("ShowTopicChange") &&
|
||||
dest.GetBool("SyncTopic") {
|
||||
return brMsgIDs
|
||||
}
|
||||
|
||||
// broadcast to every out channel (irc QUIT)
|
||||
if msg.Channel == "" && msg.Event != config.EventJoinLeave {
|
||||
flog.Debug("empty channel")
|
||||
return brMsgIDs
|
||||
}
|
||||
|
||||
// Get the ID of the parent message in thread
|
||||
var canonicalParentMsgID string
|
||||
if msg.ParentID != "" && dest.GetBool("PreserveThreading") {
|
||||
canonicalParentMsgID = gw.FindCanonicalMsgID(msg.Protocol, msg.ParentID)
|
||||
}
|
||||
|
||||
originchannel := msg.Channel
|
||||
origmsg := msg
|
||||
channels := gw.getDestChannel(&msg, *dest)
|
||||
for _, channel := range channels {
|
||||
// Only send the avatar download event to ourselves.
|
||||
if msg.Event == config.EventAvatarDownload {
|
||||
if channel.ID != getChannelID(origmsg) {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// do not send to ourself for any other event
|
||||
if channel.ID == getChannelID(origmsg) {
|
||||
continue
|
||||
}
|
||||
// ignoreTexts returns true if msg.Text matches any of the input regexes.
|
||||
func (gw *Gateway) ignoreTexts(msg *config.Message, input []string) bool {
|
||||
for _, entry := range input {
|
||||
if entry == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Too noisy to log like other events
|
||||
if msg.Event != config.EventUserTyping {
|
||||
flog.Debugf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, originchannel, dest.Account, channel.Name)
|
||||
}
|
||||
|
||||
msg.Channel = channel.Name
|
||||
msg.Avatar = gw.modifyAvatar(origmsg, dest)
|
||||
msg.Username = gw.modifyUsername(origmsg, dest)
|
||||
|
||||
msg.ID = gw.getDestMsgID(origmsg.Protocol+" "+origmsg.ID, dest, channel)
|
||||
|
||||
// for api we need originchannel as channel
|
||||
if dest.Protocol == apiProtocol {
|
||||
msg.Channel = originchannel
|
||||
}
|
||||
|
||||
msg.ParentID = gw.getDestMsgID(origmsg.Protocol+" "+canonicalParentMsgID, dest, channel)
|
||||
if msg.ParentID == "" {
|
||||
msg.ParentID = canonicalParentMsgID
|
||||
}
|
||||
|
||||
// if we are using mattermost plugin account, send messages to MattermostPlugin channel
|
||||
// that can be picked up by the mattermost matterbridge plugin
|
||||
if dest.Account == "mattermost.plugin" {
|
||||
gw.Router.MattermostPlugin <- msg
|
||||
}
|
||||
|
||||
mID, err := dest.Send(msg)
|
||||
// TODO do not compile regexps everytime
|
||||
re, err := regexp.Compile(entry)
|
||||
if err != nil {
|
||||
flog.Error(err)
|
||||
flog.Errorf("incorrect regexp %s for %s", entry, msg.Account)
|
||||
continue
|
||||
}
|
||||
|
||||
// append the message ID (mID) from this bridge (dest) to our brMsgIDs slice
|
||||
if mID != "" {
|
||||
flog.Debugf("mID %s: %s", dest.Account, mID)
|
||||
brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + mID, channel.ID})
|
||||
if re.MatchString(msg.Text) {
|
||||
flog.Debugf("matching %s. ignoring %s from %s", entry, msg.Text, msg.Account)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return brMsgIDs
|
||||
return false
|
||||
}
|
||||
|
||||
// ignoreNicks returns true if msg.Username matches any of the input regexes.
|
||||
func (gw *Gateway) ignoreNicks(msg *config.Message, input []string) bool {
|
||||
// is the username in IgnoreNicks field
|
||||
for _, entry := range input {
|
||||
if msg.Username == entry {
|
||||
flog.Debugf("ignoring %s from %s", msg.Username, msg.Account)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (gw *Gateway) ignoreMessage(msg *config.Message) bool {
|
||||
@ -319,45 +270,12 @@ func (gw *Gateway) ignoreMessage(msg *config.Message) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// check if we need to ignore a empty message
|
||||
if msg.Text == "" {
|
||||
if msg.Event == config.EventUserTyping {
|
||||
return false
|
||||
}
|
||||
// we have an attachment or actual bytes, do not ignore
|
||||
if msg.Extra != nil &&
|
||||
(msg.Extra["attachments"] != nil ||
|
||||
len(msg.Extra["file"]) > 0 ||
|
||||
len(msg.Extra[config.EventFileFailureSize]) > 0) {
|
||||
return false
|
||||
}
|
||||
flog.Debugf("ignoring empty message %#v from %s", msg, msg.Account)
|
||||
igNicks := strings.Fields(gw.Bridges[msg.Account].GetString("IgnoreNicks"))
|
||||
igMessages := strings.Fields(gw.Bridges[msg.Account].GetString("IgnoreMessages"))
|
||||
if gw.ignoreTextEmpty(msg) || gw.ignoreNicks(msg, igNicks) || gw.ignoreTexts(msg, igMessages) {
|
||||
return true
|
||||
}
|
||||
|
||||
// is the username in IgnoreNicks field
|
||||
for _, entry := range strings.Fields(gw.Bridges[msg.Account].GetString("IgnoreNicks")) {
|
||||
if msg.Username == entry {
|
||||
flog.Debugf("ignoring %s from %s", msg.Username, msg.Account)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// does the message match regex in IgnoreMessages field
|
||||
// TODO do not compile regexps everytime
|
||||
for _, entry := range strings.Fields(gw.Bridges[msg.Account].GetString("IgnoreMessages")) {
|
||||
if entry != "" {
|
||||
re, err := regexp.Compile(entry)
|
||||
if err != nil {
|
||||
flog.Errorf("incorrect regexp %s for %s", entry, msg.Account)
|
||||
continue
|
||||
}
|
||||
if re.MatchString(msg.Text) {
|
||||
flog.Debugf("matching %s. ignoring %s from %s", entry, msg.Text, msg.Account)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -438,86 +356,61 @@ func (gw *Gateway) modifyMessage(msg *config.Message) {
|
||||
}
|
||||
}
|
||||
|
||||
// handleFiles uploads or places all files on the given msg to the MediaServer and
|
||||
// adds the new URL of the file on the MediaServer onto the given msg.
|
||||
func (gw *Gateway) handleFiles(msg *config.Message) {
|
||||
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
|
||||
// If we don't have a attachfield or we don't have a mediaserver configured return
|
||||
if msg.Extra == nil ||
|
||||
(gw.BridgeValues().General.MediaServerUpload == "" &&
|
||||
gw.BridgeValues().General.MediaDownloadPath == "") {
|
||||
return
|
||||
}
|
||||
|
||||
// If we don't have files, nothing to upload.
|
||||
if len(msg.Extra["file"]) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
|
||||
for i, f := range msg.Extra["file"] {
|
||||
fi := f.(config.FileInfo)
|
||||
ext := filepath.Ext(fi.Name)
|
||||
fi.Name = fi.Name[0 : len(fi.Name)-len(ext)]
|
||||
fi.Name = reg.ReplaceAllString(fi.Name, "_")
|
||||
fi.Name += ext
|
||||
|
||||
sha1sum := fmt.Sprintf("%x", sha1.Sum(*fi.Data))[:8] //nolint:gosec
|
||||
|
||||
if gw.BridgeValues().General.MediaServerUpload != "" {
|
||||
// Use MediaServerUpload. Upload using a PUT HTTP request and basicauth.
|
||||
|
||||
url := gw.BridgeValues().General.MediaServerUpload + "/" + sha1sum + "/" + fi.Name
|
||||
|
||||
req, err := http.NewRequest("PUT", url, bytes.NewReader(*fi.Data))
|
||||
if err != nil {
|
||||
flog.Errorf("mediaserver upload failed, could not create request: %#v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
flog.Debugf("mediaserver upload url: %s", url)
|
||||
|
||||
req.Header.Set("Content-Type", "binary/octet-stream")
|
||||
_, err = client.Do(req)
|
||||
if err != nil {
|
||||
flog.Errorf("mediaserver upload failed, could not Do request: %#v", err)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Use MediaServerPath. Place the file on the current filesystem.
|
||||
|
||||
dir := gw.BridgeValues().General.MediaDownloadPath + "/" + sha1sum
|
||||
err := os.Mkdir(dir, os.ModePerm)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
flog.Errorf("mediaserver path failed, could not mkdir: %s %#v", err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
path := dir + "/" + fi.Name
|
||||
flog.Debugf("mediaserver path placing file: %s", path)
|
||||
|
||||
err = ioutil.WriteFile(path, *fi.Data, os.ModePerm)
|
||||
if err != nil {
|
||||
flog.Errorf("mediaserver path failed, could not writefile: %s %#v", err, err)
|
||||
continue
|
||||
}
|
||||
// SendMessage sends a message (with specified parentID) to the channel on the selected destination bridge.
|
||||
// returns a message id and error.
|
||||
func (gw *Gateway) SendMessage(origmsg config.Message, dest *bridge.Bridge, channel config.ChannelInfo, canonicalParentMsgID string) (string, error) {
|
||||
msg := origmsg
|
||||
// Only send the avatar download event to ourselves.
|
||||
if msg.Event == config.EventAvatarDownload {
|
||||
if channel.ID != getChannelID(origmsg) {
|
||||
return "", nil
|
||||
}
|
||||
} else {
|
||||
// do not send to ourself for any other event
|
||||
if channel.ID == getChannelID(origmsg) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// Download URL.
|
||||
durl := gw.BridgeValues().General.MediaServerDownload + "/" + sha1sum + "/" + fi.Name
|
||||
|
||||
flog.Debugf("mediaserver download URL = %s", durl)
|
||||
|
||||
// We uploaded/placed the file successfully. Add the SHA and URL.
|
||||
extra := msg.Extra["file"][i].(config.FileInfo)
|
||||
extra.URL = durl
|
||||
extra.SHA = sha1sum
|
||||
msg.Extra["file"][i] = extra
|
||||
}
|
||||
|
||||
// Too noisy to log like other events
|
||||
if msg.Event != config.EventUserTyping {
|
||||
flog.Debugf("=> Sending %#v from %s (%s) to %s (%s)", msg, msg.Account, origmsg.Channel, dest.Account, channel.Name)
|
||||
}
|
||||
|
||||
msg.Channel = channel.Name
|
||||
msg.Avatar = gw.modifyAvatar(origmsg, dest)
|
||||
msg.Username = gw.modifyUsername(origmsg, dest)
|
||||
|
||||
msg.ID = gw.getDestMsgID(origmsg.Protocol+" "+origmsg.ID, dest, channel)
|
||||
|
||||
// for api we need originchannel as channel
|
||||
if dest.Protocol == apiProtocol {
|
||||
msg.Channel = origmsg.Channel
|
||||
}
|
||||
|
||||
msg.ParentID = gw.getDestMsgID(origmsg.Protocol+" "+canonicalParentMsgID, dest, channel)
|
||||
if msg.ParentID == "" {
|
||||
msg.ParentID = canonicalParentMsgID
|
||||
}
|
||||
|
||||
// if we are using mattermost plugin account, send messages to MattermostPlugin channel
|
||||
// that can be picked up by the mattermost matterbridge plugin
|
||||
if dest.Account == "mattermost.plugin" {
|
||||
gw.Router.MattermostPlugin <- msg
|
||||
}
|
||||
|
||||
mID, err := dest.Send(msg)
|
||||
if err != nil {
|
||||
return mID, err
|
||||
}
|
||||
|
||||
// append the message ID (mID) from this bridge (dest) to our brMsgIDs slice
|
||||
if mID != "" {
|
||||
flog.Debugf("mID %s: %s", dest.Account, mID)
|
||||
return mID, nil
|
||||
//brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + mID, channel.ID})
|
||||
}
|
||||
return "", nil
|
||||
}
|
||||
|
||||
func (gw *Gateway) validGatewayDest(msg *config.Message) bool {
|
||||
|
@ -387,3 +387,116 @@ func TestGetDestChannelAdvanced(t *testing.T) {
|
||||
}
|
||||
assert.Equal(t, map[string]int{"bridge3": 4, "bridge": 9, "announcements": 3, "bridge2": 4}, hits)
|
||||
}
|
||||
|
||||
func TestIgnoreTextEmpty(t *testing.T) {
|
||||
extraFile := make(map[string][]interface{})
|
||||
extraAttach := make(map[string][]interface{})
|
||||
extraFailure := make(map[string][]interface{})
|
||||
extraFile["file"] = append(extraFile["file"], config.FileInfo{})
|
||||
extraAttach["attachments"] = append(extraAttach["attachments"], []string{})
|
||||
extraFailure[config.EventFileFailureSize] = append(extraFailure[config.EventFileFailureSize], config.FileInfo{})
|
||||
|
||||
msgTests := map[string]struct {
|
||||
input *config.Message
|
||||
output bool
|
||||
}{
|
||||
"usertyping": {
|
||||
input: &config.Message{Event: config.EventUserTyping},
|
||||
output: false,
|
||||
},
|
||||
"file attach": {
|
||||
input: &config.Message{Extra: extraFile},
|
||||
output: false,
|
||||
},
|
||||
"attachments": {
|
||||
input: &config.Message{Extra: extraAttach},
|
||||
output: false,
|
||||
},
|
||||
config.EventFileFailureSize: {
|
||||
input: &config.Message{Extra: extraFailure},
|
||||
output: false,
|
||||
},
|
||||
"nil extra": {
|
||||
input: &config.Message{Extra: nil},
|
||||
output: true,
|
||||
},
|
||||
"empty": {
|
||||
input: &config.Message{},
|
||||
output: true,
|
||||
},
|
||||
}
|
||||
gw := &Gateway{}
|
||||
for testname, testcase := range msgTests {
|
||||
output := gw.ignoreTextEmpty(testcase.input)
|
||||
assert.Equalf(t, testcase.output, output, "case '%s' failed", testname)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestIgnoreTexts(t *testing.T) {
|
||||
msgTests := map[string]struct {
|
||||
input *config.Message
|
||||
re []string
|
||||
output bool
|
||||
}{
|
||||
"no regex": {
|
||||
input: &config.Message{Text: "a text message"},
|
||||
re: []string{},
|
||||
output: false,
|
||||
},
|
||||
"simple regex": {
|
||||
input: &config.Message{Text: "a text message"},
|
||||
re: []string{"text"},
|
||||
output: true,
|
||||
},
|
||||
"multiple regex fail": {
|
||||
input: &config.Message{Text: "a text message"},
|
||||
re: []string{"abc", "123$"},
|
||||
output: false,
|
||||
},
|
||||
"multiple regex pass": {
|
||||
input: &config.Message{Text: "a text message"},
|
||||
re: []string{"lala", "sage$"},
|
||||
output: true,
|
||||
},
|
||||
}
|
||||
gw := &Gateway{}
|
||||
for testname, testcase := range msgTests {
|
||||
output := gw.ignoreTexts(testcase.input, testcase.re)
|
||||
assert.Equalf(t, testcase.output, output, "case '%s' failed", testname)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIgnoreNicks(t *testing.T) {
|
||||
msgTests := map[string]struct {
|
||||
input *config.Message
|
||||
re []string
|
||||
output bool
|
||||
}{
|
||||
"no entry": {
|
||||
input: &config.Message{Username: "user", Text: "a text message"},
|
||||
re: []string{},
|
||||
output: false,
|
||||
},
|
||||
"one entry": {
|
||||
input: &config.Message{Username: "user", Text: "a text message"},
|
||||
re: []string{"user"},
|
||||
output: true,
|
||||
},
|
||||
"multiple entries": {
|
||||
input: &config.Message{Username: "user", Text: "a text message"},
|
||||
re: []string{"abc", "user"},
|
||||
output: true,
|
||||
},
|
||||
"multiple entries fail": {
|
||||
input: &config.Message{Username: "user", Text: "a text message"},
|
||||
re: []string{"abc", "def"},
|
||||
output: false,
|
||||
},
|
||||
}
|
||||
gw := &Gateway{}
|
||||
for testname, testcase := range msgTests {
|
||||
output := gw.ignoreNicks(testcase.input, testcase.re)
|
||||
assert.Equalf(t, testcase.output, output, "case '%s' failed", testname)
|
||||
}
|
||||
}
|
||||
|
210
gateway/handlers.go
Normal file
210
gateway/handlers.go
Normal file
@ -0,0 +1,210 @@
|
||||
package gateway
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha1" //nolint:gosec
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"github.com/42wim/matterbridge/bridge"
|
||||
"github.com/42wim/matterbridge/bridge/config"
|
||||
)
|
||||
|
||||
// handleEventFailure handles failures and reconnects bridges.
|
||||
func (r *Router) handleEventFailure(msg *config.Message) {
|
||||
if msg.Event != config.EventFailure {
|
||||
return
|
||||
}
|
||||
for _, gw := range r.Gateways {
|
||||
for _, br := range gw.Bridges {
|
||||
if msg.Account == br.Account {
|
||||
go gw.reconnectBridge(br)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleEventRejoinChannels handles rejoining of channels.
|
||||
func (r *Router) handleEventRejoinChannels(msg *config.Message) {
|
||||
if msg.Event != config.EventRejoinChannels {
|
||||
return
|
||||
}
|
||||
for _, gw := range r.Gateways {
|
||||
for _, br := range gw.Bridges {
|
||||
if msg.Account == br.Account {
|
||||
br.Joined = make(map[string]bool)
|
||||
if err := br.JoinChannels(); err != nil {
|
||||
flog.Errorf("channel join failed for %s: %s", msg.Account, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleFiles uploads or places all files on the given msg to the MediaServer and
|
||||
// adds the new URL of the file on the MediaServer onto the given msg.
|
||||
func (gw *Gateway) handleFiles(msg *config.Message) {
|
||||
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
|
||||
|
||||
// If we don't have a attachfield or we don't have a mediaserver configured return
|
||||
if msg.Extra == nil ||
|
||||
(gw.BridgeValues().General.MediaServerUpload == "" &&
|
||||
gw.BridgeValues().General.MediaDownloadPath == "") {
|
||||
return
|
||||
}
|
||||
|
||||
// If we don't have files, nothing to upload.
|
||||
if len(msg.Extra["file"]) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
for i, f := range msg.Extra["file"] {
|
||||
fi := f.(config.FileInfo)
|
||||
ext := filepath.Ext(fi.Name)
|
||||
fi.Name = fi.Name[0 : len(fi.Name)-len(ext)]
|
||||
fi.Name = reg.ReplaceAllString(fi.Name, "_")
|
||||
fi.Name += ext
|
||||
|
||||
sha1sum := fmt.Sprintf("%x", sha1.Sum(*fi.Data))[:8] //nolint:gosec
|
||||
|
||||
if gw.BridgeValues().General.MediaServerUpload != "" {
|
||||
// Use MediaServerUpload. Upload using a PUT HTTP request and basicauth.
|
||||
if err := gw.handleFilesUpload(&fi); err != nil {
|
||||
flog.Error(err)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Use MediaServerPath. Place the file on the current filesystem.
|
||||
if err := gw.handleFilesLocal(&fi); err != nil {
|
||||
flog.Error(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Download URL.
|
||||
durl := gw.BridgeValues().General.MediaServerDownload + "/" + sha1sum + "/" + fi.Name
|
||||
|
||||
flog.Debugf("mediaserver download URL = %s", durl)
|
||||
|
||||
// We uploaded/placed the file successfully. Add the SHA and URL.
|
||||
extra := msg.Extra["file"][i].(config.FileInfo)
|
||||
extra.URL = durl
|
||||
extra.SHA = sha1sum
|
||||
msg.Extra["file"][i] = extra
|
||||
}
|
||||
}
|
||||
|
||||
// handleFilesUpload uses MediaServerUpload configuration to upload the file.
|
||||
// Returns error on failure.
|
||||
func (gw *Gateway) handleFilesUpload(fi *config.FileInfo) error {
|
||||
client := &http.Client{
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
// Use MediaServerUpload. Upload using a PUT HTTP request and basicauth.
|
||||
sha1sum := fmt.Sprintf("%x", sha1.Sum(*fi.Data))[:8] //nolint:gosec
|
||||
url := gw.BridgeValues().General.MediaServerUpload + "/" + sha1sum + "/" + fi.Name
|
||||
|
||||
req, err := http.NewRequest("PUT", url, bytes.NewReader(*fi.Data))
|
||||
if err != nil {
|
||||
return fmt.Errorf("mediaserver upload failed, could not create request: %#v", err)
|
||||
}
|
||||
|
||||
flog.Debugf("mediaserver upload url: %s", url)
|
||||
|
||||
req.Header.Set("Content-Type", "binary/octet-stream")
|
||||
_, err = client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mediaserver upload failed, could not Do request: %#v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleFilesLocal use MediaServerPath configuration, places the file on the current filesystem.
|
||||
// Returns error on failure.
|
||||
func (gw *Gateway) handleFilesLocal(fi *config.FileInfo) error {
|
||||
sha1sum := fmt.Sprintf("%x", sha1.Sum(*fi.Data))[:8] //nolint:gosec
|
||||
dir := gw.BridgeValues().General.MediaDownloadPath + "/" + sha1sum
|
||||
err := os.Mkdir(dir, os.ModePerm)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
return fmt.Errorf("mediaserver path failed, could not mkdir: %s %#v", err, err)
|
||||
}
|
||||
|
||||
path := dir + "/" + fi.Name
|
||||
flog.Debugf("mediaserver path placing file: %s", path)
|
||||
|
||||
err = ioutil.WriteFile(path, *fi.Data, os.ModePerm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mediaserver path failed, could not writefile: %s %#v", err, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ignoreEvent returns true if we need to ignore this event for the specified destination bridge.
|
||||
func (gw *Gateway) ignoreEvent(event string, dest *bridge.Bridge) bool {
|
||||
switch event {
|
||||
case config.EventAvatarDownload:
|
||||
// Avatar downloads are only relevant for telegram and mattermost for now
|
||||
if dest.Protocol != "mattermost" && dest.Protocol != "telegram" {
|
||||
return true
|
||||
}
|
||||
case config.EventJoinLeave:
|
||||
// only relay join/part when configured
|
||||
if !dest.GetBool("ShowJoinPart") {
|
||||
return true
|
||||
}
|
||||
case config.EventTopicChange:
|
||||
// only relay topic change when used in some way on other side
|
||||
if dest.GetBool("ShowTopicChange") && dest.GetBool("SyncTopic") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// handleMessage makes sure the message get sent to the correct bridge/channels.
|
||||
// Returns an array of msg ID's
|
||||
func (gw *Gateway) handleMessage(msg config.Message, dest *bridge.Bridge) []*BrMsgID {
|
||||
var brMsgIDs []*BrMsgID
|
||||
|
||||
// if we have an attached file, or other info
|
||||
if msg.Extra != nil && len(msg.Extra[config.EventFileFailureSize]) != 0 && msg.Text == "" {
|
||||
return brMsgIDs
|
||||
}
|
||||
|
||||
if gw.ignoreEvent(msg.Event, dest) {
|
||||
return brMsgIDs
|
||||
}
|
||||
|
||||
// broadcast to every out channel (irc QUIT)
|
||||
if msg.Channel == "" && msg.Event != config.EventJoinLeave {
|
||||
flog.Debug("empty channel")
|
||||
return brMsgIDs
|
||||
}
|
||||
|
||||
// Get the ID of the parent message in thread
|
||||
var canonicalParentMsgID string
|
||||
if msg.ParentID != "" && dest.GetBool("PreserveThreading") {
|
||||
canonicalParentMsgID = gw.FindCanonicalMsgID(msg.Protocol, msg.ParentID)
|
||||
}
|
||||
|
||||
origmsg := msg
|
||||
channels := gw.getDestChannel(&msg, *dest)
|
||||
for _, channel := range channels {
|
||||
msgID, err := gw.SendMessage(origmsg, dest, channel, canonicalParentMsgID)
|
||||
if err != nil {
|
||||
flog.Errorf("SendMessage failed: %s", err)
|
||||
continue
|
||||
}
|
||||
if msgID == "" {
|
||||
continue
|
||||
}
|
||||
brMsgIDs = append(brMsgIDs, &BrMsgID{dest, dest.Protocol + " " + msgID, channel.ID})
|
||||
}
|
||||
return brMsgIDs
|
||||
}
|
@ -108,41 +108,23 @@ func (r *Router) getBridge(account string) *bridge.Bridge {
|
||||
func (r *Router) handleReceive() {
|
||||
for msg := range r.Message {
|
||||
msg := msg // scopelint
|
||||
if msg.Event == config.EventFailure {
|
||||
Loop:
|
||||
for _, gw := range r.Gateways {
|
||||
for _, br := range gw.Bridges {
|
||||
if msg.Account == br.Account {
|
||||
go gw.reconnectBridge(br)
|
||||
break Loop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if msg.Event == config.EventRejoinChannels {
|
||||
for _, gw := range r.Gateways {
|
||||
for _, br := range gw.Bridges {
|
||||
if msg.Account == br.Account {
|
||||
br.Joined = make(map[string]bool)
|
||||
br.JoinChannels()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
r.handleEventFailure(&msg)
|
||||
r.handleEventRejoinChannels(&msg)
|
||||
for _, gw := range r.Gateways {
|
||||
// record all the message ID's of the different bridges
|
||||
var msgIDs []*BrMsgID
|
||||
if !gw.ignoreMessage(&msg) {
|
||||
msg.Timestamp = time.Now()
|
||||
gw.modifyMessage(&msg)
|
||||
gw.handleFiles(&msg)
|
||||
for _, br := range gw.Bridges {
|
||||
msgIDs = append(msgIDs, gw.handleMessage(msg, br)...)
|
||||
}
|
||||
// only add the message ID if it doesn't already exists
|
||||
if _, ok := gw.Messages.Get(msg.Protocol + " " + msg.ID); !ok && msg.ID != "" {
|
||||
gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
|
||||
}
|
||||
if gw.ignoreMessage(&msg) {
|
||||
continue
|
||||
}
|
||||
msg.Timestamp = time.Now()
|
||||
gw.modifyMessage(&msg)
|
||||
gw.handleFiles(&msg)
|
||||
for _, br := range gw.Bridges {
|
||||
msgIDs = append(msgIDs, gw.handleMessage(msg, br)...)
|
||||
}
|
||||
// only add the message ID if it doesn't already exists
|
||||
if _, ok := gw.Messages.Get(msg.Protocol + " " + msg.ID); !ok && msg.ID != "" {
|
||||
gw.Messages.Add(msg.Protocol+" "+msg.ID, msgIDs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user