Use StreamClient interface in StreamManager

This commit is contained in:
Mickael Remond
2019-06-08 18:52:19 +02:00
committed by Mickaël Rémond
parent 021f6d3740
commit 736a60cd1b
5 changed files with 40 additions and 31 deletions

View File

@@ -1,6 +1,7 @@
package xmpp // import "gosrc.io/xmpp"
import (
"errors"
"time"
"golang.org/x/xerrors"
@@ -11,13 +12,13 @@ import (
// stream events and doing the right operations.
//
// It can handle:
// - Connection
// - Client
// - Stream establishment workflow
// - Reconnection strategies, with exponential backoff. It also takes into account
// permanent errors to avoid useless reconnection loops.
// - Metrics processing
type StreamSession interface {
type StreamClient interface {
Connect() error
Disconnect()
SetHandler(handler EventHandler)
@@ -26,66 +27,71 @@ type StreamSession interface {
// StreamManager supervises an XMPP client connection. Its role is to handle connection events and
// apply reconnection strategy.
type StreamManager struct {
Client *Client
Session *Session
client StreamClient
PostConnect PostConnect
// Store low level metrics
Metrics *Metrics
}
type PostConnect func(c *Client)
type PostConnect func(c StreamClient)
// NewStreamManager creates a new StreamManager structure, intended to support
// handling XMPP client state event changes and auto-trigger reconnection
// based on StreamManager configuration.
func NewStreamManager(client *Client, pc PostConnect) *StreamManager {
// TODO: Move parameters to Start and remove factory method
func NewStreamManager(client StreamClient, pc PostConnect) *StreamManager {
return &StreamManager{
Client: client,
client: client,
PostConnect: pc,
}
}
// Start launch the connection loop
func (cm *StreamManager) Start() error {
cm.Client.Handler = func(e Event) {
func (sm *StreamManager) Start() error {
if sm.client == nil {
return errors.New("missing stream client")
}
handler := func(e Event) {
switch e.State {
case StateConnected:
cm.Metrics.setConnectTime()
sm.Metrics.setConnectTime()
case StateSessionEstablished:
cm.Metrics.setLoginTime()
sm.Metrics.setLoginTime()
case StateDisconnected:
// Reconnect on disconnection
cm.connect()
sm.connect()
case StateStreamError:
cm.Client.Disconnect()
sm.client.Disconnect()
// Only try reconnecting if we have not been kicked by another session to avoid connection loop.
if e.StreamError != "conflict" {
cm.connect()
sm.connect()
}
}
}
sm.client.SetHandler(handler)
return cm.connect()
return sm.connect()
}
// Stop cancels pending operations and terminates existing XMPP client.
func (cm *StreamManager) Stop() {
func (sm *StreamManager) Stop() {
// Remove on disconnect handler to avoid triggering reconnect
cm.Client.Handler = nil
cm.Client.Disconnect()
sm.client.SetHandler(nil)
sm.client.Disconnect()
}
// connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
func (cm *StreamManager) connect() error {
func (sm *StreamManager) connect() error {
var backoff Backoff // TODO: Group backoff calculation features with connection manager?
for {
var err error
// TODO: Make it possible to define logger to log disconnect and reconnection attempts
cm.Metrics = initMetrics()
sm.Metrics = initMetrics()
if err = cm.Client.Connect(); err != nil {
if err = sm.client.Connect(); err != nil {
var actualErr ConnError
if xerrors.As(err, &actualErr) {
if actualErr.Permanent {
@@ -98,8 +104,8 @@ func (cm *StreamManager) connect() error {
}
}
if cm.PostConnect != nil {
cm.PostConnect(cm.Client)
if sm.PostConnect != nil {
sm.PostConnect(sm.client)
}
return nil
}