Merge branch 'develop' of github.com:fritzy/SleekXMPP into develop

This commit is contained in:
Lance Stout 2013-06-22 14:35:20 -07:00
commit e76a483931
3 changed files with 39 additions and 17 deletions

View File

@ -13,6 +13,7 @@
""" """
from socket import _fileobject from socket import _fileobject
import errno
import socket import socket
@ -29,7 +30,13 @@ class FileSocket(_fileobject):
"""Read data from the socket as if it were a file.""" """Read data from the socket as if it were a file."""
if self._sock is None: if self._sock is None:
return None return None
data = self._sock.recv(size) while True:
try:
data = self._sock.recv(size)
break
except socket.error as serr:
if serr.errno != errno.EINTR:
raise
if data is not None: if data is not None:
return data return data

View File

@ -20,6 +20,11 @@ import itertools
from sleekxmpp.util import Queue, QueueEmpty from sleekxmpp.util import Queue, QueueEmpty
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1.0
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -120,6 +125,10 @@ class Scheduler(object):
#: Lock for accessing the task queue. #: Lock for accessing the task queue.
self.schedule_lock = threading.RLock() self.schedule_lock = threading.RLock()
#: The time in seconds to wait for events from the event queue,
#: and also the time between checks for the process stop signal.
self.wait_timeout = WAIT_TIMEOUT
def process(self, threaded=True, daemon=False): def process(self, threaded=True, daemon=False):
"""Begin accepting and processing scheduled tasks. """Begin accepting and processing scheduled tasks.
@ -139,24 +148,25 @@ class Scheduler(object):
self.run = True self.run = True
try: try:
while self.run and not self.stop.is_set(): while self.run and not self.stop.is_set():
wait = 0.1
updated = False updated = False
if self.schedule: if self.schedule:
wait = self.schedule[0].next - time.time() wait = self.schedule[0].next - time.time()
else:
wait = self.wait_timeout
try: try:
if wait <= 0.0: if wait <= 0.0:
newtask = self.addq.get(False) newtask = self.addq.get(False)
else: else:
if wait >= 3.0:
wait = 3.0
newtask = None newtask = None
elapsed = 0 while self.run and \
while not self.stop.is_set() and \ not self.stop.is_set() and \
newtask is None and \ newtask is None and \
elapsed < wait: wait > 0:
newtask = self.addq.get(True, 0.1) try:
elapsed += 0.1 newtask = self.addq.get(True, min(wait, self.wait_timeout))
except QueueEmpty: except QueueEmpty: # Nothing to add, nothing to do. Check run flags and continue waiting.
wait -= self.wait_timeout
except QueueEmpty: # Time to run some tasks, and no new tasks to add.
self.schedule_lock.acquire() self.schedule_lock.acquire()
#select only those tasks which are to be executed now #select only those tasks which are to be executed now
relevant = itertools.takewhile( relevant = itertools.takewhile(
@ -174,11 +184,11 @@ class Scheduler(object):
# only need to resort tasks if a repeated task has # only need to resort tasks if a repeated task has
# been kept in the list. # been kept in the list.
updated = True updated = True
else: else: # Add new task
updated = True
self.schedule_lock.acquire() self.schedule_lock.acquire()
if newtask is not None: if newtask is not None:
self.schedule.append(newtask) self.schedule.append(newtask)
updated = True
finally: finally:
if updated: if updated:
self.schedule.sort(key=lambda task: task.next) self.schedule.sort(key=lambda task: task.next)

View File

@ -49,7 +49,7 @@ RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the #: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal. #: time between checks for the process stop signal.
WAIT_TIMEOUT = 0.1 WAIT_TIMEOUT = 1.0
#: The number of threads to use to handle XML stream events. This is not the #: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads. #: same as the number of custom event handling threads.
@ -1294,6 +1294,9 @@ class XMLStream(object):
try: try:
sent += self.socket.send(data[sent:]) sent += self.socket.send(data[sent:])
count += 1 count += 1
except Socket.error as serr:
if serr.errno != errno.EINTR:
raise
except ssl.SSLError as serr: except ssl.SSLError as serr:
if tries >= self.ssl_retry_max: if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached') log.debug('SSL error: max retries reached')
@ -1629,8 +1632,7 @@ class XMLStream(object):
try: try:
while not self.stop.is_set(): while not self.stop.is_set():
try: try:
wait = self.wait_timeout event = self.event_queue.get(True, timeout=self.wait_timeout)
event = self.event_queue.get(True, timeout=wait)
except QueueEmpty: except QueueEmpty:
event = None event = None
if event is None: if event is None:
@ -1693,13 +1695,13 @@ class XMLStream(object):
while not self.stop.is_set(): while not self.stop.is_set():
while not self.stop.is_set() and \ while not self.stop.is_set() and \
not self.session_started_event.is_set(): not self.session_started_event.is_set():
self.session_started_event.wait(timeout=0.1) self.session_started_event.wait(timeout=0.1) # Wait for session start
if self.__failed_send_stanza is not None: if self.__failed_send_stanza is not None:
data = self.__failed_send_stanza data = self.__failed_send_stanza
self.__failed_send_stanza = None self.__failed_send_stanza = None
else: else:
try: try:
data = self.send_queue.get(True, 1) data = self.send_queue.get(True, timeout=self.wait_timeout) # Wait for data to send
except QueueEmpty: except QueueEmpty:
continue continue
log.debug("SEND: %s", data) log.debug("SEND: %s", data)
@ -1715,6 +1717,9 @@ class XMLStream(object):
try: try:
sent += self.socket.send(enc_data[sent:]) sent += self.socket.send(enc_data[sent:])
count += 1 count += 1
except Socket.error as serr:
if serr.errno != errno.EINTR:
raise
except ssl.SSLError as serr: except ssl.SSLError as serr:
if tries >= self.ssl_retry_max: if tries >= self.ssl_retry_max:
log.debug('SSL error: max retries reached') log.debug('SSL error: max retries reached')