Improve handling disconnections.

- Add option for disconnecting without sending </stream>:

    self.disconnect(send_close=False)

- Optionally distinguish between session_end and disconnected based
  on if </stream> was sent.

    self.end_session_on_disconnect = False
This commit is contained in:
Lance Stout 2012-03-27 23:24:42 -07:00
parent f1fde07eb9
commit 94923ae898

View File

@ -262,6 +262,10 @@ class XMLStream(object):
#: after connecting before reconnecting and trying again. #: after connecting before reconnecting and trying again.
self.session_timeout = 45 self.session_timeout = 45
#: Flag for controlling if the session can be considered ended
#: if the connection is terminated.
self.end_session_on_disconnect = True
#: A queue of stream, custom, and scheduled events to be processed. #: A queue of stream, custom, and scheduled events to be processed.
self.event_queue = queue.Queue() self.event_queue = queue.Queue()
@ -304,7 +308,7 @@ class XMLStream(object):
self.add_event_handler('connected', self._handle_connected) self.add_event_handler('connected', self._handle_connected)
self.add_event_handler('session_start', self._start_keepalive) self.add_event_handler('session_start', self._start_keepalive)
self.add_event_handler('session_end', self._end_keepalive) self.add_event_handler('disconnected', self._end_keepalive)
def use_signals(self, signals=None): def use_signals(self, signals=None):
"""Register signal handlers for ``SIGHUP`` and ``SIGTERM``. """Register signal handlers for ``SIGHUP`` and ``SIGTERM``.
@ -581,7 +585,7 @@ class XMLStream(object):
self.session_timeout, self.session_timeout,
_handle_session_timeout) _handle_session_timeout)
def disconnect(self, reconnect=False, wait=None): def disconnect(self, reconnect=False, wait=None, send_close=True):
"""Terminate processing and close the XML streams. """Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and Optionally, the connection may be reconnected and
@ -602,13 +606,20 @@ class XMLStream(object):
:param wait: Flag indicating if the send queue should :param wait: Flag indicating if the send queue should
be emptied before disconnecting, overriding be emptied before disconnecting, overriding
:attr:`disconnect_wait`. :attr:`disconnect_wait`.
:param send_close: Flag indicating if the stream footer
should be sent before terminating the
connection. Setting this to ``False``
prevents error loops when trying to
disconnect after a socket error.
""" """
self.state.transition('connected', 'disconnected', self.state.transition('connected', 'disconnected',
wait=2.0, wait=2.0,
func=self._disconnect, args=(reconnect, wait)) func=self._disconnect,
args=(reconnect, wait, send_close))
def _disconnect(self, reconnect=False, wait=None): def _disconnect(self, reconnect=False, wait=None, send_close=True):
self.event('session_end', direct=True) if self.end_session_on_disconnect or send_close:
self.event('session_end', direct=True)
# Wait for the send queue to empty. # Wait for the send queue to empty.
if wait is not None: if wait is not None:
@ -618,13 +629,20 @@ class XMLStream(object):
self.send_queue.join() self.send_queue.join()
# Send the end of stream marker. # Send the end of stream marker.
self.send_raw(self.stream_footer, now=True) if send_close:
self.send_raw(self.stream_footer, now=True)
self.session_started_event.clear() self.session_started_event.clear()
# Wait for confirmation that the stream was # Wait for confirmation that the stream was
# closed in the other direction. # closed in the other direction. If we didn't
# send a stream footer we don't need to wait
# since the server won't know to respond.
self.auto_reconnect = reconnect self.auto_reconnect = reconnect
log.debug('Waiting for %s from server', self.stream_footer) if send_close:
self.stream_end_event.wait(4) log.debug('Waiting for %s from server', self.stream_footer)
self.stream_end_event.wait(4)
else:
self.stream_end_event.set()
if not self.auto_reconnect: if not self.auto_reconnect:
self.stop.set() self.stop.set()
try: try:
@ -638,12 +656,14 @@ class XMLStream(object):
self.event("disconnected", direct=True) self.event("disconnected", direct=True)
return True return True
def reconnect(self, reattempt=True): def reconnect(self, reattempt=True, wait=False, send_close=True):
"""Reset the stream's state and reconnect to the server.""" """Reset the stream's state and reconnect to the server."""
log.debug("reconnecting...") log.debug("reconnecting...")
if self.state.ensure('connected'): if self.state.ensure('connected'):
self.state.transition('connected', 'disconnected', wait=2.0, self.state.transition('connected', 'disconnected',
func=self._disconnect, args=(True,)) wait=2.0,
func=self._disconnect,
args=(True, wait, send_close))
attempts = self.reconnect_max_attempts attempts = self.reconnect_max_attempts
@ -1190,7 +1210,7 @@ class XMLStream(object):
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
if reconnect is None: if reconnect is None:
reconnect = self.auto_reconnect reconnect = self.auto_reconnect
self.disconnect(reconnect) self.disconnect(reconnect, send_close=False)
log.warning('SSL write error - reattempting') log.warning('SSL write error - reattempting')
time.sleep(self.ssl_retry_delay) time.sleep(self.ssl_retry_delay)
tries += 1 tries += 1
@ -1201,7 +1221,7 @@ class XMLStream(object):
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
if reconnect is None: if reconnect is None:
reconnect = self.auto_reconnect reconnect = self.auto_reconnect
self.disconnect(reconnect) self.disconnect(reconnect, send_close=False)
else: else:
self.send_queue.put(data) self.send_queue.put(data)
return True return True
@ -1540,7 +1560,7 @@ class XMLStream(object):
log.debug('SSL error - max retries reached') log.debug('SSL error - max retries reached')
self.exception(serr) self.exception(serr)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.disconnect(self.auto_reconnect) self.disconnect(self.auto_reconnect, send_close=False)
log.warning('SSL write error - reattempting') log.warning('SSL write error - reattempting')
time.sleep(self.ssl_retry_delay) time.sleep(self.ssl_retry_delay)
tries += 1 tries += 1
@ -1551,7 +1571,7 @@ class XMLStream(object):
self.event('socket_error', serr, direct=True) self.event('socket_error', serr, direct=True)
log.warning("Failed to send %s", data) log.warning("Failed to send %s", data)
self.__failed_send_stanza = data self.__failed_send_stanza = data
self.disconnect(self.auto_reconnect) self.disconnect(self.auto_reconnect, send_close=False)
except Exception as ex: except Exception as ex:
log.exception('Unexpected error in send thread: %s', ex) log.exception('Unexpected error in send thread: %s', ex)
self.exception(ex) self.exception(ex)