forked from jshiffer/go-xmpp
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe4c366de8 | ||
|
|
0a4acd12c3 | ||
|
|
ef6de6000a | ||
|
|
d4960afc97 | ||
|
|
6e84084bb3 | ||
|
|
64e54134df | ||
|
|
f1331dcebc |
@@ -42,12 +42,12 @@ func (scs *SyncConnState) setState(cs ConnState) {
|
||||
// This is a the list of events happening on the connection that the
|
||||
// client can be notified about.
|
||||
const (
|
||||
InitialPresence = "<presence/>"
|
||||
StateDisconnected ConnState = iota
|
||||
StateResuming
|
||||
StateSessionEstablished
|
||||
StateStreamError
|
||||
StatePermanentError
|
||||
InitialPresence = "<presence/>"
|
||||
)
|
||||
|
||||
// Event is a structure use to convey event changes related to client state. This
|
||||
@@ -285,7 +285,7 @@ func (c *Client) Resume() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Execute post reconnect hook. This can be different from the first connection hook, and not trigger roster retrival
|
||||
// Execute post reconnect hook. This can be different from the first connection hook, and not trigger roster retrieval
|
||||
// for example.
|
||||
if c.PostResumeHook != nil {
|
||||
err = c.PostResumeHook()
|
||||
@@ -379,11 +379,12 @@ func (c *Client) sendWithWriter(writer io.Writer, packet []byte) error {
|
||||
|
||||
// Loop: Receive data from server
|
||||
func (c *Client) recv(keepaliveQuit chan<- struct{}) {
|
||||
defer close(keepaliveQuit)
|
||||
|
||||
for {
|
||||
val, err := stanza.NextPacket(c.transport.GetDecoder())
|
||||
if err != nil {
|
||||
c.ErrorHandler(err)
|
||||
close(keepaliveQuit)
|
||||
c.disconnected(c.Session.SMState)
|
||||
return
|
||||
}
|
||||
@@ -392,7 +393,6 @@ func (c *Client) recv(keepaliveQuit chan<- struct{}) {
|
||||
switch packet := val.(type) {
|
||||
case stanza.StreamError:
|
||||
c.router.route(c, val)
|
||||
close(keepaliveQuit)
|
||||
c.streamError(packet.Error.Local, packet.Text)
|
||||
c.ErrorHandler(errors.New("stream error: " + packet.Error.Local))
|
||||
// We don't return here, because we want to wait for the stream close tag from the server, or timeout.
|
||||
|
||||
@@ -80,13 +80,13 @@ func (sm *StreamManager) Run() error {
|
||||
sm.Metrics.setLoginTime()
|
||||
case StateDisconnected:
|
||||
// Reconnect on disconnection
|
||||
return sm.resume(e.SMState)
|
||||
return sm.resume()
|
||||
case StateStreamError:
|
||||
sm.client.Disconnect()
|
||||
// Only try reconnecting if we have not been kicked by another session to avoid connection loop.
|
||||
// TODO: Make this conflict exception a permanent error
|
||||
if e.StreamError != "conflict" {
|
||||
return sm.connect()
|
||||
return sm.resume()
|
||||
}
|
||||
case StatePermanentError:
|
||||
// Do not attempt to reconnect
|
||||
@@ -113,19 +113,32 @@ func (sm *StreamManager) Stop() {
|
||||
}
|
||||
|
||||
func (sm *StreamManager) connect() error {
|
||||
var state SMState
|
||||
return sm.resume(state)
|
||||
if sm.client != nil {
|
||||
if c, ok := sm.client.(*Client); ok {
|
||||
if c.CurrentState.getState() == StateDisconnected {
|
||||
sm.Metrics = initMetrics()
|
||||
err := c.Connect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if sm.PostConnect != nil {
|
||||
sm.PostConnect(sm.client)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return errors.New("client is not disconnected")
|
||||
}
|
||||
|
||||
// resume manages the reconnection loop and apply the define backoff to avoid overloading the server.
|
||||
func (sm *StreamManager) resume(state SMState) error {
|
||||
func (sm *StreamManager) resume() error {
|
||||
var backoff backoff // TODO: Group backoff calculation features with connection manager?
|
||||
|
||||
for {
|
||||
var err error
|
||||
// TODO: Make it possible to define logger to log disconnect and reconnection attempts
|
||||
sm.Metrics = initMetrics()
|
||||
|
||||
if err = sm.client.Resume(); err != nil {
|
||||
var actualErr ConnError
|
||||
if xerrors.As(err, &actualErr) {
|
||||
|
||||
@@ -93,6 +93,7 @@ func (t *XMPPTransport) StartTLS() error {
|
||||
return err
|
||||
}
|
||||
|
||||
t.isSecure = false
|
||||
t.conn = tlsConn
|
||||
t.readWriter = newStreamLogger(tlsConn, t.logFile)
|
||||
t.decoder = xml.NewDecoder(bufio.NewReaderSize(t.readWriter, maxPacketSize))
|
||||
|
||||
Reference in New Issue
Block a user