Compare commits

...

135 Commits

Author SHA1 Message Date
Tom Nichols
494e3ef449 fixed indentation error 2010-07-14 15:40:27 -04:00
Thom Nichols
be5688007b moved parsing logic into TimeElement to aid reuse 2010-07-14 11:05:29 -04:00
Tom Nichols
ad7c1b06f4 XEP-0202 Entity Time plugin and fix for unused 'sid' parameter in StanzaBase. 2010-07-13 17:14:38 -04:00
Tom Nichols
083ac3faaf woops, broke resource binding request 2010-07-12 13:07:24 -04:00
Tom Nichols
a909731b03 removed digest_auth_started (it was never set to 'True') and did a little error handling cleanup 2010-07-12 12:55:53 -04:00
Tom Nichols
4864197d46 fixed indent 2010-07-12 12:54:58 -04:00
Tom Nichols
92a5ac2ba9 removed unused imports and fixed log msg 2010-07-12 12:25:55 -04:00
Tom Nichols
02ca5f0e42 fixed logging error (logging module was not imported) 2010-07-12 12:17:57 -04:00
Tom Nichols
1e009513ee removed some unused imports 2010-07-12 12:16:58 -04:00
Tom Nichols
55f83e8ab0 fixed variable name 2010-07-12 12:09:34 -04:00
Tom Nichols
d43fba3c8f adding pylint rcfile 2010-07-12 12:01:13 -04:00
Tom Nichols
9c5285987d removed ClientXMPP.server in favor of ClientXMPP.domain 2010-07-09 17:25:11 -04:00
Tom Nichols
d09cbef9a7 catch other DNS errors that might occur and fallback to JID domain. 2010-07-09 17:23:02 -04:00
Tom Nichols
9c850f080d removed useless 'use_tls' variable 2010-07-09 17:21:50 -04:00
Tom Nichols
879dd11daa reduced max quiesce delay to 6 minutes. We want to be fairly agressive here. 2010-07-09 16:16:07 -04:00
Tom Nichols
969c4652a4 wait, shouldn't 'port' default to 5222?? Would seem logical to me. 2010-07-09 16:15:18 -04:00
Tom Nichols
9506970042 removed useless 'use_tls' variable 2010-07-09 16:12:32 -04:00
Tom Nichols
3c6b07353d added keepalive to send thread 2010-07-09 16:06:53 -04:00
Brian Beggs
66c6c21ad8 kill the running threads before disconnecting 2010-07-09 15:36:13 -04:00
Brian Beggs
c5b5cc4af1 fix for md5 sasl authentication 2010-07-09 15:33:21 -04:00
Brian Beggs
e835843aab fixes to digest-md5 for ejabberd 2010-07-09 15:26:26 -04:00
Tom Nichols
d6681f16d2 fixed indentation error 2010-07-07 15:19:31 -04:00
Tom Nichols
fc952efae9 removed unused and redundant 'makeIq...' methods from basexmpp; cleaned up the (few\!) plugins that actually used them. 2010-07-07 15:18:59 -04:00
Tom Nichols
f7273affc5 notes on the usefulness of some of the 'makeIq' methods. In particular, they seem to duplicate behavior or be largely unused for their intended purpose. 2010-07-07 15:06:39 -04:00
Tom Nichols
34eb88f199 Merge branch 'hacks' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-07-07 13:19:21 -04:00
Thom Nichols
f3cf5f6080 added SSL certificate verification to startTLS method 2010-07-07 11:33:12 -04:00
Thom Nichols
85d8b9270f client must validate the server's SSL certificate against the CA list if it is provided. 2010-07-06 17:37:57 -04:00
Tom Nichols
259dffeb6e send now has a priority and an 'init' parameter to denote stanzas that may be sent prior to session establishment. 2010-07-06 14:16:46 -04:00
Thom Nichols
0a30e6c017 cleaned up disconnect/reconnect logic just a little 2010-07-02 18:05:50 -04:00
Tom Nichols
d381ab320a merged changes from origin/hacks 2010-07-02 17:14:37 -04:00
Tom Nichols
6e93982fdf trying to get xmlstream to reconnect on stream failure 2010-07-02 16:46:34 -04:00
Tom Nichols
33602f232c allow 'ensure' to block if a transition is occurring 2010-07-02 16:45:55 -04:00
Tom Nichols
7968ca2892 added optional 'block_on_transition' param for 'ensure' function that's called while a transition is in-process 2010-07-02 14:34:59 -04:00
Tom Nichols
661cdd2018 'wait' could delay longer than desired if waiting threads were notified but did not achieve their lock condition afterwards. 2010-07-02 12:57:27 -04:00
Brian Beggs
4b00baab1e reconnection fix: xmlstream now catches XMLParserError and restarts the stream 2010-07-02 22:29:08 +08:00
Brian Beggs
fe1d3004cc xep_0047 initial module checkin 2010-07-02 22:29:08 +08:00
Thom Nichols
62da57a6c2 Merge branch 'master' of git://github.com/macdiesel/SleekXMPP into hacks 2010-07-01 17:50:45 -04:00
Thom Nichols
ba9633f8f7 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP into hacks 2010-07-01 17:06:50 -04:00
Tom Nichols
065a164223 proper logging. 2010-07-01 16:47:08 -04:00
Tom Nichols
cd2017b8b0 catch XML parse errors & don't attempt to reconnect. Also removed 'connecting' state from setStream method 2010-07-01 16:46:37 -04:00
Tom Nichols
dd9f33b7d9 removed some superfluous debug logging 2010-07-01 15:11:02 -04:00
Tom Nichols
0a23f84ec3 fix for statemachine where operations would unintentionally block if the lock was acquired in a long-running transition 2010-07-01 15:10:22 -04:00
Brian Beggs
f477ccf533 Merge remote branch 'tom/hacks' 2010-07-01 10:01:52 -04:00
Brian Beggs
d62a30b0f8 digest-md5 authentication now works with unicode-literals import. Re-added the __future__ imports that were removed. 2010-07-01 09:46:12 -04:00
Brian Beggs
d763795b2c Merge remote branch 'fritzy/master'
Conflicts:
	sleekxmpp/__init__.py
	sleekxmpp/basexmpp.py
	sleekxmpp/stanza/error.py
2010-07-01 09:17:45 -04:00
Brian Beggs
fff54eaf2f temporary removed future support for sleek to support digest-md5 auth 2010-07-01 08:44:39 -04:00
Brian Beggs
488d5b29d4 fixed typo 2010-06-30 14:48:45 -04:00
Brian Beggs
9bdb297fe2 basic checking for digest-md5 to make sure the necessary components are there to complete auth. If not a failed_auth event is dispatched and the socket disconnected. 2010-06-30 14:44:57 -04:00
Brian Beggs
fa7f72d0af Fixed a defect where handlers for SASL authentication were being added multiple times. This caused issues when trying to reconnect. A handler for the auth mech would get added each reconnection attempt, causing digest-md5, success and failure to be called x times for each x number of retries.
Handlers for sasl authentication as well as success and failure are now added during the __init__ method.
2010-06-30 14:30:18 -04:00
Brian Beggs
c538ffae79 digest-md5 auth now works, had to remove from __future__ import unicode_literals to get it working correctly. Also some improvments for the prioroity message sending. 2010-06-30 13:54:53 -04:00
Thom Nichols
5d87a54913 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP into hacks 2010-06-29 16:48:15 -04:00
Tom Nichols
8bdfa77024 Merge branch 'hacks' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-06-28 11:10:34 -04:00
Tom Nichols
15ac3e9fba race condition where we were transitioning to 'disconnected' and immediately reconnecting in another thread before the socket.close call occurred. Now we're locking the state machine until the disconnect routine completes. 2010-06-28 11:06:26 -04:00
Tom Nichols
e8d37b409c make the scheduler a daemon thread to prevent hanging when the main thread exits. 2010-06-28 11:04:18 -04:00
Tom Nichols
898f96f265 print the traceback if we can't load a plugin for some reason 2010-06-28 11:03:46 -04:00
Thom Nichols
bbf1cb8ba2 output traceback when plugin load fails 2010-06-25 16:31:38 -04:00
Thom Nichols
d22f6a2aa5 make scheduler thread a daemon to prevent shutdown hanging 2010-06-25 16:30:45 -04:00
Brian Beggs
c0a6291fea More digest-md5 changes 2010-06-21 09:23:56 -04:00
Brian Beggs
f5d0466462 working on digest-md5 authentication 2010-06-18 09:51:29 -04:00
Brian Beggs
f659e3081e Merge remote branch 'tom/hacks' 2010-06-10 10:52:58 -04:00
Tom Nichols
4fccd77685 Merge branch 'hacks' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-06-08 10:40:15 -04:00
Thom Nichols
bf2bf29fc6 fixed mis-named variable, doc typo and using conformant Condition methods. 2010-06-08 09:02:51 -04:00
Thom Nichols
34dc236126 added documentation for transition_ctx and removed some superfluous comment lines 2010-06-07 14:41:42 -04:00
Thom Nichols
9464736551 added __str__ 2010-06-07 13:58:15 -04:00
Thom Nichols
47f1fb1690 context manager now returns a boolean 'result' as the context variable to indicate whether the transition timed out or if you are actually locked when entering the context body 2010-06-07 13:43:37 -04:00
Thom Nichols
66cf0c2021 context manager is working but there's a fatal flaw: inside the body of the 'with' statement, there's no way to tell whether or not the transition occurred or timed out. 2010-06-07 13:16:02 -04:00
Thom Nichols
e7c37c4ec5 connect uses the new function-on-state-transition so when the connect method returns you are guaranteed to be either in the 'connected' or 'disconnected' state. Could remove the 'connecting' state except uses it. 2010-06-04 17:00:51 -04:00
Brian Beggs
1aa34cb0fc Merge remote branch 'tom/hacks' 2010-06-04 12:52:52 -04:00
Thom Nichols
919c8c5633 tweaked connectTCP call slightly to reduce possibility of 'connecting' state limbo 2010-06-03 15:21:26 -04:00
Thom Nichols
f54501a346 added function execution on transition, and more unit tests. 2010-06-03 14:12:06 -04:00
Thom Nichols
d20cd6b3e6 added function execution on transition, and more unit tests. 2010-06-03 13:51:11 -04:00
Brian Beggs
3f96226e29 Added additional logging when a plugin fails to import correctly. 2010-06-03 10:02:55 -04:00
Brian Beggs
71d72f431f Merge remote branch 'tom/hacks' 2010-06-03 09:54:48 -04:00
Thom Nichols
da6e1e47dc whups, somehow I lost the 'connecting' lock in connect() 2010-06-03 08:09:09 -04:00
Thom Nichols
2f0f18a8c6 added function to retrieve the current state 2010-06-03 08:07:56 -04:00
Thom Nichols
1c32668e18 fixed quiesce algorithm; state transition if connect fails; note about use_tls instance variable. 2010-06-03 07:47:27 -04:00
Tom Nichols
77bff9cce7 Merge branch 'hacks' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-06-02 15:45:51 -04:00
Thom Nichols
1f3cfb98f1 Merge branch 'master' into hacks 2010-06-02 14:18:46 -04:00
Thom Nichols
4295a66c70 reconnection quiesce logic 2010-06-02 14:18:09 -04:00
Thom Nichols
8227affd7f removed unnecessary flags and arguments from disconnect method 2010-06-02 14:17:36 -04:00
Thom Nichols
3a2f989c5e Merge branch 'master' into hacks 2010-06-02 14:15:07 -04:00
Nathan Fritz
85a2715c7d hack fix for session before bind 2010-06-03 01:30:24 +08:00
Nathan Fritz
b03e6168a8 if binding and session are advertised in the same go, do session first 2010-06-03 01:30:23 +08:00
Brian Beggs
2a43f59a58 added try/catch block to plugin loading 2010-06-03 01:29:49 +08:00
Brian Beggs
184f7cb8a4 moddified plugin loading so plugins located outside of the plugins directory in sleek may be loaded. Added optional argument pluginModule that is a string that represents the module the desired plugin should be loaded from.
An exception on plugin loading now also will not cause the program to exit.  The exception is caught and loading of other plugins contains.
2010-06-03 01:29:49 +08:00
Brian Beggs
e1aa4d0b93 Added .pydevproject to the .gitignore 2010-06-03 01:29:48 +08:00
Brian Beggs
db4989c66d Merge remote branch 'tom/hacks' 2010-06-02 12:49:54 -04:00
Thom Nichols
7930ed22f2 overhauled state machine. Now allows for atomic transitions.
Next step: atomic function calls (and maybe 'handlers') on state transition.
2010-06-02 12:39:54 -04:00
Brian Beggs
b0066f3ef4 added try/catch block to plugin loading 2010-06-02 08:45:42 -04:00
Brian Beggs
c0457cf5d0 moddified plugin loading so plugins located outside of the plugins directory in sleek may be loaded. Added optional argument pluginModule that is a string that represents the module the desired plugin should be loaded from.
An exception on plugin loading now also will not cause the program to exit.  The exception is caught and loading of other plugins contains.
2010-06-02 08:28:49 -04:00
Brian Beggs
59b8406573 Added .pydevproject to the .gitignore 2010-06-02 07:34:43 -04:00
Brian Beggs
686943a2ec Merge remote branch 'tom/hacks' 2010-06-02 07:32:33 -04:00
Thom Nichols
060b4c3938 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP 2010-06-01 22:55:01 -04:00
Thom Nichols
49f5767aea merged changes from fritzy 2010-06-01 22:54:30 -04:00
Thom Nichols
4eb210bff5 fixed some major reconnection errors 2010-06-01 22:51:49 -04:00
Thom Nichols
1780ca900a merged a lot of fritzy's changes 2010-06-01 22:40:37 -04:00
Nathan Fritz
e6c2fde283 included jobs plugin 2010-06-01 22:07:53 +08:00
Nathan Fritz
ecf902bf16 Scheduler waits too longer, and pubsubstate registration was backwards 2010-06-01 22:07:53 +08:00
Lance stout
d76c0931ef Added missing 'internal-server-error' condition to error stanza interface. 2010-06-01 22:07:53 +08:00
Lance stout
e18793152f Touched up the style of creating an Iq stanza. 2010-06-01 22:07:53 +08:00
Lance stout
e388680269 Added 'resource-constraint' to the list of error conditions. 2010-06-01 22:07:53 +08:00
Lance Stout
bee42e4a2f Added unit tests for the new XEP-0030 stanza objects. All pass.
(cherry picked from commit e1b814f27bf160f20bb30c315ca30769d217482d)
2010-06-01 22:07:53 +08:00
Lance Stout
8e3227ae5e Updated the XEP-0030 plugin to work with stanza objects instead of manipulating XML directly.
Four new events have been added:
  disco_info - A disco#info result has been received
  disco_info_request - A disco#info request has been received
  disco_items - A disco#items result has been received
  disco_items_request - A disco#items request has been received

For disco_info_request and disco_items_request two default handlers are registered. These handlers will only run if they are the only handler for these two events so that multiple responses are not returned and cause errors.

In your own handlers for these two events, you can call the default handlers to preserve the static node behaviour as so:
  self.plugin['xep_0030'].handle_disco_info(iq, True)

The forwarded=True will disable the check for other registered handlers.

Agents can now dynamically respond to disco requests by using these events.
(cherry picked from commit 0fc3381492a8bd75e6a9858539a972334881d8ff)
2010-06-01 22:07:53 +08:00
Nathan Fritz
257bcadd96 control-c fixes 2010-06-01 22:07:52 +08:00
Nathan Fritz
3e5cdc8664 added pubsubjobs test 2010-06-01 22:07:52 +08:00
Nathan Fritz
194e6bcb51 added pubsub state stanzas and scheduled events 2010-06-01 22:07:52 +08:00
Nathan Fritz
2e7024419a adding scheduler 2010-06-01 22:07:52 +08:00
Nathan Fritz
5235313aab added muc room to readme 2010-06-01 22:07:51 +08:00
Nathan Fritz
a2719b0bb0 plugins now are checked for post_init having ran when process() is called 2010-06-01 22:07:51 +08:00
Hernan E Grecco
71ad715caa Changed example.py to register first Xep_0030.
This a simple fix to prevent getting a key error as many plugins add
features to Xep_0030. A better fix would be to call pos_init after all
 plugins are loaded. An even better fix would be to define dependencies
for each plugin and registering on demand.
2010-06-01 22:07:51 +08:00
Hernan E Grecco
d452085049 Fixed error registering a plugin. To add a feature to another plugin, it should look into xmpp.plugin dict 2010-06-01 22:07:51 +08:00
Nathan Fritz
8b3b8aca9e updated README, index fix for component 2010-06-01 22:07:51 +08:00
Lance Stout
e00dea7c0c Added a flag to registerPlugin to control calling the plugin's post_init method. 2010-06-01 22:07:51 +08:00
Lance Stout
520bf72e11 Modified the return values for several methods so that they can be chained.
For example:

    iq.reply().error().setPayload(something.xml).send()
2010-06-01 22:07:51 +08:00
Lance Stout
040f426f1a Added the error attribute 'code' to the Error object interface. 2010-06-01 22:07:51 +08:00
Nathan Fritz
226b0e4297 added plugin indexing to components 2010-06-01 22:07:50 +08:00
Nathan Fritz
0b2cd176b1 added test_events and testing new del_event_handler 2010-06-01 22:07:50 +08:00
Lance Stout
56b5cbe5b1 Added del_event_handler to remove handler functions for a given event.
All registered handlers for the event which use the given function will
be removed.

Using this method allows agents to reconfigure their behaviour on the fly
without needing to add extra state information to event handling functions.
2010-06-01 22:07:50 +08:00
Thom Nichols
3e83b16a58 Merge branch 'hacks' of github.com:tomstrummer/SleekXMPP 2010-05-18 16:11:27 -04:00
Tom Nichols
de4d611d30 fixed SRV query - should use dns.rdatatype.SRV 2010-05-14 11:22:17 -04:00
Tom Nichols
e8d0fc37dc updated ignore file 2010-05-14 11:21:53 -04:00
Brian Beggs
dda3e733b5 Merge branch 'master' of https://github.com/macdiesel/SleekXMPP 2010-05-14 11:00:05 -04:00
Brian Beggs
4b322720b3 Merge remote branch 'tom/master' 2010-05-14 10:59:41 -04:00
Tom Nichols
3f41fdd231 fixed SRV query - should use dns.rdatatype.SRV 2010-05-13 14:39:32 -04:00
Tom Nichols
8e95ae2948 attempt to add support for self-signed certificate certs 2010-05-13 13:49:00 -04:00
Tom Nichols
341c110b6a Merge branch 'master' of git@github.com:tomstrummer/SleekXMPP into hacks 2010-05-13 13:48:27 -04:00
Nathan Fritz
7522839141 added test for unsolicided unavailable presence and fixed bug to make it pass 2010-05-14 01:47:19 +08:00
Nathan Fritz
4c410dd48a fixed a rather large memory leak 2010-05-14 01:47:19 +08:00
Brian Beggs
2d89954412 Merge commit 'fritzy/master' 2010-05-13 10:01:46 -04:00
Tom Nichols
a92075a659 merged 2010-05-12 16:54:01 -04:00
Tom Nichols
7552efee5c some reconnetion fixes 2010-05-12 16:51:14 -04:00
Tom Nichols
6bc6ebb95d updated ignore file 2010-05-12 16:46:23 -04:00
Brian Beggs
e0c32b6d9b Fixes for disconnection problems detailed in http://github.com/fritzy/SleekXMPP/issues/#issue/20
Fixes to both ClientXMPP & xmlstream.  ClientXMPP was not tracking the changes to authenticated and sessionstarted after the client was disconnected.

xmlstream had some funkyness with state in the _process method that was cleaned up and hopefully made a little cleaner.

Also changed a DNS issue that was occuring that rendered me unable to disconnect.  I would recieve the following error upon reconnect.
Exception in thread process:
Exception in thread process:
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/local/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 202, in _process
    self.reconnect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 134, in reconnect
    XMLStream.reconnect(self)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 289, in reconnect
    self.connect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 99, in connect
    answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 732, in query
    return get_default_resolver().query(qname, rdtype, rdclass, tcp, source)
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 617, in query
    source=source)
  File "/usr/local/lib/python2.6/site-packages/dns/query.py", line 113, in udp
    wire = q.to_wire()
  File "/usr/local/lib/python2.6/site-packages/dns/message.py", line 404, in to_wire
    r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
  File "/usr/local/lib/python2.6/site-packages/dns/renderer.py", line 152, in add_question
    self.output.write(struct.pack("!HH", rdtype, rdclass))
TypeError: unsupported operand type(s) for &: 'unicode' and 'long'

Seems I was getting this error when calling line 99 in ClientXMPP.  You can't bit-shift a 1 and a string and this is why this error is coming up. I removed the "SRV" argument and used the default of 1.  not sure exactly what this should be so it may need to be fixed back before it's merged back to trunk.

The line in question:
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
2010-05-13 04:43:25 +08:00
Brian Beggs
1521a8b5c9 Merge remote branch 'fritzy/master' 2010-05-12 07:46:07 -04:00
Brian Beggs
70f69c180c Fixes for disconnection problems detailed in http://github.com/fritzy/SleekXMPP/issues/#issue/20
Fixes to both ClientXMPP & xmlstream.  ClientXMPP was not tracking the changes to authenticated and sessionstarted after the client was disconnected.

xmlstream had some funkyness with state in the _process method that was cleaned up and hopefully made a little cleaner.

Also changed a DNS issue that was occuring that rendered me unable to disconnect.  I would recieve the following error upon reconnect.
Exception in thread process:
Exception in thread process:
Traceback (most recent call last):
  File "/usr/local/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/local/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 202, in _process
    self.reconnect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 134, in reconnect
    XMLStream.reconnect(self)
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/xmlstream/xmlstream.py", line 289, in reconnect
    self.connect()
  File "/home/macdiesel/tmp/workspace/SleekXMPP/sleekxmpp/__init__.py", line 99, in connect
    answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 732, in query
    return get_default_resolver().query(qname, rdtype, rdclass, tcp, source)
  File "/usr/local/lib/python2.6/site-packages/dns/resolver.py", line 617, in query
    source=source)
  File "/usr/local/lib/python2.6/site-packages/dns/query.py", line 113, in udp
    wire = q.to_wire()
  File "/usr/local/lib/python2.6/site-packages/dns/message.py", line 404, in to_wire
    r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
  File "/usr/local/lib/python2.6/site-packages/dns/renderer.py", line 152, in add_question
    self.output.write(struct.pack("!HH", rdtype, rdclass))
TypeError: unsupported operand type(s) for &: 'unicode' and 'long'

Seems I was getting this error when calling line 99 in ClientXMPP.  You can't bit-shift a 1 and a string and this is why this error is coming up. I removed the "SRV" argument and used the default of 1.  not sure exactly what this should be so it may need to be fixed back before it's merged back to trunk.

The line in question:
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, "SRV")
2010-05-04 14:03:38 -04:00
35 changed files with 2041 additions and 467 deletions

4
.gitignore vendored
View File

@ -1,2 +1,6 @@
*.pyc
.project
build/
*.swp
.pydevproject
.settings

69
.pylintrc Normal file
View File

@ -0,0 +1,69 @@
# Pylint configuration file.
# run `pylint --generate-rcfile` to see the default configuration
# run `pylint --rcfile=.pylintrc smallfoot` to perform analysis
# Brain-dead errors regarding standard language features
# W0142 = *args and **kwargs support
# W0403 = Relative imports
# Pointless whining
# R0201 = Method could be a function
# W0212 = Accessing protected attribute of client class
# W0613 = Unused argument
# W0232 = Class has no __init__ method
# R0903 = Too few public methods
# C0301 = Line too long
# R0913 = Too many arguments
# C0103 = Invalid name
# R0914 = Too many local variables
# PyLint's module importation is unreliable
# F0401 = Unable to import module
# W0402 = Uses of a deprecated module
# Already an error when wildcard imports are used
# W0614 = Unused import from wildcard
# Sometimes disabled depending on how bad a module is
# C0111 = Missing docstring
# Convention Errors related to whitespace:
# C0321,C0322,C0323,C0324
# Comments that we've put in the code:
# W0511
[MESSAGES CONTROL]
# Disable the message(s) with the given id(s).
disable=W0142,W0403,R0201,W0212,W0613,W0232,R0903,W0614,C0103,C0111,C0301,C0321,C0322,C0323,C0324,R0913,F0401,W0402,R0914,W0511,W0312
[REPORTS]
include-ids=y
reports=y
# Set the output format. Available formats are text, parseable, colorized, msvs
# (visual studio) and html
output-format=text
# Put messages in a separate file for each module / package specified on the
# command line instead of printing them on stdout. Reports (if any) will be
# written in a file name "pylint_global.[txt|html]".
files-output=no
[VARIABLES]
# Tells whether we should check for unused import in __init__ files.
init-import=yes
[TYPECHECK]
# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set).
#ignored-classes=Message
[MISCELLANEOUS]
# List of note tags to take in consideration, separated by a comma.
notes=FIXME,XXX,TODO

View File

@ -0,0 +1,171 @@
import logging
import sleekxmpp
from optparse import OptionParser
from xml.etree import cElementTree as ET
import os
import time
import sys
import unittest
import sleekxmpp.plugins.xep_0004
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath
from sleekxmpp.xmlstream.handler.waiter import Waiter
try:
import configparser
except ImportError:
import ConfigParser as configparser
try:
import queue
except ImportError:
import Queue as queue
class TestClient(sleekxmpp.ClientXMPP):
def __init__(self, jid, password):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.add_event_handler("session_start", self.start)
#self.add_event_handler("message", self.message)
self.waitforstart = queue.Queue()
def start(self, event):
self.getRoster()
self.sendPresence()
self.waitforstart.put(True)
class TestPubsubServer(unittest.TestCase):
statev = {}
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
def setUp(self):
pass
def test001getdefaultconfig(self):
"""Get the default node config"""
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode3')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode4')
self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode5')
result = self.xmpp1['xep_0060'].getNodeConfig(self.pshost)
self.statev['defaultconfig'] = result
self.failUnless(isinstance(result, sleekxmpp.plugins.xep_0004.Form))
def test002createdefaultnode(self):
"""Create a node without config"""
self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode1'))
def test003deletenode(self):
"""Delete recently created node"""
self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode1'))
def test004createnode(self):
"""Create a node with a config"""
self.statev['defaultconfig'].field['pubsub#access_model'].setValue('open')
self.statev['defaultconfig'].field['pubsub#notify_retract'].setValue(True)
self.statev['defaultconfig'].field['pubsub#persist_items'].setValue(True)
self.statev['defaultconfig'].field['pubsub#presence_based_delivery'].setValue(True)
p = self.xmpp2.Presence()
p['to'] = self.pshost
p.send()
self.failUnless(self.xmpp1['xep_0060'].create_node(self.pshost, 'testnode2', self.statev['defaultconfig'], ntype='job'))
def test005reconfigure(self):
"""Retrieving node config and reconfiguring"""
nconfig = self.xmpp1['xep_0060'].getNodeConfig(self.pshost, 'testnode2')
self.failUnless(nconfig, "No configuration returned")
#print("\n%s ==\n %s" % (nconfig.getValues(), self.statev['defaultconfig'].getValues()))
self.failUnless(nconfig.getValues() == self.statev['defaultconfig'].getValues(), "Configuration does not match")
self.failUnless(self.xmpp1['xep_0060'].setNodeConfig(self.pshost, 'testnode2', nconfig))
def test006subscribetonode(self):
"""Subscribe to node from account 2"""
self.failUnless(self.xmpp2['xep_0060'].subscribe(self.pshost, "testnode2"))
def test007publishitem(self):
"""Publishing item"""
item = ET.Element('{http://netflint.net/protocol/test}test')
w = Waiter('wait publish', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w)
#result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test1', item),))
result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test1', item)
msg = w.wait(5) # got to get a result in 5 seconds
self.failUnless(msg != False, "Account #2 did not get message event")
#result = self.xmpp1['xep_0060'].setItem(self.pshost, "testnode2", (('test2', item),))
result = self.xmpp1['jobs'].createJob(self.pshost, "testnode2", 'test2', item)
w = Waiter('wait publish2', StanzaPath('message/pubsub_event/items'))
self.xmpp2.registerHandler(w)
self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test1')
msg = w.wait(5) # got to get a result in 5 seconds
self.xmpp2['jobs'].claimJob(self.pshost, 'testnode2', 'test2')
self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test1')
self.xmpp2['jobs'].finishJob(self.pshost, 'testnode2', 'test2')
print result
#need to add check for update
def test900cleanup(self):
"Cleaning up"
#self.failUnless(self.xmpp1['xep_0060'].deleteNode(self.pshost, 'testnode2'), "Could not delete test node.")
time.sleep(10)
if __name__ == '__main__':
#parse command line arguements
optp = OptionParser()
optp.add_option('-q','--quiet', help='set logging to ERROR', action='store_const', dest='loglevel', const=logging.ERROR, default=logging.INFO)
optp.add_option('-d','--debug', help='set logging to DEBUG', action='store_const', dest='loglevel', const=logging.DEBUG, default=logging.INFO)
optp.add_option('-v','--verbose', help='set logging to COMM', action='store_const', dest='loglevel', const=5, default=logging.INFO)
optp.add_option("-c","--config", dest="configfile", default="config.xml", help="set config file to use")
optp.add_option("-n","--nodenum", dest="nodenum", default="1", help="set node number to use")
optp.add_option("-p","--pubsub", dest="pubsub", default="1", help="set pubsub host to use")
opts,args = optp.parse_args()
logging.basicConfig(level=opts.loglevel, format='%(levelname)-8s %(message)s')
#load xml config
logging.info("Loading config file: %s" % opts.configfile)
config = configparser.RawConfigParser()
config.read(opts.configfile)
#init
logging.info("Account 1 is %s" % config.get('account1', 'jid'))
xmpp1 = TestClient(config.get('account1','jid'), config.get('account1','pass'))
logging.info("Account 2 is %s" % config.get('account2', 'jid'))
xmpp2 = TestClient(config.get('account2','jid'), config.get('account2','pass'))
xmpp1.registerPlugin('xep_0004')
xmpp1.registerPlugin('xep_0030')
xmpp1.registerPlugin('xep_0060')
xmpp1.registerPlugin('xep_0199')
xmpp1.registerPlugin('jobs')
xmpp2.registerPlugin('xep_0004')
xmpp2.registerPlugin('xep_0030')
xmpp2.registerPlugin('xep_0060')
xmpp2.registerPlugin('xep_0199')
xmpp2.registerPlugin('jobs')
if not config.get('account1', 'server'):
# we don't know the server, but the lib can probably figure it out
xmpp1.connect()
else:
xmpp1.connect((config.get('account1', 'server'), 5222))
xmpp1.process(threaded=True)
#init
if not config.get('account2', 'server'):
# we don't know the server, but the lib can probably figure it out
xmpp2.connect()
else:
xmpp2.connect((config.get('account2', 'server'), 5222))
xmpp2.process(threaded=True)
TestPubsubServer.xmpp1 = xmpp1
TestPubsubServer.xmpp2 = xmpp2
TestPubsubServer.pshost = config.get('settings', 'pubsub')
xmpp1.waitforstart.get(True)
xmpp2.waitforstart.get(True)
testsuite = unittest.TestLoader().loadTestsFromTestCase(TestPubsubServer)
alltests_suite = unittest.TestSuite([testsuite])
result = unittest.TextTestRunner(verbosity=2).run(alltests_suite)
xmpp1.disconnect()
xmpp2.disconnect()

View File

@ -5,7 +5,6 @@ from xml.etree import cElementTree as ET
import os
import time
import sys
import thread
import unittest
import sleekxmpp.plugins.xep_0004
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath

View File

@ -1,11 +1,11 @@
#!/usr/bin/python2.5
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from __future__ import absolute_import, unicode_literals
from . basexmpp import basexmpp
@ -14,51 +14,39 @@ from . xmlstream.xmlstream import XMLStream
from . xmlstream.xmlstream import RestartStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.xpath import MatchXPath
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.callback import Callback
from . xmlstream.stanzabase import StanzaBase
from . xmlstream import xmlstream as xmlstreammod
from . stanza.message import Message
from . stanza.iq import Iq
import time
import logging
import base64
import sys
import random
import copy
from . import plugins
#from . import stanza
from xml.etree.cElementTree import tostring
srvsupport = True
try:
import dns.resolver
import dns.rdatatype
import dns.exception
except ImportError:
srvsupport = False
#class PresenceStanzaType(object):
#
# def fromXML(self, xml):
# self.ptype = xml.get('type')
class ClientXMPP(basexmpp, XMLStream):
"""SleekXMPP's client class. Use only for good, not evil."""
def __init__(self, jid, password, ssl=False, plugin_config = {}, plugin_whitelist=[], escape_quotes=True):
global srvsupport
XMLStream.__init__(self)
self.default_ns = 'jabber:client'
basexmpp.__init__(self)
self.plugin_config = plugin_config
self.escape_quotes = escape_quotes
self.set_jid(jid)
self.port = 5222 # not used if DNS SRV is used
self.plugin_whitelist = plugin_whitelist
self.auto_reconnect = True
self.srvsupport = srvsupport
self.password = password
self.registered_features = []
self.stream_header = """<stream:stream to='%s' xmlns:stream='http://etherx.jabber.org/streams' xmlns='%s' version='1.0'>""" % (self.server,self.default_ns)
self.stream_header = """<stream:stream to='%s' xmlns:stream='http://etherx.jabber.org/streams' xmlns='%s' version='1.0'>""" % (self.domain,self.default_ns)
self.stream_footer = "</stream:stream>"
#self.map_namespace('http://etherx.jabber.org/streams', 'stream')
#self.map_namespace('jabber:client', '')
@ -66,8 +54,15 @@ class ClientXMPP(basexmpp, XMLStream):
#TODO: Use stream state here
self.authenticated = False
self.sessionstarted = False
self.registerHandler(Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
self.registerHandler(Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
self.bound = False
self.bindfail = False
XMLStream.registerHandler(self, Callback('Stream Features', MatchXPath('{http://etherx.jabber.org/streams}features'), self._handleStreamFeatures, thread=True))
XMLStream.registerHandler(self, Callback('Roster Update', MatchXPath('{%s}iq/{jabber:iq:roster}query' % self.default_ns), self._handleRoster, thread=True))
#SASL Auth handlers
basexmpp.add_handler(self, "<challenge xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_digest_md5_auth, instream=True)
basexmpp.add_handler(self, "<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>", self.handler_sasl_digest_md5_auth_fail, instream=True)
basexmpp.add_handler(self, "<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_success, instream=True)
basexmpp.add_handler(self, "<failure xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_fail, instream=True)
#self.registerHandler(Callback('Roster Update', MatchXMLMask("<presence xmlns='%s' type='subscribe' />" % self.default_ns), self._handlePresenceSubscribe, thread=True))
self.registerFeature("<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_starttls, True)
self.registerFeature("<mechanisms xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_sasl_auth, True)
@ -87,18 +82,27 @@ class ClientXMPP(basexmpp, XMLStream):
def get(self, key, default):
return self.plugin.get(key, default)
def connect(self, address=tuple()):
def connect(self, host=None, port=None):
"""Connect to the Jabber Server. Attempts SRV lookup, and if it fails, uses
the JID server."""
if not address or len(address) < 2:
the JID server. You can optionally specify a host/port if you're not using
DNS and want to connect to a server address that is different from the XMPP domain."""
if self.state['connected']: return True
if host: # if a host was specified, don't attempt a DNS lookup.
if port is None: port = self.port
else:
if not self.srvsupport:
logging.debug("Did not supply (address, port) to connect to and no SRV support is installed (http://www.dnspython.org). Continuing to attempt connection, using server hostname from JID.")
logging.warn("Did not supply (address, port) to connect to and no SRV support is installed (http://www.dnspython.org). Continuing to attempt connection, using domain from JID.")
else:
logging.debug("Since no address is supplied, attempting SRV lookup.")
try:
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.server, dns.rdatatype.SRV)
answers = dns.resolver.query("_xmpp-client._tcp.%s" % self.domain, dns.rdatatype.SRV)
except dns.resolver.NXDOMAIN:
logging.debug("No appropriate SRV record found. Using JID server name.")
logging.info("No appropriate SRV record found for %s. Using domain as server address.", self.domain)
except dns.exception.DNSException:
# this could be a timeout or other DNS error. Worth retrying?
logging.exception("DNS error during SRV query for %s. Using domain as server address.", self.domain)
else:
# pick a random answer, weighted by priority
# there are less verbose ways of doing this (random.choice() with answer * priority), but I chose this way anyway
@ -113,12 +117,15 @@ class ClientXMPP(basexmpp, XMLStream):
picked = random.randint(0, intmax)
for priority in priorities:
if picked <= priority:
address = addresses[priority]
(host,port) = addresses[priority]
break
if not address:
if not host:
# if all else fails take server from JID.
address = (self.server, 5222)
result = XMLStream.connect(self, address[0], address[1], use_tls=True)
(host,port) = (self.domain, self.port)
logging.debug('Attempting connection to %s:%d', host, port )
result = XMLStream.connect(self, host, port)
if result:
self.event("connected")
else:
@ -129,13 +136,19 @@ class ClientXMPP(basexmpp, XMLStream):
# overriding reconnect and disconnect so that we can get some events
# should events be part of or required by xmlstream? Maybe that would be cleaner
def reconnect(self):
logging.info("Reconnecting")
self.event("disconnected")
XMLStream.reconnect(self)
self.disconnect(reconnect=True)
def disconnect(self, init=True, close=False, reconnect=False):
def disconnect(self, reconnect=False, error=False):
self.event("disconnected")
XMLStream.disconnect(self, reconnect)
self.authenticated = False
self.sessionstarted = False
XMLStream.disconnect(self, reconnect, error)
def sendRaw(self, data, priority=5, init=False):
if not init and not self.sessionstarted:
logging.warn("Attempt to send stanza before session has started:\n%s", data)
return False
XMLStream.sendRaw(self, data, priority, init)
def registerFeature(self, mask, pointer, breaker = False):
"""Register a stream feature."""
@ -155,6 +168,7 @@ class ClientXMPP(basexmpp, XMLStream):
self._handleRoster(iq, request=True)
def _handleStreamFeatures(self, features):
logging.debug('handling stream features')
self.features = []
for sub in features.xml:
self.features.append(sub.tag)
@ -162,13 +176,17 @@ class ClientXMPP(basexmpp, XMLStream):
for feature in self.registered_features:
if feature[0].match(subelement):
#if self.maskcmp(subelement, feature[0], True):
# This calls the feature handler & optionally breaks
if feature[1](subelement) and feature[2]: #if breaker, don't continue
return True
def handler_starttls(self, xml):
logging.debug( 'TLS start handler; SSL support: %s', self.ssl_support )
if not self.authenticated and self.ssl_support:
self.add_handler("<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />", self.handler_tls_start, instream=True)
self.sendXML(xml)
_stanza = "<proceed xmlns='urn:ietf:params:xml:ns:xmpp-tls' />"
if not self.event_handlers.get(_stanza,None): # don't add handler > once
self.add_handler( _stanza, self.handler_tls_start, instream=True )
self.sendRaw(self.tostring(xml), priority=1, init=True)
return True
else:
logging.warning("The module tlslite is required in to some servers, and has not been found.")
@ -183,57 +201,98 @@ class ClientXMPP(basexmpp, XMLStream):
if '{urn:ietf:params:xml:ns:xmpp-tls}starttls' in self.features:
return False
logging.debug("Starting SASL Auth")
self.add_handler("<success xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_success, instream=True)
self.add_handler("<failure xmlns='urn:ietf:params:xml:ns:xmpp-sasl' />", self.handler_auth_fail, instream=True)
sasl_mechs = xml.findall('{urn:ietf:params:xml:ns:xmpp-sasl}mechanism')
if len(sasl_mechs):
for sasl_mech in sasl_mechs:
self.features.append("sasl:%s" % sasl_mech.text)
if 'sasl:PLAIN' in self.features:
if 'sasl:DIGEST-MD5' in self.features:
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='DIGEST-MD5'/>", priority=1, init=True)
elif 'sasl:PLAIN' in self.features:
if sys.version_info < (3,0):
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username) + b'\x00' + bytes(self.password)).decode('utf-8'))
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>" \
% base64.b64encode(b'\x00' + bytes(self.username) + b'\x00' + bytes(self.password)).decode('utf-8'),
priority=1, init=True)
else:
self.send("""<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>""" % base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8'))
self.sendRaw("<auth xmlns='urn:ietf:params:xml:ns:xmpp-sasl' mechanism='PLAIN'>%s</auth>" \
% base64.b64encode(b'\x00' + bytes(self.username, 'utf-8') + b'\x00' + bytes(self.password, 'utf-8')).decode('utf-8'),
priority=1, init=True)
else:
logging.error("No appropriate login method.")
self.disconnect()
#if 'sasl:DIGEST-MD5' in self.features:
# self._auth_digestmd5()
logging.error("No appropriate login method: %s", sasl_mechs)
self.handler_auth_fail(xml)
return False
return True
def handler_sasl_digest_md5_auth(self, xml):
challenge = [item.split('=', 1) for item in base64.b64decode(xml.text).replace("\"", "").split(',', 6) ]
challenge = dict(challenge)
logging.debug("MD5 auth challenge: %s", challenge)
if challenge.get('rspauth'): #authenticated success... send response
self.sendRaw("""<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>""", priority=1, init=True )
return
#TODO: use realm if supplied by server, use default qop unless supplied by server
#Realm, nonce, qop should all be present
if not challenge.get('qop') or not challenge.get('nonce'):
logging.error("Error during digest-md5 authentication. Challenge missing critical information. Challenge: %s" %base64.b64decode(xml.text))
self.handler_auth_fail(xml)
return
#TODO: charset can be either UTF-8 or if not present use ISO 8859-1 defaulting for UTF-8 for now
#Compute the cnonce - a unique hex string only used in this request
cnonce = ""
for i in range(7):
cnonce+=hex(int(random.random()*65536*4096))[2:]
cnonce = base64.encodestring(cnonce)[0:-1]
a1 = b"%s:%s:%s" %(md5("%s:%s:%s" % (self.username, self.domain, self.password)), challenge["nonce"].encode("UTF-8"), cnonce.encode("UTF-8") )
a2 = "AUTHENTICATE:xmpp/%s" %self.domain
responseHash = md5digest("%s:%s:00000001:%s:auth:%s" %(md5digest(a1), challenge["nonce"], cnonce, md5digest(a2) ) )
response = 'charset=utf-8,username="%s",realm="%s",nonce="%s",nc=00000001,cnonce="%s",digest-uri="%s",response=%s,qop=%s,' \
% (self.username, self.domain, challenge["nonce"], cnonce, "xmpp/%s" % self.domain, responseHash, challenge["qop"])
self.sendRaw("<response xmlns='urn:ietf:params:xml:ns:xmpp-sasl'>%s</response>" % base64.encodestring(response)[:-1],
priority=1, init=True )
def handler_sasl_digest_md5_auth_fail(self, xml):
self.authenticated = False
self.handler_auth_fail(xml)
def handler_auth_success(self, xml):
logging.debug("Authentication successful.")
self.authenticated = True
self.features = []
raise RestartStream()
def handler_auth_fail(self, xml):
logging.info("Authentication failed.")
logging.warning("Authentication failed.")
logging.debug(tostring(xml, 'utf-8'))
self.disconnect()
self.event("failed_auth")
def handler_bind_resource(self, xml):
logging.debug("Requesting resource: %s" % self.resource)
iq = self.Iq(stype='set')
res = ET.Element('resource')
res.text = self.resource
xml.append(res)
iq.append(xml)
response = iq.send()
iq = self.makeIqSet(xml)
response = iq.send(priority=2,init=True)
#response = self.send(iq, self.Iq(sid=iq['id']))
self.set_jid(response.xml.find('{urn:ietf:params:xml:ns:xmpp-bind}bind/{urn:ietf:params:xml:ns:xmpp-bind}jid').text)
self.bound = True
logging.info("Node set to: %s" % self.fulljid)
if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features:
if "{urn:ietf:params:xml:ns:xmpp-session}session" not in self.features or self.bindfail:
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
def handler_start_session(self, xml):
if self.authenticated:
if self.authenticated and self.bound:
iq = self.makeIqSet(xml)
response = iq.send()
response = iq.send(priority=2,init=True)
logging.debug("Established Session")
self.sessionstarted = True
self.event("session_start")
else:
logging.warn("Bind has failed; not starting session!")
self.bindfail = True
def _handleRoster(self, iq, request=False):
if iq['type'] == 'set' or (iq['type'] == 'result' and request):
@ -244,3 +303,21 @@ class ClientXMPP(basexmpp, XMLStream):
if iq['type'] == 'set':
self.send(self.Iq().setValues({'type': 'result', 'id': iq['id']}).enable('roster'))
self.event("roster_update", iq)
def md5(data):
try:
import hashlib
md5 = hashlib.md5(data)
except ImportError:
import md5
md5 = md5.new(data)
return md5.digest()
def md5digest(data):
try:
import hashlib
md5 = hashlib.md5(data)
except ImportError:
import md5
md5 = md5.new(data)
return md5.hexdigest()

View File

@ -1,19 +1,16 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from __future__ import with_statement, unicode_literals
from xml.etree import cElementTree as ET
from . xmlstream.xmlstream import XMLStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.xmlcallback import XMLCallback
from . xmlstream.handler.xmlwaiter import XMLWaiter
from . xmlstream.handler.waiter import Waiter
from . xmlstream.handler.callback import Callback
from . import plugins
@ -23,7 +20,6 @@ from . stanza.presence import Presence
from . stanza.roster import Roster
from . stanza.nick import Nick
from . stanza.htmlim import HTMLIM
from . stanza.error import Error
import logging
import threading
@ -49,7 +45,7 @@ class basexmpp(object):
self.resource = ''
self.jid = ''
self.username = ''
self.server = ''
self.domain = ''
self.plugin = {}
self.auto_authorize = True
self.auto_subscribe = True
@ -84,27 +80,34 @@ class basexmpp(object):
self.resource = self.getjidresource(jid)
self.jid = self.getjidbare(jid)
self.username = jid.split('@', 1)[0]
self.server = jid.split('@',1)[-1].split('/', 1)[0]
self.domain = jid.split('@',1)[-1].split('/', 1)[0]
def process(self, *args, **kwargs):
for idx in self.plugin:
if not self.plugin[idx].post_inited: self.plugin[idx].post_init()
return super(basexmpp, self).process(*args, **kwargs)
def registerPlugin(self, plugin, pconfig = {}):
def registerPlugin(self, plugin, pconfig = {}, pluginModule = None):
"""Register a plugin not in plugins.__init__.__all__ but in the plugins
directory."""
# discover relative "path" to the plugins module from the main app, and import it.
# TODO:
# gross, this probably isn't necessary anymore, especially for an installed module
__import__("%s.%s" % (globals()['plugins'].__name__, plugin))
# init the plugin class
self.plugin[plugin] = getattr(getattr(plugins, plugin), plugin)(self, pconfig) # eek
# all of this for a nice debug? sure.
xep = ''
if hasattr(self.plugin[plugin], 'xep'):
xep = "(XEP-%s) " % self.plugin[plugin].xep
logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description))
try:
if pluginModule:
module = __import__(pluginModule, globals(), locals(), [plugin])
else:
module = __import__("%s.%s" % (globals()['plugins'].__name__, plugin), globals(), locals(), [plugin])
# init the plugin class
self.plugin[plugin] = getattr(module, plugin)(self, pconfig) # eek
# all of this for a nice debug? sure.
xep = ''
if hasattr(self.plugin[plugin], 'xep'):
xep = "(XEP-%s) " % self.plugin[plugin].xep
logging.debug("Loaded Plugin %s%s" % (xep, self.plugin[plugin].description))
except:
logging.exception("Unable to load plugin: %s", plugin )
def register_plugins(self):
"""Initiates all plugins in the plugins/__init__.__all__"""
@ -131,7 +134,7 @@ class basexmpp(object):
self.registerHandler(XMLCallback('add_handler_%s' % self.getNewId(), MatchXMLMask(mask), pointer, threaded, disposable, instream))
def getId(self):
return "%x".upper() % self.id
return "%X" % self.id
def sendXML(self, data, mask=None, timeout=10):
return self.send(self.tostring(data), mask, timeout)
@ -151,40 +154,33 @@ class basexmpp(object):
if mask is not None:
return waitfor.wait(timeout)
def makeIq(self, id=0, ifrom=None):
return self.Iq().setValues({'id': id, 'from': ifrom})
def makeIqGet(self, queryxmlns = None):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
iq = self.Iq().setValues({'type': 'get'})
if queryxmlns:
iq.append(ET.Element("{%s}query" % queryxmlns))
return iq
def makeIqResult(self, id):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
return self.Iq().setValues({'id': id, 'type': 'result'})
def makeIqSet(self, sub=None):
# TODO this should take a 'to' param since more often than not you set
# iq['to']=whatever immediately after.
iq = self.Iq().setValues({'type': 'set'})
if sub != None:
iq.append(sub)
return iq
def makeIqError(self, id, type='cancel', condition='feature-not-implemented', text=None):
# TODO not used.
iq = self.Iq().setValues({'id': id})
iq['error'].setValues({'type': type, 'condition': condition, 'text': text})
return iq
def makeIqQuery(self, iq, xmlns):
query = ET.Element("{%s}query" % xmlns)
iq.append(query)
return iq
def makeQueryRoster(self, iq=None):
query = ET.Element("{jabber:iq:roster}query")
if iq:
iq.append(query)
return query
def add_event_handler(self, name, pointer, threaded=False, disposable=False):
if not name in self.event_handlers:
self.event_handlers[name] = []

View File

@ -12,21 +12,11 @@ from . basexmpp import basexmpp
from xml.etree import cElementTree as ET
from . xmlstream.xmlstream import XMLStream
from . xmlstream.xmlstream import RestartStream
from . xmlstream.matcher.xmlmask import MatchXMLMask
from . xmlstream.matcher.xpath import MatchXPath
from . xmlstream.matcher.many import MatchMany
from . xmlstream.handler.callback import Callback
from . xmlstream.stanzabase import StanzaBase
from . xmlstream import xmlstream as xmlstreammod
import time
import logging
import base64
import sys
import random
import copy
from . import plugins
from . import stanza
import hashlib
srvsupport = True
try:
@ -58,7 +48,7 @@ class ComponentXMPP(basexmpp, XMLStream):
if key in self.plugin:
return self.plugin[key]
else:
logging.warning("""Plugin "%s" is not loaded.""" % key)
logging.warning("Plugin '%s' is not loaded.", key)
return False
def get(self, key, default):

View File

@ -33,7 +33,7 @@ class gmail_notify(base.base_plugin):
def handler_gmailcheck(self, payload):
#TODO XEP 30 should cache results and have getFeature
result = self.xmpp['xep_0030'].getInfo(self.xmpp.server)
result = self.xmpp['xep_0030'].getInfo(self.xmpp.domain)
features = []
for feature in result.findall('{http://jabber.org/protocol/disco#info}query/{http://jabber.org/protocol/disco#info}feature'):
features.append(feature.get('var'))
@ -50,7 +50,7 @@ class gmail_notify(base.base_plugin):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = self.xmpp.jid
self.xmpp.makeIqQuery(iq, 'google:mail:notify')
iq.append(ET.Element('{google:mail:notify}query'))
emails = iq.send()
mailbox = emails.find('{google:mail:notify}mailbox')
total = int(mailbox.get('total-matched', 0))

44
sleekxmpp/plugins/jobs.py Normal file
View File

@ -0,0 +1,44 @@
from . import base
import logging
from xml.etree import cElementTree as ET
class jobs(base.base_plugin):
def plugin_init(self):
self.xep = 'pubsubjob'
self.description = "Job distribution over Pubsub"
def post_init(self):
pass
#TODO add event
def createJobNode(self, host, jid, node, config=None):
pass
def createJob(self, host, node, jobid=None, payload=None):
return self.xmpp.plugin['xep_0060'].setItem(host, node, ((jobid, payload),))
def claimJob(self, host, node, jobid, ifrom=None):
return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}claimed'))
def unclaimJob(self, jobid):
return self._setState(host, node, jobid, ET.Element('{http://andyet.net/protocol/pubsubjob}unclaimed'))
def finishJob(self, host, node, jobid, payload=None):
finished = ET.Element('{http://andyet.net/protocol/pubsubjob}finished')
if payload is not None:
finished.append(payload)
return self._setState(host, node, jobid, finished)
def _setState(self, host, node, jobid, state, ifrom=None):
iq = self.xmpp.Iq()
iq['to'] = host
if ifrom: iq['from'] = ifrom
iq['type'] = 'set'
iq['psstate']['node'] = node
iq['psstate']['item'] = jobid
iq['psstate']['payload'] = state
result = iq.send()
if result is None or result['type'] != 'result':
return False
return True

View File

@ -10,6 +10,39 @@ def stanzaPlugin(stanza, plugin):
stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin
class PubsubState(ElementBase):
namespace = 'http://jabber.org/protocol/psstate'
name = 'state'
plugin_attrib = 'psstate'
interfaces = set(('node', 'item', 'payload'))
plugin_attrib_map = {}
plugin_tag_map = {}
def setPayload(self, value):
self.xml.append(value)
def getPayload(self):
childs = self.xml.getchildren()
if len(childs) > 0:
return childs[0]
def delPayload(self):
for child in self.xml.getchildren():
self.xml.remove(child)
stanzaPlugin(Iq, PubsubState)
class PubsubStateEvent(ElementBase):
namespace = 'http://jabber.org/protocol/psstate#event'
name = 'event'
plugin_attrib = 'psstate_event'
intefaces = set(tuple())
plugin_attrib_map = {}
plugin_tag_map = {}
stanzaPlugin(Message, PubsubStateEvent)
stanzaPlugin(PubsubStateEvent, PubsubState)
class Pubsub(ElementBase):
namespace = 'http://jabber.org/protocol/pubsub'
name = 'pubsub'
@ -321,18 +354,6 @@ class Options(ElementBase):
stanzaPlugin(Pubsub, Options)
stanzaPlugin(Subscribe, Options)
#iq = Iq()
#iq['pubsub']['defaultconfig']
#print(iq)
#from xml.etree import cElementTree as ET
#iq = Iq()
#item = Item()
#item['payload'] = ET.Element("{http://netflint.net/p/crap}stupidshit")
#item['id'] = 'aa11bbcc'
#iq['pubsub']['items'].append(item)
#print(iq)
class OwnerAffiliations(Affiliations):
namespace = 'http://jabber.org/protocol/pubsub#owner'
interfaces = set(('node'))

View File

@ -188,7 +188,6 @@ class Form(FieldContainer):
#def getXML(self, tostring = False):
def getXML(self, ftype=None):
logging.debug("creating form as %s" % ftype)
if ftype:
self.type = ftype
form = ET.Element('{jabber:x:data}x')

View File

@ -226,35 +226,31 @@ class xep_0009(base.base_plugin):
else:
raise ValueError()
def makeMethodCallQuery(self,pmethod,params):
query = self.xmpp.makeIqQuery(iq,"jabber:iq:rpc")
def makeIqMethodCall(self,pto,pmethod,params):
query = ET.Element("{jabber:iq:rpc}query")
methodCall = ET.Element('methodCall')
methodName = ET.Element('methodName')
methodName.text = pmethod
methodCall.append(methodName)
methodCall.append(params)
query.append(methodCall)
return query
def makeIqMethodCall(self,pto,pmethod,params):
iq = self.xmpp.makeIqSet()
iq = self.xmpp.makeIqSet(query)
iq.set('to',pto)
iq.append(self.makeMethodCallQuery(pmethod,params))
return iq
def makeIqMethodResponse(self,pto,pid,params):
iq = self.xmpp.makeIqResult(pid)
iq.set('to',pto)
query = self.xmpp.makeIqQuery(iq,"jabber:iq:rpc")
query = ET.Element("{jabber:iq:rpc}query")
methodResponse = ET.Element('methodResponse')
methodResponse.append(params)
query.append(methodResponse)
iq = self.xmpp.makeIqResult(pid)
iq.set('to',pto)
iq.append(query)
return iq
def makeIqMethodError(self,pto,id,pmethod,params,condition):
iq = self.xmpp.makeIqError(id)
iq.set('to',pto)
iq.append(self.makeMethodCallQuery(pmethod,params))
def makeIqMethodError(self,pto,pid,pmethod,params,condition):
iq = self.self.makeMethodCallQuery(pto,pmethod,params)
iq.setValues({'id':pid,'type':'error'})
iq.append(self.xmpp['xep_0086'].makeError(condition))
return iq

View File

@ -1,25 +1,184 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2007 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
SleekXMPP is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SleekXMPP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SleekXMPP; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
See the file license.txt for copying permissio
"""
from . import base
import logging
from xml.etree import cElementTree as ET
from . import base
from .. xmlstream.handler.callback import Callback
from .. xmlstream.matcher.xpath import MatchXPath
from .. xmlstream.stanzabase import ElementBase, ET, JID
from .. stanza.iq import Iq
class DiscoInfo(ElementBase):
namespace = 'http://jabber.org/protocol/disco#info'
name = 'query'
plugin_attrib = 'disco_info'
interfaces = set(('node', 'features', 'identities'))
def getFeatures(self):
features = []
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
features.append(feature.attrib['var'])
return features
def setFeatures(self, features):
self.delFeatures()
for name in features:
self.addFeature(name)
def delFeatures(self):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for feature in featuresXML:
self.xml.remove(feature)
def addFeature(self, feature):
featureXML = ET.Element('{%s}feature' % self.namespace,
{'var': feature})
self.xml.append(featureXML)
def delFeature(self, feature):
featuresXML = self.xml.findall('{%s}feature' % self.namespace)
for featureXML in featuresXML:
if featureXML.attrib['var'] == feature:
self.xml.remove(featureXML)
def getIdentities(self):
ids = []
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'],
idXML.attrib.get('name', ''))
ids.append(idData)
return ids
def setIdentities(self, ids):
self.delIdentities()
for idData in ids:
self.addIdentity(*idData)
def delIdentities(self):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
self.xml.remove(idXML)
def addIdentity(self, category, id_type, name=''):
idXML = ET.Element('{%s}identity' % self.namespace,
{'category': category,
'type': id_type,
'name': name})
self.xml.append(idXML)
def delIdentity(self, category, id_type, name=''):
idsXML = self.xml.findall('{%s}identity' % self.namespace)
for idXML in idsXML:
idData = (idXML.attrib['category'],
idXML.attrib['type'])
delId = (category, id_type)
if idData == delId:
self.xml.remove(idXML)
class DiscoItems(ElementBase):
namespace = 'http://jabber.org/protocol/disco#items'
name = 'query'
plugin_attrib = 'disco_items'
interfaces = set(('node', 'items'))
def getItems(self):
items = []
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
itemData = (item.attrib['jid'],
item.attrib.get('node'),
item.attrib.get('name'))
items.append(itemData)
return items
def setItems(self, items):
self.delItems()
for item in items:
self.addItem(*item)
def delItems(self):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for item in itemsXML:
self.xml.remove(item)
def addItem(self, jid, node='', name=''):
itemXML = ET.Element('{%s}item' % self.namespace, {'jid': jid})
if name:
itemXML.attrib['name'] = name
if node:
itemXML.attrib['node'] = node
self.xml.append(itemXML)
def delItem(self, jid, node=''):
itemsXML = self.xml.findall('{%s}item' % self.namespace)
for itemXML in itemsXML:
itemData = (itemXML.attrib['jid'],
itemXML.attrib.get('node', ''))
itemDel = (jid, node)
if itemData == itemDel:
self.xml.remove(itemXML)
class DiscoNode(object):
"""
Collection object for grouping info and item information
into nodes.
"""
def __init__(self, name):
self.name = name
self.info = DiscoInfo()
self.items = DiscoItems()
# This is a bit like poor man's inheritance, but
# to simplify adding information to the node we
# map node functions to either the info or items
# stanza objects.
#
# We don't want to make DiscoNode inherit from
# DiscoInfo and DiscoItems because DiscoNode is
# not an actual stanza, and doing so would create
# confusion and potential bugs.
self._map(self.items, 'items', ['get', 'set', 'del'])
self._map(self.items, 'item', ['add', 'del'])
self._map(self.info, 'identities', ['get', 'set', 'del'])
self._map(self.info, 'identity', ['add', 'del'])
self._map(self.info, 'features', ['get', 'set', 'del'])
self._map(self.info, 'feature', ['add', 'del'])
def isEmpty(self):
"""
Test if the node contains any information. Useful for
determining if a node can be deleted.
"""
ids = self.getIdentities()
features = self.getFeatures()
items = self.getItems()
if not ids and not features and not items:
return True
return False
def _map(self, obj, interface, access):
"""
Map functions of the form obj.accessInterface
to self.accessInterface for each given access type.
"""
interface = interface.title()
for access_type in access:
method = access_type + interface
if hasattr(obj, method):
setattr(self, method, getattr(obj, method))
class xep_0030(base.base_plugin):
"""
@ -29,85 +188,137 @@ class xep_0030(base.base_plugin):
def plugin_init(self):
self.xep = '0030'
self.description = 'Service Discovery'
self.features = {'main': ['http://jabber.org/protocol/disco#info', 'http://jabber.org/protocol/disco#items']}
self.identities = {'main': [{'category': 'client', 'type': 'pc', 'name': 'SleekXMPP'}]}
self.items = {'main': []}
self.xmpp.add_handler("<iq type='get' xmlns='%s'><query xmlns='http://jabber.org/protocol/disco#info' /></iq>" % self.xmpp.default_ns, self.info_handler)
self.xmpp.add_handler("<iq type='get' xmlns='%s'><query xmlns='http://jabber.org/protocol/disco#items' /></iq>" % self.xmpp.default_ns, self.item_handler)
self.xmpp.registerHandler(
Callback('Disco Items',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoItems.namespace)),
self.handle_item_query))
self.xmpp.registerHandler(
Callback('Disco Info',
MatchXPath('{%s}iq/{%s}query' % (self.xmpp.default_ns,
DiscoInfo.namespace)),
self.handle_info_query))
self.xmpp.stanzaPlugin(Iq, DiscoInfo)
self.xmpp.stanzaPlugin(Iq, DiscoItems)
self.xmpp.add_event_handler('disco_items_request', self.handle_disco_items)
self.xmpp.add_event_handler('disco_info_request', self.handle_disco_info)
self.nodes = {'main': DiscoNode('main')}
def add_node(self, node):
if node not in self.nodes:
self.nodes[node] = DiscoNode(node)
def del_node(self, node):
if node in self.nodes:
del self.nodes[node]
def handle_item_query(self, iq):
if iq['type'] == 'get':
logging.debug("Items requested by %s" % iq['from'])
self.xmpp.event('disco_items_request', iq)
elif iq['type'] == 'result':
logging.debug("Items result from %s" % iq['from'])
self.xmpp.event('disco_items', iq)
def handle_info_query(self, iq):
if iq['type'] == 'get':
logging.debug("Info requested by %s" % iq['from'])
self.xmpp.event('disco_info_request', iq)
elif iq['type'] == 'result':
logging.debug("Info result from %s" % iq['from'])
self.xmpp.event('disco_info', iq)
def handle_disco_info(self, iq, forwarded=False):
"""
A default handler for disco#info requests. If another
handler is registered, this one will defer and not run.
"""
handlers = self.xmpp.event_handlers['disco_info_request']
if not forwarded and len(handlers) > 1:
return
node_name = iq['disco_info']['node']
if not node_name:
node_name = 'main'
logging.debug("Using default handler for disco#info on node '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply().setPayload(node.info.xml).send()
else:
logging.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_info'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
def handle_disco_items(self, iq, forwarded=False):
"""
A default handler for disco#items requests. If another
handler is registered, this one will defer and not run.
If this handler is called by your own custom handler with
forwarded set to True, then it will run as normal.
"""
handlers = self.xmpp.event_handlers['disco_items_request']
if not forwarded and len(handlers) > 1:
return
node_name = iq['disco_items']['node']
if not node_name:
node_name = 'main'
logging.debug("Using default handler for disco#items on node '%s'." % node_name)
if node_name in self.nodes:
node = self.nodes[node_name]
iq.reply().setPayload(node.items.xml).send()
else:
logging.debug("Node %s requested, but does not exist." % node_name)
iq.reply().error().setPayload(iq['disco_items'].xml)
iq['error']['code'] = '404'
iq['error']['type'] = 'cancel'
iq['error']['condition'] = 'item-not-found'
iq.send()
# Older interface methods for backwards compatibility
def getInfo(self, jid, node=''):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = self.xmpp.fulljid
iq['disco_info']['node'] = node
iq.send()
def getItems(self, jid, node=''):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq['to'] = jid
iq['from'] = self.xmpp.fulljid
iq['disco_items']['node'] = node
iq.send()
def add_feature(self, feature, node='main'):
if not node in self.features:
self.features[node] = []
self.features[node].append(feature)
self.add_node(node)
self.nodes[node].addFeature(feature)
def add_identity(self, category=None, itype=None, name=None, node='main'):
if not node in self.identities:
self.identities[node] = []
self.identities[node].append({'category': category, 'type': itype, 'name': name})
def add_identity(self, category='', itype='', name='', node='main'):
self.add_node(node)
self.nodes[node].addIdentity(category=category,
id_type=itype,
name=name)
def add_item(self, jid=None, name=None, node='main', subnode=''):
if not node in self.items:
self.items[node] = []
self.items[node].append({'jid': jid, 'name': name, 'node': subnode})
def info_handler(self, xml):
logging.debug("Info request from %s" % xml.get('from', ''))
iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId()))
iq.attrib['from'] = xml.get('to')
iq.attrib['to'] = xml.get('from', self.xmpp.server)
query = xml.find('{http://jabber.org/protocol/disco#info}query')
node = query.get('node', 'main')
for identity in self.identities.get(node, []):
idxml = ET.Element('identity')
for attrib in identity:
if identity[attrib]:
idxml.attrib[attrib] = identity[attrib]
query.append(idxml)
for feature in self.features.get(node, []):
featxml = ET.Element('feature')
featxml.attrib['var'] = feature
query.append(featxml)
iq.append(query)
#print ET.tostring(iq)
self.xmpp.send(iq)
def item_handler(self, xml):
logging.debug("Item request from %s" % xml.get('from', ''))
iq = self.xmpp.makeIqResult(xml.get('id', self.xmpp.getNewId()))
iq.attrib['from'] = xml.get('to')
iq.attrib['to'] = xml.get('from', self.xmpp.server)
query = self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items').find('{http://jabber.org/protocol/disco#items}query')
node = xml.find('{http://jabber.org/protocol/disco#items}query').get('node', 'main')
for item in self.items.get(node, []):
itemxml = ET.Element('item')
itemxml.attrib = item
if itemxml.attrib['jid'] is None:
itemxml.attrib['jid'] = xml.get('to')
query.append(itemxml)
self.xmpp.send(iq)
def getItems(self, jid, node=None):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = jid
self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#items')
if node:
iq.find('{http://jabber.org/protocol/disco#items}query').attrib['node'] = node
return iq.send()
def getInfo(self, jid, node=None):
iq = self.xmpp.makeIqGet()
iq.attrib['from'] = self.xmpp.fulljid
iq.attrib['to'] = jid
self.xmpp.makeIqQuery(iq, 'http://jabber.org/protocol/disco#info')
if node:
iq.find('{http://jabber.org/protocol/disco#info}query').attrib['node'] = node
return iq.send()
def parseInfo(self, xml):
result = {'identity': {}, 'feature': []}
for identity in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}identity'):
result['identity'][identity['name']] = identity.attrib
for feature in xml.findall('{http://jabber.org/protocol/disco#info}query/{{http://jabber.org/protocol/disco#info}feature'):
result['feature'].append(feature.get('var', '__unknown__'))
return result
def add_item(self, jid=None, name='', node='main', subnode=''):
self.add_node(node)
self.add_node(subnode)
if jid is None:
jid = self.xmpp.fulljid
self.nodes[node].addItem(jid=jid, name=name, node=subnode)

View File

@ -0,0 +1,52 @@
'''
Created on Jul 1, 2010
@author: bbeggs
'''
from . import base
import logging
import threading
from xml.etree import cElementTree as ET
class xep_0047(base.base_plugin):
'''
In-band file transfer for xmpp.
Both message and iq transfer is supported with message being attempted first.
'''
def plugin_init(self):
self.xep = 'xep-047'
self.description = 'in-band file transfer'
self.acceptTransfers = self.config.get('acceptTransfers', True)
self.saveDirectory = self.config.get('saveDirectory', '/tmp')
self.stanzaType = self.config.get('stanzaType', 'message')
self.maxSendThreads = self.config.get('maxSendThreads', 1)
self.maxReceiveThreads = self.config.get('maxReceiveThreads', 1)
#thread setup
self.receiveThreads = {} #id:thread
self.sendThreads = {}
#add handlers to listen for incoming requests
self.xmpp.add_handler("<iq><open xmlns='http://jabber.org/protocol/ibb' /></iq>", self._handleIncomingTransferRequest)
def post_init(self):
self.post_inited = True
def sendFile(self, filePath, threaded=True):
#TODO use this method to send a file
pass
def _handleIncomingTransferRequest(self, xml):
pass
class receiverThread(threading.Thread):
def run(self):
pass
class senderThread(threading.Thread):
def run(self):
pass

View File

@ -14,12 +14,14 @@ class xep_0060(base.base_plugin):
self.xep = '0060'
self.description = 'Publish-Subscribe'
def create_node(self, jid, node, config=None, collection=False):
def create_node(self, jid, node, config=None, collection=False, ntype=None):
pubsub = ET.Element('{http://jabber.org/protocol/pubsub}pubsub')
create = ET.Element('create')
create.set('node', node)
pubsub.append(create)
configure = ET.Element('configure')
if collection:
ntype = 'collection'
#if config is None:
# submitform = self.xmpp.plugin['xep_0004'].makeForm('submit')
#else:
@ -29,11 +31,11 @@ class xep_0060(base.base_plugin):
submitform.field['FORM_TYPE'].setValue('http://jabber.org/protocol/pubsub#node_config')
else:
submitform.addField('FORM_TYPE', 'hidden', value='http://jabber.org/protocol/pubsub#node_config')
if collection:
if ntype:
if 'pubsub#node_type' in submitform.field:
submitform.field['pubsub#node_type'].setValue('collection')
submitform.field['pubsub#node_type'].setValue(ntype)
else:
submitform.addField('pubsub#node_type', value='collection')
submitform.addField('pubsub#node_type', value=ntype)
else:
if 'pubsub#node_type' in submitform.field:
submitform.field['pubsub#node_type'].setValue('leaf')

View File

@ -45,7 +45,7 @@ class xep_0078(base.base_plugin):
logging.debug("Starting jabber:iq:auth Authentication")
auth_request = self.xmpp.makeIqGet()
auth_request_query = ET.Element('{jabber:iq:auth}query')
auth_request.attrib['to'] = self.xmpp.server
auth_request.attrib['to'] = self.xmpp.domain
username = ET.Element('username')
username.text = self.xmpp.username
auth_request_query.append(username)

View File

@ -38,7 +38,7 @@ class xep_0092(base.base_plugin):
def report_version(self, xml):
iq = self.xmpp.makeIqResult(xml.get('id', 'unknown'))
iq.attrib['to'] = xml.get('from', self.xmpp.server)
iq.attrib['to'] = xml.get('from', self.xmpp.domain)
query = ET.Element('{jabber:iq:version}query')
name = ET.Element('name')
name.text = self.name

View File

@ -41,14 +41,14 @@ class xep_0199(base.base_plugin):
def handler_pingserver(self, xml):
if not self.running:
time.sleep(self.config.get('frequency', 300))
while self.sendPing(self.xmpp.server, self.config.get('timeout', 30)) is not False:
while self.sendPing(self.xmpp.domain, self.config.get('timeout', 30)) is not False:
time.sleep(self.config.get('frequency', 300))
logging.debug("Did not recieve ping back in time. Requesting Reconnect.")
self.xmpp.disconnect(reconnect=True)
def handler_ping(self, xml):
iq = self.xmpp.makeIqResult(xml.get('id', 'unknown'))
iq.attrib['to'] = xml.get('from', self.xmpp.server)
iq.attrib['to'] = xml.get('from', self.xmpp.domain)
self.xmpp.send(iq)
def sendPing(self, jid, timeout = 30):
@ -56,17 +56,13 @@ class xep_0199(base.base_plugin):
Sends a ping to the specified jid, returning the time (in seconds)
to receive a reply, or None if no reply is received in timeout seconds.
"""
id = self.xmpp.getNewId()
iq = self.xmpp.makeIq(id)
iq.attrib['type'] = 'get'
iq = self.xmpp.makeIqGet()
iq.attrib['to'] = jid
ping = ET.Element('{http://www.xmpp.org/extensions/xep-0199.html#ns}ping')
iq.append(ping)
startTime = time.clock()
#pingresult = self.xmpp.send(iq, self.xmpp.makeIq(id), timeout)
pingresult = iq.send()
endTime = time.clock()
if pingresult == False:
#self.xmpp.disconnect(reconnect=True)
return False
return endTime - startTime

View File

@ -0,0 +1,89 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2007 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SleekXMPP is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SleekXMPP; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
from . import base
from xml.etree import cElementTree as ET
from datetime import datetime
XMLNS = 'urn:xmpp:time'
_XMLNS = '{%s}' % XMLNS
class xep_0202(base.base_plugin):
"""
Implements XEP-0202 Entity Time
TODO currently no support for the user's 'local' timezone; `<tzo>` is always reported as `Z` (UTC).
"""
def plugin_init(self):
self.xep = '0202'
self.description = "Entity Time"
self.xmpp.add_handler("<iq type='get'><time xmlns='%s' /></iq>" % XMLNS, self._handle_get)
def post_init(self):
base.base_plugin.post_init(self)
disco = self.xmpp.plugin.get('xep_0030',None)
if disco: disco.add_feature(XMLNS)
def send_request(self,to):
iq = self.xmpp.Iq( stream=self.xmpp, sto=to, stype='get',
xml = ET.Element(_XMLNS + 'time') )
resp = iq.send(iq) # wait for response
return TimeElement(
resp.find(_XMLNS + 'time/utc').text,
xml.find(_XMLNS + 'time/tzo').text )
def _handle_get(self,xml):
iq = self.xmpp.Iq( sid=xml.get('id'), sto=xml.get('from'), stype='result' )
iq.append( TimeElement().to_xml() )
self.xmpp.send(iq)
class TimeElement:
"""
Time response data
"""
def __init__(self, utc=None, tzo="Z"):
if utc is None:
self.utc = datetime.utcnow()
elif type(utc) is str: # parse ISO string
dt_format = '%Y-%m-%dT%H:%M:%S'
if utc.find('.') > -1: dt_format += '.%f' # milliseconds in format
self.utc = datetime.strptime( time_str, dt_format + 'Z' )
elif type(utc) is float: # parse posix timestamp
self.utc = datetime.utcfromtimestamp()
else: self.utc = utc
self.tzo = tzo
def to_xml(self):
time = ET.Element(_XMLNS+'time')
child = ET.Element('tzo')
child.text = str(self.tzo)
time.append( child )
child = ET.Element('utc')
child.text = datetime.isoformat(self.utc) + "Z"
time.append( child )
return time
def __str__(self):
return ET.tostring( self.to_xml() )

View File

@ -1,9 +1,9 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET
@ -11,7 +11,7 @@ class Error(ElementBase):
namespace = 'jabber:client'
name = 'error'
plugin_attrib = 'error'
conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request'))
conditions = set(('bad-request', 'conflict', 'feature-not-implemented', 'forbidden', 'gone', 'internal-server-error', 'item-not-found', 'jid-malformed', 'not-acceptable', 'not-allowed', 'not-authorized', 'payment-required', 'recipient-unavailable', 'redirect', 'registration-required', 'remote-server-not-found', 'remote-server-timeout', 'resource-constraint', 'service-unavailable', 'subscription-required', 'undefined-condition', 'unexpected-request'))
interfaces = set(('code', 'condition', 'text', 'type'))
types = set(('cancel', 'continue', 'modify', 'auth', 'wait'))
sub_interfaces = set(('text',))

View File

@ -1,13 +1,12 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import StanzaBase
from xml.etree import cElementTree as ET
from . error import Error
from .. xmlstream.handler.waiter import Waiter
from .. xmlstream.matcher.id import MatcherId
from . rootstanza import RootStanza
@ -67,11 +66,11 @@ class Iq(RootStanza):
self.xml.remove(child)
return self
def send(self, block=True, timeout=10):
def send(self, block=True, timeout=10, priority=5, init=False):
if block and self['type'] in ('get', 'set'):
waitfor = Waiter('IqWait_%s' % self['id'], MatcherId(self['id']))
self.stream.registerHandler(waitfor)
StanzaBase.send(self)
StanzaBase.send(self, priority, init)
return waitfor.wait(timeout)
else:
return StanzaBase.send(self)
return StanzaBase.send(self, priority, init)

View File

@ -6,8 +6,6 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import StanzaBase
from xml.etree import cElementTree as ET
from . error import Error
from . rootstanza import RootStanza
class Message(RootStanza):

View File

@ -5,7 +5,7 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET
from .. xmlstream.stanzabase import ElementBase
class Nick(ElementBase):
namespace = 'http://jabber.org/nick/nick'

View File

@ -5,8 +5,7 @@
See the file license.txt for copying permission.
"""
from .. xmlstream.stanzabase import ElementBase, ET, JID
import logging
from .. xmlstream.stanzabase import ElementBase, ET
class Roster(ElementBase):
namespace = 'jabber:iq:roster'

View File

@ -43,7 +43,7 @@ class testps(sleekxmpp.ClientXMPP):
self.node = "pstestnode_%s"
self.pshost = pshost
if pshost is None:
self.pshost = self.server
self.pshost = self.domain
self.nodenum = int(nodenum)
self.leafnode = self.nodenum + 1
self.collectnode = self.nodenum + 2

View File

@ -18,7 +18,7 @@ class BaseHandler(object):
def match(self, xml):
return self._matcher.match(xml)
def prerun(self, payload):
def prerun(self, payload): # what's the point of this if the payload is called again in run??
self._payload = payload
def run(self, payload):

View File

@ -17,13 +17,15 @@ class Callback(base.BaseHandler):
self._once = once
self._instream = instream
def prerun(self, payload):
def prerun(self, payload): # prerun actually calls run?!? WTF! Then it gets run AGAIN!
base.BaseHandler.prerun(self, payload)
if self._instream:
# logging.debug('callback "%s" prerun', self.name)
self.run(payload, True)
def run(self, payload, instream=False):
if not self._instream or instream:
# logging.debug('callback "%s" run', self.name)
base.BaseHandler.run(self, payload)
#if self._thread:
# x = threading.Thread(name="Callback_%s" % self.name, target=self._pointer, args=(payload,))

View File

@ -8,6 +8,7 @@
from . import base
from xml.etree import cElementTree
from xml.parsers.expat import ExpatError
import logging
ignore_ns = False
@ -38,7 +39,7 @@ class MatchXMLMask(base.MatcherBase):
try:
maskobj = cElementTree.fromstring(maskobj)
except ExpatError:
logging.log(logging.WARNING, "Expat error: %s\nIn parsing: %s" % ('', maskobj))
logging.exception( "Expat error parsing: %s", maskobj)
if not use_ns and source.tag.split('}', 1)[-1] != maskobj.tag.split('}', 1)[-1]: # strip off ns and compare
return False
if use_ns and (source.tag != maskobj.tag and "{%s}%s" % (self.default_ns, maskobj.tag) != source.tag ):

View File

@ -0,0 +1,88 @@
try:
import queue
except ImportError:
import Queue as queue
import time
import threading
import logging
class Task(object):
"""Task object for the Scheduler class"""
def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.name = name
self.seconds = seconds
self.callback = callback
self.args = args or tuple()
self.kwargs = kwargs or {}
self.repeat = repeat
self.next = time.time() + self.seconds
self.qpointer = qpointer
def run(self):
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
self.next = time.time() + self.seconds
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
self.thread = threading.Thread(name='shedulerprocess', target=self._process)
self.thread.daemon = True
self.thread.start()
else:
self._process()
def _process(self):
self.run = True
while self.run:
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Quitting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
def quit(self):
self.run = False

View File

@ -1,9 +1,9 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from xml.etree import cElementTree as ET
import logging
@ -79,6 +79,9 @@ class ElementBase(tostring.ToString):
self.idx = 0
return self
def __bool__(self):
return True
def __next__(self):
self.idx += 1
if self.idx > len(self.iterables):
@ -319,6 +322,8 @@ class StanzaBase(ElementBase):
def __init__(self, stream=None, xml=None, stype=None, sto=None, sfrom=None, sid=None):
self.stream = stream
if stream is not None:
self.namespace = stream.default_ns
ElementBase.__init__(self, xml)
if stype is not None:
self['type'] = stype
@ -326,8 +331,7 @@ class StanzaBase(ElementBase):
self['to'] = sto
if sfrom is not None:
self['from'] = sfrom
if stream is not None:
self.namespace = stream.default_ns
if sid is not None: self['id'] = sid
self.tag = "{%s}%s" % (self.namespace, self.name)
def setType(self, value):
@ -380,6 +384,7 @@ class StanzaBase(ElementBase):
def exception(self, e):
logging.error(traceback.format_tb(e))
def send(self):
self.stream.sendRaw(self.__str__())
def send(self, priority=5, init=False):
self.stream.sendRaw(self.__str__(), priority, init)

View File

@ -5,55 +5,263 @@
See the file license.txt for copying permission.
"""
from __future__ import with_statement
import threading
import time
import logging
log = logging.getLogger(__name__)
class StateMachine(object):
def __init__(self, states=[], groups=[]):
def __init__(self, states=[]):
self.lock = threading.Lock()
self.__state = {}
self.__default_state = {}
self.__group = {}
self.notifier = threading.Event()
self.__states= []
self.addStates(states)
self.addGroups(groups)
self.__default_state = self.__states[0]
self.__current_state = self.__default_state
def addStates(self, states):
with self.lock:
self.lock.acquire()
try:
for state in states:
if state in self.__state or state in self.__group:
raise IndexError("The state or group '%s' is already in the StateMachine." % state)
self.__state[state] = states[state]
self.__default_state[state] = states[state]
if state in self.__states:
raise IndexError("The state '%s' is already in the StateMachine." % state)
self.__states.append( state )
finally: self.lock.release()
def addGroups(self, groups):
with self.lock:
for gstate in groups:
if gstate in self.__state or gstate in self.__group:
raise IndexError("The key or group '%s' is already in the StateMachine." % gstate)
for state in groups[gstate]:
if state in self.__state:
raise IndexError("The group %s contains a key %s which is not set in the StateMachine." % (gstate, state))
self.__group[gstate] = groups[gstate]
def set(self, state, status):
with self.lock:
if state in self.__state:
self.__state[state] = bool(status)
def transition(self, from_state, to_state, wait=0.0, func=None, args=[], kwargs={} ):
'''
Transition from the given `from_state` to the given `to_state`.
This method will return `True` if the state machine is now in `to_state`. It
will return `False` if a timeout occurred the transition did not occur.
If `wait` is 0 (the default,) this method returns immediately if the state machine
is not in `from_state`.
If you want the thread to block and transition once the state machine to enters
`from_state`, set `wait` to a non-negative value. Note there is no 'block
indefinitely' flag since this leads to deadlock. If you want to wait indefinitely,
choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so:
::
while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ):
pass # timeout will occur every 20s unless transition occurs
if thread_should_exit: return
# perform actions here after successful transition
This allows the thread to be responsive by setting `thread_should_exit=True`.
The optional `func` argument allows the user to pass a callable operation which occurs
within the context of the state transition (e.g. while the state machine is locked.)
If `func` returns a True value, the transition will occur. If `func` returns a non-
True value or if an exception is thrown, the transition will not occur. Any thrown
exception is not caught by the state machine and is the caller's responsibility to handle.
If `func` completes normally, this method will return the value returned by `func.` If
values for `args` and `kwargs` are provided, they are expanded and passed like so:
`func( *args, **kwargs )`.
'''
return self.transition_any( (from_state,), to_state, wait=wait,
func=func, args=args, kwargs=kwargs )
def transition_any(self, from_states, to_state, wait=0.0, func=None, args=[], kwargs={} ):
'''
Transition from any of the given `from_states` to the given `to_state`.
'''
if not (isinstance(from_states,tuple) or isinstance(from_states,list)):
raise ValueError( "from_states should be a list or tuple" )
for state in from_states:
if not state in self.__states:
raise ValueError( "StateMachine does not contain from_state %s." % state )
if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state )
start = time.time()
while not self.__current_state in from_states or not self.lock.acquire(False):
# detect timeout:
remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder)
else: return False
try: # lock is acquired; all other threads will return false or wait until notify/timeout
if self.__current_state in from_states: # should always be True due to lock
# Note that func might throw an exception, but that's OK, it aborts the transition
return_val = func(*args,**kwargs) if func is not None else True
# some 'false' value returned from func,
# indicating that transition should not occur:
if not return_val: return return_val
log.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
self._set_state( to_state )
return return_val # some 'true' value returned by func or True if func was None
else:
raise KeyError("StateMachine does not contain state %s." % state)
log.error( "StateMachine bug!! The lock should ensure this doesn't happen!" )
return False
finally:
self.notifier.set() # notify any waiting threads that the state has changed.
self.notifier.clear()
self.lock.release()
def __getitem__(self, key):
if key in self.__group:
for state in self.__group[key]:
if not self.__state[state]:
return False
return True
return self.__state[key]
def __getattr__(self, attr):
return self.__getitem__(attr)
def transition_ctx(self, from_state, to_state, wait=0.0):
'''
Use the state machine as a context manager. The transition occurs on /exit/ from
the `with` context, so long as no exception is thrown. For example:
::
with state_machine.transition_ctx('one','two', wait=5) as locked:
if locked:
# the state machine is currently locked in state 'one', and will
# transition to 'two' when the 'with' statement ends, so long as
# no exception is thrown.
print 'Currently locked in state one: %s' % state_machine['one']
else:
# The 'wait' timed out, and no lock has been acquired
print 'Timed out before entering state "one"'
print 'Since no exception was thrown, we are now in state "two": %s' % state_machine['two']
The other main difference between this method and `transition()` is that the
state machine is locked for the duration of the `with` statement. Normally,
after a `transition()` occurs, the state machine is immediately unlocked and
available to another thread to call `transition()` again.
'''
if not from_state in self.__states:
raise ValueError( "StateMachine does not contain from_state %s." % from_state )
if not to_state in self.__states:
raise ValueError( "StateMachine does not contain to_state %s." % to_state )
return _StateCtx(self, from_state, to_state, wait)
def ensure(self, state, wait=0.0, block_on_transition=False ):
'''
Ensure the state machine is currently in `state`, or wait until it enters `state`.
'''
return self.ensure_any( (state,), wait=wait, block_on_transition=block_on_transition )
def ensure_any(self, states, wait=0.0, block_on_transition=False):
'''
Ensure we are currently in one of the given `states` or wait until
we enter one of those states.
Note that due to the nature of the function, you cannot guarantee that
the entirety of some operation completes while you remain in a given
state. That would require acquiring and holding a lock, which
would mean no other threads could do the same. (You'd essentially
be serializing all of the threads that are 'ensuring' their tasks
occurred in some state.
'''
if not (isinstance(states,tuple) or isinstance(states,list)):
raise ValueError('states arg should be a tuple or list')
for state in states:
if not state in self.__states:
raise ValueError( "StateMachine does not contain state '%s'" % state )
# if we're in the middle of a transition, determine whether we should
# 'fall back' to the 'current' state, or wait for the new state, in order to
# avoid an operation occurring in the wrong state.
# TODO another option would be an ensure_ctx that uses a semaphore to allow
# threads to indicate they want to remain in a particular state.
# will return immediately if no transition is in process.
if block_on_transition:
# we're not in the middle of a transition; don't hold the lock
if self.lock.acquire(False): self.lock.release()
# wait for the transition to complete
else: self.notifier.wait()
start = time.time()
while not self.__current_state in states:
# detect timeout:
remainder = start + wait - time.time()
if remainder > 0: self.notifier.wait(remainder)
else: return False
return True
def reset(self):
self.__state = self.__default_state
# TODO need to lock before calling this?
self.transition(self.__current_state, self.__default_state)
def _set_state(self, state): #unsynchronized, only call internally after lock is acquired
self.__current_state = state
return state
def current_state(self):
'''
Return the current state name.
'''
return self.__current_state
def __getitem__(self, state):
'''
Non-blocking, non-synchronized test to determine if we are in the given state.
Use `StateMachine.ensure(state)` to wait until the machine enters a certain state.
'''
return self.__current_state == state
def __str__(self):
return "".join(( "StateMachine(", ','.join(self.__states), "): ", self.__current_state ))
class _StateCtx:
def __init__( self, state_machine, from_state, to_state, wait ):
self.state_machine = state_machine
self.from_state = from_state
self.to_state = to_state
self.wait = wait
self._locked = False
def __enter__(self):
start = time.time()
while not self.state_machine[ self.from_state ] or not self.state_machine.lock.acquire(False):
# detect timeout:
remainder = start + self.wait - time.time()
if remainder > 0: self.state_machine.notifier.wait(remainder)
else:
log.debug('StateMachine timeout while waiting for state: %s', self.from_state )
return False
self._locked = True # lock has been acquired at this point
self.state_machine.notifier.clear()
log.debug('StateMachine entered context in state: %s',
self.state_machine.current_state() )
return True
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_val is not None:
log.exception( "StateMachine exception in context, remaining in state: %s\n%s:%s",
self.state_machine.current_state(), exc_type.__name__, exc_val )
if self._locked:
if exc_val is None:
log.debug(' ==== TRANSITION %s -> %s',
self.state_machine.current_state(), self.to_state)
self.state_machine._set_state( self.to_state )
self.state_machine.notifier.set()
self.state_machine.lock.release()
return False # re-raise any exception

View File

@ -1,9 +1,9 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file license.txt for copying permission.
See the file license.txt for copying permission.
"""
from __future__ import with_statement, unicode_literals
@ -14,14 +14,13 @@ except ImportError:
from . import statemachine
from . stanzabase import StanzaBase
from xml.etree import cElementTree
from xml.parsers import expat
import logging
import random
import socket
import threading
import time
import traceback
import types
import xml.sax.saxutils
from . import scheduler
HANDLER_THREADS = 1
@ -40,20 +39,22 @@ if sys.version_info < (3, 0):
class RestartStream(Exception):
pass
class CloseStream(Exception):
pass
stanza_extensions = {}
RECONNECT_MAX_DELAY = 360
RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi
RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole
DEFAULT_KEEPALIVE = 300 # send a single byte every 5 minutes
class XMLStream(object):
"A connection manager with XML events."
def __init__(self, socket=None, host='', port=0, escape_quotes=False):
def __init__(self, socket=None, host='', port=5222, escape_quotes=False):
global ssl_support
self.ssl_support = ssl_support
self.escape_quotes = escape_quotes
self.state = statemachine.StateMachine()
self.state.addStates({'connected':False, 'is client':False, 'ssl':False, 'tls':False, 'reconnect':True, 'processing':False, 'disconnecting':False}) #set initial states
self.state = statemachine.StateMachine(('disconnected','connected'))
self.should_reconnect = True
self.setSocket(socket)
self.address = (host, int(port))
@ -65,79 +66,128 @@ class XMLStream(object):
self.__stanza_extension = {}
self.__handlers = []
self.__tls_socket = None
self.filesocket = None
self.use_ssl = False
self.use_tls = False
self.ca_certs=None
self.keep_alive = DEFAULT_KEEPALIVE
self._last_sent_time = time.time()
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.sendqueue = queue.PriorityQueue()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
self.run = True
# booleans are not volatile in Python and changes
# do not seem to be detected easily between threads.
self.quit = threading.Event()
def setSocket(self, socket):
"Set the socket"
self.socket = socket
if socket is not None:
self.filesocket = socket.makefile('rb', 0) # ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.state.set('connected', True)
with self.state.transition_ctx('disconnected','connected') as locked:
if not locked: raise Exception('Already connected')
# ElementTree.iterparse requires a file. 0 buffer files have to be binary
self.filesocket = socket.makefile('rb', 0)
def setFileSocket(self, filesocket):
self.filesocket = filesocket
def connect(self, host='', port=0, use_ssl=False, use_tls=True):
"Link to connectTCP"
return self.connectTCP(host, port, use_ssl, use_tls)
def connect(self, host='', port=5222, use_ssl=None):
"Establish a socket connection to the given XMPP server."
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
if not self.state.transition('disconnected','connected',
func=self.connectTCP, args=[host, port, use_ssl] ):
if self.state['connected']: logging.debug('Already connected')
else: logging.warning("Connection failed" )
return False
logging.debug('Connection complete.')
return True
# TODO currently a caller can't distinguish between "connection failed" and
# "we're already trying to connect from another thread"
def connectTCP(self, host='', port=5222, use_ssl=None, reattempt=True):
"Connect and create socket"
while reattempt and not self.state['connected']:
if host and port:
self.address = (host, int(port))
if use_ssl is not None:
self.use_ssl = use_ssl
if use_tls is not None:
self.use_tls = use_tls
self.state.set('is client', True)
if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket)
# Note that this is thread-safe by merit of being called solely from connect() which
# holds the state lock.
delay = 1.0 # reconnection delay
while not self.quit.is_set():
logging.debug('connecting....')
try:
if host and port:
self.address = (host, int(port))
if use_ssl is not None:
self.use_ssl = use_ssl
if sys.version_info < (3, 0):
self.socket = filesocket.Socket26(socket.AF_INET, socket.SOCK_STREAM)
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL")
cert_policy = ssl.CERT_NONE if self.ca_certs is None else ssl.CERT_REQUIRED
self.socket = ssl.wrap_socket(self.socket,
ca_certs=self.ca_certs, cert_reqs=cert_policy)
self.socket.connect(self.address)
#self.filesocket = self.socket.makefile('rb', 0)
self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
return True
except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror))
time.sleep(1)
logging.exception("Socket Error #%s: %s", serr.errno, serr.strerror)
if not reattempt: return False
except:
logging.exception("Connection error")
if not reattempt: return False
# quiesce if rconnection fails:
# This algorithm based loosely on Twisted internet.protocol
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
logging.debug('Waiting %.3fs until next reconnect attempt...', delay)
time.sleep(delay)
def connectUnix(self, filepath):
"Connect to Unix file and create socket"
def startTLS(self):
"Handshakes for TLS"
# TODO since this is not part of the 'connectTCP' method, it does not quiesce if
# The TLS negotiation throws an SSLError. It really should. Worse yet, some
# errors might be considered fatal (like certificate verification failure) in which
# case, should we even attempt to re-connect at all?
if self.ssl_support:
logging.info("Negotiating TLS")
self.realsocket = self.socket
self.socket = ssl.wrap_socket(self.socket, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=False)
# self.realsocket = self.socket # NOT USED
cert_policy = ssl.CERT_NONE if self.ca_certs is None else ssl.CERT_REQUIRED
self.socket = ssl.wrap_socket(self.socket,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=False,
cert_reqs=cert_policy,
ca_certs=self.ca_certs)
self.socket.do_handshake()
if sys.version_info < (3,0):
from . filesocket import filesocket
self.filesocket = filesocket(self.socket)
else:
self.filesocket = self.socket.makefile('rb', 0)
logging.debug("TLS negotitation successful")
return True
else:
logging.warning("Tried to enable TLS, but ssl module not found.")
@ -145,67 +195,57 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
self.quit.clear()
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
self.__thread['sendthread'].start()
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
self.__thread['eventhandle%s' % t] = th
th.start()
th = threading.Thread(name='sendthread', target=self._sendThread)
th.setDaemon(True)
self.__thread['sendthread'] = th
th.start()
if threaded:
self.__thread['process'] = threading.Thread(name='process', target=self._process)
self.__thread['process'].start()
th = threading.Thread(name='process', target=self._process)
th.setDaemon(True)
self.__thread['process'] = th
th.start()
else:
self._process()
def schedule(self, seconds, handler, args=None):
threading.Timer(seconds, handler, args).start()
def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
def _process(self):
"Start processing the socket."
firstrun = True
while self.run and (firstrun or self.state['reconnect']):
self.state.set('processing', True)
firstrun = False
logging.debug('Process thread starting...')
while not self.quit.is_set():
if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
try:
if self.state['is client']:
self.sendRaw(self.stream_header)
while self.run and self.__readXML():
if self.state['is client']:
self.sendRaw(self.stream_header)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected")
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.sendRaw(self.stream_header, priority=0, init=True)
self.__readXML() # this loops until the stream is terminated.
except socket.timeout:
# TODO currently this will re-send a stream header if this exception occurs.
# I don't think that's intended behavior.
logging.warn('socket rcv timeout')
except RestartStream:
logging.debug("Restarting stream...")
continue # DON'T re-initialize the stream -- this exception is sent
# specifically when we've initialized TLS and need to re-send the <stream> header.
except (KeyboardInterrupt, SystemExit):
logging.debug("System interrupt detected")
self.shutdown()
self.eventqueue.put(('quit', None, None))
return
except CloseStream:
return
except SystemExit:
self.eventqueue.put(('quit', None, None))
return
except socket.error:
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True)
except:
if not self.state.reconnect:
return
else:
self.state.set('processing', False)
traceback.print_exc()
self.disconnect(reconnect=True)
if self.state['reconnect']:
self.reconnect()
self.state.set('processing', False)
self.eventqueue.put(('quit', None, None))
#self.__thread['readXML'] = threading.Thread(name='readXML', target=self.__readXML)
#self.__thread['readXML'].start()
#self.__thread['spawnEvents'] = threading.Thread(name='spawnEvents', target=self.__spawnEvents)
#self.__thread['spawnEvents'].start()
logging.exception('Unexpected error in RCV thread')
# if the RCV socket is terminated for whatever reason (e.g. we reach this point of
# code,) our only sane choice of action is an attempt to re-establish the connection.
reconnect = (self.should_reconnect and not self.quit.is_set())
self.disconnect(reconnect=reconnect, error=True)
logging.debug('Quitting Process thread')
def __readXML(self):
"Parses the incoming stream, adding to xmlin queue as it goes"
@ -218,83 +258,97 @@ class XMLStream(object):
if edepth == 0: # and xmlobj.tag.split('}', 1)[-1] == self.basetag:
if event == b'start':
root = xmlobj
logging.debug('handling start stream')
self.start_stream_handler(root)
if event == b'end':
edepth += -1
if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect'])
logging.warn("Premature EOF from read socket; Ending readXML loop")
# this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly.
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
try:
self.__spawnEvent(xmlobj)
except RestartStream:
return True
except CloseStream:
return False
if root:
root.clear()
self.__spawnEvent(xmlobj)
if root: root.clear()
if event == b'start':
edepth += 1
logging.warn("Exiting readXML loop")
# TODO under what conditions will this _ever_ occur?
return False
def _sendThread(self):
while self.run:
data = self.sendqueue.get(True)
logging.debug("SEND: %s" % data)
logging.debug('send thread starting...')
while not self.quit.is_set():
if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
data = None
try:
self.socket.send(data.encode('utf-8'))
#self.socket.send(bytes(data, "utf-8"))
#except socket.error,(errno, strerror):
data = self.sendqueue.get(True,5)[1]
logging.debug("SEND: %s" % data)
self.socket.sendall(data.encode('utf-8'))
self._last_sent_time = time.time()
except queue.Empty: # send keep-alive if necessary
now = time.time()
if self._last_sent_time + self.keep_alive < now:
self.socket.sendall(' ')
self._last_sent_time = time.time()
except socket.timeout:
# this is to prevent a thread blocked indefinitely
logging.debug('timeout sending packet data')
except:
logging.warning("Failed to send %s" % data)
self.state.set('connected', False)
if self.state.reconnect:
logging.error("Disconnected. Socket Error.")
traceback.print_exc()
self.disconnect(reconnect=True)
logging.exception("Socket error in SEND thread")
# TODO it's somewhat unsafe for the sender thread to assume it can just
# re-intitialize the connection, since the receiver thread could be doing
# the same thing concurrently. Oops! The safer option would be to throw
# some sort of event that could be handled by a common thread or the reader
# thread to perform reconnect and then re-initialize the handler threads as well.
reconnect = (self.should_reconnect and not self.quit.is_set())
self.disconnect(reconnect=reconnect, error=True)
def sendRaw(self, data):
self.sendqueue.put(data)
def sendRaw( self, data, priority=5, init=False ):
if not self.state.ensure('connected'): return False
self.sendqueue.put((priority, data))
return True
def disconnect(self, reconnect=False):
self.state.set('reconnect', reconnect)
if self.state['disconnecting']:
return
if not self.state['reconnect']:
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer)
time.sleep(1)
#send end of stream
#wait for end of stream back
try:
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error as serr:
#logging.warning("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
#thread.exit_thread()
pass
if self.state['processing']:
#raise CloseStream
pass
def disconnect(self, reconnect=False, error=False):
with self.state.transition_ctx('connected','disconnected') as locked:
if not locked:
logging.warning("Already disconnected.")
return
def reconnect(self):
self.state.set('tls',False)
self.state.set('ssl',False)
time.sleep(1)
self.connect()
logging.debug("Disconnecting...")
# don't send a footer on error; if the stream is already closed,
# this won't get sent until the stream is re-initialized!
if not error: self.sendRaw(self.stream_footer,init=True) #send end of stream
try:
# self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
except socket.error as (errno,strerror):
logging.exception("Error while disconnecting. Socket Error #%s: %s" % (errno, strerror))
try:
self.filesocket.close()
except socket.error as (errno,strerror):
logging.exception("Error closing filesocket.")
if reconnect: self.connect()
def shutdown(self):
'''
Disconnects and shuts down all event threads.
'''
self.run = False
self.scheduler.run = False
self.disconnect()
def incoming_filter(self, xmlobj):
return xmlobj
def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers"
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
#convert XML into Stanza
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
xmlobj = self.incoming_filter(xmlobj)
stanza = None
for stanza_class in self.__root_stanza:
@ -305,11 +359,15 @@ class XMLStream(object):
if stanza is None:
stanza = StanzaBase(self, xmlobj)
unhandled = True
# TODO inefficient linear search; performance might be improved by hashtable lookup
for handler in self.__handlers:
if handler.match(stanza):
# logging.debug('matched stanza to handler %s', handler.name)
handler.prerun(stanza)
self.eventqueue.put(('stanza', handler, stanza))
if handler.checkDelete(): self.__handlers.pop(self.__handlers.index(handler))
if handler.checkDelete():
# logging.debug('deleting callback %s', handler.name)
self.__handlers.pop(self.__handlers.index(handler))
unhandled = False
if unhandled:
stanza.unhandled()
@ -318,24 +376,26 @@ class XMLStream(object):
def _eventRunner(self):
logging.debug("Loading event runner")
while self.run:
while not self.quit.is_set():
try:
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
# logging.debug('Nothing on event queue')
event = None
if event is not None:
etype = event[0]
handler = event[1]
args = event[2:]
#etype, handler, *args = event #python 3.x way
#etype, handler, *args = event #python 3.x way
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
traceback.print_exc()
logging.exception("Exception in event handler")
args[0].exception(e)
elif etype == 'sched':
try:
#handler(*args[0])
handler.run(*args)
except:
logging.error(traceback.format_exc())
@ -432,4 +492,4 @@ class XMLStream(object):
def start_stream_handler(self, xml):
"""Meant to be overridden"""
pass
logging.warn("No start stream handler has been implemented.")

155
tests/test_disco.py Normal file
View File

@ -0,0 +1,155 @@
import unittest
from xml.etree import cElementTree as ET
from sleekxmpp.xmlstream.matcher.stanzapath import StanzaPath
from . import xmlcompare
import sleekxmpp.plugins.xep_0030 as sd
def stanzaPlugin(stanza, plugin):
stanza.plugin_attrib_map[plugin.plugin_attrib] = plugin
stanza.plugin_tag_map["{%s}%s" % (plugin.namespace, plugin.name)] = plugin
class testdisco(unittest.TestCase):
def setUp(self):
self.sd = sd
stanzaPlugin(self.sd.Iq, self.sd.DiscoInfo)
stanzaPlugin(self.sd.Iq, self.sd.DiscoItems)
def try3Methods(self, xmlstring, iq):
iq2 = self.sd.Iq(None, self.sd.ET.fromstring(xmlstring))
values = iq2.getValues()
iq3 = self.sd.Iq()
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3), str(iq)+"3 methods for creating stanza don't match")
def testCreateInfoQueryNoNode(self):
"""Testing disco#info query with no node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = ''
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateInfoQueryWithNode(self):
"""Testing disco#info query with a node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateInfoQueryNoNode(self):
"""Testing disco#items query with no node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = ''
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" /></iq>"""
self.try3Methods(xmlstring, iq)
def testCreateItemsQueryWithNode(self):
"""Testing disco#items query with a node."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo" /></iq>"""
self.try3Methods(xmlstring, iq)
def testInfoIdentities(self):
"""Testing adding identities to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addIdentity('conference', 'text', 'Chatroom')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><identity category="conference" type="text" name="Chatroom" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testInfoFeatures(self):
"""Testing adding features to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_info']['node'] = 'foo'
iq['disco_info'].addFeature('foo')
iq['disco_info'].addFeature('bar')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#info" node="foo"><feature var="foo" /><feature var="bar" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testItems(self):
"""Testing adding features to disco#info."""
iq = self.sd.Iq()
iq['id'] = "0"
iq['disco_items']['node'] = 'foo'
iq['disco_items'].addItem('user@localhost')
iq['disco_items'].addItem('user@localhost', 'foo')
iq['disco_items'].addItem('user@localhost', 'bar', 'Testing')
xmlstring = """<iq id="0"><query xmlns="http://jabber.org/protocol/disco#items" node="foo"><item jid="user@localhost" /><item node="foo" jid="user@localhost" /><item node="bar" jid="user@localhost" name="Testing" /></query></iq>"""
self.try3Methods(xmlstring, iq)
def testAddRemoveIdentities(self):
"""Test adding and removing identities to disco#info stanza"""
ids = [('automation', 'commands', 'AdHoc'),
('conference', 'text', 'ChatRoom')]
info = self.sd.DiscoInfo()
info.addIdentity(*ids[0])
self.failUnless(info.getIdentities() == [ids[0]])
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [])
info.setIdentities(ids)
self.failUnless(info.getIdentities() == ids)
info.delIdentity('automation', 'commands')
self.failUnless(info.getIdentities() == [ids[1]])
info.delIdentities()
self.failUnless(info.getIdentities() == [])
def testAddRemoveFeatures(self):
"""Test adding and removing features to disco#info stanza"""
features = ['foo', 'bar', 'baz']
info = self.sd.DiscoInfo()
info.addFeature(features[0])
self.failUnless(info.getFeatures() == [features[0]])
info.delFeature('foo')
self.failUnless(info.getFeatures() == [])
info.setFeatures(features)
self.failUnless(info.getFeatures() == features)
info.delFeature('bar')
self.failUnless(info.getFeatures() == ['foo', 'baz'])
info.delFeatures()
self.failUnless(info.getFeatures() == [])
def testAddRemoveItems(self):
"""Test adding and removing items to disco#items stanza"""
items = [('user@localhost', None, None),
('user@localhost', 'foo', None),
('user@localhost', 'bar', 'Test')]
info = self.sd.DiscoItems()
self.failUnless(True, ""+str(items[0]))
info.addItem(*(items[0]))
self.failUnless(info.getItems() == [items[0]], info.getItems())
info.delItem('user@localhost')
self.failUnless(info.getItems() == [])
info.setItems(items)
self.failUnless(info.getItems() == items)
info.delItem('user@localhost', 'foo')
self.failUnless(info.getItems() == [items[0], items[2]])
info.delItems()
self.failUnless(info.getItems() == [])
suite = unittest.TestLoader().loadTestsFromTestCase(testdisco)

View File

@ -97,6 +97,21 @@ class testpubsubstanzas(unittest.TestCase):
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3))
def testState(self):
"Testing iq/psstate stanzas"
from sleekxmpp.plugins import xep_0004
iq = self.ps.Iq()
iq['psstate']['node']= 'mynode'
iq['psstate']['item']= 'myitem'
pl = ET.Element('{http://andyet.net/protocol/pubsubqueue}claimed')
iq['psstate']['payload'] = pl
xmlstring = """<iq id="0"><state xmlns="http://jabber.org/protocol/psstate" node="mynode" item="myitem"><claimed xmlns="http://andyet.net/protocol/pubsubqueue" /></state></iq>"""
iq2 = self.ps.Iq(None, self.ps.ET.fromstring(xmlstring))
iq3 = self.ps.Iq()
values = iq2.getValues()
iq3.setValues(values)
self.failUnless(xmlstring == str(iq) == str(iq2) == str(iq3))
def testDefault(self):
"Testing iq/pubsub_owner/default stanzas"
from sleekxmpp.plugins import xep_0004

328
tests/test_statemachine.py Normal file
View File

@ -0,0 +1,328 @@
import unittest
import time, threading, random, functools
if __name__ == '__main__':
import sys, os
sys.path.insert(0, os.getcwd())
import sleekxmpp.xmlstream.statemachine as sm
class testStateMachine(unittest.TestCase):
def setUp(self): pass
def testDefaults(self):
"Test ensure transitions occur correctly in a single thread"
s = sm.StateMachine(('one','two','three'))
self.assertTrue(s['one'])
self.failIf(s['two'])
try:
s['booga']
self.fail('s.booga is an invalid state and should throw an exception!')
except: pass #expected exception
# just make sure __str__ works, no reason to test its exact value:
print str(s)
def testTransitions(self):
"Test ensure transitions occur correctly in a single thread"
s = sm.StateMachine(('one','two','three'))
self.assertTrue( s.transition('one', 'two') )
self.assertTrue( s['two'] )
self.failIf( s['one'] )
self.assertTrue( s.transition('two', 'three') )
self.assertTrue( s['three'] )
self.failIf( s['two'] )
self.assertTrue( s.transition('three', 'one') )
self.assertTrue( s['one'] )
self.failIf( s['three'] )
# should return False immediately w/ no wait:
self.failIf( s.transition('three', 'one') )
self.assertTrue( s['one'] )
self.failIf( s['three'] )
# test fail condition w/ a short delay:
self.failIf( s.transition('two', 'three') )
# Ensure bad states are weeded out:
try:
s.transition('blah', 'three')
s.fail('Exception expected')
except: pass
try:
s.transition('one', 'blahblah')
s.fail('Exception expected')
except: pass
def testTransitionsBlocking(self):
"Test that transitions block from more than one thread"
s = sm.StateMachine(('one','two','three'))
self.assertTrue(s['one'])
now = time.time()
self.failIf( s.transition('two', 'one', wait=5.0) )
self.assertTrue( time.time() > now + 4 )
self.assertTrue( time.time() < now + 7 )
def testThreadedTransitions(self):
"Test that transitions are atomic in > one thread"
s = sm.StateMachine(('one','two','three'))
self.assertTrue(s['one'])
thread_state = {'ready': False, 'transitioned': False}
def t1():
if s['two']:
print 'thread has already transitioned!'
self.fail()
thread_state['ready'] = True
print 'Thread is ready'
# this will block until the main thread transitions to 'two'
self.assertTrue( s.transition('two','three', wait=20) )
print 'transitioned to three!'
thread_state['transitioned'] = True
thread = threading.Thread(target=t1)
thread.daemon = True
thread.start()
start = time.time()
while not thread_state['ready']:
print 'not ready'
if time.time() > start+10: self.fail('Timeout waiting for thread to init!')
time.sleep(0.1)
time.sleep(0.2) # the thread should be blocking on the 'transition' call at this point.
self.failIf( thread_state['transitioned'] ) # ensure it didn't 'go' yet.
print 'transitioning to two!'
self.assertTrue( s.transition('one','two') )
time.sleep(0.2) # second thread should have transitioned now:
self.assertTrue( thread_state['transitioned'] )
def testForRaceCondition(self):
"""Attempt to allow two threads to perform the same transition;
only one should ever make it."""
s = sm.StateMachine(('one','two','three'))
def t1(num):
while True:
if not trigger['go'] or thread_state[num] in (True,False):
time.sleep( random.random()/100 ) # < .01s
if thread_state[num] == 'quit': break
continue
thread_state[num] = s.transition('one','two' )
# print '-',
thread_count = 20
threads = []
thread_state = {}
def reset():
for c in range(thread_count): thread_state[c] = "reset"
trigger = {'go':False} # use of a plain boolean seems to be non-volatile between threads.
for c in range(thread_count):
thread_state[c] = "reset"
thread = threading.Thread( target= functools.partial(t1,c) )
threads.append( thread )
thread.daemon = True
thread.start()
for x in range(100): # this will take 10s to execute
# print "+",
trigger['go'] = True
time.sleep(.1)
trigger['go'] = False
winners = 0
for (num, state) in thread_state.items():
if state == True: winners = winners +1
elif state != False: raise Exception( "!%d!%s!" % (num,state) )
self.assertEqual( 1, winners, "Expected one winner! %d" % winners )
self.assertTrue( s.ensure('two') )
self.assertTrue( s.transition('two','one') ) # return to the first state.
reset()
# now let the threads quit gracefully:
for c in range(thread_count): thread_state[c] = 'quit'
time.sleep(2)
def testTransitionFunctions(self):
"test that a `func` argument allows or blocks the transition correctly."
s = sm.StateMachine(('one','two','three'))
def alwaysFalse(): return False
def alwaysTrue(): return True
self.failIf( s.transition('one','two', func=alwaysFalse) )
self.assertTrue(s['one'])
self.failIf(s['two'])
self.assertTrue( s.transition('one','two', func=alwaysTrue) )
self.failIf(s['one'])
self.assertTrue(s['two'])
def testTransitionFuncException(self):
"if a transition function throws an exeption, ensure we're in a sane state"
s = sm.StateMachine(('one','two','three'))
def alwaysException(): raise Exception('whups!')
try:
self.failIf( s.transition('one','two', func=alwaysException) )
self.fail("exception should have been thrown")
except: pass #expected exception
self.assertTrue(s['one'])
self.failIf(s['two'])
# ensure a subsequent attempt completes normally:
self.assertTrue( s.transition('one','two') )
self.failIf(s['one'])
self.assertTrue(s['two'])
def testContextManager(self):
s = sm.StateMachine(('one','two','three'))
with s.transition_ctx('one','two'):
self.assertTrue( s['one'] )
self.failIf( s['two'] )
#successful transition b/c no exception was thrown
self.assertTrue( s['two'] )
self.failIf( s['one'] )
# failed transition because exception is thrown:
try:
with s.transition_ctx('two','three'):
raise Exception("boom!")
self.fail('exception expected')
except: pass
self.failIf( s.current_state() in ('one','three') )
self.assertTrue( s['two'] )
def testCtxManagerTransitionFailure(self):
s = sm.StateMachine(('one','two','three'))
with s.transition_ctx('two','three') as result:
self.failIf( result )
self.assertTrue( s['one'] )
self.failIf( s.current_state in ('two','three') )
self.assertTrue( s['one'] )
def r1():
print 'thread 1 started'
self.assertTrue( s.transition('one','two') )
print 'thread 1 transitioned'
def r2():
print 'thread 2 started'
self.failIf( s['two'] )
with s.transition_ctx('two','three', 10) as result:
self.assertTrue( result )
self.assertTrue( s['two'] )
print 'thread 2 will transition on exit from the context manager...'
self.assertTrue( s['three'] )
print 'transitioned to %s' % s.current_state()
t1 = threading.Thread(target=r1)
t2 = threading.Thread(target=r2)
t2.start() # this should block until r1 goes
time.sleep(1)
t1.start()
t1.join()
t2.join()
self.assertTrue( s['three'] )
def testTransitionsDontUnintentionallyBlock(self):
'''
There was a bug where a long-running transition (e.g. one with a 'func'
arg or a `transition_ctx` call would cause any `transition` or `ensure`
call to block since the lock is acquired before checking the current
state. Attempts to acquire the mutex need to be non-blocking so when a
timeout is _not_ given, the caller can return immediately. At the same
time, threads that _do_ want to wait need the ability to be notified
(to avoid waiting beyond when the lock is released) so we've moved to a
combination of a plain-ol `threading.Lock` to act as mutex, and a
`threading.Event` to perform notification for threads who choose to wait.
'''
s = sm.StateMachine(('one','two','three'))
with s.transition_ctx('two','three') as result:
self.failIf( result )
self.assertTrue( s['one'] )
self.failIf( s.current_state in ('two','three') )
self.assertTrue( s['one'] )
statuses = {'t1':"not started",
't2':'not started'}
def t1():
print 'thread 1 started'
# no wait, so this should 'return False' immediately.
self.failIf( s.transition('two','three') )
statuses['t1'] = 'complete'
print 'thread 1 transitioned'
def t2():
print 'thread 2 started'
self.failIf( s['two'] )
self.failIf( s['three'] )
# we want this thread to acquire the lock, but for
# the second thread not to wait on the first.
with s.transition_ctx('one','two', 10) as locked:
statuses['t2'] = 'started'
print 'thread 2 has entered context'
self.assertTrue( locked )
# give thread1 a chance to complete while this
# thread still owns the lock
time.sleep(5)
self.assertTrue( s['two'] )
statuses['t2'] = 'complete'
t1 = threading.Thread(target=t1)
t2 = threading.Thread(target=t2)
t2.start() # this should acquire the lock
time.sleep(.2)
self.assertEqual( 'started', statuses['t2'] )
t1.start() # but it shouldn't prevent thread 1 from completing
time.sleep(1)
self.assertEqual( 'complete', statuses['t1'] )
t1.join()
t2.join()
self.assertEqual( 'complete', statuses['t2'] )
self.assertTrue( s['two'] )
suite = unittest.TestLoader().loadTestsFromTestCase(testStateMachine)
if __name__ == '__main__': unittest.main()