mirror of
https://github.com/FluuxIO/go-xmpp.git
synced 2024-11-26 20:32:02 -08:00
Merge branch 'Error_Handling' of github.com:remicorniere/go-xmpp into Error_Handling
# Conflicts: # client.go # client_test.go # tcp_server_mock.go
This commit is contained in:
commit
1f5591f33a
20
client.go
20
client.go
@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
@ -60,21 +59,21 @@ type EventManager struct {
|
|||||||
Handler EventHandler
|
Handler EventHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em EventManager) updateState(state ConnState) {
|
func (em *EventManager) updateState(state ConnState) {
|
||||||
em.CurrentState = state
|
em.CurrentState = state
|
||||||
if em.Handler != nil {
|
if em.Handler != nil {
|
||||||
em.Handler(Event{State: em.CurrentState})
|
em.Handler(Event{State: em.CurrentState})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em EventManager) disconnected(state SMState) {
|
func (em *EventManager) disconnected(state SMState) {
|
||||||
em.CurrentState = StateDisconnected
|
em.CurrentState = StateDisconnected
|
||||||
if em.Handler != nil {
|
if em.Handler != nil {
|
||||||
em.Handler(Event{State: em.CurrentState, SMState: state})
|
em.Handler(Event{State: em.CurrentState, SMState: state})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em EventManager) streamError(error, desc string) {
|
func (em *EventManager) streamError(error, desc string) {
|
||||||
em.CurrentState = StateStreamError
|
em.CurrentState = StateStreamError
|
||||||
if em.Handler != nil {
|
if em.Handler != nil {
|
||||||
em.Handler(Event{State: em.CurrentState, StreamError: error, Description: desc})
|
em.Handler(Event{State: em.CurrentState, StreamError: error, Description: desc})
|
||||||
@ -110,6 +109,9 @@ Setting up the client / Checking the parameters
|
|||||||
// If host is not specified, the DNS SRV should be used to find the host from the domainpart of the JID.
|
// If host is not specified, the DNS SRV should be used to find the host from the domainpart of the JID.
|
||||||
// Default the port to 5222.
|
// Default the port to 5222.
|
||||||
func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, err error) {
|
func NewClient(config Config, r *Router, errorHandler func(error)) (c *Client, err error) {
|
||||||
|
if config.KeepaliveInterval == 0 {
|
||||||
|
config.KeepaliveInterval = time.Second * 30
|
||||||
|
}
|
||||||
// Parse JID
|
// Parse JID
|
||||||
if config.parsedJid, err = NewJid(config.Jid); err != nil {
|
if config.parsedJid, err = NewJid(config.Jid); err != nil {
|
||||||
err = errors.New("missing jid")
|
err = errors.New("missing jid")
|
||||||
@ -188,7 +190,7 @@ func (c *Client) Resume(state SMState) error {
|
|||||||
|
|
||||||
// Start the keepalive go routine
|
// Start the keepalive go routine
|
||||||
keepaliveQuit := make(chan struct{})
|
keepaliveQuit := make(chan struct{})
|
||||||
go keepalive(c, keepaliveQuit)
|
go keepalive(c.transport, c.config.KeepaliveInterval, keepaliveQuit)
|
||||||
// Start the receiver go routine
|
// Start the receiver go routine
|
||||||
state = c.Session.SMState
|
state = c.Session.SMState
|
||||||
go c.recv(state, keepaliveQuit)
|
go c.recv(state, keepaliveQuit)
|
||||||
@ -197,7 +199,7 @@ func (c *Client) Resume(state SMState) error {
|
|||||||
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
|
//fmt.Fprintf(client.conn, "<presence xml:lang='en'><show>%s</show><status>%s</status></presence>", "chat", "Online")
|
||||||
// TODO: Do we always want to send initial presence automatically ?
|
// TODO: Do we always want to send initial presence automatically ?
|
||||||
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
|
// Do we need an option to avoid that or do we rely on client to send the presence itself ?
|
||||||
_, err = fmt.Fprintf(c.transport, "<presence/>")
|
err = c.sendWithWriter(c.transport, []byte("<presence/>"))
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -312,10 +314,8 @@ func (c *Client) recv(state SMState, keepaliveQuit chan<- struct{}) {
|
|||||||
// Loop: send whitespace keepalive to server
|
// Loop: send whitespace keepalive to server
|
||||||
// This is use to keep the connection open, but also to detect connection loss
|
// This is use to keep the connection open, but also to detect connection loss
|
||||||
// and trigger proper client connection shutdown.
|
// and trigger proper client connection shutdown.
|
||||||
func keepalive(c *Client, quit <-chan struct{}) {
|
func keepalive(transport Transport, interval time.Duration, quit <-chan struct{}) {
|
||||||
// TODO: Make keepalive interval configurable
|
ticker := time.NewTicker(interval)
|
||||||
transport := c.transport
|
|
||||||
ticker := time.NewTicker(30 * time.Second)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -19,6 +19,24 @@ const (
|
|||||||
testClientDomain = "localhost"
|
testClientDomain = "localhost"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestEventManager(t *testing.T) {
|
||||||
|
mgr := EventManager{}
|
||||||
|
mgr.updateState(StateConnected)
|
||||||
|
if mgr.CurrentState != StateConnected {
|
||||||
|
t.Fatal("CurrentState not updated by updateState()")
|
||||||
|
}
|
||||||
|
|
||||||
|
mgr.disconnected(SMState{})
|
||||||
|
if mgr.CurrentState != StateDisconnected {
|
||||||
|
t.Fatalf("CurrentState not reset by disconnected()")
|
||||||
|
}
|
||||||
|
|
||||||
|
mgr.streamError(ErrTLSNotSupported.Error(), "")
|
||||||
|
if mgr.CurrentState != StateStreamError {
|
||||||
|
t.Fatalf("CurrentState not set by streamError()")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClient_Connect(t *testing.T) {
|
func TestClient_Connect(t *testing.T) {
|
||||||
// Setup Mock server
|
// Setup Mock server
|
||||||
mock := ServerMock{}
|
mock := ServerMock{}
|
||||||
|
12
component.go
12
component.go
@ -85,7 +85,7 @@ func (c *Component) Resume(sm SMState) error {
|
|||||||
c.updateState(StateConnected)
|
c.updateState(StateConnected)
|
||||||
|
|
||||||
// Authentication
|
// Authentication
|
||||||
if _, err := fmt.Fprintf(c.transport, "<handshake>%s</handshake>", c.handshake(streamId)); err != nil {
|
if err := c.sendWithWriter(c.transport, []byte(fmt.Sprintf("<handshake>%s</handshake>", c.handshake(streamId)))); err != nil {
|
||||||
c.updateState(StateStreamError)
|
c.updateState(StateStreamError)
|
||||||
|
|
||||||
return NewConnError(errors.New("cannot send handshake "+err.Error()), false)
|
return NewConnError(errors.New("cannot send handshake "+err.Error()), false)
|
||||||
@ -159,12 +159,18 @@ func (c *Component) Send(packet stanza.Packet) error {
|
|||||||
return errors.New("cannot marshal packet " + err.Error())
|
return errors.New("cannot marshal packet " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := fmt.Fprintf(transport, string(data)); err != nil {
|
if err := c.sendWithWriter(transport, data); err != nil {
|
||||||
return errors.New("cannot send packet " + err.Error())
|
return errors.New("cannot send packet " + err.Error())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Component) sendWithWriter(writer io.Writer, packet []byte) error {
|
||||||
|
var err error
|
||||||
|
_, err = writer.Write(packet)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// SendIQ sends an IQ set or get stanza to the server. If a result is received
|
// SendIQ sends an IQ set or get stanza to the server. If a result is received
|
||||||
// the provided handler function will automatically be called.
|
// the provided handler function will automatically be called.
|
||||||
//
|
//
|
||||||
@ -195,7 +201,7 @@ func (c *Component) SendRaw(packet string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
_, err = fmt.Fprintf(transport, packet)
|
err = c.sendWithWriter(transport, []byte(packet))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package xmpp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config & TransportConfiguration must not be modified after having been passed to NewClient. Any
|
// Config & TransportConfiguration must not be modified after having been passed to NewClient. Any
|
||||||
@ -14,6 +15,7 @@ type Config struct {
|
|||||||
Credential Credential
|
Credential Credential
|
||||||
StreamLogger *os.File // Used for debugging
|
StreamLogger *os.File // Used for debugging
|
||||||
Lang string // TODO: should default to 'en'
|
Lang string // TODO: should default to 'en'
|
||||||
|
KeepaliveInterval time.Duration // Interval between keepalive packets
|
||||||
ConnectTimeout int // Client timeout in seconds. Default to 15
|
ConnectTimeout int // Client timeout in seconds. Default to 15
|
||||||
// Insecure can be set to true to allow to open a session without TLS. If TLS
|
// Insecure can be set to true to allow to open a session without TLS. If TLS
|
||||||
// is supported on the server, we will still try to use it.
|
// is supported on the server, we will still try to use it.
|
||||||
|
@ -117,21 +117,25 @@ func (mock *ServerMock) loop() {
|
|||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
|
|
||||||
func respondToIQ(t *testing.T, c net.Conn) {
|
func respondToIQ(t *testing.T, c net.Conn) {
|
||||||
// Decoder to parse the request
|
recvBuf := make([]byte, 1024)
|
||||||
decoder := xml.NewDecoder(c)
|
var iqR stanza.IQ
|
||||||
|
_, err := c.Read(recvBuf[:]) // recv data
|
||||||
iqReq, err := receiveIq(c, decoder)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to receive IQ : %s", err.Error())
|
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||||
|
t.Errorf("read timeout: %s", err)
|
||||||
|
} else {
|
||||||
|
t.Errorf("read error: %s", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
xml.Unmarshal(recvBuf, &iqR)
|
||||||
|
|
||||||
if !iqReq.IsValid() {
|
if !iqR.IsValid() {
|
||||||
mockIQError(c)
|
mockIQError(c)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Crafting response
|
// Crafting response
|
||||||
iqResp := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: iqReq.To, To: iqReq.From, Id: iqReq.Id, Lang: "en"})
|
iqResp := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: iqR.To, To: iqR.From, Id: iqR.Id, Lang: "en"})
|
||||||
disco := iqResp.DiscoInfo()
|
disco := iqResp.DiscoInfo()
|
||||||
disco.AddFeatures("vcard-temp",
|
disco.AddFeatures("vcard-temp",
|
||||||
`http://jabber.org/protocol/address`)
|
`http://jabber.org/protocol/address`)
|
||||||
|
Loading…
Reference in New Issue
Block a user