reconnection quiesce logic

This commit is contained in:
Thom Nichols 2010-06-02 14:18:09 -04:00
parent 8227affd7f
commit 4295a66c70

View File

@ -16,6 +16,7 @@ from . stanzabase import StanzaBase
from xml.etree import cElementTree from xml.etree import cElementTree
from xml.parsers import expat from xml.parsers import expat
import logging import logging
import random
import socket import socket
import threading import threading
import time import time
@ -46,6 +47,10 @@ class CloseStream(Exception):
stanza_extensions = {} stanza_extensions = {}
RECONNECT_MAX_DELAY = 3600
RECONNECT_QUIESCE_FACTOR = 1.6180339887498948 # Phi
RECONNECT_QUIESCE_JITTER = 0.11962656472 # molar Planck constant times c, joule meter/mole
class XMLStream(object): class XMLStream(object):
"A connection manager with XML events." "A connection manager with XML events."
@ -101,7 +106,12 @@ class XMLStream(object):
def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True): def connectTCP(self, host='', port=0, use_ssl=None, use_tls=None, reattempt=True):
"Connect and create socket" "Connect and create socket"
while reattempt and not self.state['connected']: # the self.state part is redundant.
# Note that this is thread-safe by merit of being called solely from connect() which
# holds the state lock.
delay = 1.0 # reconnection delay
while self.run:
logging.debug('connecting....') logging.debug('connecting....')
try: try:
if host and port: if host and port:
@ -115,21 +125,35 @@ class XMLStream(object):
else: else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None) #10) self.socket.settimeout(None) #10)
if self.use_ssl and self.ssl_support: if self.use_ssl and self.ssl_support:
logging.debug("Socket Wrapped for SSL") logging.debug("Socket Wrapped for SSL")
self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs) self.socket = ssl.wrap_socket(self.socket,ca_certs=self.ca_certs)
except:
logging.exception("Connection error")
try:
self.socket.connect(self.address) self.socket.connect(self.address)
self.filesocket = self.socket.makefile('rb', 0) self.filesocket = self.socket.makefile('rb', 0)
if not self.state.transition('connecting','connected'): if not self.state.transition('connecting','connected'):
logging.error( "State transition error!!!! Shouldn't have happened" ) logging.error( "State transition error!!!! Shouldn't have happened" )
logging.debug('connect complete.') logging.debug('connect complete.')
return True return True
except socket.error as serr: except socket.error as serr:
logging.error("Could not connect. Socket Error #%s: %s" % (serr.errno, serr.strerror)) logging.exception("Socket Error #%s: %s", serr.errno, serr.strerror)
time.sleep(1) # TODO proper quiesce if connection attempt fails if not reattempt: return False
except:
logging.exception("Connection error")
if not reattempt: return False
# quiesce if rconnection fails:
# This code based loosely on Twisted internet.protocol
# http://twistedmatrix.com/trac/browser/trunk/twisted/internet/protocol.py#L310
delay = min(delay * RECONNECT_QUIESCE_JITTER, RECONNECT_MAX_DELAY)
delay = random.normalvariate(delay, delay * RECONNECT_QUIESCE_JITTER)
logging.debug('Waiting %fs until next reconnect attempt...', delay)
time.sleep(delay)
def connectUnix(self, filepath): def connectUnix(self, filepath):
"Connect to Unix file and create socket" "Connect to Unix file and create socket"