Refactor channel and user management (slack) (#766)

This commit is contained in:
Duco van Amstel 2019-03-15 20:23:09 +00:00 committed by Wim
parent b638f7037a
commit fb2667631d
4 changed files with 138 additions and 94 deletions

View File

@ -34,7 +34,7 @@ func (b *Bslack) handleSlack() {
message.Text = html.UnescapeString(message.Text) message.Text = html.UnescapeString(message.Text)
// Add the avatar // Add the avatar
message.Avatar = b.getAvatar(message.UserID) message.Avatar = b.users.getAvatar(message.UserID)
b.Log.Debugf("<= Message is %#v", message) b.Log.Debugf("<= Message is %#v", message)
b.Remote <- *message b.Remote <- *message
@ -75,20 +75,17 @@ func (b *Bslack) handleSlackClient(messages chan *config.Message) {
// When we join a channel we update the full list of users as // When we join a channel we update the full list of users as
// well as the information for the channel that we joined as this // well as the information for the channel that we joined as this
// should now tell that we are a member of it. // should now tell that we are a member of it.
b.channelsMutex.Lock() b.channels.registerChannel(ev.Channel)
b.channelsByID[ev.Channel.ID] = &ev.Channel
b.channelsByName[ev.Channel.Name] = &ev.Channel
b.channelsMutex.Unlock()
case *slack.ConnectedEvent: case *slack.ConnectedEvent:
b.si = ev.Info b.si = ev.Info
b.populateChannels(true) b.channels.populateChannels(true)
b.populateUsers(true) b.users.populateUsers(true)
case *slack.InvalidAuthEvent: case *slack.InvalidAuthEvent:
b.Log.Fatalf("Invalid Token %#v", ev) b.Log.Fatalf("Invalid Token %#v", ev)
case *slack.ConnectionErrorEvent: case *slack.ConnectionErrorEvent:
b.Log.Errorf("Connection failed %#v %#v", ev.Error(), ev.ErrorObj) b.Log.Errorf("Connection failed %#v %#v", ev.Error(), ev.ErrorObj)
case *slack.MemberJoinedChannelEvent: case *slack.MemberJoinedChannelEvent:
b.populateUser(ev.User) b.users.populateUser(ev.User)
case *slack.LatencyReport: case *slack.LatencyReport:
continue continue
default: default:
@ -210,7 +207,7 @@ func (b *Bslack) handleStatusEvent(ev *slack.MessageEvent, rmsg *config.Message)
rmsg.Username = sSystemUser rmsg.Username = sSystemUser
rmsg.Event = config.EventJoinLeave rmsg.Event = config.EventJoinLeave
case sChannelTopic, sChannelPurpose: case sChannelTopic, sChannelPurpose:
b.populateChannels(false) b.channels.populateChannels(false)
rmsg.Event = config.EventTopicChange rmsg.Event = config.EventTopicChange
case sMessageChanged: case sMessageChanged:
rmsg.Text = ev.SubMessage.Text rmsg.Text = ev.SubMessage.Text
@ -266,7 +263,7 @@ func (b *Bslack) handleAttachments(ev *slack.MessageEvent, rmsg *config.Message)
} }
func (b *Bslack) handleTypingEvent(ev *slack.UserTypingEvent) (*config.Message, error) { func (b *Bslack) handleTypingEvent(ev *slack.UserTypingEvent) (*config.Message, error) {
channelInfo, err := b.getChannelByID(ev.Channel) channelInfo, err := b.channels.getChannelByID(ev.Channel)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -316,36 +313,7 @@ func (b *Bslack) handleGetChannelMembers(rmsg *config.Message) bool {
return false return false
} }
cMembers := config.ChannelMembers{} cMembers := b.channels.getChannelMembers(b.users)
b.channelMembersMutex.RLock()
for channelID, members := range b.channelMembers {
for _, member := range members {
channelName := ""
userName := ""
userNick := ""
user := b.getUser(member)
if user != nil {
userName = user.Name
userNick = user.Profile.DisplayName
}
channel, _ := b.getChannelByID(channelID)
if channel != nil {
channelName = channel.Name
}
cMember := config.ChannelMember{
Username: userName,
Nick: userNick,
UserID: member,
ChannelID: channelID,
ChannelName: channelName,
}
cMembers = append(cMembers, cMember)
}
}
b.channelMembersMutex.RUnlock()
extra := make(map[string][]interface{}) extra := make(map[string][]interface{})
extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers) extra[config.EventGetChannelMembers] = append(extra[config.EventGetChannelMembers], cMembers)

View File

@ -15,7 +15,7 @@ import (
// router before we apply message-dependent modifications. // router before we apply message-dependent modifications.
func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) { func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) {
// Use our own func because rtm.GetChannelInfo doesn't work for private channels. // Use our own func because rtm.GetChannelInfo doesn't work for private channels.
channel, err := b.getChannelByID(ev.Channel) channel, err := b.channels.getChannelByID(ev.Channel)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -77,7 +77,7 @@ func (b *Bslack) populateMessageWithUserInfo(ev *slack.MessageEvent, rmsg *confi
return nil return nil
} }
user := b.getUser(userID) user := b.users.getUser(userID)
if user == nil { if user == nil {
return fmt.Errorf("could not find information for user with id %s", ev.User) return fmt.Errorf("could not find information for user with id %s", ev.User)
} }
@ -148,7 +148,7 @@ func (b *Bslack) extractTopicOrPurpose(text string) (string, string) {
func (b *Bslack) replaceMention(text string) string { func (b *Bslack) replaceMention(text string) string {
replaceFunc := func(match string) string { replaceFunc := func(match string) string {
userID := strings.Trim(match, "@<>") userID := strings.Trim(match, "@<>")
if username := b.getUsername(userID); userID != "" { if username := b.users.getUsername(userID); userID != "" {
return "@" + username return "@" + username
} }
return match return match

View File

@ -30,20 +30,8 @@ type Bslack struct {
uuid string uuid string
useChannelID bool useChannelID bool
users map[string]*slack.User channels *channels
usersMutex sync.RWMutex users *users
channelsByID map[string]*slack.Channel
channelsByName map[string]*slack.Channel
channelsMutex sync.RWMutex
channelMembers map[string][]string
channelMembersMutex sync.RWMutex
refreshInProgress bool
earliestChannelRefresh time.Time
earliestUserRefresh time.Time
refreshMutex sync.Mutex
} }
const ( const (
@ -94,14 +82,9 @@ func newBridge(cfg *bridge.Config) *Bslack {
cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err) cfg.Log.Fatalf("Could not create LRU cache for Slack bridge: %v", err)
} }
b := &Bslack{ b := &Bslack{
Config: cfg, Config: cfg,
uuid: xid.New().String(), uuid: xid.New().String(),
cache: newCache, cache: newCache,
users: map[string]*slack.User{},
channelsByID: map[string]*slack.Channel{},
channelsByName: map[string]*slack.Channel{},
earliestChannelRefresh: time.Now(),
earliestUserRefresh: time.Now(),
} }
return b return b
} }
@ -121,7 +104,12 @@ func (b *Bslack) Connect() error {
// If we have a token we use the Slack websocket-based RTM for both sending and receiving. // If we have a token we use the Slack websocket-based RTM for both sending and receiving.
if token := b.GetString(tokenConfig); token != "" { if token := b.GetString(tokenConfig); token != "" {
b.Log.Info("Connecting using token") b.Log.Info("Connecting using token")
b.sc = slack.New(token, slack.OptionDebug(b.GetBool("Debug"))) b.sc = slack.New(token, slack.OptionDebug(b.GetBool("Debug")))
b.channels = newChannelManager(b.Log, b.sc)
b.users = newUserManager(b.Log, b.sc)
b.rtm = b.sc.NewRTM() b.rtm = b.sc.NewRTM()
go b.rtm.ManageConnection() go b.rtm.ManageConnection()
go b.handleSlack() go b.handleSlack()
@ -163,9 +151,9 @@ func (b *Bslack) JoinChannel(channel config.ChannelInfo) error {
return nil return nil
} }
b.populateChannels(false) b.channels.populateChannels(false)
channelInfo, err := b.getChannel(channel.Name) channelInfo, err := b.channels.getChannel(channel.Name)
if err != nil { if err != nil {
return fmt.Errorf("could not join channel: %#v", err) return fmt.Errorf("could not join channel: %#v", err)
} }
@ -275,7 +263,7 @@ func (b *Bslack) sendRTM(msg config.Message) (string, error) {
return "", nil return "", nil
} }
channelInfo, err := b.getChannel(msg.Channel) channelInfo, err := b.channels.getChannel(msg.Channel)
if err != nil { if err != nil {
return "", fmt.Errorf("could not send message: %v", err) return "", fmt.Errorf("could not send message: %v", err)
} }

View File

@ -4,14 +4,38 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"github.com/42wim/matterbridge/bridge/config"
"github.com/nlopes/slack" "github.com/nlopes/slack"
"github.com/sirupsen/logrus"
) )
const minimumRefreshInterval = 10 * time.Second const minimumRefreshInterval = 10 * time.Second
func (b *Bslack) getUser(id string) *slack.User { type users struct {
log *logrus.Entry
sc *slack.Client
users map[string]*slack.User
usersMutex sync.RWMutex
refreshInProgress bool
earliestRefresh time.Time
refreshMutex sync.Mutex
}
func newUserManager(log *logrus.Entry, sc *slack.Client) *users {
return &users{
log: log,
sc: sc,
users: make(map[string]*slack.User),
earliestRefresh: time.Now(),
}
}
func (b *users) getUser(id string) *slack.User {
b.usersMutex.RLock() b.usersMutex.RLock()
user, ok := b.users[id] user, ok := b.users[id]
b.usersMutex.RUnlock() b.usersMutex.RUnlock()
@ -25,25 +49,25 @@ func (b *Bslack) getUser(id string) *slack.User {
return b.users[id] return b.users[id]
} }
func (b *Bslack) getUsername(id string) string { func (b *users) getUsername(id string) string {
if user := b.getUser(id); user != nil { if user := b.getUser(id); user != nil {
if user.Profile.DisplayName != "" { if user.Profile.DisplayName != "" {
return user.Profile.DisplayName return user.Profile.DisplayName
} }
return user.Name return user.Name
} }
b.Log.Warnf("Could not find user with ID '%s'", id) b.log.Warnf("Could not find user with ID '%s'", id)
return "" return ""
} }
func (b *Bslack) getAvatar(id string) string { func (b *users) getAvatar(id string) string {
if user := b.getUser(id); user != nil { if user := b.getUser(id); user != nil {
return user.Profile.Image48 return user.Profile.Image48
} }
return "" return ""
} }
func (b *Bslack) populateUser(userID string) { func (b *users) populateUser(userID string) {
b.usersMutex.RLock() b.usersMutex.RLock()
_, exists := b.users[userID] _, exists := b.users[userID]
b.usersMutex.RUnlock() b.usersMutex.RUnlock()
@ -54,7 +78,7 @@ func (b *Bslack) populateUser(userID string) {
user, err := b.sc.GetUserInfo(userID) user, err := b.sc.GetUserInfo(userID)
if err != nil { if err != nil {
b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err) b.log.Debugf("GetUserInfo failed for %v: %v", userID, err)
return return
} }
@ -63,10 +87,10 @@ func (b *Bslack) populateUser(userID string) {
b.usersMutex.Unlock() b.usersMutex.Unlock()
} }
func (b *Bslack) populateUsers(wait bool) { func (b *users) populateUsers(wait bool) {
b.refreshMutex.Lock() b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) { if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing user list as it was done less than %v ago.", b.log.Debugf("Not refreshing user list as it was done less than %v ago.",
minimumRefreshInterval) minimumRefreshInterval)
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
@ -92,8 +116,8 @@ func (b *Bslack) populateUsers(wait bool) {
break break
} }
if err = handleRateLimit(b.Log, err); err != nil { if err = handleRateLimit(b.log, err); err != nil {
b.Log.Errorf("Could not retrieve users: %#v", err) b.log.Errorf("Could not retrieve users: %#v", err)
return return
} }
continue continue
@ -102,11 +126,11 @@ func (b *Bslack) populateUsers(wait bool) {
for i := range pagination.Users { for i := range pagination.Users {
newUsers[pagination.Users[i].ID] = &pagination.Users[i] newUsers[pagination.Users[i].ID] = &pagination.Users[i]
} }
b.Log.Debugf("getting %d users", len(pagination.Users)) b.log.Debugf("getting %d users", len(pagination.Users))
count++ count++
// more > 2000 users, slack will complain and ratelimit. break // more > 2000 users, slack will complain and ratelimit. break
if count > 10 { if count > 10 {
b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.") b.log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
break break
} }
} }
@ -117,40 +141,104 @@ func (b *Bslack) populateUsers(wait bool) {
b.refreshMutex.Lock() b.refreshMutex.Lock()
defer b.refreshMutex.Unlock() defer b.refreshMutex.Unlock()
b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval) b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false b.refreshInProgress = false
} }
func (b *Bslack) getChannel(channel string) (*slack.Channel, error) { type channels struct {
log *logrus.Entry
sc *slack.Client
channelsByID map[string]*slack.Channel
channelsByName map[string]*slack.Channel
channelsMutex sync.RWMutex
channelMembers map[string][]string
channelMembersMutex sync.RWMutex
refreshInProgress bool
earliestRefresh time.Time
refreshMutex sync.Mutex
}
func newChannelManager(log *logrus.Entry, sc *slack.Client) *channels {
return &channels{
log: log,
sc: sc,
channelsByID: make(map[string]*slack.Channel),
channelsByName: make(map[string]*slack.Channel),
earliestRefresh: time.Now(),
}
}
func (b *channels) getChannel(channel string) (*slack.Channel, error) {
if strings.HasPrefix(channel, "ID:") { if strings.HasPrefix(channel, "ID:") {
return b.getChannelByID(strings.TrimPrefix(channel, "ID:")) return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
} }
return b.getChannelByName(channel) return b.getChannelByName(channel)
} }
func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) { func (b *channels) getChannelByName(name string) (*slack.Channel, error) {
return b.getChannelBy(name, b.channelsByName) return b.getChannelBy(name, b.channelsByName)
} }
func (b *Bslack) getChannelByID(id string) (*slack.Channel, error) { func (b *channels) getChannelByID(id string) (*slack.Channel, error) {
return b.getChannelBy(id, b.channelsByID) return b.getChannelBy(id, b.channelsByID)
} }
func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) { func (b *channels) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
b.channelsMutex.RLock() b.channelsMutex.RLock()
defer b.channelsMutex.RUnlock() defer b.channelsMutex.RUnlock()
if channel, ok := lookupMap[lookupKey]; ok { if channel, ok := lookupMap[lookupKey]; ok {
return channel, nil return channel, nil
} }
return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey) return nil, fmt.Errorf("channel %s not found", lookupKey)
} }
func (b *Bslack) populateChannels(wait bool) { func (b *channels) getChannelMembers(users *users) config.ChannelMembers {
b.channelMembersMutex.RLock()
defer b.channelMembersMutex.RUnlock()
membersInfo := config.ChannelMembers{}
for channelID, members := range b.channelMembers {
for _, member := range members {
channelName := ""
userName := ""
userNick := ""
user := users.getUser(member)
if user != nil {
userName = user.Name
userNick = user.Profile.DisplayName
}
channel, _ := b.getChannelByID(channelID)
if channel != nil {
channelName = channel.Name
}
memberInfo := config.ChannelMember{
Username: userName,
Nick: userNick,
UserID: member,
ChannelID: channelID,
ChannelName: channelName,
}
membersInfo = append(membersInfo, memberInfo)
}
}
return membersInfo
}
func (b *channels) registerChannel(channel slack.Channel) {
b.channelsMutex.Lock()
b.channelsByID[channel.ID] = &channel
b.channelsByName[channel.Name] = &channel
b.channelsMutex.Unlock()
}
func (b *channels) populateChannels(wait bool) {
b.refreshMutex.Lock() b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) { if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", b.log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.", minimumRefreshInterval)
minimumRefreshInterval)
b.refreshMutex.Unlock() b.refreshMutex.Unlock()
return return
} }
@ -175,8 +263,8 @@ func (b *Bslack) populateChannels(wait bool) {
for { for {
channels, nextCursor, err := b.sc.GetConversations(queryParams) channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil { if err != nil {
if err = handleRateLimit(b.Log, err); err != nil { if err = handleRateLimit(b.log, err); err != nil {
b.Log.Errorf("Could not retrieve channels: %#v", err) b.log.Errorf("Could not retrieve channels: %#v", err)
return return
} }
continue continue
@ -217,6 +305,6 @@ func (b *Bslack) populateChannels(wait bool) {
b.refreshMutex.Lock() b.refreshMutex.Lock()
defer b.refreshMutex.Unlock() defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval) b.earliestRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false b.refreshInProgress = false
} }