diff --git a/client_internal_test.go b/client_internal_test.go index e517a42..140eab7 100644 --- a/client_internal_test.go +++ b/client_internal_test.go @@ -176,6 +176,8 @@ func Test_StreamManagementNoResume(t *testing.T) { } func Test_StreamManagementResume(t *testing.T) { + serverDone := make(chan struct{}) + clientDone := make(chan struct{}) // Setup Mock server mock := ServerMock{} mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) { @@ -190,6 +192,7 @@ func Test_StreamManagementResume(t *testing.T) { bind(t, sc) enableStreamManagement(t, sc, false, true) discardPresence(t, sc) + serverDone <- struct{}{} }) // Test / Check result @@ -210,11 +213,20 @@ func Test_StreamManagementResume(t *testing.T) { t.Errorf("connect create XMPP client: %s", err) } - err = client.Connect() - if err != nil { - t.Fatalf("could not connect client to mock server: %s", err) - } + // ================================================================= + // Connect client, then disconnect it so we can resume the session + go func() { + err = client.Connect() + if err != nil { + t.Fatalf("could not connect client to mock server: %s", err) + } + clientDone <- struct{}{} + }() + waitForEntity(t, clientDone) + + // =========================================================================================== + // Check that the client correctly went into "disconnected" state, after being disconnected statusCorrectChan := make(chan struct{}) kill := make(chan struct{}) @@ -224,9 +236,10 @@ func Test_StreamManagementResume(t *testing.T) { } transp.conn.Close() + + waitForEntity(t, serverDone) mock.Stop() - // Check if status is correctly updated because of the disconnect go checkClientResumeStatus(client, statusCorrectChan, kill) select { case <-statusCorrectChan: @@ -256,17 +269,27 @@ func Test_StreamManagementResume(t *testing.T) { checkClientOpenStream(t, sc) // Reset stream sendFeaturesStreamManagment(t, sc) // Send post auth features resumeStream(t, sc) + serverDone <- struct{}{} }) // Reconnect - err = client.Resume() - if err != nil { - t.Fatalf("could not connect client to mock server: %s", err) - } + go func() { + err = client.Resume() + if err != nil { + t.Fatalf("could not connect client to mock server: %s", err) + } + clientDone <- struct{}{} + }() + + waitForEntity(t, clientDone) + waitForEntity(t, serverDone) + mock2.Stop() } func Test_StreamManagementFail(t *testing.T) { + serverDone := make(chan struct{}) + clientDone := make(chan struct{}) // Setup Mock server mock := ServerMock{} mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) { @@ -280,6 +303,7 @@ func Test_StreamManagementFail(t *testing.T) { sendFeaturesStreamManagment(t, sc) // Send post auth features bind(t, sc) enableStreamManagement(t, sc, true, true) + serverDone <- struct{}{} }) // Test / Check result @@ -301,26 +325,33 @@ func Test_StreamManagementFail(t *testing.T) { } var state SMState - _, err = client.transport.Connect() - if err != nil { - return - } + go func() { + _, err = client.transport.Connect() + if err != nil { + return + } - // Client is ok, we now open XMPP session - if client.Session, err = NewSession(client, state); err == nil { - t.Fatalf("test is supposed to err") - } - if client.Session.SMState.StreamErrorGroup == nil { - t.Fatalf("error was not stored correctly in session state") - } + // Client is ok, we now open XMPP session + if client.Session, err = NewSession(client, state); err == nil { + t.Fatalf("test is supposed to err") + } + if client.Session.SMState.StreamErrorGroup == nil { + t.Fatalf("error was not stored correctly in session state") + } + clientDone <- struct{}{} + }() + + waitForEntity(t, serverDone) + waitForEntity(t, clientDone) mock.Stop() } func Test_SendStanzaQueueWithSM(t *testing.T) { + serverDone := make(chan struct{}) + clientDone := make(chan struct{}) // Setup Mock server mock := ServerMock{} - serverDone := make(chan struct{}) mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) { checkClientOpenStream(t, sc) @@ -340,7 +371,8 @@ func Test_SendStanzaQueueWithSM(t *testing.T) { skipPacket(t, sc) // Respond to the client ACK request with a number of processed stanzas of 0. This should trigger a resend // of previously ignored stanza to the server, which this handler element will be expecting. - respondWithAck(t, sc, 0, serverDone) + respondWithAck(t, sc, 0) + serverDone <- struct{}{} }) // Test / Check result @@ -361,24 +393,22 @@ func Test_SendStanzaQueueWithSM(t *testing.T) { t.Errorf("connect create XMPP client: %s", err) } - err = client.Connect() + go func() { + err = client.Connect() - client.SendRaw(` + client.SendRaw(` `) - // Last stanza was discarded silently by the server. Let's ask an ack for it. This should trigger resend as the server - // will respond with an acknowledged number of stanzas of 0. - r := stanza.SMRequest{} - client.Send(r) - - select { - case <-time.After(defaultChannelTimeout): - t.Fatalf("server failed to complete the test in time") - case <-serverDone: - // Test completed successfully - } + // Last stanza was discarded silently by the server. Let's ask an ack for it. This should trigger resend as the server + // will respond with an acknowledged number of stanzas of 0. + r := stanza.SMRequest{} + client.Send(r) + clientDone <- struct{}{} + }() + waitForEntity(t, serverDone) + waitForEntity(t, clientDone) mock.Stop() } @@ -400,7 +430,7 @@ func skipPacket(t *testing.T, sc *ServerConn) { } } -func respondWithAck(t *testing.T, sc *ServerConn, h int, serverDone chan struct{}) { +func respondWithAck(t *testing.T, sc *ServerConn, h int) { // Mock server reads the ack request var p stanza.SMRequest @@ -437,7 +467,6 @@ func respondWithAck(t *testing.T, sc *ServerConn, h int, serverDone chan struct{ t.Fatalf("cannot decode packet: %s", err) return } - serverDone <- struct{}{} } func sendFeaturesStreamManagment(t *testing.T, sc *ServerConn) { diff --git a/component_test.go b/component_test.go index f2b5a2f..59ac08e 100644 --- a/component_test.go +++ b/component_test.go @@ -38,6 +38,8 @@ func TestHandshake(t *testing.T) { // Tests connection process with a handshake exchange // Tests multiple session IDs. All serverConnections should generate a unique stream ID func TestGenerateHandshakeId(t *testing.T) { + clientDone := make(chan struct{}) + serverDone := make(chan struct{}) // Using this array with a channel to make a queue of values to test // These are stream IDs that will be used to test the connection process, mixing them with the "secret" to generate // some handshake value @@ -57,11 +59,10 @@ func TestGenerateHandshakeId(t *testing.T) { // Performs a Component connection with a handshake. It expects to have an ID sent its way through the "uchan" // channel of this file. Otherwise it will hang for ever. h := func(t *testing.T, sc *ServerConn) { - checkOpenStreamHandshakeID(t, sc, <-uchan) readHandshakeComponent(t, sc.decoder) sc.connection.Write([]byte("")) // That's all the server needs to return (see xep-0114) - return + serverDone <- struct{}{} } // Init mock server @@ -92,14 +93,45 @@ func TestGenerateHandshakeId(t *testing.T) { } // Try connecting, and storing the resulting streamID in a map. - m := make(map[string]bool) - for range uuidsArray { - streamId, _ := c.transport.Connect() - m[c.handshake(streamId)] = true - } - if len(uuidsArray) != len(m) { - t.Errorf("Handshake does not produce a unique id. Expected: %d unique ids, got: %d", len(uuidsArray), len(m)) - } + go func() { + m := make(map[string]bool) + for range uuidsArray { + idChan := make(chan string) + go func() { + streamId, err := c.transport.Connect() + if err != nil { + t.Fatalf("failed to mock component connection to get a handshake: %s", err) + } + idChan <- streamId + }() + + var streamId string + select { + case streamId = <-idChan: + case <-time.After(defaultTimeout): + t.Fatalf("test timed out") + } + + hs := stanza.Handshake{ + Value: c.handshake(streamId), + } + m[hs.Value] = true + hsRaw, err := xml.Marshal(hs) + if err != nil { + t.Fatalf("could not marshal handshake: %s", err) + } + c.SendRaw(string(hsRaw)) + waitForEntity(t, serverDone) + c.transport.Close() + } + if len(uuidsArray) != len(m) { + t.Errorf("Handshake does not produce a unique id. Expected: %d unique ids, got: %d", len(uuidsArray), len(m)) + } + clientDone <- struct{}{} + }() + + waitForEntity(t, clientDone) + mock.Stop() } // Test that NewStreamManager can accept a Component. @@ -121,10 +153,11 @@ func TestDecoder(t *testing.T) { // Tests sending an IQ to the server, and getting the response func TestSendIq(t *testing.T) { - done := make(chan struct{}) + serverDone := make(chan struct{}) + clientDone := make(chan struct{}) h := func(t *testing.T, sc *ServerConn) { handlerForComponentIQSend(t, sc) - done <- struct{}{} + serverDone <- struct{}{} } //Connecting to a mock server, initialized with given port and handler function @@ -145,24 +178,23 @@ func TestSendIq(t *testing.T) { } c.ErrorHandler = errorHandler - var res chan stanza.IQ - res, _ = c.SendIQ(ctx, iqReq) + go func() { + var res chan stanza.IQ + res, _ = c.SendIQ(ctx, iqReq) - select { - case <-res: - case err := <-errChan: - t.Errorf(err.Error()) - case <-time.After(defaultChannelTimeout): - t.Errorf("Failed to receive response, to sent IQ, from mock server") - } + select { + case <-res: + case err := <-errChan: + t.Fatalf(err.Error()) + } + clientDone <- struct{}{} + }() + + waitForEntity(t, clientDone) + waitForEntity(t, serverDone) - select { - case <-done: - m.Stop() - case <-time.After(defaultChannelTimeout): - t.Errorf("The mock server failed to finish its job !") - } cancel() + m.Stop() } // Checking that error handling is done properly client side when an invalid IQ is sent and the server responds in kind.