Compare commits

...

84 Commits

Author SHA1 Message Date
Tom Nichols
494e3ef449 fixed indentation error 2010-07-14 15:40:27 -04:00
Thom Nichols
be5688007b moved parsing logic into TimeElement to aid reuse 2010-07-14 11:05:29 -04:00
Tom Nichols
ad7c1b06f4 XEP-0202 Entity Time plugin and fix for unused 'sid' parameter in StanzaBase. 2010-07-13 17:14:38 -04:00
Tom Nichols
083ac3faaf woops, broke resource binding request 2010-07-12 13:07:24 -04:00
Tom Nichols
a909731b03 removed digest_auth_started (it was never set to 'True') and did a little error handling cleanup 2010-07-12 12:55:53 -04:00
Tom Nichols
4864197d46 fixed indent 2010-07-12 12:54:58 -04:00
Tom Nichols
92a5ac2ba9 removed unused imports and fixed log msg 2010-07-12 12:25:55 -04:00
Tom Nichols
02ca5f0e42 fixed logging error (logging module was not imported) 2010-07-12 12:17:57 -04:00
Tom Nichols
1e009513ee removed some unused imports 2010-07-12 12:16:58 -04:00
Tom Nichols
55f83e8ab0 fixed variable name 2010-07-12 12:09:34 -04:00
Tom Nichols
d43fba3c8f adding pylint rcfile 2010-07-12 12:01:13 -04:00
Tom Nichols
9c5285987d removed ClientXMPP.server in favor of ClientXMPP.domain 2010-07-09 17:25:11 -04:00
Tom Nichols
d09cbef9a7 catch other DNS errors that might occur and fallback to JID domain. 2010-07-09 17:23:02 -04:00
Tom Nichols
9c850f080d removed useless 'use_tls' variable 2010-07-09 17:21:50 -04:00
Tom Nichols
879dd11daa reduced max quiesce delay to 6 minutes. We want to be fairly agressive here. 2010-07-09 16:16:07 -04:00
Tom Nichols
969c4652a4 wait, shouldn't 'port' default to 5222?? Would seem logical to me. 2010-07-09 16:15:18 -04:00
Tom Nichols
9506970042 removed useless 'use_tls' variable 2010-07-09 16:12:32 -04:00
Tom Nichols
3c6b07353d added keepalive to send thread 2010-07-09 16:06:53 -04:00
Brian Beggs
66c6c21ad8 kill the running threads before disconnecting 2010-07-09 15:36:13 -04:00
Brian Beggs
c5b5cc4af1 fix for md5 sasl authentication 2010-07-09 15:33:21 -04:00
Brian Beggs
e835843aab fixes to digest-md5 for ejabberd 2010-07-09 15:26:26 -04:00
Tom Nichols
d6681f16d2 fixed indentation error 2010-07-07 15:19:31 -04:00
Tom Nichols
fc952efae9 removed unused and redundant 'makeIq...' methods from basexmpp; cleaned up the (few\!) plugins that actually used them. 2010-07-07 15:18:59 -04:00
Tom Nichols
f7273affc5 notes on the usefulness of some of the 'makeIq' methods. In particular, they seem to duplicate behavior or be largely unused for their intended purpose. 2010-07-07 15:06:39 -04:00
Tom Nichols
34eb88f199 Merge branch 'hacks' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-07-07 13:19:21 -04:00
Thom Nichols
f3cf5f6080 added SSL certificate verification to startTLS method 2010-07-07 11:33:12 -04:00
Thom Nichols
85d8b9270f client must validate the server's SSL certificate against the CA list if it is provided. 2010-07-06 17:37:57 -04:00
Tom Nichols
259dffeb6e send now has a priority and an 'init' parameter to denote stanzas that may be sent prior to session establishment. 2010-07-06 14:16:46 -04:00
Thom Nichols
0a30e6c017 cleaned up disconnect/reconnect logic just a little 2010-07-02 18:05:50 -04:00
Tom Nichols
d381ab320a merged changes from origin/hacks 2010-07-02 17:14:37 -04:00
Tom Nichols
6e93982fdf trying to get xmlstream to reconnect on stream failure 2010-07-02 16:46:34 -04:00
Tom Nichols
33602f232c allow 'ensure' to block if a transition is occurring 2010-07-02 16:45:55 -04:00
Tom Nichols
7968ca2892 added optional 'block_on_transition' param for 'ensure' function that's called while a transition is in-process 2010-07-02 14:34:59 -04:00
Tom Nichols
661cdd2018 'wait' could delay longer than desired if waiting threads were notified but did not achieve their lock condition afterwards. 2010-07-02 12:57:27 -04:00
Brian Beggs
4b00baab1e reconnection fix: xmlstream now catches XMLParserError and restarts the stream 2010-07-02 22:29:08 +08:00
Brian Beggs
fe1d3004cc xep_0047 initial module checkin 2010-07-02 22:29:08 +08:00
Thom Nichols
62da57a6c2 Merge branch 'master' of git://github.com/macdiesel/SleekXMPP into hacks 2010-07-01 17:50:45 -04:00
Thom Nichols
ba9633f8f7 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP into hacks 2010-07-01 17:06:50 -04:00
Tom Nichols
065a164223 proper logging. 2010-07-01 16:47:08 -04:00
Tom Nichols
cd2017b8b0 catch XML parse errors & don't attempt to reconnect. Also removed 'connecting' state from setStream method 2010-07-01 16:46:37 -04:00
Tom Nichols
dd9f33b7d9 removed some superfluous debug logging 2010-07-01 15:11:02 -04:00
Tom Nichols
0a23f84ec3 fix for statemachine where operations would unintentionally block if the lock was acquired in a long-running transition 2010-07-01 15:10:22 -04:00
Brian Beggs
f477ccf533 Merge remote branch 'tom/hacks' 2010-07-01 10:01:52 -04:00
Brian Beggs
d62a30b0f8 digest-md5 authentication now works with unicode-literals import. Re-added the __future__ imports that were removed. 2010-07-01 09:46:12 -04:00
Brian Beggs
d763795b2c Merge remote branch 'fritzy/master'
Conflicts:
	sleekxmpp/__init__.py
	sleekxmpp/basexmpp.py
	sleekxmpp/stanza/error.py
2010-07-01 09:17:45 -04:00
Brian Beggs
fff54eaf2f temporary removed future support for sleek to support digest-md5 auth 2010-07-01 08:44:39 -04:00
Brian Beggs
488d5b29d4 fixed typo 2010-06-30 14:48:45 -04:00
Brian Beggs
9bdb297fe2 basic checking for digest-md5 to make sure the necessary components are there to complete auth. If not a failed_auth event is dispatched and the socket disconnected. 2010-06-30 14:44:57 -04:00
Brian Beggs
fa7f72d0af Fixed a defect where handlers for SASL authentication were being added multiple times. This caused issues when trying to reconnect. A handler for the auth mech would get added each reconnection attempt, causing digest-md5, success and failure to be called x times for each x number of retries.
Handlers for sasl authentication as well as success and failure are now added during the __init__ method.
2010-06-30 14:30:18 -04:00
Brian Beggs
c538ffae79 digest-md5 auth now works, had to remove from __future__ import unicode_literals to get it working correctly. Also some improvments for the prioroity message sending. 2010-06-30 13:54:53 -04:00
Thom Nichols
5d87a54913 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP into hacks 2010-06-29 16:48:15 -04:00
Thom Nichols
bbf1cb8ba2 output traceback when plugin load fails 2010-06-25 16:31:38 -04:00
Thom Nichols
d22f6a2aa5 make scheduler thread a daemon to prevent shutdown hanging 2010-06-25 16:30:45 -04:00
Brian Beggs
c0a6291fea More digest-md5 changes 2010-06-21 09:23:56 -04:00
Brian Beggs
f5d0466462 working on digest-md5 authentication 2010-06-18 09:51:29 -04:00
Brian Beggs
f659e3081e Merge remote branch 'tom/hacks' 2010-06-10 10:52:58 -04:00
Brian Beggs
1aa34cb0fc Merge remote branch 'tom/hacks' 2010-06-04 12:52:52 -04:00
Brian Beggs
3f96226e29 Added additional logging when a plugin fails to import correctly. 2010-06-03 10:02:55 -04:00
Brian Beggs
71d72f431f Merge remote branch 'tom/hacks' 2010-06-03 09:54:48 -04:00
Brian Beggs
db4989c66d Merge remote branch 'tom/hacks' 2010-06-02 12:49:54 -04:00
Brian Beggs
b0066f3ef4 added try/catch block to plugin loading 2010-06-02 08:45:42 -04:00
Brian Beggs
c0457cf5d0 moddified plugin loading so plugins located outside of the plugins directory in sleek may be loaded. Added optional argument pluginModule that is a string that represents the module the desired plugin should be loaded from.
An exception on plugin loading now also will not cause the program to exit.  The exception is caught and loading of other plugins contains.
2010-06-02 08:28:49 -04:00
Brian Beggs
59b8406573 Added .pydevproject to the .gitignore 2010-06-02 07:34:43 -04:00
Brian Beggs
686943a2ec Merge remote branch 'tom/hacks' 2010-06-02 07:32:33 -04:00
Nathan Fritz
1e3a6e1b5f added muc room to readme 2010-05-26 11:46:56 -07:00
Nathan Fritz
fa92bc866b fixed dns unicode problem 2010-05-26 11:37:01 -07:00
Nathan Fritz
f4bc9d9722 plugins now are checked for post_init having ran when process() is called 2010-05-26 10:51:51 -07:00
Hernan E Grecco
9cfe19c1e1 Changed example.py to register first Xep_0030.
This a simple fix to prevent getting a key error as many plugins add
features to Xep_0030. A better fix would be to call pos_init after all
 plugins are loaded. An even better fix would be to define dependencies
for each plugin and registering on demand.
2010-05-26 06:49:01 +08:00
Hernan E Grecco
f18c790824 Fixed error registering a plugin. To add a feature to another plugin, it should look into xmpp.plugin dict 2010-05-26 06:49:01 +08:00
Nathan Fritz
f165b4b52b Merge branch 'master' of git@github.com:fritzy/SleekXMPP 2010-05-24 19:34:49 -07:00
Nathan Fritz
7ebc006516 updated README, index fix for component 2010-05-24 19:33:24 -07:00
Lance Stout
5ca4ede5ac Added a flag to registerPlugin to control calling the plugin's post_init method. 2010-05-25 07:28:48 +08:00
Lance Stout
35f4ef3452 Modified the return values for several methods so that they can be chained.
For example:

    iq.reply().error().setPayload(something.xml).send()
2010-05-25 07:28:43 +08:00
Lance Stout
828cba875f Added the error attribute 'code' to the Error object interface. 2010-05-25 07:28:43 +08:00
Nathan Fritz
3920ee3941 added plugin indexing to components 2010-05-24 14:27:13 -07:00
Nathan Fritz
feaa7539af added test_events and testing new del_event_handler 2010-05-20 13:09:04 -07:00
Lance Stout
c004f042f9 Added del_event_handler to remove handler functions for a given event.
All registered handlers for the event which use the given function will
be removed.

Using this method allows agents to reconfigure their behaviour on the fly
without needing to add extra state information to event handling functions.
2010-05-21 03:54:48 +08:00
Brian Beggs
dda3e733b5 Merge branch 'master' of https://github.com/macdiesel/SleekXMPP 2010-05-14 11:00:05 -04:00
Brian Beggs
4b322720b3 Merge remote branch 'tom/master' 2010-05-14 10:59:41 -04:00
Brian Beggs
2d89954412 Merge commit 'fritzy/master' 2010-05-13 10:01:46 -04:00
Nathan Fritz
ae41c08fec added test for unsolicided unavailable presence and fixed bug to make it pass 2010-05-12 18:07:20 -07:00
Nathan Fritz
223507f36f fixed a rather large memory leak 2010-05-12 13:45:36 -07:00
Brian Beggs
1521a8b5c9 Merge remote branch 'fritzy/master' 2010-05-12 07:46:07 -04:00
Brian Beggs
70f69c180c Fixes for disconnection problems detailed in http://github.com/fritzy/SleekXMPP/issues/#issue/20
Fixes to both ClientXMPP & xmlstream.  ClientXMPP was not tracking the changes to authenticated and sessionstarted after the client was disconnected.

xmlstream had some funkyness with state in the _process method that was cleaned up and hopefully made a little cleaner.

Also changed a DNS issue that was occuring that rendered me unable to disconnect.  I would recieve the following error upon reconnect.
Exception in thread process:
Exception in thread process:
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/local/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 202, in _process
    self.reconnect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 134, in reconnect
    XMLStream.reconnect(self)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 289, in reconnect
    self.connect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 99, in connect
    answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 732, in query
    return get_default_resolver().query(qname, rdtype, rdclass, tcp, source)
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 617, in query
    source=source)
  File "/usr/local/lib/python2.6/site-packages/dns/query.py", line 113, in udp
    wire = q.to_wire()
  File "/usr/local/lib/python2.6/site-packages/dns/message.py", line 404, in to_wire
    r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
  File "/usr/local/lib/python2.6/site-packages/dns/renderer.py", line 152, in add_question
    self.output.write(struct.pack("!HH", rdtype, rdclass))
TypeError: unsupported operand type(s) for &: 'unicode' and 'long'

Seems I was getting this error when calling line 99 in ClientXMPP.  You can't bit-shift a 1 and a string and this is why this error is coming up. I removed the "SRV" argument and used the default of 1.  not sure exactly what this should be so it may need to be fixed back before it's merged back to trunk.

The line in question:
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
2010-05-04 14:03:38 -04:00
24 changed files with 585 additions and 245 deletions

69
.pylintrc Normal file
View File

@@ -0,0 +1,69 @@
# Pylint configuration file.
# run `pylint --generate-rcfile` to see the default configuration
# run `pylint --rcfile=.pylintrc smallfoot` to perform analysis
# Brain-dead errors regarding standard language features
# W0142 = *args and **kwargs support
# W0403 = Relative imports
# Pointless whining
# R0201 = Method could be a function
# W0212 = Accessing protected attribute of client class
# W0613 = Unused argument
# W0232 = Class has no __init__ method
# R0903 = Too few public methods
# C0301 = Line too long
# R0913 = Too many arguments
# C0103 = Invalid name
# R0914 = Too many local variables
# PyLint's module importation is unreliable
# F0401 = Unable to import module
# W0402 = Uses of a deprecated module
# Already an error when wildcard imports are used
# W0614 = Unused import from wildcard
# Sometimes disabled depending on how bad a module is
# C0111 = Missing docstring
# Convention Errors related to whitespace:
# C0321,C0322,C0323,C0324
# Comments that we've put in the code:
# W0511
[MESSAGES CONTROL]
# Disable the message(s) with the given id(s).
disable=W0142,W0403,R0201,W0212,W0613,W0232,R0903,W0614,C0103,C0111,C0301,C0321,C0322,C0323,C0324,R0913,F0401,W0402,R0914,W0511,W0312
[REPORTS]
include-ids=y
reports=y
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]".
files-output=no
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=yes
[TYPECHECK]
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
#ignored-classes=Message
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO

View File

@@ -14,47 +14,32 @@ from . xmlstream.xmlstream import XMLStream
from . xmlstream.xmlstream import RestartStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.xpath import MatchXPath
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.callback import Callback
from . xmlstream.stanzabase import StanzaBase
from . xmlstream import xmlstream as xmlstreammod
from . stanza.message import Message
from . stanza.iq import Iq
import time
import logging
import base64
import sys
import random
import copy
from . import plugins
#from . import stanza
from xml.etree.cElementTree import tostring
srvsupport = True
try:
import dns.resolver
import dns.rdatatype
import dns.exception
except ImportError:
srvsupport = False
#class PresenceStanzaType(object):
#
# def fromXML(self, xml):
# self.ptype = xml.get('type')
class ClientXMPP(basexmpp, XMLStream):
"""SleekXMPP's client class. Use only for good, not evil."""
def __init__(self, jid, password, ssl=False, plugin_config = {}, plugin_whitelist=[], escape_quotes=True):
global srvsupport
XMLStream.__init__(self)
self.default_ns = 'jabber:client'
basexmpp.__init__(self)
self.plugin_config = plugin_config
self.escape_quotes = escape_quotes
self.set_jid(jid)
self.server = None
self.port = 5222 # not used if DNS SRV is used
self.plugin_whitelist = plugin_whitelist
self.auto_reconnect = True
@@ -71,8 +56,13 @@ class ClientXMPP(basexmpp, XMLStream):
self.sessionstarted = False
self.bound = False
self.bindfail = False
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
XMLStream.registerHandler(self, Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
XMLStream.registerHandler(self, Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
#SASL Auth handlers
basexmpp.add_handler(self, "<challenge xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_digest_md5_auth, instream=True)
basexmpp.add_handler(self, "<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>", self.handler_sasl_digest_md5_auth_fail, instream=True)
basexmpp.add_handler(self, "<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_success, instream=True)
basexmpp.add_handler(self, "<failure xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_fail, instream=True)
#self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True))
self.registerFeature("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_starttls, True)
self.registerFeature("<mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_auth, True)
@@ -94,23 +84,25 @@ class ClientXMPP(basexmpp, XMLStream):
def connect(self, host=None, port=None):
"""Connect to the Jabber Server. Attempts SRV lookup, and if it fails, uses
the JID server."""
the JID server. You can optionally specify a host/port if you're not using
DNS and want to connect to a server address that is different from the XMPP domain."""
if self.state['connected']: return True
if host:
self.server = host
if host: # if a host was specified, don't attempt a DNS lookup.
if port is None: port = self.port
else:
if not self.srvsupport:
logging.debug("Did not supply (address, port) to connect to and no SRV support is installed (http://www.dnspython.org). Continuing to attempt connection, using domain from JID.")
logging.warn("Did not supply (address, port) to connect to and no SRV support is installed (http://www.dnspython.org). Continuing to attempt connection, using domain from JID.")
else:
logging.debug("Since no address is supplied, attempting SRV lookup.")
try:
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.domain,
dns.rdatatype.SRV )
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.domain, dns.rdatatype.SRV)
except dns.resolver.NXDOMAIN:
logging.debug("No appropriate SRV record found. Using JID server name.")
logging.info("No appropriate SRV record found for %s. Using domain as server address.", self.domain)
except dns.exception.DNSException:
# this could be a timeout or other DNS error. Worth retrying?
logging.exception("DNS error during SRV query for %s. Using domain as server address.", self.domain)
else:
# pick a random answer, weighted by priority
# there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway
@@ -127,17 +119,13 @@ class ClientXMPP(basexmpp, XMLStream):
if picked <= priority:
(host,port) = addresses[priority]
break
# if SRV lookup was successful, we aren't using a particular server.
self.server = None
if not host:
# if all else fails take server from JID.
(host,port) = (self.domain, self.port)
self.server = None
logging.debug('Attempting connection to %s:%d', host, port )
#TODO option to not use TLS?
result = XMLStream.connect(self, host, port, use_tls=True)
result = XMLStream.connect(self, host, port)
if result:
self.event("connected")
else:
@@ -150,11 +138,17 @@ class ClientXMPP(basexmpp, XMLStream):
def reconnect(self):
self.disconnect(reconnect=True)
def disconnect(self, reconnect=False):
def disconnect(self, reconnect=False, error=False):
self.event("disconnected")
self.authenticated = False
self.sessionstarted = False
XMLStream.disconnect(self, reconnect)
XMLStream.disconnect(self, reconnect, error)
def sendRaw(self, data, priority=5, init=False):
if not init and not self.sessionstarted:
logging.warn("Attempt to send stanza before session has started:\n%s", data)
return False
XMLStream.sendRaw(self, data, priority, init)
def registerFeature(self, mask, pointer, breaker = False):
"""Register a stream feature."""
@@ -192,7 +186,7 @@ class ClientXMPP(basexmpp, XMLStream):
_stanza = "<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />"
if not self.event_handlers.get(_stanza,None): # don't add handler > once
self.add_handler( _stanza, self.handler_tls_start, instream=True )
self.sendXML(xml)
self.sendRaw(self.tostring(xml), priority=1, init=True)
return True
else:
logging.warning("The module tlslite is required in to some servers, and has not been found.")
@@ -207,24 +201,60 @@ class ClientXMPP(basexmpp, XMLStream):
if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features:
return False
logging.debug("Starting SASL Auth")
self.add_handler("<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_success, instream=True)
self.add_handler("<failure xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_fail, instream=True)
sasl_mechs = xml.findall('{urn:ietf:params:xml:ns:xmpp-sasl}mechanism')
if len(sasl_mechs):
for sasl_mech in sasl_mechs:
self.features.append("sasl:%s" % sasl_mech.text)
if 'sasl:PLAIN' in self.features:
if 'sasl:DIGEST-MD5' in self.features:
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='DIGEST-MD5'/>", priority=1, init=True)
elif 'sasl:PLAIN' in self.features:
if sys.version_info < (3,0):
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username) + b'\x00' + bytes(self.password)).decode('utf-8'))
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>" \
% base64.b64encode(b'\x00' + bytes(self.username) + b'\x00' + bytes(self.password)).decode('utf-8'),
priority=1, init=True)
else:
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8'))
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>" \
% base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8'),
priority=1, init=True)
else:
logging.error("No appropriate login method.")
self.disconnect()
#if 'sasl:DIGEST-MD5' in self.features:
# self._auth_digestmd5()
logging.error("No appropriate login method: %s", sasl_mechs)
self.handler_auth_fail(xml)
return False
return True
def handler_sasl_digest_md5_auth(self, xml):
challenge = [item.split('=', 1) for item in base64.b64decode(xml.text).replace("\"", "").split(',', 6) ]
challenge = dict(challenge)
logging.debug("MD5 auth challenge: %s", challenge)
if challenge.get('rspauth'): #authenticated success... send response
self.sendRaw("""<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>""", priority=1, init=True )
return
#TODO: use realm if supplied by server, use default qop unless supplied by server
#Realm, nonce, qop should all be present
if not challenge.get('qop') or not challenge.get('nonce'):
logging.error("Error during digest-md5 authentication. Challenge missing critical information. Challenge: %s" %base64.b64decode(xml.text))
self.handler_auth_fail(xml)
return
#TODO: charset can be either UTF-8 or if not present use ISO 8859-1 defaulting for UTF-8 for now
#Compute the cnonce - a unique hex string only used in this request
cnonce = ""
for i in range(7):
cnonce+=hex(int(random.random()*65536*4096))[2:]
cnonce = base64.encodestring(cnonce)[0:-1]
a1 = b"%s:%s:%s" %(md5("%s:%s:%s" % (self.username, self.domain, self.password)), challenge["nonce"].encode("UTF-8"), cnonce.encode("UTF-8") )
a2 = "AUTHENTICATE:xmpp/%s" %self.domain
responseHash = md5digest("%s:%s:00000001:%s:auth:%s" %(md5digest(a1), challenge["nonce"], cnonce, md5digest(a2) ) )
response = 'charset=utf-8,username="%s",realm="%s",nonce="%s",nc=00000001,cnonce="%s",digest-uri="%s",response=%s,qop=%s,' \
% (self.username, self.domain, challenge["nonce"], cnonce, "xmpp/%s" % self.domain, responseHash, challenge["qop"])
self.sendRaw("<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'>%s</response>" % base64.encodestring(response)[:-1],
priority=1, init=True )
def handler_sasl_digest_md5_auth_fail(self, xml):
self.authenticated = False
self.handler_auth_fail(xml)
def handler_auth_success(self, xml):
logging.debug("Authentication successful.")
self.authenticated = True
@@ -233,17 +263,17 @@ class ClientXMPP(basexmpp, XMLStream):
def handler_auth_fail(self, xml):
logging.warning("Authentication failed.")
logging.debug(tostring(xml, 'utf-8'))
self.disconnect()
self.event("failed_auth")
def handler_bind_resource(self, xml):
logging.debug("Requesting resource: %s" % self.resource)
iq = self.Iq(stype='set')
res = ET.Element('resource')
res.text = self.resource
xml.append(res)
iq.append(xml)
response = iq.send()
iq = self.makeIqSet(xml)
response = iq.send(priority=2,init=True)
#response = self.send(iq, self.Iq(sid=iq['id']))
self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text)
self.bound = True
@@ -256,12 +286,12 @@ class ClientXMPP(basexmpp, XMLStream):
def handler_start_session(self, xml):
if self.authenticated and self.bound:
iq = self.makeIqSet(xml)
response = iq.send()
response = iq.send(priority=2,init=True)
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
else:
#bind probably hasn't happened yet
logging.warn("Bind has failed; not starting session!")
self.bindfail = True
def _handleRoster(self, iq, request=False):
@@ -273,3 +303,21 @@ class ClientXMPP(basexmpp, XMLStream):
if iq['type'] == 'set':
self.send(self.Iq().setValues({'type': 'result', 'id': iq['id']}).enable('roster'))
self.event("roster_update", iq)
def md5(data):
try:
import hashlib
md5 = hashlib.md5(data)
except ImportError:
import md5
md5 = md5.new(data)
return md5.digest()
def md5digest(data):
try:
import hashlib
md5 = hashlib.md5(data)
except ImportError:
import md5
md5 = md5.new(data)
return md5.hexdigest()

View File

@@ -9,11 +9,8 @@ from __future__ import with_statement, unicode_literals
from xml.etree import cElementTree as ET
from . xmlstream.xmlstream import XMLStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.xmlcallback import XMLCallback
from . xmlstream.handler.xmlwaiter import XMLWaiter
from . xmlstream.handler.waiter import Waiter
from . xmlstream.handler.callback import Callback
from . import plugins
@@ -23,7 +20,6 @@ from . stanza.presence import Presence
from . stanza.roster import Roster
from . stanza.nick import Nick
from . stanza.htmlim import HTMLIM
from . stanza.error import Error
import logging
import threading
@@ -111,7 +107,8 @@ class basexmpp(object):
logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description))
except:
logging.exception("Unable to load plugin: %s", plugin )
def register_plugins(self):
"""Initiates all plugins in the plugins/__init__.__all__"""
if self.plugin_whitelist:
@@ -137,7 +134,7 @@ class basexmpp(object):
self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable, instream))
def getId(self):
return "%x".upper() % self.id
return "%X" % self.id
def sendXML(self, data, mask=None, timeout=10):
return self.send(self.tostring(data), mask, timeout)
@@ -157,40 +154,33 @@ class basexmpp(object):
if mask is not None:
return waitfor.wait(timeout)
def makeIq(self, id=0, ifrom=None):
return self.Iq().setValues({'id': id, 'from': ifrom})
def makeIqGet(self, queryxmlns = None):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
iq = self.Iq().setValues({'type': 'get'})
if queryxmlns:
iq.append(ET.Element("{%s}query" % queryxmlns))
return iq
def makeIqResult(self, id):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
return self.Iq().setValues({'id': id, 'type': 'result'})
def makeIqSet(self, sub=None):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
iq = self.Iq().setValues({'type': 'set'})
if sub != None:
iq.append(sub)
return iq
def makeIqError(self, id, type='cancel', condition='feature-not-implemented', text=None):
# TODO not used.
iq = self.Iq().setValues({'id': id})
iq['error'].setValues({'type': type, 'condition': condition, 'text': text})
return iq
def makeIqQuery(self, iq, xmlns):
query = ET.Element("{%s}query" % xmlns)
iq.append(query)
return iq
def makeQueryRoster(self, iq=None):
query = ET.Element("{jabber:iq:roster}query")
if iq:
iq.append(query)
return query
def add_event_handler(self, name, pointer, threaded=False, disposable=False):
if not name in self.event_handlers:
self.event_handlers[name] = []

View File

@@ -12,21 +12,11 @@ from . basexmpp import basexmpp
from xml.etree import cElementTree as ET
from . xmlstream.xmlstream import XMLStream
from . xmlstream.xmlstream import RestartStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.xpath import MatchXPath
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.callback import Callback
from . xmlstream.stanzabase import StanzaBase
from . xmlstream import xmlstream as xmlstreammod
import time
import logging
import base64
import sys
import random
import copy
from . import plugins
from . import stanza
import hashlib
srvsupport = True
try:
@@ -58,7 +48,7 @@ class ComponentXMPP(basexmpp, XMLStream):
if key in self.plugin:
return self.plugin[key]
else:
logging.warning("""Plugin "%s" is not loaded.""" % key)
logging.warning("Plugin '%s' is not loaded.", key)
return False
def get(self, key, default):

View File

@@ -33,7 +33,7 @@ class gmail_notify(base.base_plugin):
def handler_gmailcheck(self, payload):
#TODO XEP 30 should cache results and have getFeature
result = self.xmpp['xep_0030'].getInfo(self.xmpp.server)
result = self.xmpp['xep_0030'].getInfo(self.xmpp.domain)
features = []
for feature in result.findall('{http://jabber.org/protocol/disco#info}query/{http://jabber.org/protocol/disco#info}feature'):
features.append(feature.get('var'))
@@ -50,7 +50,7 @@ class gmail_notify(base.base_plugin):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = self.xmpp.jid
self.xmpp.makeIqQuery(iq, 'google:mail:notify')
iq.append(ET.Element('{google:mail:notify}query'))
emails = iq.send()
mailbox = emails.find('{google:mail:notify}mailbox')
total = int(mailbox.get('total-matched', 0))

View File

@@ -226,35 +226,31 @@ class xep_0009(base.base_plugin):
else:
raise ValueError()
def makeMethodCallQuery(self,pmethod,params):
query = self.xmpp.makeIqQuery(iq,"jabber:iq:rpc")
def makeIqMethodCall(self,pto,pmethod,params):
query = ET.Element("{jabber:iq:rpc}query")
methodCall = ET.Element('methodCall')
methodName = ET.Element('methodName')
methodName.text = pmethod
methodCall.append(methodName)
methodCall.append(params)
query.append(methodCall)
return query
def makeIqMethodCall(self,pto,pmethod,params):
iq = self.xmpp.makeIqSet()
iq = self.xmpp.makeIqSet(query)
iq.set('to',pto)
iq.append(self.makeMethodCallQuery(pmethod,params))
return iq
def makeIqMethodResponse(self,pto,pid,params):
iq = self.xmpp.makeIqResult(pid)
iq.set('to',pto)
query = self.xmpp.makeIqQuery(iq,"jabber:iq:rpc")
query = ET.Element("{jabber:iq:rpc}query")
methodResponse = ET.Element('methodResponse')
methodResponse.append(params)
query.append(methodResponse)
iq = self.xmpp.makeIqResult(pid)
iq.set('to',pto)
iq.append(query)
return iq
def makeIqMethodError(self,pto,id,pmethod,params,condition):
iq = self.xmpp.makeIqError(id)
iq.set('to',pto)
iq.append(self.makeMethodCallQuery(pmethod,params))
def makeIqMethodError(self,pto,pid,pmethod,params,condition):
iq = self.self.makeMethodCallQuery(pto,pmethod,params)
iq.setValues({'id':pid,'type':'error'})
iq.append(self.xmpp['xep_0086'].makeError(condition))
return iq

View File

@@ -0,0 +1,52 @@
'''
Created on Jul 1, 2010
@author: bbeggs
'''
from . import base
import logging
import threading
from xml.etree import cElementTree as ET
class xep_0047(base.base_plugin):
'''
In-band file transfer for xmpp.
Both message and iq transfer is supported with message being attempted first.
'''
def plugin_init(self):
self.xep = 'xep-047'
self.description = 'in-band file transfer'
self.acceptTransfers = self.config.get('acceptTransfers', True)
self.saveDirectory = self.config.get('saveDirectory', '/tmp')
self.stanzaType = self.config.get('stanzaType', 'message')
self.maxSendThreads = self.config.get('maxSendThreads', 1)
self.maxReceiveThreads = self.config.get('maxReceiveThreads', 1)
#thread setup
self.receiveThreads = {} #id:thread
self.sendThreads = {}
#add handlers to listen for incoming requests
self.xmpp.add_handler("<iq><open xmlns='http://jabber.org/protocol/ibb' /></iq>", self._handleIncomingTransferRequest)
def post_init(self):
self.post_inited = True
def sendFile(self, filePath, threaded=True):
#TODO use this method to send a file
pass
def _handleIncomingTransferRequest(self, xml):
pass
class receiverThread(threading.Thread):
def run(self):
pass
class senderThread(threading.Thread):
def run(self):
pass

View File

@@ -45,7 +45,7 @@ class xep_0078(base.base_plugin):
logging.debug("Starting jabber:iq:auth Authentication")
auth_request = self.xmpp.makeIqGet()
auth_request_query = ET.Element('{jabber:iq:auth}query')
auth_request.attrib['to'] = self.xmpp.server
auth_request.attrib['to'] = self.xmpp.domain
username = ET.Element('username')
username.text = self.xmpp.username
auth_request_query.append(username)

View File

@@ -38,7 +38,7 @@ class xep_0092(base.base_plugin):
def report_version(self, xml):
iq = self.xmpp.makeIqResult(xml.get('id', 'unknown'))
iq.attrib['to'] = xml.get('from', self.xmpp.server)
iq.attrib['to'] = xml.get('from', self.xmpp.domain)
query = ET.Element('{jabber:iq:version}query')
name = ET.Element('name')
name.text = self.name

View File

@@ -41,14 +41,14 @@ class xep_0199(base.base_plugin):
def handler_pingserver(self, xml):
if not self.running:
time.sleep(self.config.get('frequency', 300))
while self.sendPing(self.xmpp.server, self.config.get('timeout', 30)) is not False:
while self.sendPing(self.xmpp.domain, self.config.get('timeout', 30)) is not False:
time.sleep(self.config.get('frequency', 300))
logging.debug("Did not recieve ping back in time. Requesting Reconnect.")
self.xmpp.disconnect(reconnect=True)
def handler_ping(self, xml):
iq = self.xmpp.makeIqResult(xml.get('id', 'unknown'))
iq.attrib['to'] = xml.get('from', self.xmpp.server)
iq.attrib['to'] = xml.get('from', self.xmpp.domain)
self.xmpp.send(iq)
def sendPing(self, jid, timeout = 30):
@@ -56,17 +56,13 @@ class xep_0199(base.base_plugin):
Sends a ping to the specified jid, returning the time (in seconds)
to receive a reply, or None if no reply is received in timeout seconds.
"""
id = self.xmpp.getNewId()
iq = self.xmpp.makeIq(id)
iq.attrib['type'] = 'get'
iq = self.xmpp.makeIqGet()
iq.attrib['to'] = jid
ping = ET.Element('{http://www.xmpp.org/extensions/xep-0199.html#ns}ping')
iq.append(ping)
startTime = time.clock()
#pingresult = self.xmpp.send(iq, self.xmpp.makeIq(id), timeout)
pingresult = iq.send()
endTime = time.clock()
if pingresult == False:
#self.xmpp.disconnect(reconnect=True)
return False
return endTime - startTime

View File

@@ -0,0 +1,89 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2007 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SleekXMPP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SleekXMPP; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
from . import base
from xml.etree import cElementTree as ET
from datetime import datetime
XMLNS = 'urn:xmpp:time'
_XMLNS = '{%s}' % XMLNS
class xep_0202(base.base_plugin):
"""
Implements XEP-0202 Entity Time
TODO currently no support for the user's 'local' timezone; `<tzo>` is always reported as `Z` (UTC).
"""
def plugin_init(self):
self.xep = '0202'
self.description = "Entity Time"
self.xmpp.add_handler("<iq type='get'><time xmlns='%s' /></iq>" % XMLNS, self._handle_get)
def post_init(self):
base.base_plugin.post_init(self)
disco = self.xmpp.plugin.get('xep_0030',None)
if disco: disco.add_feature(XMLNS)
def send_request(self,to):
iq = self.xmpp.Iq( stream=self.xmpp, sto=to, stype='get',
xml = ET.Element(_XMLNS + 'time') )
resp = iq.send(iq) # wait for response
return TimeElement(
resp.find(_XMLNS + 'time/utc').text,
xml.find(_XMLNS + 'time/tzo').text )
def _handle_get(self,xml):
iq = self.xmpp.Iq( sid=xml.get('id'), sto=xml.get('from'), stype='result' )
iq.append( TimeElement().to_xml() )
self.xmpp.send(iq)
class TimeElement:
"""
Time response data
"""
def __init__(self, utc=None, tzo="Z"):
if utc is None:
self.utc = datetime.utcnow()
elif type(utc) is str: # parse ISO string
dt_format = '%Y-%m-%dT%H:%M:%S'
if utc.find('.') > -1: dt_format += '.%f' # milliseconds in format
self.utc = datetime.strptime( time_str, dt_format + 'Z' )
elif type(utc) is float: # parse posix timestamp
self.utc = datetime.utcfromtimestamp()
else: self.utc = utc
self.tzo = tzo
def to_xml(self):
time = ET.Element(_XMLNS+'time')
child = ET.Element('tzo')
child.text = str(self.tzo)
time.append( child )
child = ET.Element('utc')
child.text = datetime.isoformat(self.utc) + "Z"
time.append( child )
return time
def __str__(self):
return ET.tostring( self.to_xml() )

View File

@@ -1,9 +1,9 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET

View File

@@ -1,13 +1,12 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import StanzaBase
from xml.etree import cElementTree as ET
from . error import Error
from .. xmlstream.handler.waiter import Waiter
from .. xmlstream.matcher.id import MatcherId
from . rootstanza import RootStanza
@@ -67,11 +66,11 @@ class Iq(RootStanza):
self.xml.remove(child)
return self
def send(self, block=True, timeout=10):
def send(self, block=True, timeout=10, priority=5, init=False):
if block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.registerHandler(waitfor)
StanzaBase.send(self)
StanzaBase.send(self, priority, init)
return waitfor.wait(timeout)
else:
return StanzaBase.send(self)
return StanzaBase.send(self, priority, init)

View File

@@ -6,8 +6,6 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import StanzaBase
from xml.etree import cElementTree as ET
from . error import Error
from . rootstanza import RootStanza
class Message(RootStanza):

View File

@@ -5,7 +5,7 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET
from .. xmlstream.stanzabase import ElementBase
class Nick(ElementBase):
namespace = 'http://jabber.org/nick/nick'

View File

@@ -5,8 +5,7 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET, JID
import logging
from .. xmlstream.stanzabase import ElementBase, ET
class Roster(ElementBase):
namespace = 'jabber:iq:roster'

View File

@@ -43,7 +43,7 @@ class testps(sleekxmpp.ClientXMPP):
self.node = "pstestnode_%s"
self.pshost = pshost
if pshost is None:
self.pshost = self.server
self.pshost = self.domain
self.nodenum = int(nodenum)
self.leafnode = self.nodenum + 1
self.collectnode = self.nodenum + 2

View File

@@ -20,12 +20,12 @@ class Callback(base.BaseHandler):
def prerun(self, payload): # prerun actually calls run?!? WTF! Then it gets run AGAIN!
base.BaseHandler.prerun(self, payload)
if self._instream:
logging.debug('callback "%s" prerun', self.name)
# logging.debug('callback "%s" prerun', self.name)
self.run(payload, True)
def run(self, payload, instream=False):
if not self._instream or instream:
logging.debug('callback "%s" run', self.name)
# logging.debug('callback "%s" run', self.name)
base.BaseHandler.run(self, payload)
#if self._thread:
# x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))

View File

@@ -8,6 +8,7 @@
from . import base
from xml.etree import cElementTree
from xml.parsers.expat import ExpatError
import logging
ignore_ns = False
@@ -38,7 +39,7 @@ class MatchXMLMask(base.MatcherBase):
try:
maskobj = cElementTree.fromstring(maskobj)
except ExpatError:
logging.log(logging.WARNING, "Expat error: %s\nIn parsing: %s" % ('', maskobj))
logging.exception( "Expat error parsing: %s", maskobj)
if not use_ns and source.tag.split('}', 1)[-1] != maskobj.tag.split('}', 1)[-1]: # strip off ns and compare
return False
if use_ns and (source.tag != maskobj.tag and "{%s}%s" % (self.default_ns, maskobj.tag) != source.tag ):

View File

@@ -1,9 +1,9 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from xml.etree import cElementTree as ET
import logging
@@ -331,6 +331,7 @@ class StanzaBase(ElementBase):
self['to'] = sto
if sfrom is not None:
self['from'] = sfrom
if sid is not None: self['id'] = sid
self.tag = "{%s}%s" % (self.namespace, self.name)
def setType(self, value):
@@ -383,6 +384,7 @@ class StanzaBase(ElementBase):
def exception(self, e):
logging.error(traceback.format_tb(e))
def send(self):
self.stream.sendRaw(self.__str__())
def send(self, priority=5, init=False):
self.stream.sendRaw(self.__str__(), priority, init)

View File

@@ -5,27 +5,31 @@
See the file license.txt for copying permission.
"""
from __future__ import with_statement
import threading
import time
import logging
log = logging.getLogger(__name__)
class StateMachine(object):
def __init__(self, states=[]):
self.lock = threading.Condition(threading.RLock())
self.lock = threading.Lock()
self.notifier = threading.Event()
self.__states= []
self.addStates(states)
self.__default_state = self.__states[0]
self.__current_state = self.__default_state
def addStates(self, states):
with self.lock:
self.lock.acquire()
try:
for state in states:
if state in self.__states:
raise IndexError("The state '%s' is already in the StateMachine." % state)
self.__states.append( state )
finally: self.lock.release()
def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ):
@@ -78,30 +82,34 @@ class StateMachine(object):
if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state )
with self.lock:
start = time.time()
while not self.__current_state in from_states:
# detect timeout:
if time.time() >= start + wait: return False
self.lock.wait(wait)
start = time.time()
while not self.__current_state in from_states or not self.lock.acquire(False):
# detect timeout:
remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder)
else: return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout
if self.__current_state in from_states: # should always be True due to lock
return_val = True
# Note that func might throw an exception, but that's OK, it aborts the transition
if func is not None: return_val = func(*args,**kwargs)
return_val = func(*args,**kwargs) if func is not None else True
# some 'false' value returned from func,
# indicating that transition should not occur:
if not return_val: return return_val
logging.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
self.__current_state = to_state
self.lock.notify_all()
log.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
self._set_state( to_state )
return return_val # some 'true' value returned by func or True if func was None
else:
logging.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
return False
finally:
self.notifier.set() # notify any waiting threads that the state has changed.
self.notifier.clear()
self.lock.release()
def transition_ctx(self, from_state, to_state, wait=0.0):
@@ -139,16 +147,24 @@ class StateMachine(object):
return _StateCtx(self, from_state, to_state, wait)
def ensure(self, state, wait=0.0):
def ensure(self, state, wait=0.0, block_on_transition=False ):
'''
Ensure the state machine is currently in `state`, or wait until it enters `state`.
'''
return self.ensure_any( (state,), wait=wait )
return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition )
def ensure_any(self, states, wait=0.0):
def ensure_any(self, states, wait=0.0, block_on_transition=False):
'''
Ensure we are currently in one of the given `states`
Ensure we are currently in one of the given `states` or wait until
we enter one of those states.
Note that due to the nature of the function, you cannot guarantee that
the entirety of some operation completes while you remain in a given
state. That would require acquiring and holding a lock, which
would mean no other threads could do the same. (You'd essentially
be serializing all of the threads that are 'ensuring' their tasks
occurred in some state.
'''
if not (isinstance(states,tuple) or isinstance(states,list)):
raise ValueError('states arg should be a tuple or list')
@@ -157,18 +173,31 @@ class StateMachine(object):
if not state in self.__states:
raise ValueError( "StateMachine does not contain state '%s'" % state )
with self.lock:
start = time.time()
while not self.__current_state in states:
# detect timeout:
if time.time() >= start + wait: return False
self.lock.wait(wait)
return self.__current_state in states # should always be True due to lock
# if we're in the middle of a transition, determine whether we should
# 'fall back' to the 'current' state, or wait for the new state, in order to
# avoid an operation occurring in the wrong state.
# TODO another option would be an ensure_ctx that uses a semaphore to allow
# threads to indicate they want to remain in a particular state.
# will return immediately if no transition is in process.
if block_on_transition:
# we're not in the middle of a transition; don't hold the lock
if self.lock.acquire(False): self.lock.release()
# wait for the transition to complete
else: self.notifier.wait()
start = time.time()
while not self.__current_state in states:
# detect timeout:
remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder)
else: return False
return True
def reset(self):
# TODO need to lock before calling this?
self.transition(self.__current_state, self._default_state)
self.transition(self.__current_state, self.__default_state)
def _set_state(self, state): #unsynchronized, only call internally after lock is acquired
@@ -202,33 +231,37 @@ class _StateCtx:
self.from_state = from_state
self.to_state = to_state
self.wait = wait
self._timeout = False
self._locked = False
def __enter__(self):
self.state_machine.lock.acquire()
start = time.time()
while not self.state_machine[ self.from_state ]:
while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False):
# detect timeout:
if time.time() >= start + self.wait:
logging.debug('StateMachine timeout while waiting for state: %s', self.from_state )
self._timeout = True # to indicate we should not transition
remainder = start + self.wait - time.time()
if remainder > 0: self.state_machine.notifier.wait(remainder)
else:
log.debug('StateMachine timeout while waiting for state: %s', self.from_state )
return False
self.state_machine.lock.wait(self.wait)
logging.debug('StateMachine entered context in state: %s',
self._locked = True # lock has been acquired at this point
self.state_machine.notifier.clear()
log.debug('StateMachine entered context in state: %s',
self.state_machine.current_state() )
return True
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
logging.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s",
self.state_machine.current_state(), exc_type.__name__, exc_val )
elif not self._timeout:
logging.debug(' ==== TRANSITION %s -> %s',
self.state_machine.current_state(), self.to_state)
self.state_machine._set_state( self.to_state )
log.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s",
self.state_machine.current_state(), exc_type.__name__, exc_val )
if self._locked:
if exc_val is None:
log.debug(' ==== TRANSITION %s -> %s',
self.state_machine.current_state(), self.to_state)
self.state_machine._set_state( self.to_state )
self.state_machine.notifier.set()
self.state_machine.lock.release()
self.state_machine.lock.notify_all()
self.state_machine.lock.release()
return False # re-raise any exception

View File

@@ -14,15 +14,12 @@ except ImportError:
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging
import random
import socket
import threading
import time
import traceback
import types
import xml.sax.saxutils
from . import scheduler
HANDLER_THREADS = 1
@@ -42,24 +39,21 @@ if sys.version_info < (3, 0):
class RestartStream(Exception):
pass
class CloseStream(Exception):
pass
stanza_extensions = {}
RECONNECT_MAX_DELAY = 3600
RECONNECT_MAX_DELAY = 360
RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi
RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole
DEFAULT_KEEPALIVE = 300 # send a single byte every 5 minutes
class XMLStream(object):
"A connection manager with XML events."
def __init__(self, socket=None, host='', port=0, escape_quotes=False):
def __init__(self, socket=None, host='', port=5222, escape_quotes=False):
global ssl_support
self.ssl_support = ssl_support
self.escape_quotes = escape_quotes
self.state = statemachine.StateMachine(('disconnected','connecting',
'connected'))
self.state = statemachine.StateMachine(('disconnected','connected'))
self.should_reconnect = True
self.setSocket(socket)
@@ -72,38 +66,43 @@ class XMLStream(object):
self.__stanza_extension = {}
self.__handlers = []
self.__tls_socket = None
self.filesocket = None
self.use_ssl = False
self.use_tls = False
self.ca_certs=None
self.keep_alive = DEFAULT_KEEPALIVE
self._last_sent_time = time.time()
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.sendqueue = queue.PriorityQueue()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
self.run = True
# booleans are not volatile in Python and changes
# do not seem to be detected easily between threads.
self.quit = threading.Event()
def setSocket(self, socket):
"Set the socket"
self.socket = socket
if socket is not None and self.state.transition('disconnected','connecting'):
self.filesocket = socket.makefile('rb', 0) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.state.transition('connecting','connected')
if socket is not None:
with self.state.transition_ctx('disconnected','connected') as locked:
if not locked: raise Exception('Already connected')
# ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.filesocket = socket.makefile('rb', 0)
def setFileSocket(self, filesocket):
self.filesocket = filesocket
def connect(self, host='', port=0, use_ssl=None, use_tls=None):
def connect(self, host='', port=5222, use_ssl=None):
"Establish a socket connection to the given XMPP server."
if not self.state.transition('disconnected','connected',
func=self.connectTCP, args=[host, port, use_ssl, use_tls] ):
func=self.connectTCP, args=[host, port, use_ssl] ):
if self.state['connected']: logging.debug('Already connected')
else: logging.warning("Connection failed" )
@@ -115,32 +114,31 @@ class XMLStream(object):
# TODO currently a caller can't distinguish between "connection failed" and
# "we're already trying to connect from another thread"
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
def connectTCP(self, host='', port=5222, use_ssl=None, reattempt=True):
"Connect and create socket"
# Note that this is thread-safe by merit of being called solely from connect() which
# holds the state lock.
delay = 1.0 # reconnection delay
while self.run:
while not self.quit.is_set():
logging.debug('connecting....')
try:
if host and port:
self.address = (host, int(port))
if use_ssl is not None:
self.use_ssl = use_ssl
if use_tls is not None:
# TODO this variable doesn't seem to be used for anything!
self.use_tls = use_tls
if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None) #10)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs)
cert_policy = ssl.CERT_NONE if self.ca_certs is None else ssl.CERT_REQUIRED
self.socket = ssl.wrap_socket(self.socket,
ca_certs=self.ca_certs, cert_reqs=cert_policy)
self.socket.connect(self.address)
self.filesocket = self.socket.makefile('rb', 0)
@@ -152,14 +150,14 @@ class XMLStream(object):
if not reattempt: return False
except:
logging.exception("Connection error")
if not reattempt: return False
if not reattempt: return False
# quiesce if rconnection fails:
# This algorithm based loosely on Twisted internet.protocol
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
logging.debug('Waiting %fs until next reconnect attempt...', delay)
logging.debug('Waiting %.3fs until next reconnect attempt...', delay)
time.sleep(delay)
@@ -169,12 +167,18 @@ class XMLStream(object):
def startTLS(self):
"Handshakes for TLS"
# TODO since this is not part of the 'connectTCP' method, it does not quiesce if
# The TLS negotiation throws an SSLError. It really should. Worse yet, some
# errors might be considered fatal (like certificate verification failure) in which
# case, should we even attempt to re-connect at all?
if self.ssl_support:
logging.info("Negotiating TLS")
# self.realsocket = self.socket # NOT USED
cert_policy = ssl.CERT_NONE if self.ca_certs is None else ssl.CERT_REQUIRED
self.socket = ssl.wrap_socket(self.socket,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=False,
do_handshake_on_connect=False,
cert_reqs=cert_policy,
ca_certs=self.ca_certs)
self.socket.do_handshake()
if sys.version_info < (3,0):
@@ -191,8 +195,8 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
self.quit.clear()
self.scheduler.process(threaded=True)
self.run = True
for t in range(0, HANDLER_THREADS):
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
@@ -216,17 +220,15 @@ class XMLStream(object):
def _process(self):
"Start processing the socket."
logging.debug('Process thread starting...')
while self.run:
if not self.state.ensure('connected',wait=2): continue
while not self.quit.is_set():
if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
try:
self.sendRaw(self.stream_header)
while self.run and self.__readXML(): pass
self.sendRaw(self.stream_header, priority=0, init=True)
self.__readXML() # this loops until the stream is terminated.
except socket.timeout:
logging.debug('socket rcv timeout')
pass
except CloseStream:
# TODO warn that the listener thread is exiting!!!
pass
# TODO currently this will re-send a stream header if this exception occurs.
# I don't think that's intended behavior.
logging.warn('socket rcv timeout')
except RestartStream:
logging.debug("Restarting stream...")
continue # DON'T re-initialize the stream -- this exception is sent
@@ -237,9 +239,12 @@ class XMLStream(object):
self.eventqueue.put(('quit', None, None))
except:
logging.exception('Unexpected error in RCV thread')
if self.should_reconnect:
self.disconnect(reconnect=True)
# if the RCV socket is terminated for whatever reason (e.g. we reach this point of
# code,) our only sane choice of action is an attempt to re-establish the connection.
reconnect = (self.should_reconnect and not self.quit.is_set())
self.disconnect(reconnect=reconnect, error=True)
logging.debug('Quitting Process thread')
def __readXML(self):
@@ -258,8 +263,8 @@ class XMLStream(object):
if event == b'end':
edepth += -1
if edepth == 0 and event == b'end':
# what is this case exactly? Premature EOF?
logging.debug("Ending readXML loop")
logging.warn("Premature EOF from read socket; Ending readXML loop")
# this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly.
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -267,22 +272,26 @@ class XMLStream(object):
if root: root.clear()
if event == b'start':
edepth += 1
logging.debug("Exiting readXML loop")
logging.warn("Exiting readXML loop")
# TODO under what conditions will this _ever_ occur?
return False
def _sendThread(self):
logging.debug('send thread starting...')
while self.run:
if not self.state.ensure('connected',wait=2): continue
while not self.quit.is_set():
if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
data = None
try:
data = self.sendqueue.get(True,5)
data = self.sendqueue.get(True,5)[1]
logging.debug("SEND: %s" % data)
self.socket.sendall(data.encode('utf-8'))
except queue.Empty:
# logging.debug('Nothing on send queue')
pass
self._last_sent_time = time.time()
except queue.Empty: # send keep-alive if necessary
now = time.time()
if self._last_sent_time + self.keep_alive < now:
self.socket.sendall(' ')
self._last_sent_time = time.time()
except socket.timeout:
# this is to prevent a thread blocked indefinitely
logging.debug('timeout sending packet data')
@@ -294,23 +303,24 @@ class XMLStream(object):
# the same thing concurrently. Oops! The safer option would be to throw
# some sort of event that could be handled by a common thread or the reader
# thread to perform reconnect and then re-initialize the handler threads as well.
if self.should_reconnect:
self.disconnect(reconnect=True)
reconnect = (self.should_reconnect and not self.quit.is_set())
self.disconnect(reconnect=reconnect, error=True)
def sendRaw(self, data):
self.sendqueue.put(data)
def sendRaw( self, data, priority=5, init=False ):
if not self.state.ensure('connected'): return False
self.sendqueue.put((priority, data))
return True
def disconnect(self, reconnect=False):
def disconnect(self, reconnect=False, error=False):
with self.state.transition_ctx('connected','disconnected') as locked:
if not locked:
logging.warning("Already disconnected.")
return
logging.debug("Disconnecting...")
self.sendRaw(self.stream_footer)
time.sleep(5)
#send end of stream
#wait for end of stream back
# don't send a footer on error; if the stream is already closed,
# this won't get sent until the stream is re-initialized!
if not error: self.sendRaw(self.stream_footer,init=True) #send end of stream
try:
# self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
@@ -327,18 +337,18 @@ class XMLStream(object):
'''
Disconnects and shuts down all event threads.
'''
self.disconnect()
self.run = False
self.scheduler.run = False
self.disconnect()
def incoming_filter(self, xmlobj):
return xmlobj
def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers"
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
#convert XML into Stanza
# TODO surround this log statement with an if, it's expensive
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
xmlobj = self.incoming_filter(xmlobj)
stanza = None
for stanza_class in self.__root_stanza:
@@ -352,11 +362,11 @@ class XMLStream(object):
# TODO inefficient linear search; performance might be improved by hashtable lookup
for handler in self.__handlers:
if handler.match(stanza):
logging.debug('matched stanza to handler %s', handler.name)
# logging.debug('matched stanza to handler %s', handler.name)
handler.prerun(stanza)
self.eventqueue.put(('stanza', handler, stanza))
if handler.checkDelete():
logging.debug('deleting callback %s', handler.name)
# logging.debug('deleting callback %s', handler.name)
self.__handlers.pop(self.__handlers.index(handler))
unhandled = False
if unhandled:
@@ -366,7 +376,7 @@ class XMLStream(object):
def _eventRunner(self):
logging.debug("Loading event runner")
while self.run:
while not self.quit.is_set():
try:
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:

View File

@@ -118,10 +118,11 @@ class testpubsubstanzas(unittest.TestCase):
iq = self.ps.Iq()
iq['pubsub_owner']['default']
iq['pubsub_owner']['default']['node'] = 'mynode'
iq['pubsub_owner']['default']['type'] = 'leaf'
form = xep_0004.Form()
form.addField('pubsub#title', ftype='text-single', value='This thing is awesome')
iq['pubsub_owner']['default']['config'] = form
xmlstring = """<iq id="0"><pubsub xmlns="http://jabber.org/protocol/pubsub#owner"><default node="mynode"><x xmlns="jabber:x:data" type="form"><field var="pubsub#title" type="text-single"><value>This thing is awesome</value></field></x></default></pubsub></iq>"""
xmlstring = """<iq id="0"><pubsub xmlns="http://jabber.org/protocol/pubsub#owner"><default node="mynode" type="leaf"><x xmlns="jabber:x:data" type="form"><field var="pubsub#title" type="text-single"><value>This thing is awesome</value></field></x></default></pubsub></iq>"""
iq2 = self.ps.Iq(None, self.ps.ET.fromstring(xmlstring))
iq3 = self.ps.Iq()
values = iq2.getValues()

View File

@@ -256,6 +256,73 @@ class testStateMachine(unittest.TestCase):
self.assertTrue( s['three'] )
def testTransitionsDontUnintentionallyBlock(self):
'''
There was a bug where a long-running transition (e.g. one with a 'func'
arg or a `transition_ctx` call would cause any `transition` or `ensure`
call to block since the lock is acquired before checking the current
state. Attempts to acquire the mutex need to be non-blocking so when a
timeout is _not_ given, the caller can return immediately. At the same
time, threads that _do_ want to wait need the ability to be notified
(to avoid waiting beyond when the lock is released) so we've moved to a
combination of a plain-ol `threading.Lock` to act as mutex, and a
`threading.Event` to perform notification for threads who choose to wait.
'''
s = sm.StateMachine(('one','two','three'))
with s.transition_ctx('two','three') as result:
self.failIf( result )
self.assertTrue( s['one'] )
self.failIf( s.current_state in ('two','three') )
self.assertTrue( s['one'] )
statuses = {'t1':"not started",
't2':'not started'}
def t1():
print 'thread 1 started'
# no wait, so this should 'return False' immediately.
self.failIf( s.transition('two','three') )
statuses['t1'] = 'complete'
print 'thread 1 transitioned'
def t2():
print 'thread 2 started'
self.failIf( s['two'] )
self.failIf( s['three'] )
# we want this thread to acquire the lock, but for
# the second thread not to wait on the first.
with s.transition_ctx('one','two', 10) as locked:
statuses['t2'] = 'started'
print 'thread 2 has entered context'
self.assertTrue( locked )
# give thread1 a chance to complete while this
# thread still owns the lock
time.sleep(5)
self.assertTrue( s['two'] )
statuses['t2'] = 'complete'
t1 = threading.Thread(target=t1)
t2 = threading.Thread(target=t2)
t2.start() # this should acquire the lock
time.sleep(.2)
self.assertEqual( 'started', statuses['t2'] )
t1.start() # but it shouldn't prevent thread 1 from completing
time.sleep(1)
self.assertEqual( 'complete', statuses['t1'] )
t1.join()
t2.join()
self.assertEqual( 'complete', statuses['t2'] )
self.assertTrue( s['two'] )
suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine)
if __name__ == '__main__': unittest.main()