mirror of
https://github.com/FluuxIO/go-xmpp.git
synced 2026-02-19 11:52:57 -08:00
Added callback to process errors after connection.
Added tests and refactored a bit.
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"encoding/xml"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"gosrc.io/xmpp/stanza"
|
||||
"net"
|
||||
"strings"
|
||||
@@ -15,19 +16,7 @@ import (
|
||||
// Tests are ran in parallel, so each test creating a server must use a different port so we do not get any
|
||||
// conflict. Using iota for this should do the trick.
|
||||
const (
|
||||
testComponentDomain = "localhost"
|
||||
defaultServerName = "testServer"
|
||||
defaultStreamID = "91bd0bba-012f-4d92-bb17-5fc41e6fe545"
|
||||
defaultComponentName = "Test Component"
|
||||
|
||||
// Default port is not standard XMPP port to avoid interfering
|
||||
// with local running XMPP server
|
||||
testHandshakePort = iota + 15222
|
||||
testDecoderPort
|
||||
testSendIqPort
|
||||
testSendRawPort
|
||||
testDisconnectPort
|
||||
testSManDisconnectPort
|
||||
defaultChannelTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
func TestHandshake(t *testing.T) {
|
||||
@@ -48,16 +37,14 @@ func TestHandshake(t *testing.T) {
|
||||
|
||||
// Tests connection process with a handshake exchange
|
||||
// Tests multiple session IDs. All connections should generate a unique stream ID
|
||||
func TestGenerateHandshake(t *testing.T) {
|
||||
func TestGenerateHandshakeId(t *testing.T) {
|
||||
// Using this array with a channel to make a queue of values to test
|
||||
// These are stream IDs that will be used to test the connection process, mixing them with the "secret" to generate
|
||||
// some handshake value
|
||||
var uuidsArray = [5]string{
|
||||
"cc9b3249-9582-4780-825f-4311b42f9b0e",
|
||||
"bba8be3c-d98e-4e26-b9bb-9ed34578a503",
|
||||
"dae72822-80e8-496b-b763-ab685f53a188",
|
||||
"a45d6c06-de49-4bb0-935b-1a2201b71028",
|
||||
"7dc6924f-0eca-4237-9898-18654b8d891e",
|
||||
var uuidsArray = [5]string{}
|
||||
for i := 1; i < len(uuidsArray); i++ {
|
||||
id, _ := uuid.NewRandom()
|
||||
uuidsArray[i] = id.String()
|
||||
}
|
||||
|
||||
// Channel to pass stream IDs as a queue
|
||||
@@ -95,7 +82,7 @@ func TestGenerateHandshake(t *testing.T) {
|
||||
Type: "service",
|
||||
}
|
||||
router := NewRouter()
|
||||
c, err := NewComponent(opts, router)
|
||||
c, err := NewComponent(opts, router, componentDefaultErrorHandler)
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
@@ -126,7 +113,7 @@ func TestStreamManager(t *testing.T) {
|
||||
// The decoder is expected to be built after a valid connection
|
||||
// Based on the xmpp_component example.
|
||||
func TestDecoder(t *testing.T) {
|
||||
c, _ := mockConnection(t, testDecoderPort, handlerForComponentHandshakeDefaultID)
|
||||
c, _ := mockComponentConnection(t, testDecoderPort, handlerForComponentHandshakeDefaultID)
|
||||
if c.transport.GetDecoder() == nil {
|
||||
t.Errorf("Failed to initialize decoder. Decoder is nil.")
|
||||
}
|
||||
@@ -134,39 +121,103 @@ func TestDecoder(t *testing.T) {
|
||||
|
||||
// Tests sending an IQ to the server, and getting the response
|
||||
func TestSendIq(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
h := func(t *testing.T, c net.Conn) {
|
||||
handlerForComponentIQSend(t, c)
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
//Connecting to a mock server, initialized with given port and handler function
|
||||
c, m := mockConnection(t, testSendIqPort, handlerForComponentIQSend)
|
||||
c, m := mockComponentConnection(t, testSendIqPort, h)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
iqReq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeGet, From: "test1@localhost/mremond-mbp", To: defaultServerName, Id: defaultStreamID, Lang: "en"})
|
||||
disco := iqReq.DiscoInfo()
|
||||
iqReq.Payload = disco
|
||||
|
||||
// Handle a possible error
|
||||
errChan := make(chan error)
|
||||
errorHandler := func(err error) {
|
||||
errChan <- err
|
||||
}
|
||||
c.ErrorHandler = errorHandler
|
||||
|
||||
var res chan stanza.IQ
|
||||
res, _ = c.SendIQ(ctx, iqReq)
|
||||
|
||||
select {
|
||||
case <-res:
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
case err := <-errChan:
|
||||
t.Errorf(err.Error())
|
||||
case <-time.After(defaultChannelTimeout):
|
||||
t.Errorf("Failed to receive response, to sent IQ, from mock server")
|
||||
}
|
||||
|
||||
m.Stop()
|
||||
select {
|
||||
case <-done:
|
||||
m.Stop()
|
||||
case <-time.After(defaultChannelTimeout):
|
||||
t.Errorf("The mock server failed to finish its job !")
|
||||
}
|
||||
}
|
||||
|
||||
// Checking that error handling is done properly client side when an invalid IQ is sent and the server responds in kind.
|
||||
func TestSendIqFail(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
h := func(t *testing.T, c net.Conn) {
|
||||
handlerForComponentIQSend(t, c)
|
||||
done <- struct{}{}
|
||||
}
|
||||
//Connecting to a mock server, initialized with given port and handler function
|
||||
c, m := mockComponentConnection(t, testSendIqFailPort, h)
|
||||
|
||||
ctx, _ := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
iqReq := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeGet, From: "test1@localhost/mremond-mbp", To: defaultServerName, Id: defaultStreamID, Lang: "en"})
|
||||
|
||||
// Removing the id to make the stanza invalid. The IQ constructor makes a random one if none is specified
|
||||
// so we need to overwrite it.
|
||||
iqReq.Id = ""
|
||||
disco := iqReq.DiscoInfo()
|
||||
iqReq.Payload = disco
|
||||
|
||||
errChan := make(chan error)
|
||||
errorHandler := func(err error) {
|
||||
errChan <- err
|
||||
}
|
||||
c.ErrorHandler = errorHandler
|
||||
|
||||
var res chan stanza.IQ
|
||||
res, _ = c.SendIQ(ctx, iqReq)
|
||||
|
||||
select {
|
||||
case r := <-res: // Do we get an IQ response from the server ?
|
||||
t.Errorf("We should not be getting an IQ response here : this should fail !")
|
||||
fmt.Println(r)
|
||||
case <-errChan: // Do we get a stream error from the server ?
|
||||
// If we get an error from the server, the test passes.
|
||||
case <-time.After(defaultChannelTimeout): // Timeout ?
|
||||
t.Errorf("Failed to receive response, to sent IQ, from mock server")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
m.Stop()
|
||||
case <-time.After(defaultChannelTimeout):
|
||||
t.Errorf("The mock server failed to finish its job !")
|
||||
}
|
||||
}
|
||||
|
||||
// Tests sending raw xml to the mock server.
|
||||
// TODO : check the server response client side ?
|
||||
// Right now, the server response is not checked and an err is passed in a channel if the test is supposed to err.
|
||||
// In this test, we use IQs
|
||||
func TestSendRaw(t *testing.T) {
|
||||
// Error channel for the handler
|
||||
errChan := make(chan error)
|
||||
done := make(chan struct{})
|
||||
// Handler for the mock server
|
||||
h := func(t *testing.T, c net.Conn) {
|
||||
// Completes the connection by exchanging handshakes
|
||||
handlerForComponentHandshakeDefaultID(t, c)
|
||||
receiveRawIq(t, c, errChan)
|
||||
return
|
||||
receiveIq(c, xml.NewDecoder(c))
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
@@ -185,12 +236,19 @@ func TestSendRaw(t *testing.T) {
|
||||
shouldErr: true,
|
||||
}
|
||||
|
||||
// A handler for the component.
|
||||
// In the failing test, the server returns a stream error, which triggers this handler, component side.
|
||||
errChan := make(chan error)
|
||||
errHandler := func(err error) {
|
||||
errChan <- err
|
||||
}
|
||||
|
||||
// Tests for all the IQs
|
||||
for name, tcase := range testRequests {
|
||||
t.Run(name, func(st *testing.T) {
|
||||
//Connecting to a mock server, initialized with given port and handler function
|
||||
c, m := mockConnection(t, testSendRawPort, h)
|
||||
|
||||
c, m := mockComponentConnection(t, testSendRawPort, h)
|
||||
c.ErrorHandler = errHandler
|
||||
// Sending raw xml from test case
|
||||
err := c.SendRaw(tcase.req)
|
||||
if err != nil {
|
||||
@@ -198,21 +256,29 @@ func TestSendRaw(t *testing.T) {
|
||||
}
|
||||
// Just wait a little so the message has time to arrive
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
// We don't use the default "long" timeout here because waiting it out means passing the test.
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case err = <-errChan:
|
||||
if err == nil && tcase.shouldErr {
|
||||
t.Errorf("Failed to get closing stream err")
|
||||
} else if err != nil && !tcase.shouldErr {
|
||||
t.Errorf("This test is not supposed to err ! => %s", err.Error())
|
||||
}
|
||||
}
|
||||
c.transport.Close()
|
||||
m.Stop()
|
||||
select {
|
||||
case <-done:
|
||||
m.Stop()
|
||||
case <-time.After(defaultChannelTimeout):
|
||||
t.Errorf("The mock server failed to finish its job !")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the Disconnect method for Components
|
||||
func TestDisconnect(t *testing.T) {
|
||||
c, m := mockConnection(t, testDisconnectPort, handlerForComponentHandshakeDefaultID)
|
||||
c, m := mockComponentConnection(t, testDisconnectPort, handlerForComponentHandshakeDefaultID)
|
||||
err := c.transport.Ping()
|
||||
if err != nil {
|
||||
t.Errorf("Could not ping but not disconnected yet")
|
||||
@@ -257,14 +323,97 @@ func TestStreamManagerDisconnect(t *testing.T) {
|
||||
|
||||
//=============================================================================
|
||||
// Basic XMPP Server Mock Handlers.
|
||||
// Performs a Component connection with a handshake. It uses a default ID defined in this file as a constant.
|
||||
// Used in the mock server as a Handler
|
||||
func handlerForComponentHandshakeDefaultID(t *testing.T, c net.Conn) {
|
||||
decoder := xml.NewDecoder(c)
|
||||
checkOpenStreamHandshakeDefaultID(t, c, decoder)
|
||||
readHandshakeComponent(t, decoder)
|
||||
fmt.Fprintln(c, "<handshake/>") // That's all the server needs to return (see xep-0114)
|
||||
return
|
||||
|
||||
//===============================
|
||||
// Init mock server and connection
|
||||
// Creating a mock server and connecting a Component to it. Initialized with given port and handler function
|
||||
// The Component and mock are both returned
|
||||
func mockComponentConnection(t *testing.T, port int, handler func(t *testing.T, c net.Conn)) (*Component, *ServerMock) {
|
||||
// Init mock server
|
||||
testComponentAddress := fmt.Sprintf("%s:%d", testComponentDomain, port)
|
||||
mock := ServerMock{}
|
||||
mock.Start(t, testComponentAddress, handler)
|
||||
|
||||
//==================================
|
||||
// Create Component to connect to it
|
||||
c := makeBasicComponent(defaultComponentName, testComponentAddress, t)
|
||||
|
||||
//========================================
|
||||
// Connect the new Component to the server
|
||||
err := c.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
|
||||
return c, &mock
|
||||
}
|
||||
|
||||
func makeBasicComponent(name string, mockServerAddr string, t *testing.T) *Component {
|
||||
opts := ComponentOptions{
|
||||
TransportConfiguration: TransportConfiguration{
|
||||
Address: mockServerAddr,
|
||||
Domain: "localhost",
|
||||
},
|
||||
Domain: testComponentDomain,
|
||||
Secret: "mypass",
|
||||
Name: name,
|
||||
Category: "gateway",
|
||||
Type: "service",
|
||||
}
|
||||
router := NewRouter()
|
||||
c, err := NewComponent(opts, router, componentDefaultErrorHandler)
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
c.transport, err = NewComponentTransport(c.ComponentOptions.TransportConfiguration)
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// This really should not be used as is.
|
||||
// It's just meant to be a placeholder when error handling is not needed at this level
|
||||
func componentDefaultErrorHandler(err error) {
|
||||
|
||||
}
|
||||
|
||||
// Sends IQ response to Component request.
|
||||
// No parsing of the request here. We just check that it's valid, and send the default response.
|
||||
func handlerForComponentIQSend(t *testing.T, c net.Conn) {
|
||||
// Completes the connection by exchanging handshakes
|
||||
handlerForComponentHandshakeDefaultID(t, c)
|
||||
respondToIQ(t, c)
|
||||
}
|
||||
|
||||
// Used for ID and handshake related tests
|
||||
func checkOpenStreamHandshakeID(t *testing.T, c net.Conn, decoder *xml.Decoder, streamID string) {
|
||||
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
defer c.SetDeadline(time.Time{})
|
||||
|
||||
for { // TODO clean up. That for loop is not elegant and I prefer bounded recursion.
|
||||
token, err := decoder.Token()
|
||||
if err != nil {
|
||||
t.Errorf("cannot read next token: %s", err)
|
||||
}
|
||||
|
||||
switch elem := token.(type) {
|
||||
// Wait for first startElement
|
||||
case xml.StartElement:
|
||||
if elem.Name.Space != stanza.NSStream || elem.Name.Local != "stream" {
|
||||
err = errors.New("xmpp: expected <stream> but got <" + elem.Name.Local + "> in " + elem.Name.Space)
|
||||
return
|
||||
}
|
||||
if _, err := fmt.Fprintf(c, serverStreamOpen, "localhost", streamID, stanza.NSComponent, stanza.NSStream); err != nil {
|
||||
t.Errorf("cannot write server stream open: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkOpenStreamHandshakeDefaultID(t *testing.T, c net.Conn, decoder *xml.Decoder) {
|
||||
checkOpenStreamHandshakeID(t, c, decoder, defaultStreamID)
|
||||
}
|
||||
|
||||
// Performs a Component connection with a handshake. It uses a default ID defined in this file as a constant.
|
||||
@@ -303,152 +452,12 @@ func readHandshakeComponent(t *testing.T, decoder *xml.Decoder) {
|
||||
}
|
||||
}
|
||||
|
||||
func checkOpenStreamHandshakeDefaultID(t *testing.T, c net.Conn, decoder *xml.Decoder) {
|
||||
checkOpenStreamHandshakeID(t, c, decoder, defaultStreamID)
|
||||
}
|
||||
|
||||
// Used for ID and handshake related tests
|
||||
func checkOpenStreamHandshakeID(t *testing.T, c net.Conn, decoder *xml.Decoder, streamID string) {
|
||||
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
defer c.SetDeadline(time.Time{})
|
||||
|
||||
for { // TODO clean up. That for loop is not elegant and I prefer bounded recursion.
|
||||
token, err := decoder.Token()
|
||||
if err != nil {
|
||||
t.Errorf("cannot read next token: %s", err)
|
||||
}
|
||||
|
||||
switch elem := token.(type) {
|
||||
// Wait for first startElement
|
||||
case xml.StartElement:
|
||||
if elem.Name.Space != stanza.NSStream || elem.Name.Local != "stream" {
|
||||
err = errors.New("xmpp: expected <stream> but got <" + elem.Name.Local + "> in " + elem.Name.Space)
|
||||
return
|
||||
}
|
||||
if _, err := fmt.Fprintf(c, serverStreamOpen, "localhost", streamID, stanza.NSComponent, stanza.NSStream); err != nil {
|
||||
t.Errorf("cannot write server stream open: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//=============================================================================
|
||||
// Sends IQ response to Component request.
|
||||
// No parsing of the request here. We just check that it's valid, and send the default response.
|
||||
func handlerForComponentIQSend(t *testing.T, c net.Conn) {
|
||||
// Completes the connection by exchanging handshakes
|
||||
handlerForComponentHandshakeDefaultID(t, c)
|
||||
|
||||
// Decoder to parse the request
|
||||
// Performs a Component connection with a handshake. It uses a default ID defined in this file as a constant.
|
||||
// Used in the mock server as a Handler
|
||||
func handlerForComponentHandshakeDefaultID(t *testing.T, c net.Conn) {
|
||||
decoder := xml.NewDecoder(c)
|
||||
|
||||
iqReq, err := receiveIq(t, c, decoder)
|
||||
if err != nil {
|
||||
t.Errorf("Error receiving the IQ stanza : %v", err)
|
||||
} else if !iqReq.IsValid() {
|
||||
t.Errorf("server received an IQ stanza : %v", iqReq)
|
||||
}
|
||||
|
||||
// Crafting response
|
||||
iqResp := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: iqReq.To, To: iqReq.From, Id: iqReq.Id, Lang: "en"})
|
||||
disco := iqResp.DiscoInfo()
|
||||
disco.AddFeatures("vcard-temp",
|
||||
`http://jabber.org/protocol/address`)
|
||||
|
||||
disco.AddIdentity("Multicast", "service", "multicast")
|
||||
iqResp.Payload = disco
|
||||
|
||||
// Sending response to the Component
|
||||
mResp, err := xml.Marshal(iqResp)
|
||||
_, err = fmt.Fprintln(c, string(mResp))
|
||||
if err != nil {
|
||||
t.Errorf("Could not send response stanza : %s", err)
|
||||
}
|
||||
checkOpenStreamHandshakeDefaultID(t, c, decoder)
|
||||
readHandshakeComponent(t, decoder)
|
||||
fmt.Fprintln(c, "<handshake/>") // That's all the server needs to return (see xep-0114)
|
||||
return
|
||||
}
|
||||
|
||||
// Reads next request coming from the Component. Expecting it to be an IQ request
|
||||
func receiveIq(t *testing.T, c net.Conn, decoder *xml.Decoder) (stanza.IQ, error) {
|
||||
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
defer c.SetDeadline(time.Time{})
|
||||
var iqStz stanza.IQ
|
||||
err := decoder.Decode(&iqStz)
|
||||
if err != nil {
|
||||
t.Errorf("cannot read the received IQ stanza: %s", err)
|
||||
}
|
||||
if !iqStz.IsValid() {
|
||||
t.Errorf("received IQ stanza is invalid : %s", err)
|
||||
}
|
||||
return iqStz, nil
|
||||
}
|
||||
|
||||
func receiveRawIq(t *testing.T, c net.Conn, errChan chan error) {
|
||||
c.SetDeadline(time.Now().Add(defaultTimeout))
|
||||
defer c.SetDeadline(time.Time{})
|
||||
decoder := xml.NewDecoder(c)
|
||||
var iq stanza.IQ
|
||||
err := decoder.Decode(&iq)
|
||||
if err != nil || !iq.IsValid() {
|
||||
s := stanza.StreamError{
|
||||
XMLName: xml.Name{Local: "stream:error"},
|
||||
Error: xml.Name{Local: "xml-not-well-formed"},
|
||||
Text: `XML was not well-formed`,
|
||||
}
|
||||
raw, _ := xml.Marshal(s)
|
||||
fmt.Fprintln(c, string(raw))
|
||||
fmt.Fprintln(c, `</stream:stream>`) // TODO : check this client side
|
||||
errChan <- fmt.Errorf("invalid xml")
|
||||
return
|
||||
}
|
||||
errChan <- nil
|
||||
return
|
||||
}
|
||||
|
||||
//===============================
|
||||
// Init mock server and connection
|
||||
// Creating a mock server and connecting a Component to it. Initialized with given port and handler function
|
||||
// The Component and mock are both returned
|
||||
func mockConnection(t *testing.T, port int, handler func(t *testing.T, c net.Conn)) (*Component, *ServerMock) {
|
||||
// Init mock server
|
||||
testComponentAddress := fmt.Sprintf("%s:%d", testComponentDomain, port)
|
||||
mock := ServerMock{}
|
||||
mock.Start(t, testComponentAddress, handler)
|
||||
|
||||
//==================================
|
||||
// Create Component to connect to it
|
||||
c := makeBasicComponent(defaultComponentName, testComponentAddress, t)
|
||||
|
||||
//========================================
|
||||
// Connect the new Component to the server
|
||||
err := c.Connect()
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
|
||||
return c, &mock
|
||||
}
|
||||
|
||||
func makeBasicComponent(name string, mockServerAddr string, t *testing.T) *Component {
|
||||
opts := ComponentOptions{
|
||||
TransportConfiguration: TransportConfiguration{
|
||||
Address: mockServerAddr,
|
||||
Domain: "localhost",
|
||||
},
|
||||
Domain: testComponentDomain,
|
||||
Secret: "mypass",
|
||||
Name: name,
|
||||
Category: "gateway",
|
||||
Type: "service",
|
||||
}
|
||||
router := NewRouter()
|
||||
c, err := NewComponent(opts, router)
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
c.transport, err = NewComponentTransport(c.ComponentOptions.TransportConfiguration)
|
||||
if err != nil {
|
||||
t.Errorf("%+v", err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user