SleekTest may now run against a live stream.

Moved SleekTest to sleekxmpp.test package.
Corrected error in XML compare method.
Added TestLiveSocket to run stream tests against live streams.
Modified XMLStream to work with TestLiveSocket.
This commit is contained in:
Lance Stout
2010-10-07 19:43:51 -04:00
parent e02ffe8547
commit a8b948cd33
6 changed files with 423 additions and 53 deletions

View File

@@ -1,4 +1,5 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
@@ -11,8 +12,9 @@ import unittest
import sleekxmpp
from sleekxmpp import ClientXMPP, ComponentXMPP
from sleekxmpp.stanza import Message, Iq, Presence
from sleekxmpp.test import TestSocket
from sleekxmpp.test import TestSocket, TestLiveSocket
from sleekxmpp.xmlstream.stanzabase import registerStanzaPlugin, ET
from sleekxmpp.xmlstream.stanzabase import StanzaBase
from sleekxmpp.xmlstream.tostring import tostring
@@ -47,6 +49,25 @@ class SleekTest(unittest.TestCase):
compare -- Compare XML objects against each other.
"""
def parse_xml(self, xml_string):
try:
xml = ET.fromstring(xml_string)
return xml
except SyntaxError, e:
if 'unbound' in e.msg:
known_prefixes = {
'stream': 'http://etherx.jabber.org/streams'}
prefix = xml_string.split('<')[1].split(':')[0]
if prefix in known_prefixes:
xml_string = '<fixns xmlns:%s="%s">%s</fixns>' % (
prefix,
known_prefixes[prefix],
xml_string)
xml = self.parse_xml(xml_string)
xml = xml.getchildren()[0]
return xml
# ------------------------------------------------------------------
# Shortcut methods for creating stanza objects
@@ -117,7 +138,7 @@ class SleekTest(unittest.TestCase):
setStanzaValues() should be used. Defaults to
True.
"""
xml = ET.fromstring(xml_string)
xml = self.parse_xml(xml_string)
# Ensure that top level namespaces are used, even if they
# were not provided.
@@ -181,8 +202,8 @@ class SleekTest(unittest.TestCase):
"""
return self.check_stanza(Message, msg, xml_string,
defaults=['type'],
use_values=use_values)
defaults=['type'],
use_values=use_values)
def check_iq(self, iq, xml_string, use_values=True):
"""
@@ -217,59 +238,69 @@ class SleekTest(unittest.TestCase):
to True.
"""
return self.check_stanza(Presence, pres, xml_string,
defaults=['priority'],
use_values=use_values)
defaults=['priority'],
use_values=use_values)
# ------------------------------------------------------------------
# Methods for simulating stanza streams.
def stream_start(self, mode='client', skip=True, header=None):
def stream_start(self, mode='client', skip=True, header=None,
socket='mock', jid='tester@localhost',
password='test', server='localhost',
port=5222):
"""
Initialize an XMPP client or component using a dummy XML stream.
Arguments:
mode -- Either 'client' or 'component'. Defaults to 'client'.
skip -- Indicates if the first item in the sent queue (the
stream header) should be removed. Tests that wish
to test initializing the stream should set this to
False. Otherwise, the default of True should be used.
mode -- Either 'client' or 'component'. Defaults to 'client'.
skip -- Indicates if the first item in the sent queue (the
stream header) should be removed. Tests that wish
to test initializing the stream should set this to
False. Otherwise, the default of True should be used.
socket -- Either 'mock' or 'live' to indicate if the socket
should be a dummy, mock socket or a live, functioning
socket. Defaults to 'mock'.
jid -- The JID to use for the connection.
Defaults to 'tester@localhost'.
password -- The password to use for the connection.
Defaults to 'test'.
server -- The name of the XMPP server. Defaults to 'localhost'.
port -- The port to use when connecting to the server.
Defaults to 5222.
"""
if mode == 'client':
self.xmpp = ClientXMPP('tester@localhost', 'test')
self.xmpp = ClientXMPP(jid, password)
elif mode == 'component':
self.xmpp = ComponentXMPP('tester.localhost', 'test',
'localhost', 8888)
self.xmpp = ComponentXMPP(jid, password,
server, port)
else:
raise ValueError("Unknown XMPP connection mode.")
self.xmpp.setSocket(TestSocket())
self.xmpp.state.set('reconnect', False)
self.xmpp.state.set('is client', True)
self.xmpp.state.set('connected', True)
if socket == 'mock':
self.xmpp.set_socket(TestSocket())
# Must have the stream header ready for xmpp.process() to work.
if not header:
header = self.xmpp.stream_header
self.xmpp.socket.recv_data(header)
# Simulate connecting for mock sockets.
self.xmpp.state.set('reconnect', False)
self.xmpp.state.set('is client', True)
self.xmpp.state.set('connected', True)
# Must have the stream header ready for xmpp.process() to work.
if not header:
header = self.xmpp.stream_header
self.xmpp.socket.recv_data(header)
elif socket == 'live':
self.xmpp.socket_class = TestLiveSocket
self.xmpp.connect()
else:
raise ValueError("Unknown socket type.")
self.xmpp.connect = lambda a=None, b=None, c=None, d=None: True
self.xmpp.process(threaded=True)
if skip:
# Clear startup stanzas
self.xmpp.socket.next_sent(timeout=0.01)
self.xmpp.socket.next_sent(timeout=1)
if mode == 'component':
self.xmpp.socket.next_sent(timeout=0.01)
def stream_recv(self, data):
"""
Pass data to the dummy XMPP client as if it came from an XMPP server.
Arguments:
data -- String stanza XML to be received and processed by the
XMPP client or component.
"""
data = str(data)
self.xmpp.socket.recv_data(data)
self.xmpp.socket.next_sent(timeout=1)
def stream_make_header(self, sto='',
sfrom='',
@@ -308,6 +339,156 @@ class SleekTest(unittest.TestCase):
parts.append('xmlns="%s"' % default_ns)
return header % ' '.join(parts)
def stream_recv(self, data, stanza_class=StanzaBase, defaults=[],
use_values=True, timeout=1):
"""
Pass data to the dummy XMPP client as if it came from an XMPP server.
If using a live connection, verify what the server has sent.
Arguments:
data -- String stanza XML to be received and processed by
the XMPP client or component.
stanza_class -- The stanza object class for verifying data received
by a live connection. Defaults to StanzaBase.
defaults -- A list of stanza interfaces with default values that
may interfere with comparisons.
use_values -- Indicates if stanza comparisons should test using
getStanzaValues() and setStanzaValues().
Defaults to True.
timeout -- Time to wait in seconds for data to be received by
a live connection.
"""
if self.xmpp.socket.is_live:
# we are working with a live connection, so we should
# verify what has been received instead of simulating
# receiving data.
recv_data = self.xmpp.socket.next_recv(timeout)
if recv_data is None:
return False
stanza = stanza_class(xml=self.parse_xml(recv_data))
return self.check_stanza(stanza_class, stanza, data,
defaults=defaults,
use_values=use_values)
else:
# place the data in the dummy socket receiving queue.
data = str(data)
self.xmpp.socket.recv_data(data)
def stream_recv_header(self, sto='',
sfrom='',
sid='',
stream_ns="http://etherx.jabber.org/streams",
default_ns="jabber:client",
version="1.0",
xml_header=False,
timeout=1):
"""
Check that a given stream header was received.
Arguments:
sto -- The recipient of the stream header.
sfrom -- The agent sending the stream header.
sid -- The stream's id. Set to None to ignore.
stream_ns -- The namespace of the stream's root element.
default_ns -- The default stanza namespace.
version -- The stream version.
xml_header -- Indicates if the XML version header should be
appended before the stream header.
timeout -- Length of time to wait in seconds for a
response.
"""
header = self.stream_make_header(sto, sfrom, sid,
stream_ns=stream_ns,
default_ns=default_ns,
version=version,
xml_header=xml_header)
recv_header = self.xmpp.socket.next_recv(timeout)
if recv_header is None:
raise ValueError("Socket did not return data.")
# Apply closing elements so that we can construct
# XML objects for comparison.
header2 = header + '</stream:stream>'
recv_header2 = recv_header + '</stream:stream>'
xml = self.parse_xml(header2)
recv_xml = self.parse_xml(recv_header2)
if sid is None:
# Ignore the id sent by the server since
# we can't know in advance what it will be.
if 'id' in recv_xml.attrib:
del recv_xml.attrib['id']
# Ignore the xml:lang attribute for now.
if 'xml:lang' in recv_xml.attrib:
del recv_xml.attrib['xml:lang']
xml_ns = 'http://www.w3.org/XML/1998/namespace'
if '{%s}lang' % xml_ns in recv_xml.attrib:
del recv_xml.attrib['{%s}lang' % xml_ns]
if recv_xml.getchildren:
# We received more than just the header
for xml in recv_xml.getchildren():
self.xmpp.socket.recv_data(tostring(xml))
attrib = recv_xml.attrib
recv_xml.clear()
recv_xml.attrib = attrib
self.failUnless(
self.compare(xml, recv_xml),
"Stream headers do not match:\nDesired:\n%s\nReceived:\n%s" % (
'%s %s' % (xml.tag, xml.attrib),
'%s %s' % (recv_xml.tag, recv_xml.attrib)))
#tostring(xml), tostring(recv_xml)))#recv_header))
def stream_recv_feature(self, data, use_values=True, timeout=1):
"""
"""
if self.xmpp.socket.is_live:
# we are working with a live connection, so we should
# verify what has been received instead of simulating
# receiving data.
recv_data = self.xmpp.socket.next_recv(timeout)
if recv_data is None:
return False
xml = self.parse_xml(data)
recv_xml = self.parse_xml(recv_data)
self.failUnless(self.compare(xml, recv_xml),
"Features do not match.\nDesired:\n%s\nReceived:\n%s" % (
tostring(xml), tostring(recv_xml)))
else:
# place the data in the dummy socket receiving queue.
data = str(data)
self.xmpp.socket.recv_data(data)
def stream_recv_message(self, data, use_values=True, timeout=1):
"""
"""
return self.stream_recv(data, stanza_class=Message,
defaults=['type'],
use_values=use_values,
timeout=timeout)
def stream_recv_iq(self, data, use_values=True, timeout=1):
"""
"""
return self.stream_recv(data, stanza_class=Iq,
use_values=use_values,
timeout=timeout)
def stream_recv_presence(self, data, use_values=True, timeout=1):
"""
"""
return self.stream_recv(data, stanza_class=Presence,
defaults=['priority'],
use_values=use_values,
timeout=timeout)
def stream_send_header(self, sto='',
sfrom='',
sid='',
@@ -315,7 +496,7 @@ class SleekTest(unittest.TestCase):
default_ns="jabber:client",
version="1.0",
xml_header=False,
timeout=0.1):
timeout=1):
"""
Check that a given stream header was sent.
@@ -345,14 +526,26 @@ class SleekTest(unittest.TestCase):
header2 = header + '</stream:stream>'
sent_header2 = sent_header + '</stream:stream>'
xml = ET.fromstring(header2)
sent_xml = ET.fromstring(sent_header2)
xml = self.parse_xml(header2)
sent_xml = self.parse_xml(sent_header2)
self.failUnless(
self.compare(xml, sent_xml),
"Stream headers do not match:\nDesired:\n%s\nSent:\n%s" % (
header, sent_header))
def stream_send_feature(self, data, use_values=True, timeout=1):
"""
"""
sent_data = self.xmpp.socket.next_sent(timeout)
if sent_data is None:
return False
xml = self.parse_xml(data)
sent_xml = self.parse_xml(sent_data)
self.failUnless(self.compare(xml, sent_xml),
"Features do not match.\nDesired:\n%s\nSent:\n%s" % (
tostring(xml), tostring(sent_xml)))
def stream_send_stanza(self, stanza_class, data, defaults=None,
use_values=True, timeout=.1):
"""
@@ -372,7 +565,7 @@ class SleekTest(unittest.TestCase):
failing the check.
"""
if isintance(data, str):
data = stanza_class(xml=ET.fromstring(data))
data = stanza_class(xml=self.parse_xml(data))
sent = self.xmpp.socket.next_sent(timeout)
self.check_stanza(stanza_class, data, sent,
defaults=defaults,
@@ -393,7 +586,7 @@ class SleekTest(unittest.TestCase):
failing the check.
"""
if isinstance(data, str):
data = self.Message(xml=ET.fromstring(data))
data = self.Message(xml=self.parse_xml(data))
sent = self.xmpp.socket.next_sent(timeout)
self.check_message(data, sent, use_values)
@@ -412,7 +605,7 @@ class SleekTest(unittest.TestCase):
failing the check.
"""
if isinstance(data, str):
data = self.Iq(xml=ET.fromstring(data))
data = self.Iq(xml=self.parse_xml(data))
sent = self.xmpp.socket.next_sent(timeout)
self.check_iq(data, sent, use_values)
@@ -431,7 +624,7 @@ class SleekTest(unittest.TestCase):
failing the check.
"""
if isinstance(data, str):
data = self.Presence(xml=ET.fromstring(data))
data = self.Presence(xml=self.parse_xml(data))
sent = self.xmpp.socket.next_sent(timeout)
self.check_presence(data, sent, use_values)
@@ -505,7 +698,11 @@ class SleekTest(unittest.TestCase):
if xml.text != other.text:
return False
# Step 4: Recursively check children
# Step 4: Check children count
if len(xml.getchildren()) != len(other.getchildren()):
return False
# Step 5: Recursively check children
for child in xml:
child2s = other.findall("%s" % child.tag)
if child2s is None:
@@ -516,5 +713,16 @@ class SleekTest(unittest.TestCase):
else:
return False
# Step 6: Recursively check children the other way.
for child in other:
child2s = xml.findall("%s" % child.tag)
if child2s is None:
return False
for child2 in child2s:
if self.compare(child, child2):
break
else:
return False
# Everything matches
return True