Improve the send queue code a bit

This commit is contained in:
mathieui 2019-08-23 10:57:20 +02:00
parent d97efa0bd8
commit 110bbf8afc
No known key found for this signature in database
GPG Key ID: C59F84CEEFD616E3

View File

@ -12,7 +12,7 @@
:license: MIT, see LICENSE for more details :license: MIT, see LICENSE for more details
""" """
from typing import Optional from typing import Optional, Set, Callable
import functools import functools
import logging import logging
@ -896,10 +896,18 @@ class XMLStream(asyncio.BaseProtocol):
""" """
return xml return xml
async def continue_slow_send(self, task, already_used): async def _continue_slow_send(self,
log.debug('rescheduled task: %s', task) task: asyncio.Task,
already_used: Set[Callable[[ElementBase], Optional[StanzaBase]]]
) -> None:
"""
Used when an item in the send queue has taken too long to process.
This is away from the send queue and can take as much time as needed.
:param asyncio.Task task: the Task wrapping the coroutine
:param set already_used: Filters already used on this outgoing stanza
"""
data = await task data = await task
log.debug('data for rescheduled task %s : %s', task, data)
for filter in self.__filters['out']: for filter in self.__filters['out']:
if filter in already_used: if filter in already_used:
continue continue
@ -921,10 +929,12 @@ class XMLStream(asyncio.BaseProtocol):
else: else:
self.send_raw(data) self.send_raw(data)
async def run_filters(self): async def run_filters(self):
""" """
Background loop that processes stanzas to send. Background loop that processes stanzas to send.
""" """
class ContinueQueue(Exception): pass
while True: while True:
(data, use_filters) = await self.waiting_queue.get() (data, use_filters) = await self.waiting_queue.get()
try: try:
@ -935,9 +945,17 @@ class XMLStream(asyncio.BaseProtocol):
already_run_filters.add(filter) already_run_filters.add(filter)
if iscoroutinefunction(filter): if iscoroutinefunction(filter):
task = asyncio.create_task(filter(data)) task = asyncio.create_task(filter(data))
completed, pending = await wait({task}, timeout=1) completed, pending = await wait(
{task},
timeout=1,
)
if pending: if pending:
asyncio.ensure_future(self.continue_slow_send(task, already_run_filters)) asyncio.ensure_future(
self._continue_slow_send(
task,
already_run_filters
)
)
raise Exception("Slow coro, rescheduling") raise Exception("Slow coro, rescheduling")
data = task.result() data = task.result()
else: else:
@ -956,8 +974,10 @@ class XMLStream(asyncio.BaseProtocol):
self.send_raw(str_data) self.send_raw(str_data)
else: else:
self.send_raw(data) self.send_raw(data)
except: except ContinueQueue as exc:
log.error('Could not send stanza %s', data, exc_info=True) log.info('Stanza in send queue not sent: %s', exc)
except Exception:
log.error('Exception raised in send queue:', exc_info=True)
self.waiting_queue.task_done() self.waiting_queue.task_done()
def send(self, data, use_filters=True): def send(self, data, use_filters=True):