package xmpp

import (
	"bytes"
	"encoding/xml"
	"fmt"
	"gosrc.io/xmpp/stanza"
	"strconv"
	"testing"
	"time"
)

const (
	streamManagementID = "test-stream_management-id"
)

func TestClient_Send(t *testing.T) {
	buffer := bytes.NewBufferString("")
	client := Client{}
	data := []byte("https://da.wikipedia.org/wiki/J%C3%A6vnd%C3%B8gn")
	if err := client.sendWithWriter(buffer, data); err != nil {
		t.Errorf("Writing failed: %v", err)
	}

	if buffer.String() != string(data) {
		t.Errorf("Incorrect value sent to buffer: '%s'", buffer.String())
	}
}

// Stream management test.
// Connection is established, then the server sends supported features and so on.
// After the bind, client attempts a stream management enablement, and server replies in kind.
func Test_StreamManagement(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})

	client, mock := initSrvCliForResumeTests(t, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		enableStreamManagement(t, sc, false, true)
		serverDone <- struct{}{}
	}, testClientStreamManagement, true, true)
	go func() {
		var state SMState
		var err error
		// Client is ok, we now open XMPP session
		if client.Session, err = NewSession(client, state); err != nil {
			t.Fatalf("failed to open XMPP session: %s", err)
		}
		clientDone <- struct{}{}
	}()

	waitForEntity(t, clientDone)
	waitForEntity(t, serverDone)
	mock.Stop()
}

// Absence of stream management test.
// Connection is established, then the server sends supported features and so on.
// Client has stream management disabled in its config, and should not ask for it. Server is not set up to reply.
func Test_NoStreamManagement(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})

	// Setup Mock server
	client, mock := initSrvCliForResumeTests(t, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)         // Reset stream
		sendFeaturesNoStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		serverDone <- struct{}{}
	}, testClientStreamManagement, true, false)

	go func() {
		var state SMState

		// Client is ok, we now open XMPP session
		var err error
		if client.Session, err = NewSession(client, state); err != nil {
			t.Fatalf("failed to open XMPP session: %s", err)
		}
		clientDone <- struct{}{}
	}()

	waitForEntity(t, clientDone)
	waitForEntity(t, serverDone)

	mock.Stop()
}

func Test_StreamManagementNotSupported(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})

	client, mock := initSrvCliForResumeTests(t, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)         // Reset stream
		sendFeaturesNoStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		serverDone <- struct{}{}
	}, testClientStreamManagement, true, true)

	go func() {
		var state SMState
		var err error
		// Client is ok, we now open XMPP session
		if client.Session, err = NewSession(client, state); err != nil {
			t.Fatalf("failed to open XMPP session: %s", err)
		}
		clientDone <- struct{}{}
	}()

	// Wait for client
	waitForEntity(t, clientDone)

	// Check if client got a positive stream management response from the server
	if client.Session.Features.DoesStreamManagement() {
		t.Fatalf("server does not provide stream management")
	}

	// Wait for server
	waitForEntity(t, serverDone)
	mock.Stop()
}

func Test_StreamManagementNoResume(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})

	client, mock := initSrvCliForResumeTests(t, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		enableStreamManagement(t, sc, false, false)
		serverDone <- struct{}{}
	}, testClientStreamManagement, true, true)

	go func() {
		var state SMState
		var err error
		// Client is ok, we now open XMPP session
		if client.Session, err = NewSession(client, state); err != nil {
			t.Fatalf("failed to open XMPP session: %s", err)
		}
		clientDone <- struct{}{}
	}()
	waitForEntity(t, clientDone)
	if IsStreamResumable(client) {
		t.Fatalf("server does not support resumption but client says stream is resumable")
	}
	waitForEntity(t, serverDone)
	mock.Stop()
}

func Test_StreamManagementResume(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})
	// Setup Mock server
	mock := ServerMock{}
	mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		enableStreamManagement(t, sc, false, true)
		discardPresence(t, sc)
		serverDone <- struct{}{}
	})

	// Test / Check result
	config := Config{
		TransportConfiguration: TransportConfiguration{
			Address: testXMPPAddress,
		},
		Jid:                    "test@localhost",
		Credential:             Password("test"),
		Insecure:               true,
		StreamManagementEnable: true,
		streamManagementResume: true} // Enable stream management

	var client *Client
	router := NewRouter()
	client, err := NewClient(&config, router, clientDefaultErrorHandler)
	if err != nil {
		t.Errorf("connect create XMPP client: %s", err)
	}

	// =================================================================
	// Connect client, then disconnect it so we can resume the session
	go func() {
		err = client.Connect()
		if err != nil {
			t.Fatalf("could not connect client to mock server: %s", err)
		}
		clientDone <- struct{}{}
	}()

	waitForEntity(t, clientDone)

	// ===========================================================================================
	// Check that the client correctly went into "disconnected" state, after being disconnected
	statusCorrectChan := make(chan struct{})
	kill := make(chan struct{})

	transp, ok := client.transport.(*XMPPTransport)
	if !ok {
		t.Fatalf("problem with client transport ")
	}

	transp.conn.Close()

	waitForEntity(t, serverDone)
	mock.Stop()

	go checkClientResumeStatus(client, statusCorrectChan, kill)
	select {
	case <-statusCorrectChan:
	//	Test passed
	case <-time.After(5 * time.Second):
		kill <- struct{}{}
		t.Fatalf("Client is not in disconnected state while it should be. Timed out")
	}

	// Check if the client can have its connection resumed using its state but also its configuration
	if !IsStreamResumable(client) {
		t.Fatalf("should support resumption")
	}

	// Reboot server. We need to make a new one because (at least for now) the mock server can only have one handler
	// and they should be different between a first connection and a stream resume since exchanged messages
	// are different (See XEP-0198)
	mock2 := ServerMock{}
	mock2.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) {
		//	Reconnect
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		resumeStream(t, sc)
		serverDone <- struct{}{}
	})

	// Reconnect
	go func() {
		err = client.Resume()
		if err != nil {
			t.Fatalf("could not connect client to mock server: %s", err)
		}
		clientDone <- struct{}{}
	}()

	waitForEntity(t, clientDone)
	waitForEntity(t, serverDone)

	mock2.Stop()
}

func Test_StreamManagementFail(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})
	// Setup Mock server
	mock := ServerMock{}
	mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		enableStreamManagement(t, sc, true, true)
		serverDone <- struct{}{}
	})

	// Test / Check result
	config := Config{
		TransportConfiguration: TransportConfiguration{
			Address: testXMPPAddress,
		},
		Jid:                    "test@localhost",
		Credential:             Password("test"),
		Insecure:               true,
		StreamManagementEnable: true,
		streamManagementResume: true} // Enable stream management

	var client *Client
	router := NewRouter()
	client, err := NewClient(&config, router, clientDefaultErrorHandler)
	if err != nil {
		t.Errorf("connect create XMPP client: %s", err)
	}

	var state SMState
	go func() {
		_, err = client.transport.Connect()
		if err != nil {
			return
		}

		// Client is ok, we now open XMPP session
		if client.Session, err = NewSession(client, state); err == nil {
			t.Fatalf("test is supposed to err")
		}
		if client.Session.SMState.StreamErrorGroup == nil {
			t.Fatalf("error was not stored correctly in session state")
		}
		clientDone <- struct{}{}
	}()

	waitForEntity(t, serverDone)
	waitForEntity(t, clientDone)

	mock.Stop()
}

func Test_SendStanzaQueueWithSM(t *testing.T) {
	serverDone := make(chan struct{})
	clientDone := make(chan struct{})
	// Setup Mock server
	mock := ServerMock{}
	mock.Start(t, testXMPPAddress, func(t *testing.T, sc *ServerConn) {
		checkClientOpenStream(t, sc)

		sendStreamFeatures(t, sc) // Send initial features
		readAuth(t, sc.decoder)
		sc.connection.Write([]byte("<success xmlns=\"urn:ietf:params:xml:ns:xmpp-sasl\"/>"))

		checkClientOpenStream(t, sc)       // Reset stream
		sendFeaturesStreamManagment(t, sc) // Send post auth features
		bind(t, sc)
		enableStreamManagement(t, sc, false, true)

		// Ignore the initial presence sent to the server by the client so we can move on to the next packet.
		discardPresence(t, sc)

		// Used here to silently discard the IQ sent by the client, in order to later trigger a resend
		skipPacket(t, sc)
		// Respond to the client ACK request with a number of processed stanzas of 0. This should trigger a resend
		// of previously ignored stanza to the server, which this handler element will be expecting.
		respondWithAck(t, sc, 0)
		serverDone <- struct{}{}
	})

	// Test / Check result
	config := Config{
		TransportConfiguration: TransportConfiguration{
			Address: testXMPPAddress,
		},
		Jid:                    "test@localhost",
		Credential:             Password("test"),
		Insecure:               true,
		StreamManagementEnable: true,
		streamManagementResume: true} // Enable stream management

	var client *Client
	router := NewRouter()
	client, err := NewClient(&config, router, clientDefaultErrorHandler)
	if err != nil {
		t.Errorf("connect create XMPP client: %s", err)
	}

	go func() {
		err = client.Connect()

		client.SendRaw(`<iq id='ls72g593' type='get'>
  <query xmlns='jabber:iq:roster'/>
</iq>
`)

		// Last stanza was discarded silently by the server. Let's ask an ack for it. This should trigger resend as the server
		// will respond with an acknowledged number of stanzas of 0.
		r := stanza.SMRequest{}
		client.Send(r)
		clientDone <- struct{}{}
	}()
	waitForEntity(t, serverDone)
	waitForEntity(t, clientDone)

	mock.Stop()
}

//========================================================================
// Helper functions for tests

func skipPacket(t *testing.T, sc *ServerConn) {
	var p stanza.IQ
	se, err := stanza.NextStart(sc.decoder)

	if err != nil {
		t.Fatalf("cannot read packet: %s", err)
		return
	}
	if err := sc.decoder.DecodeElement(&p, &se); err != nil {
		t.Fatalf("cannot decode packet: %s", err)
		return
	}
}

func respondWithAck(t *testing.T, sc *ServerConn, h int) {

	//  Mock server reads the ack request
	var p stanza.SMRequest
	se, err := stanza.NextStart(sc.decoder)

	if err != nil {
		t.Fatalf("cannot read packet: %s", err)
		return
	}
	if err := sc.decoder.DecodeElement(&p, &se); err != nil {
		t.Fatalf("cannot decode packet: %s", err)
		return
	}

	// Mock server sends the ack response
	a := stanza.SMAnswer{
		H: uint(h),
	}
	data, err := xml.Marshal(a)
	_, err = sc.connection.Write(data)
	if err != nil {
		t.Fatalf("failed to send response ack")
	}

	// Mock server reads the re-sent stanza that was previously discarded intentionally
	var p2 stanza.IQ
	nse, err := stanza.NextStart(sc.decoder)

	if err != nil {
		t.Fatalf("cannot read packet: %s", err)
		return
	}
	if err := sc.decoder.DecodeElement(&p2, &nse); err != nil {
		t.Fatalf("cannot decode packet: %s", err)
		return
	}
}

func sendFeaturesStreamManagment(t *testing.T, sc *ServerConn) {
	// This is a basic server, supporting only 2 features after auth: stream management & session binding
	features := `<stream:features>
  <bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>
  <sm xmlns='urn:xmpp:sm:3'/>
</stream:features>`
	if _, err := fmt.Fprintln(sc.connection, features); err != nil {
		t.Fatalf("cannot send stream feature: %s", err)
	}
}

func sendFeaturesNoStreamManagment(t *testing.T, sc *ServerConn) {
	// This is a basic server, supporting only 2 features after auth: stream management & session binding
	features := `<stream:features>
  <bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>
</stream:features>`
	if _, err := fmt.Fprintln(sc.connection, features); err != nil {
		t.Fatalf("cannot send stream feature: %s", err)
	}
}

// enableStreamManagement is a function for the mock server that can either mock a successful session, or fail depending on
// the value of the "fail" boolean. True means the session should fail.
func enableStreamManagement(t *testing.T, sc *ServerConn, fail bool, resume bool) {
	// Decode element into pointer storage
	var ed stanza.SMEnable
	se, err := stanza.NextStart(sc.decoder)

	if err != nil {
		t.Fatalf("cannot read stream management enable: %s", err)
		return
	}
	if err := sc.decoder.DecodeElement(&ed, &se); err != nil {
		t.Fatalf("cannot decode stream management enable: %s", err)
		return
	}

	if fail {
		f := stanza.SMFailed{
			H:                nil,
			StreamErrorGroup: &stanza.UnexpectedRequest{},
		}
		data, err := xml.Marshal(f)
		if err != nil {
			t.Fatalf("failed to marshall error response: %s", err)
		}
		sc.connection.Write(data)
	} else {
		e := &stanza.SMEnabled{
			Resume: strconv.FormatBool(resume),
			Id:     streamManagementID,
		}
		data, err := xml.Marshal(e)
		if err != nil {
			t.Fatalf("failed to marshall error response: %s", err)
		}
		sc.connection.Write(data)
	}
}

func resumeStream(t *testing.T, sc *ServerConn) {
	h := uint(0)
	response := stanza.SMResumed{
		PrevId: streamManagementID,
		H:      &h,
	}

	data, err := xml.Marshal(response)
	if err != nil {
		t.Fatalf("failed to marshall stream management enabled response : %s", err)
	}

	writtenChan := make(chan struct{})

	go func() {
		sc.connection.Write(data)
		writtenChan <- struct{}{}
	}()
	select {
	case <-writtenChan:
		// We're done here
		return
	case <-time.After(defaultTimeout):
		t.Fatalf("failed to write enabled nonza to client")
	}
}

func checkClientResumeStatus(client *Client, statusCorrectChan chan struct{}, killChan chan struct{}) {
	for {
		if client.CurrentState.getState() == StateDisconnected {
			statusCorrectChan <- struct{}{}
		}
		select {
		case <-killChan:
			return
		case <-time.After(time.Millisecond * 10):
			//	Keep checking status value
		}
	}
}

func initSrvCliForResumeTests(t *testing.T, serverHandler func(*testing.T, *ServerConn), port int, StreamManagementEnable, StreamManagementResume bool) (*Client, *ServerMock) {
	mock := &ServerMock{}
	testServerAddress := fmt.Sprintf("%s:%d", testClientDomain, port)

	mock.Start(t, testServerAddress, serverHandler)
	config := Config{
		TransportConfiguration: TransportConfiguration{
			Address: testServerAddress,
		},
		Jid:                    "test@localhost",
		Credential:             Password("test"),
		Insecure:               true,
		StreamManagementEnable: StreamManagementEnable,
		streamManagementResume: StreamManagementResume}

	var client *Client
	var err error
	router := NewRouter()
	if client, err = NewClient(&config, router, clientDefaultErrorHandler); err != nil {
		t.Fatalf("connect create XMPP client: %s", err)
	}

	if _, err = client.transport.Connect(); err != nil {
		t.Fatalf("XMPP connection failed: %s", err)
	}

	return client, mock
}

func waitForEntity(t *testing.T, entityDone chan struct{}) {
	select {
	case <-entityDone:
	case <-time.After(defaultTimeout):
		t.Fatalf("test timed out")
	}
}