From 2f391fde8057c1cb2ac7f8e882840ff59840d4a9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Micka=C3=ABl=20R=C3=A9mond?= <mremond@process-one.net>
Date: Thu, 6 Jun 2019 11:58:50 +0200
Subject: [PATCH] Add Client Manager to monitor connection state and trigger
 reconnect (#39)

- Support for exponential backoff on reconnect to be gentle on the server.
- Clean up client by moving metrics and retry strategy to the connection manager.
- Update echo_client to use client manager
- Fix echo client XMPP message matching

Fixes #21
Improvements for #8
---
 backoff.go                       | 101 +++++++++++++++++++++++++++
 backoff_test.go                  |  24 +++++++
 client.go                        | 115 +++++++++++++++++--------------
 client_manager.go                | 106 ++++++++++++++++++++++++++++
 cmd/xmpp_echo/xmpp_echo.go       |  15 ++--
 cmd/xmpp_jukebox/xmpp_jukebox.go |   3 +-
 config.go                        |   1 -
 parser.go                        |   4 +-
 8 files changed, 300 insertions(+), 69 deletions(-)
 create mode 100644 backoff.go
 create mode 100644 backoff_test.go
 create mode 100644 client_manager.go

diff --git a/backoff.go b/backoff.go
new file mode 100644
index 0000000..bbb399c
--- /dev/null
+++ b/backoff.go
@@ -0,0 +1,101 @@
+/*
+Interesting reference on backoff:
+- Exponential Backoff And Jitter (AWS Blog):
+  https://www.awsarchitectureblog.com/2015/03/backoff.html
+
+We use Jitter as a default for exponential backoff, as the goal of
+this module is not to provide precise 'ticks', but good behaviour to
+implement retries that are helping the server to recover faster in
+case of congestion.
+
+It can be used in several ways:
+- Using duration to get next sleep time.
+- Using ticker channel to trigger callback function on tick
+
+The functions for Backoff are not threadsafe, but you can:
+- Keep the attempt counter on your end and use DurationForAttempt(int)
+- Use lock in your own code to protect the Backoff structure.
+
+TODO: Implement Backoff Ticker channel
+TODO: Implement throttler interface. Throttler could be used to implement various reconnect strategies.
+*/
+
+package xmpp // import "gosrc.io/xmpp"
+
+import (
+	"math"
+	"math/rand"
+	"time"
+)
+
+const (
+	defaultBase   int = 20 // Backoff base, in ms
+	defaultFactor int = 2
+	defaultCap    int = 180000 // 3 minutes
+)
+
+// Backoff can provide increasing duration with the number of attempt
+// performed. The structure is used to support exponential backoff on
+// connection attempts to avoid hammering the server we are connecting
+// to.
+type Backoff struct {
+	NoJitter     bool
+	Base         int
+	Factor       int
+	Cap          int
+	lastDuration int
+	attempt      int
+}
+
+// Duration returns the duration to apply to the current attempt.
+func (b *Backoff) Duration() time.Duration {
+	d := b.DurationForAttempt(b.attempt)
+	b.attempt++
+	return d
+}
+
+// Wait sleeps for backoff duration for current attempt.
+func (b *Backoff) Wait() {
+	time.Sleep(b.Duration())
+}
+
+// DurationForAttempt returns a duration for an attempt number, in a stateless way.
+func (b *Backoff) DurationForAttempt(attempt int) time.Duration {
+	b.setDefault()
+	expBackoff := math.Min(float64(b.Cap), float64(b.Base)*math.Pow(float64(b.Factor), float64(b.attempt)))
+	d := int(math.Trunc(expBackoff))
+	if !b.NoJitter {
+		d = rand.Intn(d)
+	}
+	return time.Duration(d) * time.Millisecond
+}
+
+// Reset sets back the number of attempts to 0. This is to be called after a successfull operation has been performed,
+// to reset the exponential backoff interval.
+func (b *Backoff) Reset() {
+	b.attempt = 0
+}
+
+func (b *Backoff) setDefault() {
+	if b.Base == 0 {
+		b.Base = defaultBase
+	}
+
+	if b.Cap == 0 {
+		b.Cap = defaultCap
+	}
+
+	if b.Factor == 0 {
+		b.Factor = defaultFactor
+	}
+}
+
+/*
+We use full jitter as default for now as it seems to provide good behaviour for reconnect.
+
+Base is the default interval between attempts (if backoff Factor was equal to 1)
+
+Attempt is the number of retry for operation. If we start attempt at 0, first sleep equals base.
+
+Cap is the maximum sleep time duration we tolerate between attempts
+*/
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..9ef7ce0
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,24 @@
+package xmpp_test
+
+import (
+	"testing"
+	"time"
+
+	"gosrc.io/xmpp"
+)
+
+func TestDurationForAttempt_NoJitter(t *testing.T) {
+	b := xmpp.Backoff{Base: 25, NoJitter: true}
+	bInMS := time.Duration(b.Base) * time.Millisecond
+	if b.DurationForAttempt(0) != bInMS {
+		t.Errorf("incorrect default duration for attempt #0 (%d) = %d", b.DurationForAttempt(0)/time.Millisecond, bInMS/time.Millisecond)
+	}
+	var prevDuration, d time.Duration
+	for i := 0; i < 10; i++ {
+		d = b.DurationForAttempt(i)
+		if !(d >= prevDuration) {
+			t.Errorf("duration should be increasing between attempts. #%d (%d) > %d", i, d, prevDuration)
+		}
+		prevDuration = d
+	}
+}
diff --git a/client.go b/client.go
index f6f5be2..d269d7c 100644
--- a/client.go
+++ b/client.go
@@ -10,53 +10,61 @@ import (
 	"time"
 )
 
-// Client Metrics
-// ============================================================================
+//=============================================================================
 
-type Metrics struct {
-	startTime time.Time
-	// ConnectTime returns the duration between client initiation of the TCP/IP
-	// connection to the server and actual TCP/IP session establishment.
-	// This time includes DNS resolution and can be slightly higher if the DNS
-	// resolution result was not in cache.
-	ConnectTime time.Duration
-	// LoginTime returns the between client initiation of the TCP/IP
-	// connection to the server and the return of the login result.
-	// This includes ConnectTime, but also XMPP level protocol negociation
-	// like starttls.
-	LoginTime time.Duration
+// ConnState represents the current connection state.
+type ConnState = uint8
+
+// This is a the list of events happening on the connection that the
+// client can be notified about.
+const (
+	StateDisconnected ConnState = iota
+	StateConnected
+	StateSessionEstablished
+)
+
+// Event is a structure use to convey event changes related to client state. This
+// is for example used to notify the client when the client get disconnected.
+type Event struct {
+	State       ConnState
+	Description string
 }
 
-// initMetrics set metrics with default value and define the starting point
-// for duration calculation (connect time, login time, etc).
-func initMetrics() *Metrics {
-	return &Metrics{
-		startTime: time.Now(),
+// EventHandler is use to pass events about state of the connection to
+// client implementation.
+type EventHandler func(Event)
+
+type EventManager struct {
+	// Store current state
+	CurrentState ConnState
+
+	// Callback used to propagate connection state changes
+	Handler EventHandler
+}
+
+func (em EventManager) updateState(state ConnState) {
+	em.CurrentState = state
+	if em.Handler != nil {
+		em.Handler(Event{State: em.CurrentState})
 	}
 }
 
-func (m *Metrics) setConnectTime() {
-	m.ConnectTime = time.Since(m.startTime)
-}
-
-func (m *Metrics) setLoginTime() {
-	m.LoginTime = time.Since(m.startTime)
-}
-
 // Client
 // ============================================================================
 
 // Client is the main structure used to connect as a client on an XMPP
 // server.
 type Client struct {
-	// Store user defined options
+	// Store user defined options and states
 	config Config
 	// Session gather data that can be accessed by users of this library
 	Session *Session
 	// TCP level connection / can be replaced by a TLS session after starttls
 	conn net.Conn
-	// store low level metrics
-	Metrics *Metrics
+	// Packet channel
+	RecvChannel chan interface{}
+	// Track and broadcast connection state
+	EventManager
 }
 
 /*
@@ -89,6 +97,10 @@ func NewClient(config Config) (c *Client, err error) {
 	if c.config.ConnectTimeout == 0 {
 		c.config.ConnectTimeout = 15 // 15 second as default
 	}
+
+	// Create a default channel that developer can override
+	c.RecvChannel = make(chan interface{})
+
 	return
 }
 
@@ -112,59 +124,55 @@ func checkAddress(addr string) (string, error) {
 
 // Connect triggers actual TCP connection, based on previously defined parameters.
 func (c *Client) Connect() (*Session, error) {
-	var tcpconn net.Conn
 	var err error
 
-	// TODO: Refactor = abstract retry loop in capped exponential back-off function
-	var try = 0
-	var success bool
-	c.Metrics = initMetrics()
-	for try <= c.config.Retry && !success {
-		if tcpconn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second); err == nil {
-			c.Metrics.setConnectTime()
-			success = true
-		}
-		try++
-	}
+	c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second)
 	if err != nil {
 		return nil, err
 	}
+	c.updateState(StateConnected)
 
 	// Connection is ok, we now open XMPP session
-	c.conn = tcpconn
 	if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil {
 		return c.Session, err
 	}
+	c.updateState(StateSessionEstablished)
 
-	c.Metrics.setLoginTime()
 	// We're connected and can now receive and send messages.
 	//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
 	// TODO: Do we always want to send initial presence automatically ?
 	// Do we need an option to avoid that or do we rely on client to send the presence itself ?
 	fmt.Fprintf(c.Session.socketProxy, "<presence/>")
 
+	// Start the receiver go routine
+	go c.recv()
+
 	return c.Session, err
 }
 
-func (c *Client) recv(receiver chan<- interface{}) (err error) {
+func (c *Client) Disconnect() {
+	_ = c.SendRaw("</stream:stream>")
+	// TODO: Add a way to wait for stream close acknowledgement from the server for clean disconnect
+	_ = c.conn.Close()
+}
+
+func (c *Client) recv() (err error) {
 	for {
 		val, err := next(c.Session.decoder)
 		if err != nil {
-			close(receiver)
+			c.updateState(StateDisconnected)
 			return err
 		}
-		receiver <- val
+		c.RecvChannel <- val
 		val = nil
 	}
-	panic("unreachable")
 }
 
 // Recv abstracts receiving preparsed XMPP packets from a channel.
 // Channel allow client to receive / dispatch packets in for range loop.
+// TODO: Deprecate this function in favor of reading directly from the RecvChannel
 func (c *Client) Recv() <-chan interface{} {
-	ch := make(chan interface{})
-	go c.recv(ch)
-	return ch
+	return c.RecvChannel
 }
 
 // Send marshalls XMPP stanza and sends it to the server.
@@ -185,8 +193,9 @@ func (c *Client) Send(packet Packet) error {
 // disconnect the client. It is up to the user of this method to
 // carefully craft the XML content to produce valid XMPP.
 func (c *Client) SendRaw(packet string) error {
-	fmt.Fprintf(c.Session.socketProxy, packet) // TODO handle errors
-	return nil
+	var err error
+	_, err = fmt.Fprintf(c.Session.socketProxy, packet)
+	return err
 }
 
 func xmlEscape(s string) string {
diff --git a/client_manager.go b/client_manager.go
new file mode 100644
index 0000000..a61d8db
--- /dev/null
+++ b/client_manager.go
@@ -0,0 +1,106 @@
+package xmpp // import "gosrc.io/xmpp"
+
+import (
+	"log"
+	"time"
+)
+
+type PostConnect func(c *Client)
+
+// ClientManager supervises an XMPP client connection. Its role is to handle connection events and
+// apply reconnection strategy.
+type ClientManager struct {
+	Client      *Client
+	Session     *Session
+	PostConnect PostConnect
+
+	// Store low level metrics
+	Metrics *Metrics
+}
+
+// NewClientManager creates a new client manager structure, intended to support
+// handling XMPP client state event changes and auto-trigger reconnection
+// based on ClientManager configuration.
+func NewClientManager(client *Client, pc PostConnect) *ClientManager {
+	return &ClientManager{
+		Client:      client,
+		PostConnect: pc,
+	}
+}
+
+// Start launch the connection loop
+func (cm *ClientManager) Start() {
+	cm.Client.Handler = func(e Event) {
+		switch e.State {
+		case StateConnected:
+			cm.Metrics.setConnectTime()
+		case StateSessionEstablished:
+			cm.Metrics.setLoginTime()
+		case StateDisconnected:
+			// Reconnect on disconnection
+			cm.connect()
+		}
+	}
+	cm.connect()
+}
+
+// Stop cancels pending operations and terminates existing XMPP client.
+func (cm *ClientManager) Stop() {
+	// Remove on disconnect handler to avoid triggering reconnect
+	cm.Client.Handler = nil
+	cm.Client.Disconnect()
+}
+
+// connect manages the reconnection loop and apply the define backoff to avoid overloading the server.
+func (cm *ClientManager) connect() {
+	var backoff Backoff // TODO: Group backoff calculation features with connection manager?
+
+	for {
+		var err error
+		cm.Metrics = initMetrics()
+
+		if cm.Client.Session, err = cm.Client.Connect(); err != nil {
+			log.Printf("Connection error: %v\n", err)
+			backoff.Wait()
+		} else {
+			break
+		}
+	}
+
+	if cm.PostConnect != nil {
+		cm.PostConnect(cm.Client)
+	}
+}
+
+// Client Metrics
+// ============================================================================
+
+type Metrics struct {
+	startTime time.Time
+	// ConnectTime returns the duration between client initiation of the TCP/IP
+	// connection to the server and actual TCP/IP session establishment.
+	// This time includes DNS resolution and can be slightly higher if the DNS
+	// resolution result was not in cache.
+	ConnectTime time.Duration
+	// LoginTime returns the between client initiation of the TCP/IP
+	// connection to the server and the return of the login result.
+	// This includes ConnectTime, but also XMPP level protocol negociation
+	// like starttls.
+	LoginTime time.Duration
+}
+
+// initMetrics set metrics with default value and define the starting point
+// for duration calculation (connect time, login time, etc).
+func initMetrics() *Metrics {
+	return &Metrics{
+		startTime: time.Now(),
+	}
+}
+
+func (m *Metrics) setConnectTime() {
+	m.ConnectTime = time.Since(m.startTime)
+}
+
+func (m *Metrics) setLoginTime() {
+	m.LoginTime = time.Since(m.startTime)
+}
diff --git a/cmd/xmpp_echo/xmpp_echo.go b/cmd/xmpp_echo/xmpp_echo.go
index ab0a367..c132ffa 100644
--- a/cmd/xmpp_echo/xmpp_echo.go
+++ b/cmd/xmpp_echo/xmpp_echo.go
@@ -26,17 +26,14 @@ func main() {
 		log.Fatal("Error: ", err)
 	}
 
-	session, err := client.Connect()
-	if err != nil {
-		log.Fatal("Error: ", err)
-	}
-
-	fmt.Println("Stream opened, we have streamID = ", session.StreamId)
+	cm := xmpp.NewClientManager(client, nil)
+	cm.Start()
+	// connection can be stopped with cm.Stop().
 
 	// Iterator to receive packets coming from our XMPP connection
 	for packet := range client.Recv() {
 		switch packet := packet.(type) {
-		case *xmpp.Message:
+		case xmpp.Message:
 			_, _ = fmt.Fprintf(os.Stdout, "Body = %s - from = %s\n", packet.Body, packet.From)
 			reply := xmpp.Message{PacketAttrs: xmpp.PacketAttrs{To: packet.From}, Body: packet.Body}
 			_ = client.Send(reply)
@@ -47,6 +44,4 @@ func main() {
 }
 
 // TODO create default command line client to send message or to send an arbitrary XMPP sequence from a file,
-// (using templates ?)
-
-// TODO: autoreconnect when connection is lost
+//   (using templates ?)
diff --git a/cmd/xmpp_jukebox/xmpp_jukebox.go b/cmd/xmpp_jukebox/xmpp_jukebox.go
index a60e164..b020399 100644
--- a/cmd/xmpp_jukebox/xmpp_jukebox.go
+++ b/cmd/xmpp_jukebox/xmpp_jukebox.go
@@ -101,8 +101,7 @@ func playSCURL(p *mpg123.Player, rawURL string) {
 
 func connectXmpp(jid string, password string, address string) (client *xmpp.Client, err error) {
 	xmppConfig := xmpp.Config{Address: address,
-		Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true,
-		Retry: 10}
+		Jid: jid, Password: password, PacketLogger: os.Stdout, Insecure: true}
 
 	if client, err = xmpp.NewClient(xmppConfig); err != nil {
 		return
diff --git a/config.go b/config.go
index e49ab5b..79770b1 100644
--- a/config.go
+++ b/config.go
@@ -12,7 +12,6 @@ type Config struct {
 	Password       string
 	PacketLogger   *os.File // Used for debugging
 	Lang           string   // TODO: should default to 'en'
-	Retry          int      // Number of retries for connect
 	ConnectTimeout int      // Connection timeout in seconds. Default to 15
 	// Insecure can be set to true to allow to open a session without TLS. If TLS
 	// is supported on the server, we will still try to use it.
diff --git a/parser.go b/parser.go
index 0ad24fc..ddcd526 100644
--- a/parser.go
+++ b/parser.go
@@ -39,7 +39,6 @@ func initDecoder(p *xml.Decoder) (sessionID string, err error) {
 			return
 		}
 	}
-	panic("unreachable")
 }
 
 // Scan XML token stream to find next StartElement.
@@ -47,7 +46,7 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
 	for {
 		t, err := p.Token()
 		if err == io.EOF {
-			return xml.StartElement{}, nil
+			return xml.StartElement{}, errors.New("connection closed")
 		}
 		if err != nil {
 			return xml.StartElement{}, fmt.Errorf("nextStart %s", err)
@@ -57,7 +56,6 @@ func nextStart(p *xml.Decoder) (xml.StartElement, error) {
 			return t, nil
 		}
 	}
-	panic("unreachable")
 }
 
 // next scans XML token stream for next element and then assign a structure to decode