trying to get xmlstream to reconnect on stream failure

This commit is contained in:
Tom Nichols 2010-07-02 16:46:34 -04:00
parent 33602f232c
commit 6e93982fdf
2 changed files with 51 additions and 37 deletions

View File

@ -159,11 +159,11 @@ class ClientXMPP(basexmpp, XMLStream):
def reconnect(self): def reconnect(self):
self.disconnect(reconnect=True) self.disconnect(reconnect=True)
def disconnect(self, reconnect=False): def disconnect(self, reconnect=False, error=False):
self.event("disconnected") self.event("disconnected")
self.authenticated = False self.authenticated = False
self.sessionstarted = False self.sessionstarted = False
XMLStream.disconnect(self, reconnect) XMLStream.disconnect(self, reconnect, error)
def registerFeature(self, mask, pointer, breaker = False): def registerFeature(self, mask, pointer, breaker = False):
"""Register a stream feature.""" """Register a stream feature."""

View File

@ -42,9 +42,6 @@ if sys.version_info < (3, 0):
class RestartStream(Exception): class RestartStream(Exception):
pass pass
class CloseStream(Exception):
pass
stanza_extensions = {} stanza_extensions = {}
RECONNECT_MAX_DELAY = 3600 RECONNECT_MAX_DELAY = 3600
@ -86,7 +83,9 @@ class XMLStream(object):
self.namespace_map = {} self.namespace_map = {}
self.run = True # booleans are not volatile in Python and changes
# do not seem to be detected easily between threads.
self.quit = threading.Event()
def setSocket(self, socket): def setSocket(self, socket):
"Set the socket" "Set the socket"
@ -123,7 +122,7 @@ class XMLStream(object):
# holds the state lock. # holds the state lock.
delay = 1.0 # reconnection delay delay = 1.0 # reconnection delay
while self.run: while not self.quit.is_set():
logging.debug('connecting....') logging.debug('connecting....')
try: try:
if host and port: if host and port:
@ -160,7 +159,7 @@ class XMLStream(object):
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310 # http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY) delay = min(delay * RECONNECT_QUIESCE_FACTOR, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER) delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
logging.debug('Waiting %fs until next reconnect attempt...', delay) logging.debug('Waiting %.3fs until next reconnect attempt...', delay)
time.sleep(delay) time.sleep(delay)
@ -192,8 +191,8 @@ class XMLStream(object):
raise RestartStream() raise RestartStream()
def process(self, threaded=True): def process(self, threaded=True):
self.quit.clear()
self.scheduler.process(threaded=True) self.scheduler.process(threaded=True)
self.run = True
for t in range(0, HANDLER_THREADS): for t in range(0, HANDLER_THREADS):
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner) th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True) th.setDaemon(True)
@ -217,16 +216,16 @@ class XMLStream(object):
def _process(self): def _process(self):
"Start processing the socket." "Start processing the socket."
logging.debug('Process thread starting...') logging.debug('Process thread starting...')
while self.run: while not self.quit.is_set():
if not self.state.ensure('connected',wait=2): continue if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
try: try:
logging.debug(' ------------------------------- starting process loop...')
self.sendPriorityRaw(self.stream_header) self.sendPriorityRaw(self.stream_header)
while self.run and self.__readXML(): pass self.__readXML() # this loops until the stream is terminated.
except socket.timeout: except socket.timeout:
logging.debug('socket rcv timeout') # TODO currently this will re-send a stream header if this exception occurs.
pass # I don't think that's intended behavior.
except CloseStream: logging.warn('socket rcv timeout')
# TODO warn that the listener thread is exiting!!!
pass pass
except RestartStream: except RestartStream:
logging.debug("Restarting stream...") logging.debug("Restarting stream...")
@ -236,13 +235,25 @@ class XMLStream(object):
logging.debug("System interrupt detected") logging.debug("System interrupt detected")
self.shutdown() self.shutdown()
self.eventqueue.put(('quit', None, None)) self.eventqueue.put(('quit', None, None))
except cElementTree.XMLParserError:
logging.warn('XML RCV parsing error!', exc_info=1)
# don't restart the stream on an XML parse error.
except: except:
logging.exception('Unexpected error in RCV thread') logging.exception('Unexpected error in RCV thread')
if self.should_reconnect:
self.disconnect(reconnect=True) # if the RCV socket is terminated for whatever reason, our only sane choice of action is an attempt
# to re-establish the connection.
if not self.quit.is_set():
logging.info( 'about to reconnect..........' )
logging.info( 'about to reconnect..........' )
logging.info( 'about to reconnect..........' )
logging.info( 'about to reconnect..........' )
try:
self.disconnect(reconnect=self.should_reconnect, error=True)
except:
logging.exception( "WTF disconnect!" )
logging.info( 'reconnect complete!' )
logging.info( 'reconnect complete!' )
logging.info( 'reconnect complete!' )
logging.info( 'reconnect complete!' )
logging.info( 'reconnect complete!' )
logging.debug('Quitting Process thread') logging.debug('Quitting Process thread')
@ -262,8 +273,8 @@ class XMLStream(object):
if event == b'end': if event == b'end':
edepth += -1 edepth += -1
if edepth == 0 and event == b'end': if edepth == 0 and event == b'end':
# what is this case exactly? Premature EOF? logging.warn("Premature EOF from read socket; Ending readXML loop")
logging.debug("Ending readXML loop") # this is a premature EOF as far as I can tell; raise an exception so the stream get closed and re-established cleanly.
return False return False
elif edepth == 1: elif edepth == 1:
#self.xmlin.put(xmlobj) #self.xmlin.put(xmlobj)
@ -271,13 +282,14 @@ class XMLStream(object):
if root: root.clear() if root: root.clear()
if event == b'start': if event == b'start':
edepth += 1 edepth += 1
logging.debug("Exiting readXML loop") logging.warn("Exiting readXML loop")
# TODO under what conditions will this _ever_ occur?
return False return False
def _sendThread(self): def _sendThread(self):
logging.debug('send thread starting...') logging.debug('send thread starting...')
while self.run: while not self.quit.is_set():
if not self.state.ensure('connected',wait=2): continue if not self.state.ensure('connected',wait=2, block_on_transition=True): continue
data = None data = None
try: try:
@ -299,7 +311,7 @@ class XMLStream(object):
# some sort of event that could be handled by a common thread or the reader # some sort of event that could be handled by a common thread or the reader
# thread to perform reconnect and then re-initialize the handler threads as well. # thread to perform reconnect and then re-initialize the handler threads as well.
if self.should_reconnect: if self.should_reconnect:
self.disconnect(reconnect=True) self.disconnect(reconnect=True, error=True)
def sendRaw(self, data): def sendRaw(self, data):
self.sendqueue.put((1, data)) self.sendqueue.put((1, data))
@ -309,16 +321,18 @@ class XMLStream(object):
self.sendqueue.put((0, data)) self.sendqueue.put((0, data))
return True return True
def disconnect(self, reconnect=False): def disconnect(self, reconnect=False, error=False):
logging.info('AAAAAAAAAAAAAAAAAAAAAAAA')
with self.state.transition_ctx('connected','disconnected') as locked: with self.state.transition_ctx('connected','disconnected') as locked:
logging.info('BBBBBBBBBBBBBBBBBBBBBBBBBB')
if not locked: if not locked:
logging.warning("Already disconnected.") logging.warning("Already disconnected.")
return return
logging.debug("Disconnecting...") logging.debug("Disconnecting...")
self.sendRaw(self.stream_footer) # don't send a footer on error; if the stream is already closed,
time.sleep(5) # this won't get sent until the stream is re-initialized!
#send end of stream if not error: self.sendRaw(self.stream_footer) #send end of stream
#wait for end of stream back
try: try:
# self.socket.shutdown(socket.SHUT_RDWR) # self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close() self.socket.close()
@ -336,7 +350,7 @@ class XMLStream(object):
Disconnects and shuts down all event threads. Disconnects and shuts down all event threads.
''' '''
self.disconnect() self.disconnect()
self.run = False self.quit.set()
self.scheduler.run = False self.scheduler.run = False
def incoming_filter(self, xmlobj): def incoming_filter(self, xmlobj):
@ -344,9 +358,9 @@ class XMLStream(object):
def __spawnEvent(self, xmlobj): def __spawnEvent(self, xmlobj):
"watching xmlOut and processes handlers" "watching xmlOut and processes handlers"
#convert XML into Stanza if logging.getLogger().isEnabledFor(logging.DEBUG):
# TODO surround this log statement with an if, it's expensive
logging.debug("RECV: %s" % cElementTree.tostring(xmlobj)) logging.debug("RECV: %s" % cElementTree.tostring(xmlobj))
#convert XML into Stanza
xmlobj = self.incoming_filter(xmlobj) xmlobj = self.incoming_filter(xmlobj)
stanza = None stanza = None
for stanza_class in self.__root_stanza: for stanza_class in self.__root_stanza:
@ -374,7 +388,7 @@ class XMLStream(object):
def _eventRunner(self): def _eventRunner(self):
logging.debug("Loading event runner") logging.debug("Loading event runner")
while self.run: while not self.quit.is_set():
try: try:
event = self.eventqueue.get(True, timeout=5) event = self.eventqueue.get(True, timeout=5)
except queue.Empty: except queue.Empty: