Add initial support for stream management

For now it support enabling SM, replying to ack requests from server,
and trying resuming the session with existing Stream Management state.
This commit is contained in:
Mickael Remond
2019-07-31 18:47:30 +02:00
parent e531370dc9
commit 3f48672946
6 changed files with 261 additions and 15 deletions

View File

@@ -31,6 +31,18 @@ type Event struct {
State ConnState
Description string
StreamError string
SMState SMState
}
// SMState holds Stream Management information regarding the session that can be
// used to resume session after disconnect
type SMState struct {
// Stream Management ID
Id string
// Inbound stanza count
Inbound uint
// TODO Store location for IP affinity
// TODO Store max and timestamp, to check if we should retry resumption or not
}
// EventHandler is use to pass events about state of the connection to
@@ -52,6 +64,13 @@ func (em EventManager) updateState(state ConnState) {
}
}
func (em EventManager) disconnected(state SMState) {
em.CurrentState = StateDisconnected
if em.Handler != nil {
em.Handler(Event{State: em.CurrentState, SMState: state})
}
}
func (em EventManager) streamError(error, desc string) {
em.CurrentState = StateStreamError
if em.Handler != nil {
@@ -128,7 +147,15 @@ func NewClient(config Config, r *Router) (c *Client, err error) {
}
// Connect triggers actual TCP connection, based on previously defined parameters.
// Connect simply triggers resumption, with an empty session state.
func (c *Client) Connect() error {
var state SMState
return c.Resume(state)
}
// Resume attempts resuming a Stream Managed session, based on the provided stream management
// state.
func (c *Client) Resume(state SMState) error {
var err error
c.conn, err = net.DialTimeout("tcp", c.config.Address, time.Duration(c.config.ConnectTimeout)*time.Second)
@@ -138,23 +165,24 @@ func (c *Client) Connect() error {
c.updateState(StateConnected)
// Client is ok, we now open XMPP session
if c.conn, c.Session, err = NewSession(c.conn, c.config); err != nil {
if c.conn, c.Session, err = NewSession(c.conn, c.config, state); err != nil {
return err
}
c.updateState(StateSessionEstablished)
// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.conn, keepaliveQuit)
// Start the receiver go routine
state = c.Session.SMState
go c.recv(state, keepaliveQuit)
// We're connected and can now receive and send messages.
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
// TODO: Do we always want to send initial presence automatically ?
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
fmt.Fprintf(c.Session.streamLogger, "<presence/>")
// Start the keepalive go routine
keepaliveQuit := make(chan struct{})
go keepalive(c.conn, keepaliveQuit)
// Start the receiver go routine
go c.recv(keepaliveQuit)
return err
}
@@ -206,12 +234,12 @@ func (c *Client) sendWithLogger(packet string) error {
// Go routines
// Loop: Receive data from server
func (c *Client) recv(keepaliveQuit chan<- struct{}) (err error) {
func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) (err error) {
for {
val, err := stanza.NextPacket(c.Session.decoder)
if err != nil {
close(keepaliveQuit)
c.updateState(StateDisconnected)
c.disconnected(state)
return err
}
@@ -222,6 +250,17 @@ func (c *Client) recv(keepaliveQuit chan<- struct{}) (err error) {
close(keepaliveQuit)
c.streamError(packet.Error.Local, packet.Text)
return errors.New("stream error: " + packet.Error.Local)
// Process Stream management nonzas
case stanza.SMRequest:
fmt.Println("MREMOND: inbound: ", state.Inbound)
answer := stanza.SMAnswer{XMLName: xml.Name{
Space: stanza.NSStreamManagement,
Local: "a",
}, H: state.Inbound}
c.Send(answer)
default:
fmt.Println(packet)
state.Inbound++
}
c.router.route(c, val)
@@ -243,6 +282,9 @@ func keepalive(conn net.Conn, quit <-chan struct{}) {
_ = conn.Close()
return
}
case <-time.After(3 * time.Second):
_ = conn.Close()
return
case <-quit:
ticker.Stop()
return