Update dependencies (#886)
This commit is contained in:
146
vendor/github.com/nlopes/slack/websocket_managed_conn.go
generated
vendored
146
vendor/github.com/nlopes/slack/websocket_managed_conn.go
generated
vendored
@@ -10,6 +10,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/nlopes/slack/internal/errorsx"
|
||||
"github.com/nlopes/slack/internal/timex"
|
||||
)
|
||||
|
||||
// ManageConnection can be called on a Slack RTM instance returned by the
|
||||
@@ -38,6 +40,7 @@ func (rtm *RTM) ManageConnection() {
|
||||
if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
|
||||
// when the connection is unsuccessful its fatal, and we need to bail out.
|
||||
rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
|
||||
rtm.disconnect()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -45,7 +48,6 @@ func (rtm *RTM) ManageConnection() {
|
||||
// and conn.
|
||||
rtm.mu.Lock()
|
||||
rtm.conn = conn
|
||||
rtm.isConnected = true
|
||||
rtm.info = info
|
||||
rtm.mu.Unlock()
|
||||
|
||||
@@ -56,20 +58,19 @@ func (rtm *RTM) ManageConnection() {
|
||||
|
||||
rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
|
||||
|
||||
keepRunning := make(chan bool)
|
||||
// we're now connected (or have failed fatally) so we can set up
|
||||
// listeners
|
||||
go rtm.handleIncomingEvents(keepRunning)
|
||||
// we're now connected so we can set up listeners
|
||||
go rtm.handleIncomingEvents()
|
||||
|
||||
// this should be a blocking call until the connection has ended
|
||||
rtm.handleEvents(keepRunning)
|
||||
rtm.handleEvents()
|
||||
|
||||
// after being disconnected we need to check if it was intentional
|
||||
// if not then we should try to reconnect
|
||||
if rtm.wasIntentional {
|
||||
select {
|
||||
case <-rtm.disconnected:
|
||||
// after handle events returns we need to check if we're disconnected
|
||||
return
|
||||
default:
|
||||
// otherwise continue and run the loop again to reconnect
|
||||
}
|
||||
// else continue and run the loop again to connect
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,18 +89,20 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
|
||||
// used to provide exponential backoff wait time with jitter before trying
|
||||
// to connect to slack again
|
||||
boff := &backoff{
|
||||
Min: 100 * time.Millisecond,
|
||||
Max: 5 * time.Minute,
|
||||
Factor: 2,
|
||||
Jitter: true,
|
||||
Max: 5 * time.Minute,
|
||||
}
|
||||
|
||||
for {
|
||||
var (
|
||||
backoff time.Duration
|
||||
)
|
||||
|
||||
// send connecting event
|
||||
rtm.IncomingEvents <- RTMEvent{"connecting", &ConnectingEvent{
|
||||
Attempt: boff.attempts + 1,
|
||||
ConnectionCount: connectionCount,
|
||||
}}
|
||||
|
||||
// attempt to start the connection
|
||||
info, conn, err := rtm.startRTMAndDial(useRTMStart)
|
||||
if err == nil {
|
||||
@@ -109,32 +112,49 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
|
||||
// check for fatal errors
|
||||
switch err.Error() {
|
||||
case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
|
||||
rtm.Debugf("Invalid auth when connecting with RTM: %s", err)
|
||||
rtm.Debugf("invalid auth when connecting with RTM: %s", err)
|
||||
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
|
||||
return nil, nil, err
|
||||
default:
|
||||
}
|
||||
|
||||
switch actual := err.(type) {
|
||||
case statusCodeError:
|
||||
if actual.Code == http.StatusNotFound {
|
||||
rtm.Debugf("invalid auth when connecting with RTM: %s", err)
|
||||
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
|
||||
return nil, nil, err
|
||||
}
|
||||
case *RateLimitedError:
|
||||
backoff = actual.RetryAfter
|
||||
default:
|
||||
}
|
||||
|
||||
backoff = timex.Max(backoff, boff.Duration())
|
||||
// any other errors are treated as recoverable and we try again after
|
||||
// sending the event along the IncomingEvents channel
|
||||
rtm.IncomingEvents <- RTMEvent{"connection_error", &ConnectionErrorEvent{
|
||||
Attempt: boff.attempts,
|
||||
Backoff: backoff,
|
||||
ErrorObj: err,
|
||||
}}
|
||||
|
||||
// check if Disconnect() has been invoked.
|
||||
select {
|
||||
case <-rtm.disconnected:
|
||||
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}}
|
||||
return nil, nil, fmt.Errorf("disconnect received while trying to connect")
|
||||
default:
|
||||
}
|
||||
|
||||
// get time we should wait before attempting to connect again
|
||||
dur := boff.Duration()
|
||||
rtm.Debugf("reconnection %d failed: %s", boff.attempts+1, err)
|
||||
rtm.Debugln(" -> reconnecting in", dur)
|
||||
time.Sleep(dur)
|
||||
rtm.Debugf("reconnection %d failed: %s reconnecting in %v\n", boff.attempts, err, backoff)
|
||||
|
||||
// wait for one of the following to occur,
|
||||
// backoff duration has elapsed, killChannel is signalled, or
|
||||
// the rtm finishes disconnecting.
|
||||
select {
|
||||
case <-time.After(backoff): // retry after the backoff.
|
||||
case intentional := <-rtm.killChannel:
|
||||
if intentional {
|
||||
rtm.killConnection(intentional, ErrRTMDisconnected)
|
||||
return nil, nil, ErrRTMDisconnected
|
||||
}
|
||||
case <-rtm.disconnected:
|
||||
return nil, nil, ErrRTMDisconnected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,15 +207,19 @@ func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn
|
||||
//
|
||||
// This should not be called directly! Instead a boolean value (true for
|
||||
// intentional, false otherwise) should be sent to the killChannel on the RTM.
|
||||
func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
|
||||
func (rtm *RTM) killConnection(intentional bool, cause error) (err error) {
|
||||
rtm.Debugln("killing connection")
|
||||
if rtm.isConnected {
|
||||
close(keepRunning)
|
||||
|
||||
if rtm.conn != nil {
|
||||
err = rtm.conn.Close()
|
||||
}
|
||||
rtm.isConnected = false
|
||||
rtm.wasIntentional = intentional
|
||||
err := rtm.conn.Close()
|
||||
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{intentional}}
|
||||
|
||||
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: intentional, Cause: cause}}
|
||||
|
||||
if intentional {
|
||||
rtm.disconnect()
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -204,31 +228,28 @@ func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
|
||||
// interval. This also sends outgoing messages that are received from the RTM's
|
||||
// outgoingMessages channel. This also handles incoming raw events from the RTM
|
||||
// rawEvents channel.
|
||||
func (rtm *RTM) handleEvents(keepRunning chan bool) {
|
||||
func (rtm *RTM) handleEvents() {
|
||||
ticker := time.NewTicker(rtm.pingInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
// catch "stop" signal on channel close
|
||||
case intentional := <-rtm.killChannel:
|
||||
_ = rtm.killConnection(keepRunning, intentional)
|
||||
_ = rtm.killConnection(intentional, errorsx.String("signaled"))
|
||||
return
|
||||
|
||||
// detect when the connection is dead.
|
||||
case <-rtm.pingDeadman.C:
|
||||
rtm.Debugln("deadman switch trigger disconnecting")
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
_ = rtm.killConnection(false, errorsx.String("deadman switch triggered"))
|
||||
return
|
||||
// send pings on ticker interval
|
||||
case <-ticker.C:
|
||||
err := rtm.ping()
|
||||
if err != nil {
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
if err := rtm.ping(); err != nil {
|
||||
_ = rtm.killConnection(false, err)
|
||||
return
|
||||
}
|
||||
case <-rtm.forcePing:
|
||||
err := rtm.ping()
|
||||
if err != nil {
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
if err := rtm.ping(); err != nil {
|
||||
_ = rtm.killConnection(false, err)
|
||||
return
|
||||
}
|
||||
// listen for messages that need to be sent
|
||||
@@ -238,7 +259,8 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) {
|
||||
case rawEvent := <-rtm.rawEvents:
|
||||
switch rtm.handleRawEvent(rawEvent) {
|
||||
case rtmEventTypeGoodbye:
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
_ = rtm.killConnection(false, errorsx.String("goodbye detected"))
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -250,17 +272,10 @@ func (rtm *RTM) handleEvents(keepRunning chan bool) {
|
||||
//
|
||||
// This will stop executing once the RTM's keepRunning channel has been closed
|
||||
// or has anything sent to it.
|
||||
func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) {
|
||||
func (rtm *RTM) handleIncomingEvents() {
|
||||
for {
|
||||
// non-blocking listen to see if channel is closed
|
||||
select {
|
||||
// catch "stop" signal on channel close
|
||||
case <-keepRunning:
|
||||
if err := rtm.receiveIncomingEvent(); err != nil {
|
||||
return
|
||||
default:
|
||||
if err := rtm.receiveIncomingEvent(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -296,7 +311,6 @@ func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
|
||||
Message: msg,
|
||||
ErrorObj: err,
|
||||
}}
|
||||
// TODO force ping?
|
||||
}
|
||||
}
|
||||
|
||||
@@ -332,20 +346,32 @@ func (rtm *RTM) receiveIncomingEvent() error {
|
||||
// 'PING' message
|
||||
|
||||
// trigger a 'PING' to detect potential websocket disconnect
|
||||
rtm.forcePing <- true
|
||||
select {
|
||||
case rtm.forcePing <- true:
|
||||
case <-rtm.disconnected:
|
||||
}
|
||||
case err != nil:
|
||||
// All other errors from ReadJSON come from NextReader, and should
|
||||
// kill the read loop and force a reconnect.
|
||||
rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
|
||||
ErrorObj: err,
|
||||
}}
|
||||
rtm.killChannel <- false
|
||||
|
||||
select {
|
||||
case rtm.killChannel <- false:
|
||||
case <-rtm.disconnected:
|
||||
}
|
||||
|
||||
return err
|
||||
case len(event) == 0:
|
||||
rtm.Debugln("Received empty event")
|
||||
default:
|
||||
rtm.Debugln("Incoming Event:", string(event[:]))
|
||||
rtm.rawEvents <- event
|
||||
rtm.Debugln("Incoming Event:", string(event))
|
||||
select {
|
||||
case rtm.rawEvents <- event:
|
||||
case <-rtm.disconnected:
|
||||
rtm.Debugln("disonnected while attempting to send raw event")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user