Update nlopes/slack vendor
This commit is contained in:
183
vendor/github.com/nlopes/slack/websocket_managed_conn.go
generated
vendored
183
vendor/github.com/nlopes/slack/websocket_managed_conn.go
generated
vendored
@@ -4,10 +4,11 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/websocket"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// ManageConnection can be called on a Slack RTM instance returned by the
|
||||
@@ -24,25 +25,35 @@ import (
|
||||
//
|
||||
// The defined error events are located in websocket_internals.go.
|
||||
func (rtm *RTM) ManageConnection() {
|
||||
var connectionCount int
|
||||
for {
|
||||
connectionCount++
|
||||
var (
|
||||
err error
|
||||
info *Info
|
||||
conn *websocket.Conn
|
||||
)
|
||||
|
||||
for connectionCount := 0; ; connectionCount++ {
|
||||
// start trying to connect
|
||||
// the returned err is already passed onto the IncomingEvents channel
|
||||
info, conn, err := rtm.connect(connectionCount, rtm.useRTMStart)
|
||||
// if err != nil then the connection is sucessful - otherwise it is
|
||||
// fatal
|
||||
if err != nil {
|
||||
if info, conn, err = rtm.connect(connectionCount, rtm.useRTMStart); err != nil {
|
||||
// when the connection is unsuccessful its fatal, and we need to bail out.
|
||||
rtm.Debugf("Failed to connect with RTM on try %d: %s", connectionCount, err)
|
||||
return
|
||||
}
|
||||
|
||||
// lock to prevent data races with Disconnect particularly around isConnected
|
||||
// and conn.
|
||||
rtm.mu.Lock()
|
||||
rtm.conn = conn
|
||||
rtm.isConnected = true
|
||||
rtm.info = info
|
||||
rtm.mu.Unlock()
|
||||
|
||||
rtm.IncomingEvents <- RTMEvent{"connected", &ConnectedEvent{
|
||||
ConnectionCount: connectionCount,
|
||||
Info: info,
|
||||
}}
|
||||
|
||||
rtm.conn = conn
|
||||
rtm.isConnected = true
|
||||
rtm.Debugf("RTM connection succeeded on try %d", connectionCount)
|
||||
|
||||
keepRunning := make(chan bool)
|
||||
// we're now connected (or have failed fatally) so we can set up
|
||||
@@ -50,7 +61,7 @@ func (rtm *RTM) ManageConnection() {
|
||||
go rtm.handleIncomingEvents(keepRunning)
|
||||
|
||||
// this should be a blocking call until the connection has ended
|
||||
rtm.handleEvents(keepRunning, 30*time.Second)
|
||||
rtm.handleEvents(keepRunning)
|
||||
|
||||
// after being disconnected we need to check if it was intentional
|
||||
// if not then we should try to reconnect
|
||||
@@ -67,6 +78,12 @@ func (rtm *RTM) ManageConnection() {
|
||||
// If useRTMStart is false then it uses rtm.connect to create the connection,
|
||||
// otherwise it uses rtm.start.
|
||||
func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocket.Conn, error) {
|
||||
const (
|
||||
errInvalidAuth = "invalid_auth"
|
||||
errInactiveAccount = "account_inactive"
|
||||
errMissingAuthToken = "not_authed"
|
||||
)
|
||||
|
||||
// used to provide exponential backoff wait time with jitter before trying
|
||||
// to connect to slack again
|
||||
boff := &backoff{
|
||||
@@ -87,10 +104,14 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
|
||||
if err == nil {
|
||||
return info, conn, nil
|
||||
}
|
||||
// check for fatal errors - currently only invalid_auth
|
||||
if sErr, ok := err.(*WebError); ok && (sErr.Error() == "invalid_auth" || sErr.Error() == "account_inactive") {
|
||||
|
||||
// check for fatal errors
|
||||
switch err.Error() {
|
||||
case errInvalidAuth, errInactiveAccount, errMissingAuthToken:
|
||||
rtm.Debugf("Invalid auth when connecting with RTM: %s", err)
|
||||
rtm.IncomingEvents <- RTMEvent{"invalid_auth", &InvalidAuthEvent{}}
|
||||
return nil, nil, sErr
|
||||
return nil, nil, err
|
||||
default:
|
||||
}
|
||||
|
||||
// any other errors are treated as recoverable and we try again after
|
||||
@@ -102,7 +123,7 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
|
||||
|
||||
// check if Disconnect() has been invoked.
|
||||
select {
|
||||
case _ = <-rtm.disconnected:
|
||||
case <-rtm.disconnected:
|
||||
rtm.IncomingEvents <- RTMEvent{"disconnected", &DisconnectedEvent{Intentional: true}}
|
||||
return nil, nil, fmt.Errorf("disconnect received while trying to connect")
|
||||
default:
|
||||
@@ -119,23 +140,34 @@ func (rtm *RTM) connect(connectionCount int, useRTMStart bool) (*Info, *websocke
|
||||
// startRTMAndDial attempts to connect to the slack websocket. If useRTMStart is true,
|
||||
// then it returns the full information returned by the "rtm.start" method on the
|
||||
// slack API. Else it uses the "rtm.connect" method to connect
|
||||
func (rtm *RTM) startRTMAndDial(useRTMStart bool) (*Info, *websocket.Conn, error) {
|
||||
var info *Info
|
||||
var url string
|
||||
var err error
|
||||
func (rtm *RTM) startRTMAndDial(useRTMStart bool) (info *Info, _ *websocket.Conn, err error) {
|
||||
var (
|
||||
url string
|
||||
)
|
||||
|
||||
if useRTMStart {
|
||||
rtm.Debugf("Starting RTM")
|
||||
info, url, err = rtm.StartRTM()
|
||||
} else {
|
||||
rtm.Debugf("Connecting to RTM")
|
||||
info, url, err = rtm.ConnectRTM()
|
||||
}
|
||||
if err != nil {
|
||||
rtm.Debugf("Failed to start or connect to RTM: %s", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
rtm.Debugf("Dialing to websocket on url %s", url)
|
||||
// Only use HTTPS for connections to prevent MITM attacks on the connection.
|
||||
conn, err := websocketProxyDial(url, "https://api.slack.com")
|
||||
upgradeHeader := http.Header{}
|
||||
upgradeHeader.Add("Origin", "https://api.slack.com")
|
||||
dialer := websocket.DefaultDialer
|
||||
if rtm.dialer != nil {
|
||||
dialer = rtm.dialer
|
||||
}
|
||||
conn, _, err := dialer.Dial(url, upgradeHeader)
|
||||
if err != nil {
|
||||
rtm.Debugf("Failed to dial to the websocket: %s", err)
|
||||
return nil, nil, err
|
||||
}
|
||||
return info, conn, err
|
||||
@@ -163,8 +195,8 @@ func (rtm *RTM) killConnection(keepRunning chan bool, intentional bool) error {
|
||||
// interval. This also sends outgoing messages that are received from the RTM's
|
||||
// outgoingMessages channel. This also handles incoming raw events from the RTM
|
||||
// rawEvents channel.
|
||||
func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) {
|
||||
ticker := time.NewTicker(interval)
|
||||
func (rtm *RTM) handleEvents(keepRunning chan bool) {
|
||||
ticker := time.NewTicker(rtm.pingInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
@@ -172,7 +204,12 @@ func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) {
|
||||
case intentional := <-rtm.killChannel:
|
||||
_ = rtm.killConnection(keepRunning, intentional)
|
||||
return
|
||||
// send pings on ticker interval
|
||||
|
||||
// detect when the connection is dead.
|
||||
case <-rtm.pingDeadman.C:
|
||||
rtm.Debugln("deadman switch trigger disconnecting")
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
// send pings on ticker interval
|
||||
case <-ticker.C:
|
||||
err := rtm.ping()
|
||||
if err != nil {
|
||||
@@ -190,7 +227,11 @@ func (rtm *RTM) handleEvents(keepRunning chan bool, interval time.Duration) {
|
||||
rtm.sendOutgoingMessage(msg)
|
||||
// listen for incoming messages that need to be parsed
|
||||
case rawEvent := <-rtm.rawEvents:
|
||||
rtm.handleRawEvent(rawEvent)
|
||||
switch rtm.handleRawEvent(rawEvent) {
|
||||
case rtmEventTypeGoodbye:
|
||||
_ = rtm.killConnection(keepRunning, false)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -208,7 +249,9 @@ func (rtm *RTM) handleIncomingEvents(keepRunning <-chan bool) {
|
||||
case <-keepRunning:
|
||||
return
|
||||
default:
|
||||
rtm.receiveIncomingEvent()
|
||||
if err := rtm.receiveIncomingEvent(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -218,7 +261,7 @@ func (rtm *RTM) sendWithDeadline(msg interface{}) error {
|
||||
if err := rtm.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := websocket.JSON.Send(rtm.conn, msg); err != nil {
|
||||
if err := rtm.conn.WriteJSON(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
// remove write deadline
|
||||
@@ -258,9 +301,7 @@ func (rtm *RTM) sendOutgoingMessage(msg OutgoingMessage) {
|
||||
func (rtm *RTM) ping() error {
|
||||
id := rtm.idGen.Next()
|
||||
rtm.Debugln("Sending PING ", id)
|
||||
rtm.pings[id] = time.Now()
|
||||
|
||||
msg := &Ping{ID: id, Type: "ping"}
|
||||
msg := &Ping{ID: id, Type: "ping", Timestamp: time.Now().Unix()}
|
||||
|
||||
if err := rtm.sendWithDeadline(msg); err != nil {
|
||||
rtm.Debugf("RTM Error sending 'PING %d': %s", id, err.Error())
|
||||
@@ -271,52 +312,62 @@ func (rtm *RTM) ping() error {
|
||||
|
||||
// receiveIncomingEvent attempts to receive an event from the RTM's websocket.
|
||||
// This will block until a frame is available from the websocket.
|
||||
func (rtm *RTM) receiveIncomingEvent() {
|
||||
// If the read from the websocket results in a fatal error, this function will return non-nil.
|
||||
func (rtm *RTM) receiveIncomingEvent() error {
|
||||
event := json.RawMessage{}
|
||||
err := websocket.JSON.Receive(rtm.conn, &event)
|
||||
if err == io.EOF {
|
||||
err := rtm.conn.ReadJSON(&event)
|
||||
switch {
|
||||
case err == io.ErrUnexpectedEOF:
|
||||
// EOF's don't seem to signify a failed connection so instead we ignore
|
||||
// them here and detect a failed connection upon attempting to send a
|
||||
// 'PING' message
|
||||
|
||||
// trigger a 'PING' to detect pontential websocket disconnect
|
||||
// trigger a 'PING' to detect potential websocket disconnect
|
||||
rtm.forcePing <- true
|
||||
return
|
||||
} else if err != nil {
|
||||
case err != nil:
|
||||
// All other errors from ReadJSON come from NextReader, and should
|
||||
// kill the read loop and force a reconnect.
|
||||
rtm.IncomingEvents <- RTMEvent{"incoming_error", &IncomingEventError{
|
||||
ErrorObj: err,
|
||||
}}
|
||||
// force a ping here too?
|
||||
return
|
||||
} else if len(event) == 0 {
|
||||
rtm.killChannel <- false
|
||||
return err
|
||||
case len(event) == 0:
|
||||
rtm.Debugln("Received empty event")
|
||||
return
|
||||
default:
|
||||
rtm.Debugln("Incoming Event:", string(event[:]))
|
||||
rtm.rawEvents <- event
|
||||
}
|
||||
rtm.Debugln("Incoming Event:", string(event[:]))
|
||||
rtm.rawEvents <- event
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleRawEvent takes a raw JSON message received from the slack websocket
|
||||
// and handles the encoded event.
|
||||
func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) {
|
||||
// returns the event type of the message.
|
||||
func (rtm *RTM) handleRawEvent(rawEvent json.RawMessage) string {
|
||||
event := &Event{}
|
||||
err := json.Unmarshal(rawEvent, event)
|
||||
if err != nil {
|
||||
rtm.IncomingEvents <- RTMEvent{"unmarshalling_error", &UnmarshallingErrorEvent{err}}
|
||||
return
|
||||
return ""
|
||||
}
|
||||
|
||||
switch event.Type {
|
||||
case "":
|
||||
case rtmEventTypeAck:
|
||||
rtm.handleAck(rawEvent)
|
||||
case "hello":
|
||||
case rtmEventTypeHello:
|
||||
rtm.IncomingEvents <- RTMEvent{"hello", &HelloEvent{}}
|
||||
case "pong":
|
||||
case rtmEventTypePong:
|
||||
rtm.handlePong(rawEvent)
|
||||
case "desktop_notification":
|
||||
case rtmEventTypeGoodbye:
|
||||
// just return the event type up for goodbye, will be handled by caller.
|
||||
case rtmEventTypeDesktopNotification:
|
||||
rtm.Debugln("Received desktop notification, ignoring")
|
||||
default:
|
||||
rtm.handleEvent(event.Type, rawEvent)
|
||||
}
|
||||
|
||||
return event.Type
|
||||
}
|
||||
|
||||
// handleAck handles an incoming 'ACK' message.
|
||||
@@ -331,7 +382,13 @@ func (rtm *RTM) handleAck(event json.RawMessage) {
|
||||
if ack.Ok {
|
||||
rtm.IncomingEvents <- RTMEvent{"ack", ack}
|
||||
} else if ack.RTMResponse.Error != nil {
|
||||
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
|
||||
// As there is no documentation for RTM error-codes, this
|
||||
// identification of a rate-limit warning is very brittle.
|
||||
if ack.RTMResponse.Error.Code == -1 && ack.RTMResponse.Error.Msg == "slow down, too many messages..." {
|
||||
rtm.IncomingEvents <- RTMEvent{"ack_error", &RateLimitEvent{}}
|
||||
} else {
|
||||
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{ack.Error}}
|
||||
}
|
||||
} else {
|
||||
rtm.IncomingEvents <- RTMEvent{"ack_error", &AckErrorEvent{fmt.Errorf("ack decode failure")}}
|
||||
}
|
||||
@@ -341,19 +398,20 @@ func (rtm *RTM) handleAck(event json.RawMessage) {
|
||||
// a previously sent 'PING' message. This is then used to compute the
|
||||
// connection's latency.
|
||||
func (rtm *RTM) handlePong(event json.RawMessage) {
|
||||
pong := &Pong{}
|
||||
if err := json.Unmarshal(event, pong); err != nil {
|
||||
rtm.Debugln("RTM Error unmarshalling 'pong' event:", err)
|
||||
var (
|
||||
p Pong
|
||||
)
|
||||
|
||||
rtm.resetDeadman()
|
||||
|
||||
if err := json.Unmarshal(event, &p); err != nil {
|
||||
logger.Println("RTM Error unmarshalling 'pong' event:", err)
|
||||
rtm.Debugln(" -> Erroneous 'ping' event:", string(event))
|
||||
return
|
||||
}
|
||||
if pingTime, exists := rtm.pings[pong.ReplyTo]; exists {
|
||||
latency := time.Since(pingTime)
|
||||
rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
|
||||
delete(rtm.pings, pong.ReplyTo)
|
||||
} else {
|
||||
rtm.Debugln("RTM Error - unmatched 'pong' event:", string(event))
|
||||
}
|
||||
|
||||
latency := time.Since(time.Unix(p.Timestamp, 0))
|
||||
rtm.IncomingEvents <- RTMEvent{"latency_report", &LatencyReport{Value: latency}}
|
||||
}
|
||||
|
||||
// handleEvent is the "default" response to an event that does not have a
|
||||
@@ -363,7 +421,7 @@ func (rtm *RTM) handlePong(event json.RawMessage) {
|
||||
// correct struct then this sends an UnmarshallingErrorEvent to the
|
||||
// IncomingEvents channel.
|
||||
func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
|
||||
v, exists := eventMapping[typeStr]
|
||||
v, exists := EventMapping[typeStr]
|
||||
if !exists {
|
||||
rtm.Debugf("RTM Error, received unmapped event %q: %s\n", typeStr, string(event))
|
||||
err := fmt.Errorf("RTM Error: Received unmapped event %q: %s\n", typeStr, string(event))
|
||||
@@ -382,10 +440,10 @@ func (rtm *RTM) handleEvent(typeStr string, event json.RawMessage) {
|
||||
rtm.IncomingEvents <- RTMEvent{typeStr, recvEvent}
|
||||
}
|
||||
|
||||
// eventMapping holds a mapping of event names to their corresponding struct
|
||||
// EventMapping holds a mapping of event names to their corresponding struct
|
||||
// implementations. The structs should be instances of the unmarshalling
|
||||
// target for the matching event type.
|
||||
var eventMapping = map[string]interface{}{
|
||||
var EventMapping = map[string]interface{}{
|
||||
"message": MessageEvent{},
|
||||
"presence_change": PresenceChangeEvent{},
|
||||
"user_typing": UserTypingEvent{},
|
||||
@@ -463,4 +521,7 @@ var eventMapping = map[string]interface{}{
|
||||
"accounts_changed": AccountsChangedEvent{},
|
||||
|
||||
"reconnect_url": ReconnectUrlEvent{},
|
||||
|
||||
"member_joined_channel": MemberJoinedChannelEvent{},
|
||||
"member_left_channel": MemberLeftChannelEvent{},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user