Correct handling of acks for XEP-0198 under heavy load.

This commit is contained in:
Lance Stout 2012-03-21 13:00:43 -07:00
parent d5484808a7
commit fa4c52e499
2 changed files with 41 additions and 33 deletions

View File

@ -60,6 +60,8 @@ class XEP_0198(BasePlugin):
#: the server. Setting this to ``1`` will send an ack request after #: the server. Setting this to ``1`` will send an ack request after
#: every sent stanza. Defaults to ``5``. #: every sent stanza. Defaults to ``5``.
self.window = self.config.get('window', 5) self.window = self.config.get('window', 5)
self.window_counter = self.window
self.window_counter_lock = threading.Lock()
#: Control whether or not the ability to resume the stream will be #: Control whether or not the ability to resume the stream will be
#: requested when enabling stream management. Defaults to ``True``. #: requested when enabling stream management. Defaults to ``True``.
@ -132,12 +134,12 @@ class XEP_0198(BasePlugin):
ack = stanza.Ack(self.xmpp) ack = stanza.Ack(self.xmpp)
with self.handled_lock: with self.handled_lock:
ack['h'] = self.handled ack['h'] = self.handled
ack.send() self.xmpp.send_raw(str(ack), now=True)
def request_ack(self, e=None): def request_ack(self, e=None):
"""Request an ack from the server.""" """Request an ack from the server."""
req = stanza.RequestAck(self.xmpp) req = stanza.RequestAck(self.xmpp)
req.send() self.xmpp.send_queue.put(str(req))
def _handle_sm_feature(self, features): def _handle_sm_feature(self, features):
""" """
@ -158,7 +160,7 @@ class XEP_0198(BasePlugin):
self.enabled.set() self.enabled.set()
enable = stanza.Enable(self.xmpp) enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume enable['resume'] = self.allow_resume
enable.send() enable.send(now=True)
self.handled = 0 self.handled = 0
elif self.sm_id and self.allow_resume: elif self.sm_id and self.allow_resume:
self.enabled.set() self.enabled.set()
@ -261,6 +263,9 @@ class XEP_0198(BasePlugin):
self.seq = (self.seq + 1) % MAX_SEQ self.seq = (self.seq + 1) % MAX_SEQ
seq = self.seq seq = self.seq
self.unacked_queue.append((seq, stanza)) self.unacked_queue.append((seq, stanza))
if len(self.unacked_queue) > self.window: with self.window_counter_lock:
self.xmpp.event('need_ack') self.window_counter -= 1
if self.window_counter == 0:
self.window_counter = self.window
self.request_ack()
return stanza return stanza

View File

@ -268,6 +268,7 @@ class XMLStream(object):
#: A queue of string data to be sent over the stream. #: A queue of string data to be sent over the stream.
self.send_queue = queue.Queue() self.send_queue = queue.Queue()
self.send_queue_lock = threading.Lock() self.send_queue_lock = threading.Lock()
self.send_lock = threading.RLock()
#: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for #: A :class:`~sleekxmpp.xmlstream.scheduler.Scheduler` instance for
#: executing callbacks in the future based on time delays. #: executing callbacks in the future based on time delays.
@ -1180,21 +1181,22 @@ class XMLStream(object):
sent = 0 sent = 0
count = 0 count = 0
tries = 0 tries = 0
while sent < total and not self.stop.is_set(): with self.send_lock:
try: while sent < total and not self.stop.is_set():
sent += self.socket.send(data[sent:]) try:
count += 1 sent += self.socket.send(data[sent:])
except ssl.SSLError as serr: count += 1
if tries >= self.ssl_retry_max: except ssl.SSLError as serr:
log.debug('SSL error - max retries reached') if tries >= self.ssl_retry_max:
self.exception(serr) log.debug('SSL error - max retries reached')
log.warning("Failed to send %s", data) self.exception(serr)
if reconnect is None: log.warning("Failed to send %s", data)
reconnect = self.auto_reconnect if reconnect is None:
self.disconnect(reconnect) reconnect = self.auto_reconnect
log.warning('SSL write error - reattempting') self.disconnect(reconnect)
time.sleep(self.ssl_retry_delay) log.warning('SSL write error - reattempting')
tries += 1 time.sleep(self.ssl_retry_delay)
tries += 1
if count > 1: if count > 1:
log.debug('SENT: %d chunks', count) log.debug('SENT: %d chunks', count)
except Socket.error as serr: except Socket.error as serr:
@ -1531,19 +1533,20 @@ class XMLStream(object):
count = 0 count = 0
tries = 0 tries = 0
try: try:
while sent < total and not self.stop.is_set(): with self.send_lock:
try: while sent < total and not self.stop.is_set():
sent += self.socket.send(enc_data[sent:]) try:
count += 1 sent += self.socket.send(enc_data[sent:])
except ssl.SSLError as serr: count += 1
if tries >= self.ssl_retry_max: except ssl.SSLError as serr:
log.debug('SSL error - max retries reached') if tries >= self.ssl_retry_max:
self.exception(serr) log.debug('SSL error - max retries reached')
log.warning("Failed to send %s", data) self.exception(serr)
self.disconnect(self.auto_reconnect) log.warning("Failed to send %s", data)
log.warning('SSL write error - reattempting') self.disconnect(self.auto_reconnect)
time.sleep(self.ssl_retry_delay) log.warning('SSL write error - reattempting')
tries += 1 time.sleep(self.ssl_retry_delay)
tries += 1
if count > 1: if count > 1:
log.debug('SENT: %d chunks', count) log.debug('SENT: %d chunks', count)
self.send_queue.task_done() self.send_queue.task_done()