Fixed decoder usage.

Decoders have internal buffering, and creating many on a single TCP connection can cause issues in parsing exchanged XML documents.
This commit is contained in:
rcorniere
2019-12-10 17:15:16 +01:00
parent fd48f52f3d
commit 3c9b0db5b8
5 changed files with 130 additions and 133 deletions
+49 -49
View File
@@ -41,16 +41,21 @@ const (
// ClientHandler is passed by the test client to provide custom behaviour to
// the TCP server mock. This allows customizing the server behaviour to allow
// testing clients under various scenarii.
type ClientHandler func(t *testing.T, conn net.Conn)
type ClientHandler func(t *testing.T, serverConn *ServerConn)
// ServerMock is a simple TCP server that can be use to mock basic server
// behaviour to test clients.
type ServerMock struct {
t *testing.T
handler ClientHandler
listener net.Listener
connections []net.Conn
done chan struct{}
t *testing.T
handler ClientHandler
listener net.Listener
serverConnections []*ServerConn
done chan struct{}
}
type ServerConn struct {
connection net.Conn
decoder *xml.Decoder
}
// Start launches the mock TCP server, listening to an actual address / port.
@@ -68,9 +73,9 @@ func (mock *ServerMock) Stop() {
if mock.listener != nil {
mock.listener.Close()
}
// Close all existing connections
for _, c := range mock.connections {
c.Close()
// Close all existing serverConnections
for _, c := range mock.serverConnections {
c.connection.Close()
}
}
@@ -90,13 +95,14 @@ func (mock *ServerMock) init(addr string) error {
return nil
}
// loop accepts connections and creates a go routine per connection.
// loop accepts serverConnections and creates a go routine per connection.
// The go routine is running the client handler, that is used to provide the
// real TCP server behaviour.
func (mock *ServerMock) loop() {
listener := mock.listener
for {
conn, err := listener.Accept()
serverConn := &ServerConn{conn, xml.NewDecoder(conn)}
if err != nil {
select {
case <-mock.done:
@@ -106,9 +112,10 @@ func (mock *ServerMock) loop() {
}
return
}
mock.connections = append(mock.connections, conn)
mock.serverConnections = append(mock.serverConnections, serverConn)
// TODO Create and pass a context to cancel the handler if they are still around = avoid possible leak on complex handlers
go mock.handler(mock.t, conn)
go mock.handler(mock.t, serverConn)
}
}
@@ -116,27 +123,20 @@ func (mock *ServerMock) loop() {
// A few functions commonly used for tests. Trying to avoid duplicates in client and component test files.
//======================================================================================================================
func respondToIQ(t *testing.T, c net.Conn) {
recvBuf := make([]byte, 1024)
var iqR stanza.IQ
_, err := c.Read(recvBuf[:]) // recv data
func respondToIQ(t *testing.T, sc *ServerConn) {
// Decoder to parse the request
iqReq, err := receiveIq(sc)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
t.Errorf("read timeout: %s", err)
} else {
t.Errorf("read error: %s", err)
}
t.Fatalf("failed to receive IQ : %s", err.Error())
}
xml.Unmarshal(recvBuf, &iqR)
if !iqR.IsValid() {
mockIQError(c)
if !iqReq.IsValid() {
mockIQError(sc.connection)
return
}
// Crafting response
iqResp := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: iqR.To, To: iqR.From, Id: iqR.Id, Lang: "en"})
iqResp := stanza.NewIQ(stanza.Attrs{Type: stanza.IQTypeResult, From: iqReq.To, To: iqReq.From, Id: iqReq.Id, Lang: "en"})
disco := iqResp.DiscoInfo()
disco.AddFeatures("vcard-temp",
`http://jabber.org/protocol/address`)
@@ -146,7 +146,7 @@ func respondToIQ(t *testing.T, c net.Conn) {
// Sending response to the Component
mResp, err := xml.Marshal(iqResp)
_, err = fmt.Fprintln(c, string(mResp))
_, err = fmt.Fprintln(sc.connection, string(mResp))
if err != nil {
t.Errorf("Could not send response stanza : %s", err)
}
@@ -155,13 +155,13 @@ func respondToIQ(t *testing.T, c net.Conn) {
// When a presence stanza is automatically sent (right now it's the case in the client), we may want to discard it
// and test further stanzas.
func discardPresence(t *testing.T, c net.Conn) {
c.SetDeadline(time.Now().Add(defaultTimeout))
defer c.SetDeadline(time.Time{})
func discardPresence(t *testing.T, sc *ServerConn) {
sc.connection.SetDeadline(time.Now().Add(defaultTimeout))
defer sc.connection.SetDeadline(time.Time{})
var presenceStz stanza.Presence
recvBuf := make([]byte, len(InitialPresence))
_, err := c.Read(recvBuf[:]) // recv data
_, err := sc.connection.Read(recvBuf[:]) // recv data
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
@@ -178,11 +178,11 @@ func discardPresence(t *testing.T, c net.Conn) {
}
// Reads next request coming from the Component. Expecting it to be an IQ request
func receiveIq(c net.Conn, decoder *xml.Decoder) (*stanza.IQ, error) {
c.SetDeadline(time.Now().Add(defaultTimeout))
defer c.SetDeadline(time.Time{})
func receiveIq(sc *ServerConn) (*stanza.IQ, error) {
sc.connection.SetDeadline(time.Now().Add(defaultTimeout))
defer sc.connection.SetDeadline(time.Time{})
var iqStz stanza.IQ
err := decoder.Decode(&iqStz)
err := sc.decoder.Decode(&iqStz)
if err != nil {
return nil, err
}
@@ -202,14 +202,14 @@ func mockIQError(c net.Conn) {
fmt.Fprintln(c, `</stream:stream>`)
}
func sendStreamFeatures(t *testing.T, c net.Conn, _ *xml.Decoder) {
func sendStreamFeatures(t *testing.T, sc *ServerConn) {
// This is a basic server, supporting only 1 stream feature: SASL Plain Auth
features := `<stream:features>
<mechanisms xmlns="urn:ietf:params:xml:ns:xmpp-sasl">
<mechanism>PLAIN</mechanism>
</mechanisms>
</stream:features>`
if _, err := fmt.Fprintln(c, features); err != nil {
if _, err := fmt.Fprintln(sc.connection, features); err != nil {
t.Errorf("cannot send stream feature: %s", err)
}
}
@@ -237,29 +237,29 @@ func readAuth(t *testing.T, decoder *xml.Decoder) string {
return ""
}
func sendBindFeature(t *testing.T, c net.Conn, _ *xml.Decoder) {
func sendBindFeature(t *testing.T, sc *ServerConn) {
// This is a basic server, supporting only 1 stream feature after auth: resource binding
features := `<stream:features>
<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>
</stream:features>`
if _, err := fmt.Fprintln(c, features); err != nil {
if _, err := fmt.Fprintln(sc.connection, features); err != nil {
t.Errorf("cannot send stream feature: %s", err)
}
}
func sendRFC3921Feature(t *testing.T, c net.Conn, _ *xml.Decoder) {
func sendRFC3921Feature(t *testing.T, sc *ServerConn) {
// This is a basic server, supporting only 2 features after auth: resource & session binding
features := `<stream:features>
<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>
<session xmlns='urn:ietf:params:xml:ns:xmpp-session'/>
</stream:features>`
if _, err := fmt.Fprintln(c, features); err != nil {
if _, err := fmt.Fprintln(sc.connection, features); err != nil {
t.Errorf("cannot send stream feature: %s", err)
}
}
func bind(t *testing.T, c net.Conn, decoder *xml.Decoder) {
se, err := stanza.NextStart(decoder)
func bind(t *testing.T, sc *ServerConn) {
se, err := stanza.NextStart(sc.decoder)
if err != nil {
t.Errorf("cannot read bind: %s", err)
return
@@ -267,7 +267,7 @@ func bind(t *testing.T, c net.Conn, decoder *xml.Decoder) {
iq := &stanza.IQ{}
// Decode element into pointer storage
if err = decoder.DecodeElement(&iq, &se); err != nil {
if err = sc.decoder.DecodeElement(&iq, &se); err != nil {
t.Errorf("cannot decode bind iq: %s", err)
return
}
@@ -280,12 +280,12 @@ func bind(t *testing.T, c net.Conn, decoder *xml.Decoder) {
<jid>%s</jid>
</bind>
</iq>`
fmt.Fprintf(c, result, iq.Id, "test@localhost/test") // TODO use real JID
fmt.Fprintf(sc.connection, result, iq.Id, "test@localhost/test") // TODO use real JID
}
}
func session(t *testing.T, c net.Conn, decoder *xml.Decoder) {
se, err := stanza.NextStart(decoder)
func session(t *testing.T, sc *ServerConn) {
se, err := stanza.NextStart(sc.decoder)
if err != nil {
t.Errorf("cannot read session: %s", err)
return
@@ -293,7 +293,7 @@ func session(t *testing.T, c net.Conn, decoder *xml.Decoder) {
iq := &stanza.IQ{}
// Decode element into pointer storage
if err = decoder.DecodeElement(&iq, &se); err != nil {
if err = sc.decoder.DecodeElement(&iq, &se); err != nil {
t.Errorf("cannot decode session iq: %s", err)
return
}
@@ -301,6 +301,6 @@ func session(t *testing.T, c net.Conn, decoder *xml.Decoder) {
switch iq.Payload.(type) {
case *stanza.StreamSession:
result := `<iq id='%s' type='result'/>`
fmt.Fprintf(c, result, iq.Id)
fmt.Fprintf(sc.connection, result, iq.Id)
}
}