|
|
|
|
@@ -362,7 +362,7 @@ class XMLStream(object):
|
|
|
|
|
else:
|
|
|
|
|
delay = min(self.reconnect_delay * 2, self.reconnect_max_delay)
|
|
|
|
|
delay = random.normalvariate(delay, delay * 0.1)
|
|
|
|
|
log.debug('Waiting %s seconds before connecting.' , delay)
|
|
|
|
|
log.debug('Waiting %s seconds before connecting.', delay)
|
|
|
|
|
time.sleep(delay)
|
|
|
|
|
|
|
|
|
|
if self.use_proxy:
|
|
|
|
|
@@ -391,7 +391,7 @@ class XMLStream(object):
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
if not self.use_proxy:
|
|
|
|
|
log.debug("Connecting to %s:%s" , self.address)
|
|
|
|
|
log.debug("Connecting to %s:%s", self.address)
|
|
|
|
|
self.socket.connect(self.address)
|
|
|
|
|
|
|
|
|
|
self.set_socket(self.socket, ignore=True)
|
|
|
|
|
@@ -402,8 +402,8 @@ class XMLStream(object):
|
|
|
|
|
except Socket.error as serr:
|
|
|
|
|
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
|
|
|
|
|
self.event('socket_error', serr)
|
|
|
|
|
log.error(error_msg % (self.address[0], self.address[1],
|
|
|
|
|
serr.errno, serr.strerror))
|
|
|
|
|
log.error(error_msg, self.address[0], self.address[1],
|
|
|
|
|
serr.errno, serr.strerror)
|
|
|
|
|
self.reconnect_delay = delay
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
@@ -435,18 +435,18 @@ class XMLStream(object):
|
|
|
|
|
headers = '\r\n'.join(headers) + '\r\n\r\n'
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
log.debug("Connecting to proxy: %s:%s" , address)
|
|
|
|
|
log.debug("Connecting to proxy: %s:%s", address)
|
|
|
|
|
self.socket.connect(address)
|
|
|
|
|
self.send_raw(headers, now=True)
|
|
|
|
|
resp = ''
|
|
|
|
|
while '\r\n\r\n' not in resp:
|
|
|
|
|
resp += self.socket.recv(1024).decode('utf-8')
|
|
|
|
|
log.debug('RECV: %s' , resp)
|
|
|
|
|
log.debug('RECV: %s', resp)
|
|
|
|
|
|
|
|
|
|
lines = resp.split('\r\n')
|
|
|
|
|
if '200' not in lines[0]:
|
|
|
|
|
self.event('proxy_error', resp)
|
|
|
|
|
log.error('Proxy Error: %s' , lines[0])
|
|
|
|
|
log.error('Proxy Error: %s', lines[0])
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Proxy connection established, continue connecting
|
|
|
|
|
@@ -455,8 +455,8 @@ class XMLStream(object):
|
|
|
|
|
except Socket.error as serr:
|
|
|
|
|
error_msg = "Could not connect to %s:%s. Socket Error #%s: %s"
|
|
|
|
|
self.event('socket_error', serr)
|
|
|
|
|
log.error(error_msg % (self.address[0], self.address[1],
|
|
|
|
|
serr.errno, serr.strerror))
|
|
|
|
|
log.error(error_msg, self.address[0], self.address[1],
|
|
|
|
|
serr.errno, serr.strerror)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _handle_connected(self, event=None):
|
|
|
|
|
@@ -468,7 +468,7 @@ class XMLStream(object):
|
|
|
|
|
def _handle_session_timeout():
|
|
|
|
|
if not self.session_started_event.isSet():
|
|
|
|
|
log.debug("Session start has taken more " + \
|
|
|
|
|
"than %d seconds" % self.session_timeout)
|
|
|
|
|
"than %d seconds", self.session_timeout)
|
|
|
|
|
self.disconnect(reconnect=self.auto_reconnect)
|
|
|
|
|
|
|
|
|
|
self.schedule("Session timeout check",
|
|
|
|
|
@@ -510,7 +510,7 @@ class XMLStream(object):
|
|
|
|
|
# Wait for confirmation that the stream was
|
|
|
|
|
# closed in the other direction.
|
|
|
|
|
self.auto_reconnect = reconnect
|
|
|
|
|
log.debug('Waiting for %s from server' , self.stream_footer)
|
|
|
|
|
log.debug('Waiting for %s from server', self.stream_footer)
|
|
|
|
|
self.stream_end_event.wait(4)
|
|
|
|
|
if not self.auto_reconnect:
|
|
|
|
|
self.stop.set()
|
|
|
|
|
@@ -601,7 +601,7 @@ class XMLStream(object):
|
|
|
|
|
"""
|
|
|
|
|
if self.ssl_support:
|
|
|
|
|
log.info("Negotiating TLS")
|
|
|
|
|
log.info("Using SSL version: %s" , str(self.ssl_version))
|
|
|
|
|
log.info("Using SSL version: %s", str(self.ssl_version))
|
|
|
|
|
if self.ca_certs is None:
|
|
|
|
|
cert_policy = ssl.CERT_NONE
|
|
|
|
|
else:
|
|
|
|
|
@@ -759,11 +759,11 @@ class XMLStream(object):
|
|
|
|
|
try:
|
|
|
|
|
answers = resolver.query(domain, dns.rdatatype.A)
|
|
|
|
|
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
|
|
|
|
|
log.warning("No A records for %s" , domain)
|
|
|
|
|
log.warning("No A records for %s", domain)
|
|
|
|
|
return [((domain, port), 0, 0)]
|
|
|
|
|
except dns.exception.Timeout:
|
|
|
|
|
log.warning("DNS resolution timed out " + \
|
|
|
|
|
"for A record of %s" % domain)
|
|
|
|
|
"for A record of %s", domain)
|
|
|
|
|
return [((domain, port), 0, 0)]
|
|
|
|
|
else:
|
|
|
|
|
return [((ans.address, port), 0, 0) for ans in answers]
|
|
|
|
|
@@ -808,7 +808,7 @@ class XMLStream(object):
|
|
|
|
|
if self.dns_answers[0] == address:
|
|
|
|
|
break
|
|
|
|
|
self.dns_answers.pop(idx)
|
|
|
|
|
log.debug("Trying to connect to %s:%s" , address)
|
|
|
|
|
log.debug("Trying to connect to %s:%s", address)
|
|
|
|
|
return address
|
|
|
|
|
|
|
|
|
|
def add_event_handler(self, name, pointer,
|
|
|
|
|
@@ -879,7 +879,7 @@ class XMLStream(object):
|
|
|
|
|
handler[0](copy.copy(data))
|
|
|
|
|
except Exception as e:
|
|
|
|
|
error_msg = 'Error processing event handler: %s'
|
|
|
|
|
log.exception(error_msg , str(handler[0]))
|
|
|
|
|
log.exception(error_msg, str(handler[0]))
|
|
|
|
|
if hasattr(data, 'exception'):
|
|
|
|
|
data.exception(e)
|
|
|
|
|
else:
|
|
|
|
|
@@ -994,7 +994,7 @@ class XMLStream(object):
|
|
|
|
|
Defaults to self.auto_reconnect.
|
|
|
|
|
"""
|
|
|
|
|
if now:
|
|
|
|
|
log.debug("SEND (IMMED): %s" , data)
|
|
|
|
|
log.debug("SEND (IMMED): %s", data)
|
|
|
|
|
try:
|
|
|
|
|
data = data.encode('utf-8')
|
|
|
|
|
total = len(data)
|
|
|
|
|
@@ -1004,10 +1004,10 @@ class XMLStream(object):
|
|
|
|
|
sent += self.socket.send(data[sent:])
|
|
|
|
|
count += 1
|
|
|
|
|
if count > 1:
|
|
|
|
|
log.debug('SENT: %d chunks' , count)
|
|
|
|
|
log.debug('SENT: %d chunks', count)
|
|
|
|
|
except Socket.error as serr:
|
|
|
|
|
self.event('socket_error', serr)
|
|
|
|
|
log.warning("Failed to send %s" , data)
|
|
|
|
|
log.warning("Failed to send %s", data)
|
|
|
|
|
if reconnect is None:
|
|
|
|
|
reconnect = self.auto_reconnect
|
|
|
|
|
self.disconnect(reconnect)
|
|
|
|
|
@@ -1187,8 +1187,7 @@ class XMLStream(object):
|
|
|
|
|
Arguments:
|
|
|
|
|
xml -- The XML stanza to analyze.
|
|
|
|
|
"""
|
|
|
|
|
log.debug("RECV: %s" , tostring(xml,
|
|
|
|
|
xmlns=self.default_ns,
|
|
|
|
|
log.debug("RECV: %s", tostring(xml, xmlns=self.default_ns,
|
|
|
|
|
stream=self))
|
|
|
|
|
# Apply any preprocessing filters.
|
|
|
|
|
xml = self.incoming_filter(xml)
|
|
|
|
|
@@ -1232,7 +1231,7 @@ class XMLStream(object):
|
|
|
|
|
func(*args)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
error_msg = 'Error processing event handler: %s'
|
|
|
|
|
log.exception(error_msg , str(func))
|
|
|
|
|
log.exception(error_msg, str(func))
|
|
|
|
|
if hasattr(orig, 'exception'):
|
|
|
|
|
orig.exception(e)
|
|
|
|
|
else:
|
|
|
|
|
@@ -1267,12 +1266,12 @@ class XMLStream(object):
|
|
|
|
|
handler.run(args[0])
|
|
|
|
|
except Exception as e:
|
|
|
|
|
error_msg = 'Error processing stream handler: %s'
|
|
|
|
|
log.exception(error_msg , handler.name)
|
|
|
|
|
log.exception(error_msg, handler.name)
|
|
|
|
|
orig.exception(e)
|
|
|
|
|
elif etype == 'schedule':
|
|
|
|
|
name = args[1]
|
|
|
|
|
try:
|
|
|
|
|
log.debug('Scheduled event: %s: %s' , name, args[0])
|
|
|
|
|
log.debug('Scheduled event: %s: %s', name, args[0])
|
|
|
|
|
handler(*args[0])
|
|
|
|
|
except Exception as e:
|
|
|
|
|
log.exception('Error processing scheduled task')
|
|
|
|
|
@@ -1291,7 +1290,7 @@ class XMLStream(object):
|
|
|
|
|
func(*args)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
error_msg = 'Error processing event handler: %s'
|
|
|
|
|
log.exception(error_msg , str(func))
|
|
|
|
|
log.exception(error_msg, str(func))
|
|
|
|
|
if hasattr(orig, 'exception'):
|
|
|
|
|
orig.exception(e)
|
|
|
|
|
else:
|
|
|
|
|
@@ -1324,7 +1323,7 @@ class XMLStream(object):
|
|
|
|
|
data = self.send_queue.get(True, 1)
|
|
|
|
|
except queue.Empty:
|
|
|
|
|
continue
|
|
|
|
|
log.debug("SEND: %s" , data)
|
|
|
|
|
log.debug("SEND: %s", data)
|
|
|
|
|
try:
|
|
|
|
|
enc_data = data.encode('utf-8')
|
|
|
|
|
total = len(enc_data)
|
|
|
|
|
@@ -1334,15 +1333,15 @@ class XMLStream(object):
|
|
|
|
|
sent += self.socket.send(enc_data[sent:])
|
|
|
|
|
count += 1
|
|
|
|
|
if count > 1:
|
|
|
|
|
log.debug('SENT: %d chunks' , count)
|
|
|
|
|
log.debug('SENT: %d chunks', count)
|
|
|
|
|
self.send_queue.task_done()
|
|
|
|
|
except Socket.error as serr:
|
|
|
|
|
self.event('socket_error', serr)
|
|
|
|
|
log.warning("Failed to send %s" , data)
|
|
|
|
|
log.warning("Failed to send %s", data)
|
|
|
|
|
self.__failed_send_stanza = data
|
|
|
|
|
self.disconnect(self.auto_reconnect)
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
log.exception('Unexpected error in send thread: %s' , ex)
|
|
|
|
|
log.exception('Unexpected error in send thread: %s', ex)
|
|
|
|
|
self.exception(ex)
|
|
|
|
|
if not self.stop.is_set():
|
|
|
|
|
self.disconnect(self.auto_reconnect)
|
|
|
|
|
|