Merge branch 'master' into develop

This commit is contained in:
Lance Stout 2012-10-31 13:42:32 -07:00
commit 4b7ec4a32a
4 changed files with 109 additions and 34 deletions

View File

@ -69,6 +69,20 @@ JID_CACHE = OrderedDict()
JID_CACHE_LOCK = threading.Lock() JID_CACHE_LOCK = threading.Lock()
JID_CACHE_MAX_SIZE = 1024 JID_CACHE_MAX_SIZE = 1024
def _cache(key, parts, locked):
JID_CACHE[key] = (parts, locked)
if len(JID_CACHE) > JID_CACHE_MAX_SIZE:
with JID_CACHE_LOCK:
while len(JID_CACHE) > JID_CACHE_MAX_SIZE:
found = None
for key, item in JID_CACHE.iteritems():
if not item[1]: # if not locked
found = key
break
if not found: # more than MAX_SIZE locked
# warn?
break
del JID_CACHE[found]
# pylint: disable=c0103 # pylint: disable=c0103
#: The nodeprep profile of stringprep used to validate the local, #: The nodeprep profile of stringprep used to validate the local,
@ -418,19 +432,29 @@ class JID(object):
# pylint: disable=W0212 # pylint: disable=W0212
def __init__(self, jid=None, **kwargs): def __init__(self, jid=None, **kwargs):
jid_data = (jid, kwargs.get('local', None),
kwargs.get('domain', None),
kwargs.get('resource', None))
locked = kwargs.get('cache_lock', False) locked = kwargs.get('cache_lock', False)
in_local = kwargs.get('local', None)
in_domain = kwargs.get('domain', None)
in_resource = kwargs.get('resource', None)
parts = None
if in_local or in_domain or in_resource:
parts = (in_local, in_domain, in_resource)
if jid_data in JID_CACHE: # only check cache if there is a jid string, or parts, not if there
parsed_jid, locked = JID_CACHE[jid_data] # are both
self._jid = parsed_jid self._jid = None
else: key = None
if jid is None: if (jid is not None) and (parts is None):
jid = '' if isinstance(jid, JID):
# it's already good to go, and there are no additions
self._jid = jid._jid
return
key = jid
self._jid, locked = JID_CACHE.get(jid, (None, locked))
elif jid is None and parts is not None:
key = parts
self._jid, locked = JID_CACHE.get(parts, (None, locked))
if not self._jid:
if not jid: if not jid:
parsed_jid = (None, None, None) parsed_jid = (None, None, None)
elif not isinstance(jid, JID): elif not isinstance(jid, JID):
@ -440,27 +464,16 @@ class JID(object):
local, domain, resource = parsed_jid local, domain, resource = parsed_jid
local = kwargs.get('local', local)
domain = kwargs.get('domain', domain)
resource = kwargs.get('resource', resource)
if 'local' in kwargs: if 'local' in kwargs:
local = _escape_node(local) local = _escape_node(in_local)
if 'domain' in kwargs: if 'domain' in kwargs:
domain = _validate_domain(domain) domain = _validate_domain(in_domain)
if 'resource' in kwargs: if 'resource' in kwargs:
resource = _validate_resource(resource) resource = _validate_resource(in_resource)
self._jid = (local, domain, resource) self._jid = (local, domain, resource)
if key:
JID_CACHE[jid_data] = (self._jid, locked) _cache(key, self._jid, locked)
if len(JID_CACHE) > JID_CACHE_MAX_SIZE:
with JID_CACHE_LOCK:
key, item = JID_CACHE.popitem(False)
if item[1]:
# Need to reinsert locked JIDs
JID_CACHE[key] = item
def unescape(self): def unescape(self):
"""Return an unescaped JID object. """Return an unescaped JID object.

View File

@ -324,6 +324,8 @@ class XEP_0030(BasePlugin):
callback -- Optional callback to execute when a reply is callback -- Optional callback to execute when a reply is
received instead of blocking and waiting for received instead of blocking and waiting for
the reply. the reply.
timeout_callback -- Optional callback to execute when no result
has been received in timeout seconds.
""" """
if local is None: if local is None:
if jid is not None and not isinstance(jid, JID): if jid is not None and not isinstance(jid, JID):
@ -364,7 +366,8 @@ class XEP_0030(BasePlugin):
iq['disco_info']['node'] = node if node else '' iq['disco_info']['node'] = node if node else ''
return iq.send(timeout=kwargs.get('timeout', None), return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True), block=kwargs.get('block', True),
callback=kwargs.get('callback', None)) callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_info(self, jid=None, node=None, info=None): def set_info(self, jid=None, node=None, info=None):
""" """
@ -405,6 +408,8 @@ class XEP_0030(BasePlugin):
iterator -- If True, return a result set iterator using iterator -- If True, return a result set iterator using
the XEP-0059 plugin, if the plugin is loaded. the XEP-0059 plugin, if the plugin is loaded.
Otherwise the parameter is ignored. Otherwise the parameter is ignored.
timeout_callback -- Optional callback to execute when no result
has been received in timeout seconds.
""" """
if local or local is None and jid is None: if local or local is None and jid is None:
items = self.api['get_items'](jid, node, items = self.api['get_items'](jid, node,
@ -423,7 +428,8 @@ class XEP_0030(BasePlugin):
else: else:
return iq.send(timeout=kwargs.get('timeout', None), return iq.send(timeout=kwargs.get('timeout', None),
block=kwargs.get('block', True), block=kwargs.get('block', True),
callback=kwargs.get('callback', None)) callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_items(self, jid=None, node=None, **kwargs): def set_items(self, jid=None, node=None, **kwargs):
""" """

View File

@ -154,7 +154,7 @@ class Iq(RootStanza):
StanzaBase.reply(self, clear) StanzaBase.reply(self, clear)
return self return self
def send(self, block=True, timeout=None, callback=None, now=False): def send(self, block=True, timeout=None, callback=None, now=False, timeout_callback=None):
""" """
Send an <iq> stanza over the XML stream. Send an <iq> stanza over the XML stream.
@ -181,15 +181,32 @@ class Iq(RootStanza):
now -- Indicates if the send queue should be skipped and send now -- Indicates if the send queue should be skipped and send
the stanza immediately. Used during stream the stanza immediately. Used during stream
initialization. Defaults to False. initialization. Defaults to False.
timeout_callback -- Optional reference to a stream handler function.
Will be executed when the timeout expires before a
response has been received with the originally-sent IQ
stanza. Only called if there is a callback parameter
(and therefore are in async mode).
""" """
if timeout is None: if timeout is None:
timeout = self.stream.response_timeout timeout = self.stream.response_timeout
if callback is not None and self['type'] in ('get', 'set'): if callback is not None and self['type'] in ('get', 'set'):
handler_name = 'IqCallback_%s' % self['id'] handler_name = 'IqCallback_%s' % self['id']
handler = Callback(handler_name, if timeout_callback:
MatcherId(self['id']), self.callback = callback
callback, self.timeout_callback = timeout_callback
once=True) self.stream.schedule('IqTimeout_%s' % self['id'],
timeout,
self._fire_timeout,
repeat=False)
handler = Callback(handler_name,
MatcherId(self['id']),
self._handle_result,
once=True)
else:
handler = Callback(handler_name,
MatcherId(self['id']),
callback,
once=True)
self.stream.register_handler(handler) self.stream.register_handler(handler)
StanzaBase.send(self, now=now) StanzaBase.send(self, now=now)
return handler_name return handler_name
@ -206,6 +223,16 @@ class Iq(RootStanza):
else: else:
return StanzaBase.send(self, now=now) return StanzaBase.send(self, now=now)
def _handle_result(self, iq):
# we got the IQ, so don't fire the timeout
self.stream.scheduler.remove('IqTimeout_%s' % self['id'])
self.callback(iq)
def _fire_timeout(self):
# don't fire the handler for the IQ, if it finally does come in
self.stream.remove_handler('IqCallback_%s' % self['id'])
self.timeout_callback(self)
def _set_stanza_values(self, values): def _set_stanza_values(self, values):
""" """
Set multiple stanza interface values using a dictionary. Set multiple stanza interface values using a dictionary.

View File

@ -153,6 +153,35 @@ class TestHandlers(SleekTest):
self.failUnless(events == ['foo'], self.failUnless(events == ['foo'],
"Iq callback was not executed: %s" % events) "Iq callback was not executed: %s" % events)
def testIqTimeoutCallback(self):
"""Test that iq.send(tcallback=handle_foo, timeout_callback=handle_timeout) works."""
events = []
def handle_foo(iq):
events.append('foo')
def handle_timeout(iq):
events.append('timeout')
iq = self.Iq()
iq['type'] = 'get'
iq['id'] = 'test-foo'
iq['to'] = 'user@localhost'
iq['query'] = 'foo'
iq.send(callback=handle_foo, timeout_callback=handle_timeout, timeout=0.05)
self.send("""
<iq type="get" id="test-foo" to="user@localhost">
<query xmlns="foo" />
</iq>
""")
# Give event queue time to process
time.sleep(1)
self.failUnless(events == ['timeout'],
"Iq timeout was not executed: %s" % events)
def testMultipleHandlersForStanza(self): def testMultipleHandlersForStanza(self):
""" """
Test that multiple handlers for a single stanza work Test that multiple handlers for a single stanza work