diff --git a/bridge/slack/users_channels.go b/bridge/slack/users_channels.go index 9a8568bd..8712d268 100644 --- a/bridge/slack/users_channels.go +++ b/bridge/slack/users_channels.go @@ -18,8 +18,9 @@ type users struct { log *logrus.Entry sc *slack.Client - users map[string]*slack.User - usersMutex sync.RWMutex + users map[string]*slack.User + usersMutex sync.RWMutex + usersSyncPoints map[string]chan struct{} refreshInProgress bool earliestRefresh time.Time @@ -31,6 +32,7 @@ func newUserManager(log *logrus.Entry, sc *slack.Client) *users { log: log, sc: sc, users: make(map[string]*slack.User), + usersSyncPoints: make(map[string]chan struct{}), earliestRefresh: time.Now(), } } @@ -68,14 +70,32 @@ func (b *users) getAvatar(id string) string { } func (b *users) populateUser(userID string) { - b.usersMutex.RLock() - _, exists := b.users[userID] - b.usersMutex.RUnlock() - if exists { - // already in cache - return + for { + b.usersMutex.Lock() + _, exists := b.users[userID] + if exists { + // already in cache + b.usersMutex.Unlock() + return + } + + if syncPoint, ok := b.usersSyncPoints[userID]; ok { + // Another goroutine is already populating this user for us so wait on it to finish. + b.usersMutex.Unlock() + <-syncPoint + // We do not return and iterate again to check that the entry does indeed exist + // in case the previous query failed for some reason. + } else { + b.usersSyncPoints[userID] = make(chan struct{}) + b.usersMutex.Unlock() + break + } } + // Do not hold the lock while fetching information from Slack + // as this might take an unbounded amount of time. + b.usersMutex.Unlock() + user, err := b.sc.GetUserInfo(userID) if err != nil { b.log.Debugf("GetUserInfo failed for %v: %v", userID, err) @@ -83,15 +103,20 @@ func (b *users) populateUser(userID string) { } b.usersMutex.Lock() + defer b.usersMutex.Unlock() + + // Register user information. b.users[userID] = user - b.usersMutex.Unlock() + + // Wake up any waiting goroutines and remove the synchronization point. + close(b.usersSyncPoints[userID]) + delete(b.usersSyncPoints, userID) } func (b *users) populateUsers(wait bool) { b.refreshMutex.Lock() if !wait && (time.Now().Before(b.earliestRefresh) || b.refreshInProgress) { - b.log.Debugf("Not refreshing user list as it was done less than %v ago.", - minimumRefreshInterval) + b.log.Debugf("Not refreshing user list as it was done less than %v ago.", minimumRefreshInterval) b.refreshMutex.Unlock() return @@ -230,9 +255,10 @@ func (b *channels) getChannelMembers(users *users) config.ChannelMembers { func (b *channels) registerChannel(channel slack.Channel) { b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channelsByID[channel.ID] = &channel b.channelsByName[channel.Name] = &channel - b.channelsMutex.Unlock() } func (b *channels) populateChannels(wait bool) {