Remove the send_thread() function, and the stop threading.event
This commit is contained in:
parent
4328762076
commit
a2cad40f91
@ -267,10 +267,6 @@ class XMLStream(object):
|
|||||||
#: :attr:`whitespace_keepalive` is enabled.
|
#: :attr:`whitespace_keepalive` is enabled.
|
||||||
self.whitespace_keepalive_interval = 300
|
self.whitespace_keepalive_interval = 300
|
||||||
|
|
||||||
#: An :class:`~threading.Event` to signal that the application
|
|
||||||
#: is stopping, and that all threads should shutdown.
|
|
||||||
self.stop = threading.Event()
|
|
||||||
|
|
||||||
#: An :class:`~threading.Event` to signal receiving a closing
|
#: An :class:`~threading.Event` to signal receiving a closing
|
||||||
#: stream tag from the server.
|
#: stream tag from the server.
|
||||||
self.stream_end_event = threading.Event()
|
self.stream_end_event = threading.Event()
|
||||||
@ -424,8 +420,6 @@ class XMLStream(object):
|
|||||||
localhost
|
localhost
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.stop.clear()
|
|
||||||
|
|
||||||
if host and port:
|
if host and port:
|
||||||
self.address = (host, int(port))
|
self.address = (host, int(port))
|
||||||
try:
|
try:
|
||||||
@ -552,7 +546,6 @@ class XMLStream(object):
|
|||||||
|
|
||||||
def abort(self):
|
def abort(self):
|
||||||
self.session_started_event.clear()
|
self.session_started_event.clear()
|
||||||
self.set_stop()
|
|
||||||
if self._disconnect_wait_for_threads:
|
if self._disconnect_wait_for_threads:
|
||||||
self._wait_for_threads()
|
self._wait_for_threads()
|
||||||
try:
|
try:
|
||||||
@ -1087,9 +1080,6 @@ class XMLStream(object):
|
|||||||
if self.__thread_count == 0:
|
if self.__thread_count == 0:
|
||||||
self.__thread_cond.notify()
|
self.__thread_cond.notify()
|
||||||
|
|
||||||
def set_stop(self):
|
|
||||||
self.stop.set()
|
|
||||||
|
|
||||||
def _build_stanza(self, xml, default_ns=None):
|
def _build_stanza(self, xml, default_ns=None):
|
||||||
"""Create a stanza object from a given XML object.
|
"""Create a stanza object from a given XML object.
|
||||||
|
|
||||||
@ -1221,69 +1211,6 @@ class XMLStream(object):
|
|||||||
else:
|
else:
|
||||||
self.exception(e)
|
self.exception(e)
|
||||||
|
|
||||||
def _send_thread(self):
|
|
||||||
"""Extract stanzas from the send queue and send them on the stream."""
|
|
||||||
try:
|
|
||||||
while not self.stop.is_set():
|
|
||||||
while not self.stop.is_set() and \
|
|
||||||
not self.session_started_event.is_set():
|
|
||||||
self.session_started_event.wait(timeout=0.1) # Wait for session start
|
|
||||||
if self.__failed_send_stanza is not None:
|
|
||||||
data = self.__failed_send_stanza
|
|
||||||
self.__failed_send_stanza = None
|
|
||||||
else:
|
|
||||||
data = self.send_queue.get() # Wait for data to send
|
|
||||||
if data is None:
|
|
||||||
continue
|
|
||||||
log.debug("SEND: %s", data)
|
|
||||||
enc_data = data.encode('utf-8')
|
|
||||||
total = len(enc_data)
|
|
||||||
sent = 0
|
|
||||||
count = 0
|
|
||||||
tries = 0
|
|
||||||
try:
|
|
||||||
with self.send_lock:
|
|
||||||
while sent < total and not self.stop.is_set() and \
|
|
||||||
self.session_started_event.is_set():
|
|
||||||
try:
|
|
||||||
sent += self.socket.send(enc_data[sent:])
|
|
||||||
count += 1
|
|
||||||
except Socket.error as serr:
|
|
||||||
if serr.errno != errno.EINTR:
|
|
||||||
raise
|
|
||||||
except ssl.SSLError as serr:
|
|
||||||
if tries >= self.ssl_retry_max:
|
|
||||||
log.debug('SSL error: max retries reached')
|
|
||||||
self.exception(serr)
|
|
||||||
log.warning("Failed to send %s", data)
|
|
||||||
if not self.stop.is_set():
|
|
||||||
self.disconnect(self.auto_reconnect,
|
|
||||||
send_close=False)
|
|
||||||
log.warning('SSL write error: retrying')
|
|
||||||
if not self.stop.is_set():
|
|
||||||
time.sleep(self.ssl_retry_delay)
|
|
||||||
tries += 1
|
|
||||||
if count > 1:
|
|
||||||
log.debug('SENT: %d chunks', count)
|
|
||||||
self.send_queue.task_done()
|
|
||||||
except (Socket.error, ssl.SSLError) as serr:
|
|
||||||
self.event('socket_error', serr, direct=True)
|
|
||||||
log.warning("Failed to send %s", data)
|
|
||||||
if not self.stop.is_set():
|
|
||||||
self.__failed_send_stanza = data
|
|
||||||
self._end_thread('send')
|
|
||||||
self.disconnect(self.auto_reconnect, send_close=False)
|
|
||||||
return
|
|
||||||
except Exception as ex:
|
|
||||||
log.exception('Unexpected error in send thread: %s', ex)
|
|
||||||
self.exception(ex)
|
|
||||||
if not self.stop.is_set():
|
|
||||||
self._end_thread('send')
|
|
||||||
self.disconnect(self.auto_reconnect)
|
|
||||||
return
|
|
||||||
|
|
||||||
self._end_thread('send')
|
|
||||||
|
|
||||||
def exception(self, exception):
|
def exception(self, exception):
|
||||||
"""Process an unknown exception.
|
"""Process an unknown exception.
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user