Add initial support for getting ChannelMember info of all bridges (#678)

* Add initial support for getting ChannelMember info of all bridges.

Adds an EventGetChannelMembers event, which gets send every x time to
all bridges. Bridges should respond on this event with a Message
containing ChannelMembers in the EventGetChannelMembers key in the
Extra field.

handleEventGetChannelMembers will handle this Message and sets the
contained ChannelMembers to the Bridge struct.

* Add ChannelMembers support to the slack bridge
This commit is contained in:
Wim 2019-01-18 18:35:31 +01:00 committed by GitHub
parent d99eacc2e1
commit fb713ed91b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 189 additions and 21 deletions

View File

@ -5,6 +5,7 @@ import (
"github.com/42wim/matterbridge/bridge/config" "github.com/42wim/matterbridge/bridge/config"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"sync"
) )
type Bridger interface { type Bridger interface {
@ -16,14 +17,16 @@ type Bridger interface {
type Bridge struct { type Bridge struct {
Bridger Bridger
Name string Name string
Account string Account string
Protocol string Protocol string
Channels map[string]config.ChannelInfo Channels map[string]config.ChannelInfo
Joined map[string]bool Joined map[string]bool
Log *logrus.Entry ChannelMembers *config.ChannelMembers
Config config.Config Log *logrus.Entry
General *config.Protocol Config config.Config
General *config.Protocol
*sync.RWMutex
} }
type Config struct { type Config struct {
@ -37,15 +40,17 @@ type Config struct {
type Factory func(*Config) Bridger type Factory func(*Config) Bridger
func New(bridge *config.Bridge) *Bridge { func New(bridge *config.Bridge) *Bridge {
b := new(Bridge) b := &Bridge{
b.Channels = make(map[string]config.ChannelInfo) Channels: make(map[string]config.ChannelInfo),
RWMutex: new(sync.RWMutex),
Joined: make(map[string]bool),
}
accInfo := strings.Split(bridge.Account, ".") accInfo := strings.Split(bridge.Account, ".")
protocol := accInfo[0] protocol := accInfo[0]
name := accInfo[1] name := accInfo[1]
b.Name = name b.Name = name
b.Protocol = protocol b.Protocol = protocol
b.Account = bridge.Account b.Account = bridge.Account
b.Joined = make(map[string]bool)
return b return b
} }
@ -54,6 +59,13 @@ func (b *Bridge) JoinChannels() error {
return err return err
} }
// SetChannelMembers sets the newMembers to the bridge ChannelMembers
func (b *Bridge) SetChannelMembers(newMembers *config.ChannelMembers) {
b.Lock()
b.ChannelMembers = newMembers
b.Unlock()
}
func (b *Bridge) joinChannels(channels map[string]config.ChannelInfo, exists map[string]bool) error { func (b *Bridge) joinChannels(channels map[string]config.ChannelInfo, exists map[string]bool) error {
for ID, channel := range channels { for ID, channel := range channels {
if !exists[ID] { if !exists[ID] {

View File

@ -14,16 +14,17 @@ import (
) )
const ( const (
EventJoinLeave = "join_leave" EventJoinLeave = "join_leave"
EventTopicChange = "topic_change" EventTopicChange = "topic_change"
EventFailure = "failure" EventFailure = "failure"
EventFileFailureSize = "file_failure_size" EventFileFailureSize = "file_failure_size"
EventAvatarDownload = "avatar_download" EventAvatarDownload = "avatar_download"
EventRejoinChannels = "rejoin_channels" EventRejoinChannels = "rejoin_channels"
EventUserAction = "user_action" EventUserAction = "user_action"
EventMsgDelete = "msg_delete" EventMsgDelete = "msg_delete"
EventAPIConnected = "api_connected" EventAPIConnected = "api_connected"
EventUserTyping = "user_typing" EventUserTyping = "user_typing"
EventGetChannelMembers = "get_channel_members"
) )
type Message struct { type Message struct {
@ -61,6 +62,16 @@ type ChannelInfo struct {
Options ChannelOptions Options ChannelOptions
} }
type ChannelMember struct {
Username string
Nick string
UserID string
ChannelID string
ChannelName string
}
type ChannelMembers []ChannelMember
type Protocol struct { type Protocol struct {
AuthCode string // steam AuthCode string // steam
BindAddress string // mattermost, slack // DEPRECATED BindAddress string // mattermost, slack // DEPRECATED

View File

@ -309,6 +309,58 @@ func (b *Bslack) handleDownloadFile(rmsg *config.Message, file *slack.File, retr
return nil return nil
} }
// handleGetChannelMembers handles messages containing the GetChannelMembers event
// Sends a message to the router containing *config.ChannelMembers
func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {
if rmsg.Event != config.EventGetChannelMembers {
return false
}
cMembers := config.ChannelMembers{}
b.channelMembersMutex.RLock()
for channelID, members := range b.channelMembers {
for _, member := range members {
channelName := ""
userName := ""
userNick := ""
user := b.getUser(member)
if user != nil {
userName = user.Name
userNick = user.Profile.DisplayName
}
channel, _ := b.getChannelByID(channelID)
if channel != nil {
channelName = channel.Name
}
cMember := config.ChannelMember{
Username: userName,
Nick: userNick,
UserID: member,
ChannelID: channelID,
ChannelName: channelName,
}
cMembers = append(cMembers, cMember)
}
}
b.channelMembersMutex.RUnlock()
extra := make(map[string][]interface{})
extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers)
msg := config.Message{
Extra: extra,
Event: config.EventGetChannelMembers,
Account: b.Account,
}
b.Log.Debugf("sending msg to remote %#v", msg)
b.Remote <- msg
return true
}
// fileCached implements Matterbridge's caching logic for files // fileCached implements Matterbridge's caching logic for files
// shared via Slack. // shared via Slack.
// //

View File

@ -93,7 +93,9 @@ func (b *Bslack) populateUsers(wait bool) {
return return
} }
for b.refreshInProgress { for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second) time.Sleep(time.Second)
b.refreshMutex.Lock()
} }
b.refreshInProgress = true b.refreshInProgress = true
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
@ -139,13 +141,16 @@ func (b *Bslack) populateChannels(wait bool) {
return return
} }
for b.refreshInProgress { for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second) time.Sleep(time.Second)
b.refreshMutex.Lock()
} }
b.refreshInProgress = true b.refreshInProgress = true
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
newChannelsByID := map[string]*slack.Channel{} newChannelsByID := map[string]*slack.Channel{}
newChannelsByName := map[string]*slack.Channel{} newChannelsByName := map[string]*slack.Channel{}
newChannelMembers := make(map[string][]string)
// We only retrieve public and private channels, not IMs // We only retrieve public and private channels, not IMs
// and MPIMs as those do not have a channel name. // and MPIMs as those do not have a channel name.
@ -166,7 +171,18 @@ func (b *Bslack) populateChannels(wait bool) {
for i := range channels { for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i] newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i] newChannelsByName[channels[i].Name] = &channels[i]
// also find all the members in every channel
members, err := b.getUsersInConversation(channels[i].ID)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve channel members: %#v", err)
return
}
continue
}
newChannelMembers[channels[i].ID] = members
} }
if nextCursor == "" { if nextCursor == "" {
break break
} }
@ -178,6 +194,10 @@ func (b *Bslack) populateChannels(wait bool) {
b.channelsByID = newChannelsByID b.channelsByID = newChannelsByID
b.channelsByName = newChannelsByName b.channelsByName = newChannelsByName
b.channelMembersMutex.Lock()
defer b.channelMembersMutex.Unlock()
b.channelMembers = newChannelMembers
b.refreshMutex.Lock() b.refreshMutex.Lock()
defer b.refreshMutex.Unlock() defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
@ -367,3 +387,29 @@ func (b *Bslack) handleRateLimit(err error) error {
time.Sleep(rateLimit.RetryAfter) time.Sleep(rateLimit.RetryAfter)
return nil return nil
} }
// getUsersInConversation returns an array of userIDs that are members of channelID
func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
channelMembers := []string{}
for {
queryParams := &slack.GetUsersInConversationParameters{
ChannelID: channelID,
}
members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
}
continue
}
channelMembers = append(channelMembers, members...)
if nextCursor == "" {
break
}
queryParams.Cursor = nextCursor
}
return channelMembers, nil
}

View File

@ -37,6 +37,9 @@ type Bslack struct {
channelsByName map[string]*slack.Channel channelsByName map[string]*slack.Channel
channelsMutex sync.RWMutex channelsMutex sync.RWMutex
channelMembers map[string][]string
channelMembersMutex sync.RWMutex
refreshInProgress bool refreshInProgress bool
earliestChannelRefresh time.Time earliestChannelRefresh time.Time
earliestUserRefresh time.Time earliestUserRefresh time.Time
@ -267,6 +270,11 @@ func (b *Bslack) sendWebhook(msg config.Message) error {
} }
func (b *Bslack) sendRTM(msg config.Message) (string, error) { func (b *Bslack) sendRTM(msg config.Message) (string, error) {
// Handle channelmember messages.
if handled := b.handleGetChannelMembers(&msg); handled {
return "", nil
}
channelInfo, err := b.getChannel(msg.Channel) channelInfo, err := b.getChannel(msg.Channel)
if err != nil { if err != nil {
return "", fmt.Errorf("could not send message: %v", err) return "", fmt.Errorf("could not send message: %v", err)

View File

@ -30,6 +30,23 @@ func (r *Router) handleEventFailure(msg *config.Message) {
} }
} }
// handleEventGetChannelMembers handles channel members
func (r *Router) handleEventGetChannelMembers(msg *config.Message) {
if msg.Event != config.EventGetChannelMembers {
return
}
for _, gw := range r.Gateways {
for _, br := range gw.Bridges {
if msg.Account == br.Account {
cMembers := msg.Extra[config.EventGetChannelMembers][0].(config.ChannelMembers)
flog.Debugf("Syncing channelmembers from %s", msg.Account)
br.SetChannelMembers(&cMembers)
return
}
}
}
}
// handleEventRejoinChannels handles rejoining of channels. // handleEventRejoinChannels handles rejoining of channels.
func (r *Router) handleEventRejoinChannels(msg *config.Message) { func (r *Router) handleEventRejoinChannels(msg *config.Message) {
if msg.Event != config.EventRejoinChannels { if msg.Event != config.EventRejoinChannels {

View File

@ -2,6 +2,7 @@ package gateway
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"github.com/42wim/matterbridge/bridge" "github.com/42wim/matterbridge/bridge"
@ -16,6 +17,7 @@ type Router struct {
Gateways map[string]*Gateway Gateways map[string]*Gateway
Message chan config.Message Message chan config.Message
MattermostPlugin chan config.Message MattermostPlugin chan config.Message
sync.RWMutex
} }
func NewRouter(cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) { func NewRouter(cfg config.Config, bridgeMap map[string]bridge.Factory) (*Router, error) {
@ -81,6 +83,7 @@ func (r *Router) Start() error {
} }
} }
go r.handleReceive() go r.handleReceive()
go r.updateChannelMembers()
return nil return nil
} }
@ -108,6 +111,7 @@ func (r *Router) getBridge(account string) *bridge.Bridge {
func (r *Router) handleReceive() { func (r *Router) handleReceive() {
for msg := range r.Message { for msg := range r.Message {
msg := msg // scopelint msg := msg // scopelint
r.handleEventGetChannelMembers(&msg)
r.handleEventFailure(&msg) r.handleEventFailure(&msg)
r.handleEventRejoinChannels(&msg) r.handleEventRejoinChannels(&msg)
for _, gw := range r.Gateways { for _, gw := range r.Gateways {
@ -129,3 +133,21 @@ func (r *Router) handleReceive() {
} }
} }
} }
// updateChannelMembers sends every minute an GetChannelMembers event to all bridges.
func (r *Router) updateChannelMembers() {
// TODO sleep a minute because slack can take a while
// fix this by having actually connectionDone events send to the router
time.Sleep(time.Minute)
for {
for _, gw := range r.Gateways {
for _, br := range gw.Bridges {
flog.Debugf("sending %s to %s", config.EventGetChannelMembers, br.Account)
if _, err := br.Send(config.Message{Event: config.EventGetChannelMembers}); err != nil {
flog.Errorf("updateChannelMembers: %s", err)
}
}
}
time.Sleep(time.Minute)
}
}