Refactor tests

This commit is contained in:
rcorniere
2020-03-06 16:44:01 +01:00
parent 22ba8d1f4e
commit e59a86c380
26 changed files with 1737 additions and 129 deletions
+121 -40
View File
@@ -6,6 +6,7 @@ import (
"errors"
"io"
"net"
"sync"
"time"
"gosrc.io/xmpp/stanza"
@@ -14,15 +15,36 @@ import (
//=============================================================================
// EventManager
// ConnState represents the current connection state.
// SyncConnState represents the current connection state.
type SyncConnState struct {
sync.RWMutex
// Current state of the client. Please use the dedicated getter and setter for this field as they are thread safe.
state ConnState
}
type ConnState = uint8
// getState is a thread-safe getter for the current state
func (scs *SyncConnState) getState() ConnState {
var res ConnState
scs.RLock()
res = scs.state
scs.RUnlock()
return res
}
// setState is a thread-safe setter for the current
func (scs *SyncConnState) setState(cs ConnState) {
scs.Lock()
scs.state = cs
scs.Unlock()
}
// This is a the list of events happening on the connection that the
// client can be notified about.
const (
InitialPresence = "<presence/>"
StateDisconnected ConnState = iota
StateConnected
StateResuming
StateSessionEstablished
StateStreamError
StatePermanentError
@@ -31,7 +53,7 @@ const (
// Event is a structure use to convey event changes related to client state. This
// is for example used to notify the client when the client get disconnected.
type Event struct {
State ConnState
State SyncConnState
Description string
StreamError string
SMState SMState
@@ -44,7 +66,16 @@ type SMState struct {
Id string
// Inbound stanza count
Inbound uint
// TODO Store location for IP affinity
// IP affinity
preferredReconAddr string
// Error
StreamErrorGroup stanza.StanzaErrorGroup
// Track sent stanzas
*stanza.UnAckQueue
// TODO Store max and timestamp, to check if we should retry resumption or not
}
@@ -53,29 +84,35 @@ type SMState struct {
type EventHandler func(Event) error
type EventManager struct {
// Store current state
CurrentState ConnState
// Store current state. Please use "getState" and "setState" to access and/or modify this.
CurrentState SyncConnState
// Callback used to propagate connection state changes
Handler EventHandler
}
// updateState changes the CurrentState in the event manager. The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) updateState(state ConnState) {
em.CurrentState = state
em.CurrentState.setState(state)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState})
}
}
// disconnected changes the CurrentState in the event manager to "disconnected". The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) disconnected(state SMState) {
em.CurrentState = StateDisconnected
em.CurrentState.setState(StateDisconnected)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, SMState: state})
}
}
// streamError changes the CurrentState in the event manager to "streamError". The state read is threadsafe but there is no guarantee
// regarding the triggered callback function.
func (em *EventManager) streamError(error, desc string) {
em.CurrentState = StateStreamError
em.CurrentState.setState(StateStreamError)
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, StreamError: error, Description: desc})
}
@@ -90,7 +127,7 @@ var ErrCanOnlySendGetOrSetIq = errors.New("SendIQ can only send get and set IQ s
// server.
type Client struct {
// Store user defined options and states
config Config
config *Config
// Session gather data that can be accessed by users of this library
Session *Session
transport Transport
@@ -100,6 +137,12 @@ type Client struct {
EventManager
// Handle errors from client execution
ErrorHandler func(error)
// Post connection hook. This will be executed on first connection
PostFirstConnHook func() error
// Post resume hook. This will be executed after the client resumes a lost connection using StreamManagement (XEP-0198)
PostReconnectHook func() error
}
/*
@@ -107,9 +150,9 @@ Setting up the client / Checking the parameters
*/
// NewClient generates a new XMPP client, based on Config passed as parameters.
// If host is not specified, the DNS SRV should be used to find the host from the domainpart of the Jid.
// If host is not specified, the DNS SRV should be used to find the host from the domain part of the Jid.
// Default the port to 5222.
func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, err error) {
func NewClient(config *Config, r *Router, errorHandler func(error)) (c *Client, err error) {
if config.KeepaliveInterval == 0 {
config.KeepaliveInterval = time.Second * 30
}
@@ -169,26 +212,45 @@ func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, e
return
}
// Connect triggers actual TCP connection, based on previously defined parameters.
// Connect simply triggers resumption, with an empty session state.
// Connect establishes a first time connection to a XMPP server.
// It calls the PostFirstConnHook
func (c *Client) Connect() error {
var state SMState
return c.Resume(state)
err := c.connect()
if err != nil {
return err
}
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
err = c.sendWithWriter(c.transport, []byte(InitialPresence))
// Execute the post first connection hook. Typically this holds "ask for roster" and this type of actions.
if c.PostFirstConnHook != nil {
err = c.PostFirstConnHook()
if err != nil {
return err
}
}
// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
// Start the receiver go routine
go c.recv(keepaliveQuit)
return err
}
// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state.
func (c *Client) Resume(state SMState) error {
// connect establishes an actual TCP connection, based on previously defined parameters, as well as a XMPP session
func (c *Client) connect() error {
var state SMState
var err error
// This is the TCP connection
streamId, err := c.transport.Connect()
if err != nil {
return err
}
c.updateState(StateConnected)
// Client is ok, we now open XMPP session
if c.Session, err = NewSession(c.transport, c.config, state); err != nil {
// Client is ok, we now open XMPP session with TLS negotiation if possible and session resume or binding
// depending on state.
if c.Session, err = NewSession(c, state); err != nil {
// Try to get the stream close tag from the server.
go func() {
for {
@@ -212,22 +274,26 @@ func (c *Client) Resume(state SMState) error {
c.Session.StreamId = streamId
c.updateState(StateSessionEstablished)
// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
// Start the receiver go routine
state = c.Session.SMState
go c.recv(state, keepaliveQuit)
// We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
err = c.sendWithWriter(c.transport, []byte(InitialPresence))
return err
}
// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state. See XEP-0198
func (c *Client) Resume() error {
c.EventManager.updateState(StateResuming)
err := c.connect()
if err != nil {
return err
}
// Execute post reconnect hook. This can be different from the first connection hook, and not trigger roster retrival
// for example.
if c.PostReconnectHook != nil {
err = c.PostReconnectHook()
}
return err
}
// Disconnect disconnects the client from the server, sending a stream close nonza and closing the TCP connection.
func (c *Client) Disconnect() error {
if c.transport != nil {
return c.transport.Close()
@@ -252,6 +318,15 @@ func (c *Client) Send(packet stanza.Packet) error {
return errors.New("cannot marshal packet " + err.Error())
}
// Store stanza as non-acked as part of stream management
// See https://xmpp.org/extensions/xep-0198.html#scenarios
if c.config.StreamManagementEnable {
if _, ok := packet.(stanza.SMRequest); !ok {
toStore := stanza.UnAckedStz{Stz: string(data)}
c.Session.SMState.UnAckQueue.Push(&toStore)
}
}
return c.sendWithWriter(c.transport, data)
}
@@ -284,6 +359,12 @@ func (c *Client) SendRaw(packet string) error {
return errors.New("client is not connected")
}
// Store stanza as non-acked as part of stream management
// See https://xmpp.org/extensions/xep-0198.html#scenarios
if c.config.StreamManagementEnable {
toStore := stanza.UnAckedStz{Stz: packet}
c.Session.SMState.UnAckQueue.Push(&toStore)
}
return c.sendWithWriter(c.transport, []byte(packet))
}
@@ -297,13 +378,13 @@ func (c *Client) sendWithWriter(writer io.Writer, packet []byte) error {
// Go routines
// Loop: Receive data from server
func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
func (c *Client) recv(keepaliveQuit chan<- struct{}) {
for {
val, err := stanza.NextPacket(c.transport.GetDecoder())
if err != nil {
c.ErrorHandler(err)
close(keepaliveQuit)
c.disconnected(state)
c.disconnected(c.Session.SMState)
return
}
@@ -321,7 +402,7 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
answer := stanza.SMAnswer{XMLName: xml.Name{
Space: stanza.NSStreamManagement,
Local: "a",
}, H: state.Inbound}
}, H: c.Session.SMState.Inbound}
err = c.Send(answer)
if err != nil {
c.ErrorHandler(err)
@@ -332,7 +413,7 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
c.transport.ReceivedStreamClose()
return
default:
state.Inbound++
c.Session.SMState.Inbound++
}
// Do normal route processing in a go-routine so we can immediately
// start receiving other stanzas. This also allows route handlers to