Make connect(), abort() and reconnect() work

All the auto_reconnect, connect_retry logic and that kind of stuf has been
entirely removed.
This commit is contained in:
Florent Le Coz 2014-07-21 20:31:37 +02:00
parent 373505f483
commit 5c769632e8

View File

@ -142,11 +142,6 @@ class XMLStream(object):
self._der_cert = None self._der_cert = None
#: The current amount to time to delay attempting to reconnect.
#: This value doubles (with some jitter) with each failed
#: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None
#: The connection state machine tracks if the stream is #: The connection state machine tracks if the stream is
#: ``'connected'`` or ``'disconnected'``. #: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected')) self.state = StateMachine(('disconnected', 'connected'))
@ -241,7 +236,6 @@ class XMLStream(object):
self.__thread_cond = threading.Condition() self.__thread_cond = threading.Condition()
self.__active_threads = set() self.__active_threads = set()
self._use_daemons = False self._use_daemons = False
self._disconnect_wait_for_threads = True
self._id = 0 self._id = 0
self._id_lock = threading.Lock() self._id_lock = threading.Lock()
@ -249,13 +243,6 @@ class XMLStream(object):
#: We use an ID prefix to ensure that all ID values are unique. #: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4() self._id_prefix = '%s-' % uuid.uuid4()
#: The :attr:`disconnect_wait` setting is the default value
#: for controlling if the system waits for the send queue to
#: empty before ending the stream. This may be overridden by
#: passing ``wait=True`` or ``wait=False`` to :meth:`disconnect`.
#: The default :attr:`disconnect_wait` value is ``False``.
self.disconnect_wait = False
#: A list of DNS results that have not yet been tried. #: A list of DNS results that have not yet been tried.
self.dns_answers = [] self.dns_answers = []
@ -380,6 +367,7 @@ class XMLStream(object):
self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end")) self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end"))
def connection_made(self, transport): def connection_made(self, transport):
self.event("connected")
self.transport = transport self.transport = transport
self.socket = self.transport.get_extra_info("socket") self.socket = self.transport.get_extra_info("socket")
self.init_parser() self.init_parser()
@ -396,102 +384,72 @@ class XMLStream(object):
stream=self, stream=self,
top_level=True, top_level=True,
open_only=True)) open_only=True))
# Perform any stream initialization actions, such
# as handshakes.
self.stream_end_event.clear()
self.start_stream_handler(self.xml_root)
# We have a successful stream connection, so reset
# exponential backoff for new reconnect attempts.
self.reconnect_delay = 1.0
self.xml_depth += 1 self.xml_depth += 1
if event == 'end': if event == 'end':
self.xml_depth -= 1 self.xml_depth -= 1
if self.xml_depth == 0: if self.xml_depth == 0:
# The stream's root element has closed, # The stream's root element has closed,
# terminating the stream. # terminating the stream.
log.debug("End of stream recieved") log.debug("End of stream received")
self.stream_end_event.set() self.abort()
return False
elif self.xml_depth == 1: elif self.xml_depth == 1:
# We only raise events for stanzas that are direct # We only raise events for stanzas that are direct
# children of the root element. # children of the root element.
try: self.__spawn_event(xml)
self.__spawn_event(xml)
except RestartStream:
return True
if self.xml_root is not None: if self.xml_root is not None:
# Keep the root element empty of children to # Keep the root element empty of children to
# save on memory use. # save on memory use.
self.xml_root.clear() self.xml_root.clear()
def connection_lost(self): def eof_received(self):
"""
When the TCP connection is properly closed by the remote end
"""
log.debug("eof_received")
def connection_lost(self, exception):
"""
On any kind of disconnection
"""
log.warning("connection_lost: %s", (exception,))
if self.end_session_on_disconnect:
self.event('session_end')
self.parser = None self.parser = None
self.transport = None self.transport = None
self.socket = None self.socket = None
self.event("disconnected")
def disconnect(self, wait=2.0):
"""Close the XML stream and wait for an acknowldgement from the server for
at most `wait` seconds. After the given number of seconds has
passed without a response from the serveur, abort() is called. If
wait is 0.0, this is equivalent to calling abort() directly.
Does nothing if we are not connected.
:param wait: Time to wait for a response from the server.
def _session_timeout_check(self, event=None):
""" """
Add check to ensure that a session is established within if self.transport:
a reasonable amount of time. self.send_raw(self.stream_footer)
""" self.schedule('Disconnect wait', wait,
self.abort, repeat=False)
def _handle_session_timeout():
if not self.session_started_event.is_set():
log.debug("Session start has taken more " + \
"than %d seconds", self.session_timeout)
self.disconnect(reconnect=self.auto_reconnect)
self.schedule("Session timeout check",
self.session_timeout,
_handle_session_timeout)
def disconnect(self, reconnect=False, wait=None, send_close=True):
"""Terminate processing and close the XML streams.
Optionally, the connection may be reconnected and
resume processing afterwards.
If the disconnect should take place after all items
in the send queue have been sent, use ``wait=True``.
.. warning::
If you are constantly adding items to the queue
such that it is never empty, then the disconnect will
not occur and the call will continue to block.
:param reconnect: Flag indicating if the connection
and processing should be restarted.
Defaults to ``False``.
:param wait: Flag indicating if the send queue should
be emptied before disconnecting, overriding
:attr:`disconnect_wait`.
:param send_close: Flag indicating if the stream footer
should be sent before terminating the
connection. Setting this to ``False``
prevents error loops when trying to
disconnect after a socket error.
"""
# TODO
pass
def abort(self): def abort(self):
self.session_started_event.clear() """
if self._disconnect_wait_for_threads: Forcibly close the connection
self._wait_for_threads() """
try: if self.transport:
self.socket.shutdown(Socket.SHUT_RDWR) self.transport.abort()
self.socket.close() self.event("killed", direct=True)
except Socket.error:
pass
self.state.transition_any(['connected', 'disconnected'], 'disconnected', func=lambda: True)
self.event("killed", direct=True)
def reconnect(self, reattempt=True, wait=False, send_close=True): def reconnect(self, wait=2.0):
"""Reset the stream's state and reconnect to the server.""" """Calls disconnect(), and once we are disconnected (after the timeout, or
when the server acknowledgement is received), call connect()
"""
log.debug("reconnecting...") log.debug("reconnecting...")
self.connect() self.disconnect(wait)
self.add_event_handler('disconnected', self.connect, disposable=True)
def configure_socket(self): def configure_socket(self):
"""Set timeout and other options for self.socket. """Set timeout and other options for self.socket.
@ -581,9 +539,10 @@ class XMLStream(object):
repeat=True) repeat=True)
def _remove_schedules(self, event): def _remove_schedules(self, event):
"""Remove whitespace keepalive and certificate expiration schedules.""" """Remove some schedules that become pointless when disconnected"""
self.cancel_schedule('Whitespace Keepalive') self.cancel_schedule('Whitespace Keepalive')
self.cancel_schedule('Certificate Expiration') self.cancel_schedule('Certificate Expiration')
self.cancel_schedule('Disconnect wait')
def start_stream_handler(self, xml): def start_stream_handler(self, xml):
"""Perform any initialization actions, such as handshakes, """Perform any initialization actions, such as handshakes,