XMLStream: add a forever parameter to process(), defaulting to True, to select whether we want to stop the event loop after a disconnection

This commit is contained in:
Emmanuel Gil Peyrot 2015-02-28 14:04:42 +01:00
parent 0ef3fa2703
commit 632b7b4afe
3 changed files with 17 additions and 7 deletions

View File

@ -53,7 +53,8 @@ Running the event loop
:meth:`.XMLStream.process` is only a thin wrapper on top of :meth:`.XMLStream.process` is only a thin wrapper on top of
``loop.run_forever()`` (if ``timeout`` is provided then it will ``loop.run_forever()`` (if ``timeout`` is provided then it will
only run for this amount of time). only run for this amount of time, and if ``forever`` is False it will
run until disconnection).
Therefore you can handle the event loop in any way you like Therefore you can handle the event loop in any way you like
instead of using ``process()``. instead of using ``process()``.

View File

@ -203,9 +203,9 @@ class BaseXMPP(XMLStream):
log.warning('Legacy XMPP 0.9 protocol detected.') log.warning('Legacy XMPP 0.9 protocol detected.')
self.event('legacy_protocol') self.event('legacy_protocol')
def process(self, timeout=None): def process(self, *, forever=True, timeout=None):
self.init_plugins() self.init_plugins()
XMLStream.process(self, timeout) XMLStream.process(self, forever=forever, timeout=timeout)
def init_plugins(self): def init_plugins(self):
for name in self.plugin: for name in self.plugin:

View File

@ -207,6 +207,9 @@ class XMLStream(asyncio.BaseProtocol):
#: ``_xmpp-client._tcp`` service. #: ``_xmpp-client._tcp`` service.
self.dns_service = None self.dns_service = None
#: An asyncio Future being done when the stream is disconnected.
self.disconnected = asyncio.Future()
self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive) self.add_event_handler('session_start', self._start_keepalive)
@ -299,7 +302,7 @@ class XMLStream(asyncio.BaseProtocol):
self.event("connection_failed", e) self.event("connection_failed", e)
asyncio.async(self._connect_routine()) asyncio.async(self._connect_routine())
def process(self, timeout=None): def process(self, *, forever=True, timeout=None):
"""Process all the available XMPP events (receiving or sending data on the """Process all the available XMPP events (receiving or sending data on the
socket(s), calling various registered callbacks, calling expired socket(s), calling various registered callbacks, calling expired
timers, handling signal events, etc). If timeout is None, this timers, handling signal events, etc). If timeout is None, this
@ -308,10 +311,15 @@ class XMLStream(asyncio.BaseProtocol):
""" """
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
if timeout is None: if timeout is None:
if forever:
loop.run_forever() loop.run_forever()
else: else:
future = asyncio.sleep(timeout) loop.run_until_complete(self.disconnected)
loop.run_until_complete(future) else:
tasks = [asyncio.sleep(timeout)]
if not forever:
tasks.append(self.disconnected)
loop.run_until_complete(asyncio.wait(tasks))
def init_parser(self): def init_parser(self):
"""init the XML parser. The parser must always be reset for each new """init the XML parser. The parser must always be reset for each new
@ -413,6 +421,7 @@ class XMLStream(asyncio.BaseProtocol):
if self.transport: if self.transport:
self.transport.abort() self.transport.abort()
self.event("killed") self.event("killed")
self.disconnected.set_result(True)
def reconnect(self, wait=2.0): def reconnect(self, wait=2.0):
"""Calls disconnect(), and once we are disconnected (after the timeout, or """Calls disconnect(), and once we are disconnected (after the timeout, or