Cleanup how events are run, they are always direct by definition now

This commit is contained in:
Florent Le Coz 2014-07-23 17:01:17 +02:00
parent 5611b30022
commit 74117453b5
7 changed files with 41 additions and 75 deletions

View File

@ -108,10 +108,6 @@ when this bit of XML is received (with an assumed namespace of
handlers <event handler>`. Each stanza/handler pair is then put into the handlers <event handler>`. Each stanza/handler pair is then put into the
event queue. event queue.
.. note::
It is possible to skip the event queue and process an event immediately
by using ``direct=True`` when raising the event.
The code for :meth:`BaseXMPP._handle_message` follows this pattern, and The code for :meth:`BaseXMPP._handle_message` follows this pattern, and
raises a ``'message'`` event:: raises a ``'message'`` event::

View File

@ -152,7 +152,7 @@ class ComponentXMPP(BaseXMPP):
""" """
self.session_bind_event.set() self.session_bind_event.set()
self.session_started_event.set() self.session_started_event.set()
self.event('session_bind', self.boundjid, direct=True) self.event('session_bind', self.boundjid)
self.event('session_start') self.event('session_start')
def _handle_probe(self, pres): def _handle_probe(self, pres):

View File

@ -54,7 +54,7 @@ class FeatureBind(BasePlugin):
def _on_bind_response(self, response): def _on_bind_response(self, response):
self.xmpp.boundjid = JID(response['bind']['jid'], cache_lock=True) self.xmpp.boundjid = JID(response['bind']['jid'], cache_lock=True)
self.xmpp.bound = True self.xmpp.bound = True
self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True) self.xmpp.event('session_bind', self.xmpp.boundjid)
self.xmpp.session_bind_event.set() self.xmpp.session_bind_event.set()
self.xmpp.features.add('bind') self.xmpp.features.add('bind')

View File

@ -172,8 +172,8 @@ class FeatureMechanisms(BasePlugin):
min_mech=self.min_mech) min_mech=self.min_mech)
except sasl.SASLNoAppropriateMechanism: except sasl.SASLNoAppropriateMechanism:
log.error("No appropriate login method.") log.error("No appropriate login method.")
self.xmpp.event("no_auth", direct=True) self.xmpp.event("no_auth")
self.xmpp.event("failed_auth", direct=True) self.xmpp.event("failed_auth")
self.attempted_mechs = set() self.attempted_mechs = set()
return self.xmpp.disconnect() return self.xmpp.disconnect()
except StringPrepError: except StringPrepError:
@ -232,7 +232,7 @@ class FeatureMechanisms(BasePlugin):
self.attempted_mechs = set() self.attempted_mechs = set()
self.xmpp.authenticated = True self.xmpp.authenticated = True
self.xmpp.features.add('mechanisms') self.xmpp.features.add('mechanisms')
self.xmpp.event('auth_success', stanza, direct=True) self.xmpp.event('auth_success', stanza)
# Restart the stream # Restart the stream
self.xmpp.init_parser() self.xmpp.init_parser()
self.xmpp.send_raw(self.xmpp.stream_header) self.xmpp.send_raw(self.xmpp.stream_header)
@ -241,6 +241,6 @@ class FeatureMechanisms(BasePlugin):
"""SASL authentication failed. Disconnect and shutdown.""" """SASL authentication failed. Disconnect and shutdown."""
self.attempted_mechs.add(self.mech.name) self.attempted_mechs.add(self.mech.name)
log.info("Authentication failed: %s", stanza['condition']) log.info("Authentication failed: %s", stanza['condition'])
self.xmpp.event("failed_auth", stanza, direct=True) self.xmpp.event("failed_auth", stanza)
self._send_auth() self._send_auth()
return True return True

View File

@ -77,7 +77,7 @@ class XEP_0077(BasePlugin):
if self.create_account and self.xmpp.event_handled('register'): if self.create_account and self.xmpp.event_handled('register'):
form = self.get_registration() form = self.get_registration()
self.xmpp.event('register', form, direct=True) self.xmpp.event('register', form)
return True return True
return False return False

View File

@ -82,12 +82,12 @@ class XEP_0078(BasePlugin):
resp = iq.send(now=True) resp = iq.send(now=True)
except IqError as err: except IqError as err:
log.info("Authentication failed: %s", err.iq['error']['condition']) log.info("Authentication failed: %s", err.iq['error']['condition'])
self.xmpp.event('failed_auth', direct=True) self.xmpp.event('failed_auth')
self.xmpp.disconnect() self.xmpp.disconnect()
return True return True
except IqTimeout: except IqTimeout:
log.info("Authentication failed: %s", 'timeout') log.info("Authentication failed: %s", 'timeout')
self.xmpp.event('failed_auth', direct=True) self.xmpp.event('failed_auth')
self.xmpp.disconnect() self.xmpp.disconnect()
return True return True
@ -123,11 +123,11 @@ class XEP_0078(BasePlugin):
result = iq.send(now=True) result = iq.send(now=True)
except IqError as err: except IqError as err:
log.info("Authentication failed") log.info("Authentication failed")
self.xmpp.event("failed_auth", direct=True) self.xmpp.event("failed_auth")
self.xmpp.disconnect() self.xmpp.disconnect()
except IqTimeout: except IqTimeout:
log.info("Authentication failed") log.info("Authentication failed")
self.xmpp.event("failed_auth", direct=True) self.xmpp.event("failed_auth")
self.xmpp.disconnect() self.xmpp.disconnect()
self.xmpp.features.add('auth') self.xmpp.features.add('auth')
@ -137,7 +137,7 @@ class XEP_0078(BasePlugin):
self.xmpp.boundjid = JID(self.xmpp.requested_jid, self.xmpp.boundjid = JID(self.xmpp.requested_jid,
resource=resource, resource=resource,
cache_lock=True) cache_lock=True)
self.xmpp.event('session_bind', self.xmpp.boundjid, direct=True) self.xmpp.event('session_bind', self.xmpp.boundjid)
log.debug("Established Session") log.debug("Established Session")
self.xmpp.sessionstarted = True self.xmpp.sessionstarted = True

View File

@ -612,16 +612,13 @@ class XMLStream(object):
""" """
return len(self.__event_handlers.get(name, [])) return len(self.__event_handlers.get(name, []))
def event(self, name, data={}, direct=False): def event(self, name, data={}):
"""Manually trigger a custom event. """Manually trigger a custom event.
:param name: The name of the event to trigger. :param name: The name of the event to trigger.
:param data: Data that will be passed to each event handler. :param data: Data that will be passed to each event handler.
Defaults to an empty dictionary, but is usually Defaults to an empty dictionary, but is usually
a stanza object. a stanza object.
:param direct: Runs the event directly if True, skipping the
event queue. All event handlers will run in the
same thread.
""" """
log.debug("Event triggered: " + name) log.debug("Event triggered: " + name)
@ -633,18 +630,13 @@ class XMLStream(object):
out_data = copy.copy(data) if len(handlers) > 1 else data out_data = copy.copy(data) if len(handlers) > 1 else data
old_exception = getattr(data, 'exception', None) old_exception = getattr(data, 'exception', None)
if direct: try:
try: handler_callback(out_data)
handler_callback(out_data) except Exception as e:
except Exception as e: if old_exception:
error_msg = 'Error processing event handler: %s' old_exception(e)
log.exception(error_msg, str(handler_callback)) else:
if old_exception: self.exception(e)
old_exception(e)
else:
self.exception(e)
else:
self.run_event(('event', handler, out_data))
if disposable: if disposable:
# If the handler is disposable, we will go ahead and # If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be # remove it now instead of waiting for it to be
@ -687,11 +679,19 @@ class XMLStream(object):
except KeyError: except KeyError:
log.debug("Tried to cancel unscheduled event: %s" % (name,)) log.debug("Tried to cancel unscheduled event: %s" % (name,))
def _safe_cb_run(self, name, cb):
log.debug('Scheduled event: %s', name)
try:
cb()
except Exception as e:
log.exception('Error processing scheduled task')
self.exception(e)
def _execute_and_reschedule(self, name, cb, seconds): def _execute_and_reschedule(self, name, cb, seconds):
"""Simple method that calls the given callback, and then schedule itself to """Simple method that calls the given callback, and then schedule itself to
be called after the given number of seconds. be called after the given number of seconds.
""" """
cb() self._safe_cb_run(name, cb)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
handle = loop.call_later(seconds, self._execute_and_reschedule, handle = loop.call_later(seconds, self._execute_and_reschedule,
name, cb, seconds) name, cb, seconds)
@ -701,7 +701,7 @@ class XMLStream(object):
""" """
Execute the callback and remove the handler for it. Execute the callback and remove the handler for it.
""" """
cb() self._safe_cb_run(name, cb)
del self.scheduled_events[name] del self.scheduled_events[name]
def incoming_filter(self, xml): def incoming_filter(self, xml):
@ -817,7 +817,7 @@ class XMLStream(object):
# Match the stanza against registered handlers. Handlers marked # Match the stanza against registered handlers. Handlers marked
# to run "in stream" will be executed immediately; the rest will # to run "in stream" will be executed immediately; the rest will
# be queued. # be queued.
unhandled = True handled = False
matched_handlers = [h for h in self.__handlers if h.match(stanza)] matched_handlers = [h for h in self.__handlers if h.match(stanza)]
for handler in matched_handlers: for handler in matched_handlers:
if len(matched_handlers) > 1: if len(matched_handlers) > 1:
@ -825,50 +825,20 @@ class XMLStream(object):
else: else:
stanza_copy = stanza stanza_copy = stanza
handler.prerun(stanza_copy) handler.prerun(stanza_copy)
self.run_event(('stanza', handler, stanza_copy))
try: try:
if handler.check_delete(): handler.run(stanza_copy)
self.__handlers.remove(handler)
except:
pass # not thread safe
unhandled = False
# Some stanzas require responses, such as Iq queries. A default
# handler will be executed immediately for this case.
if unhandled:
stanza.unhandled()
def run_event(self, event):
etype, handler = event[0:2]
args = event[2:]
orig = copy.copy(args[0])
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e: except Exception as e:
error_msg = 'Error processing stream handler: %s' error_msg = 'Error processing stream handler: %s'
log.exception(error_msg, handler.name) log.exception(error_msg, handler.name)
orig.exception(e) stanza_copy.exception(e)
elif etype == 'schedule': if handler.check_delete():
name = args[2] self.__handlers.remove(handler)
try: handled = True
log.debug('Scheduled event: %s: %s', name, args[0])
handler(*args[0], **args[1]) # Some stanzas require responses, such as Iq queries. A default
except Exception as e: # handler will be executed immediately for this case.
log.exception('Error processing scheduled task') if not handled:
self.exception(e) stanza.unhandled()
elif etype == 'event':
func, disposable = handler
try:
func(*args)
except Exception as e:
error_msg = 'Error processing event handler: %s'
log.exception(error_msg, str(func))
if hasattr(orig, 'exception'):
orig.exception(e)
else:
self.exception(e)
def exception(self, exception): def exception(self, exception):
"""Process an unknown exception. """Process an unknown exception.