Clean a new bunch of stuf

This commit is contained in:
Florent Le Coz 2014-07-21 20:27:53 +02:00
parent a2cad40f91
commit 373505f483
13 changed files with 37 additions and 179 deletions

View File

@ -101,7 +101,7 @@ class RegisterBot(slixmpp.ClientXMPP):
resp['register']['password'] = self.password resp['register']['password'] = self.password
try: try:
resp.send(now=True) resp.send()
logging.info("Account created for %s!" % self.boundjid) logging.info("Account created for %s!" % self.boundjid)
except IqError as e: except IqError as e:
logging.error("Could not register account: %s" % logging.error("Could not register account: %s" %

View File

@ -144,7 +144,6 @@ class ClientXMPP(BaseXMPP):
:param use_ssl: Indicates if the older SSL connection method :param use_ssl: Indicates if the older SSL connection method
should be used. Defaults to ``False``. should be used. Defaults to ``False``.
""" """
self.session_started_event.clear()
# If an address was provided, disable using DNS SRV lookup; # If an address was provided, disable using DNS SRV lookup;
# otherwise, use the domain from the client JID with the standard # otherwise, use the domain from the client JID with the standard

View File

@ -143,7 +143,7 @@ class ComponentXMPP(BaseXMPP):
handshake = ET.Element('{jabber:component:accept}handshake') handshake = ET.Element('{jabber:component:accept}handshake')
handshake.text = hashlib.sha1(pre_hash).hexdigest().lower() handshake.text = hashlib.sha1(pre_hash).hexdigest().lower()
self.send_xml(handshake, now=True) self.send_xml(handshake)
def _handle_handshake(self, xml): def _handle_handshake(self, xml):
"""The handshake has been accepted. """The handshake has been accepted.

View File

@ -64,5 +64,4 @@ class FeatureBind(BasePlugin):
if 'session' not in self.features['features']: if 'session' not in self.features['features']:
log.debug("Established Session") log.debug("Established Session")
self.xmpp.sessionstarted = True self.xmpp.sessionstarted = True
self.xmpp.session_started_event.set()
self.xmpp.event('session_start') self.xmpp.event('session_start')

View File

@ -196,7 +196,7 @@ class FeatureMechanisms(BasePlugin):
self.attempted_mechs.add(self.mech.name) self.attempted_mechs.add(self.mech.name)
self.xmpp.disconnect() self.xmpp.disconnect()
else: else:
resp.send(now=True) resp.send()
return True return True
@ -217,7 +217,7 @@ class FeatureMechanisms(BasePlugin):
else: else:
if resp.get_value() == '': if resp.get_value() == '':
resp.del_value() resp.del_value()
resp.send(now=True) resp.send()
def _handle_success(self, stanza): def _handle_success(self, stanza):
"""SASL authentication succeeded. Restart the stream.""" """SASL authentication succeeded. Restart the stream."""

View File

@ -51,5 +51,4 @@ class FeatureSession(BasePlugin):
log.debug("Established Session") log.debug("Established Session")
self.xmpp.sessionstarted = True self.xmpp.sessionstarted = True
self.xmpp.session_started_event.set()
self.xmpp.event('session_start') self.xmpp.event('session_start')

View File

@ -55,7 +55,7 @@ class FeatureSTARTTLS(BasePlugin):
elif self.xmpp.disable_starttls: elif self.xmpp.disable_starttls:
return False return False
else: else:
self.xmpp.send(features['starttls'], now=True) self.xmpp.send(features['starttls'])
return True return True
def _handle_starttls_proceed(self, proceed): def _handle_starttls_proceed(self, proceed):

View File

@ -141,7 +141,6 @@ class XEP_0078(BasePlugin):
log.debug("Established Session") log.debug("Established Session")
self.xmpp.sessionstarted = True self.xmpp.sessionstarted = True
self.xmpp.session_started_event.set()
self.xmpp.event('session_start') self.xmpp.event('session_start')
return True return True

View File

@ -305,7 +305,7 @@ class XEP_0115(BasePlugin):
self.cache_caps(ver, info) self.cache_caps(ver, info)
self.assign_verstring(jid, ver) self.assign_verstring(jid, ver)
if self.xmpp.session_started_event.is_set() and self.broadcast: if self.xmpp.sessionstarted and self.broadcast:
if self.xmpp.is_component or preserve: if self.xmpp.is_component or preserve:
for contact in self.xmpp.roster[jid]: for contact in self.xmpp.roster[jid]:
self.xmpp.roster[jid][contact].send_last_presence() self.xmpp.roster[jid][contact].send_last_presence()

View File

@ -173,7 +173,7 @@ class XEP_0198(BasePlugin):
ack = stanza.Ack(self.xmpp) ack = stanza.Ack(self.xmpp)
with self.handled_lock: with self.handled_lock:
ack['h'] = self.handled ack['h'] = self.handled
self.xmpp.send_raw(str(ack), now=True) self.xmpp.send_raw(str(ack))
def request_ack(self, e=None): def request_ack(self, e=None):
"""Request an ack from the server.""" """Request an ack from the server."""
@ -199,14 +199,14 @@ class XEP_0198(BasePlugin):
self.enabled.set() self.enabled.set()
enable = stanza.Enable(self.xmpp) enable = stanza.Enable(self.xmpp)
enable['resume'] = self.allow_resume enable['resume'] = self.allow_resume
enable.send(now=True) enable.send()
self.handled = 0 self.handled = 0
elif self.sm_id and self.allow_resume: elif self.sm_id and self.allow_resume:
self.enabled.set() self.enabled.set()
resume = stanza.Resume(self.xmpp) resume = stanza.Resume(self.xmpp)
resume['h'] = self.handled resume['h'] = self.handled
resume['previd'] = self.sm_id resume['previd'] = self.sm_id
resume.send(now=True) resume.send()
# Wait for a response before allowing stream feature processing # Wait for a response before allowing stream feature processing
# to continue. The actual result processing will be done in the # to continue. The actual result processing will be done in the
@ -239,8 +239,7 @@ class XEP_0198(BasePlugin):
self.xmpp.features.add('stream_management') self.xmpp.features.add('stream_management')
self._handle_ack(stanza) self._handle_ack(stanza)
for id, stanza in self.unacked_queue: for id, stanza in self.unacked_queue:
self.xmpp.send(stanza, now=True, use_filters=False) self.xmpp.send(stanza, use_filters=False)
self.xmpp.session_started_event.set()
self.xmpp.event('session_resumed', stanza) self.xmpp.event('session_resumed', stanza)
def _handle_failed(self, stanza): def _handle_failed(self, stanza):

View File

@ -191,9 +191,6 @@ class Iq(RootStanza):
stanza. Only called if there is a callback parameter stanza. Only called if there is a callback parameter
(and therefore are in async mode). (and therefore are in async mode).
""" """
if timeout is None:
timeout = self.stream.response_timeout
if self.stream.session_bind_event.is_set(): if self.stream.session_bind_event.is_set():
matcher = MatchIDSender({ matcher = MatchIDSender({
'id': self['id'], 'id': self['id'],

View File

@ -1580,7 +1580,7 @@ class StanzaBase(ElementBase):
stanza sent immediately. Useful for stream stanza sent immediately. Useful for stream
initialization. Defaults to ``False``. initialization. Defaults to ``False``.
""" """
self.stream.send(self, now=now) self.stream.send(self)
def __copy__(self): def __copy__(self):
"""Return a copy of the stanza object that does not share the """Return a copy of the stanza object that does not share the

View File

@ -45,35 +45,6 @@ from slixmpp.xmlstream.resolver import resolve, default_resolver
#: The time in seconds to wait before timing out waiting for response stanzas. #: The time in seconds to wait before timing out waiting for response stanzas.
RESPONSE_TIMEOUT = 30 RESPONSE_TIMEOUT = 30
#: The time in seconds to wait for events from the event queue, and also the
#: time between checks for the process stop signal.
WAIT_TIMEOUT = 1.0
#: The number of threads to use to handle XML stream events. This is not the
#: same as the number of custom event handling threads.
#: :data:`HANDLER_THREADS` must be at least 1. For Python implementations
#: with a GIL, this should be left at 1, but for implemetnations without
#: a GIL increasing this value can provide better performance.
HANDLER_THREADS = 1
#: The time in seconds to delay between attempts to resend data
#: after an SSL error.
SSL_RETRY_DELAY = 0.5
#: The maximum number of times to attempt resending data due to
#: an SSL error.
SSL_RETRY_MAX = 10
#: Maximum time to delay between connection attempts is one hour.
RECONNECT_MAX_DELAY = 600
#: Maximum number of attempts to connect to the server before quitting
#: and raising a 'connect_failed' event. Setting this to ``None`` will
#: allow infinite reconnection attempts, and using ``0`` will disable
#: reconnections. Defaults to ``None``.
RECONNECT_MAX_ATTEMPTS = None
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -83,6 +54,11 @@ class RestartStream(Exception):
resending the stream header. resending the stream header.
""" """
class NotConnectedError(Exception):
"""
Raised when we try to send something over the wire but we are not
connected.
"""
class XMLStream(object): class XMLStream(object):
""" """
@ -166,36 +142,11 @@ class XMLStream(object):
self._der_cert = None self._der_cert = None
#: The time in seconds to wait for events from the event queue,
#: and also the time between checks for the process stop signal.
self.wait_timeout = WAIT_TIMEOUT
#: The time in seconds to wait before timing out waiting
#: for response stanzas.
self.response_timeout = RESPONSE_TIMEOUT
#: The current amount to time to delay attempting to reconnect. #: The current amount to time to delay attempting to reconnect.
#: This value doubles (with some jitter) with each failed #: This value doubles (with some jitter) with each failed
#: connection attempt up to :attr:`reconnect_max_delay` seconds. #: connection attempt up to :attr:`reconnect_max_delay` seconds.
self.reconnect_delay = None self.reconnect_delay = None
#: Maximum time to delay between connection attempts is one hour.
self.reconnect_max_delay = RECONNECT_MAX_DELAY
#: Maximum number of attempts to connect to the server before
#: quitting and raising a 'connect_failed' event. Setting to
#: ``None`` allows infinite reattempts, while setting it to ``0``
#: will disable reconnection attempts. Defaults to ``None``.
self.reconnect_max_attempts = RECONNECT_MAX_ATTEMPTS
#: The time in seconds to delay between attempts to resend data
#: after an SSL error.
self.ssl_retry_max = SSL_RETRY_MAX
#: The maximum number of times to attempt resending data due to
#: an SSL error.
self.ssl_retry_delay = SSL_RETRY_DELAY
#: The connection state machine tracks if the stream is #: The connection state machine tracks if the stream is
#: ``'connected'`` or ``'disconnected'``. #: ``'connected'`` or ``'disconnected'``.
self.state = StateMachine(('disconnected', 'connected')) self.state = StateMachine(('disconnected', 'connected'))
@ -267,20 +218,6 @@ class XMLStream(object):
#: :attr:`whitespace_keepalive` is enabled. #: :attr:`whitespace_keepalive` is enabled.
self.whitespace_keepalive_interval = 300 self.whitespace_keepalive_interval = 300
#: An :class:`~threading.Event` to signal receiving a closing
#: stream tag from the server.
self.stream_end_event = threading.Event()
self.stream_end_event.set()
#: An :class:`~threading.Event` to signal the start of a stream
#: session. Until this event fires, the send queue is not used
#: and data is sent immediately over the wire.
self.session_started_event = threading.Event()
#: The default time in seconds to wait for a session to start
#: after connecting before reconnecting and trying again.
self.session_timeout = 45
#: Flag for controlling if the session can be considered ended #: Flag for controlling if the session can be considered ended
#: if the connection is terminated. #: if the connection is terminated.
self.end_session_on_disconnect = True self.end_session_on_disconnect = True
@ -312,10 +249,6 @@ class XMLStream(object):
#: We use an ID prefix to ensure that all ID values are unique. #: We use an ID prefix to ensure that all ID values are unique.
self._id_prefix = '%s-' % uuid.uuid4() self._id_prefix = '%s-' % uuid.uuid4()
#: The :attr:`auto_reconnnect` setting controls whether or not
#: the stream will be restarted in the event of an error.
self.auto_reconnect = True
#: The :attr:`disconnect_wait` setting is the default value #: The :attr:`disconnect_wait` setting is the default value
#: for controlling if the system waits for the send queue to #: for controlling if the system waits for the send queue to
#: empty before ending the stream. This may be overridden by #: empty before ending the stream. This may be overridden by
@ -331,7 +264,6 @@ class XMLStream(object):
#: ``_xmpp-client._tcp`` service. #: ``_xmpp-client._tcp`` service.
self.dns_service = None self.dns_service = None
self.add_event_handler('connected', self._session_timeout_check)
self.add_event_handler('disconnected', self._remove_schedules) self.add_event_handler('disconnected', self._remove_schedules)
self.add_event_handler('session_start', self._start_keepalive) self.add_event_handler('session_start', self._start_keepalive)
self.add_event_handler('session_start', self._cert_expiration) self.add_event_handler('session_start', self._cert_expiration)
@ -887,12 +819,11 @@ class XMLStream(object):
# If the handler is disposable, we will go ahead and # If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be # remove it now instead of waiting for it to be
# processed in the queue. # processed in the queue.
with self.__event_handlers_lock: try:
try: h_index = self.__event_handlers[name].index(handler)
h_index = self.__event_handlers[name].index(handler) self.__event_handlers[name].pop(h_index)
self.__event_handlers[name].pop(h_index) except:
except: pass
pass
def schedule(self, name, seconds, callback, args=tuple(), def schedule(self, name, seconds, callback, args=tuple(),
kwargs={}, repeat=False): kwargs={}, repeat=False):
@ -954,34 +885,18 @@ class XMLStream(object):
""" """
return xml return xml
def send(self, data, mask=None, timeout=None, now=False, use_filters=True): def send(self, data, use_filters=True):
"""A wrapper for :meth:`send_raw()` for sending stanza objects. """A wrapper for :meth:`send_raw()` for sending stanza objects.
May optionally block until an expected response is received. May optionally block until an expected response is received.
:param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param data: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase`
stanza to send on the stream. stanza to send on the stream.
:param mask: **DEPRECATED**
An XML string snippet matching the structure
of the expected response. Execution will block
in this thread until the response is received
or a timeout occurs.
:param int timeout: Time in seconds to wait for a response before
continuing. Defaults to :attr:`response_timeout`.
:param bool now: Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to ``False``.
:param bool use_filters: Indicates if outgoing filters should be :param bool use_filters: Indicates if outgoing filters should be
applied to the given stanza data. Disabling applied to the given stanza data. Disabling
filters is useful when resending stanzas. filters is useful when resending stanzas.
Defaults to ``True``. Defaults to ``True``.
""" """
if timeout is None:
timeout = self.response_timeout
if hasattr(mask, 'xml'):
mask = mask.xml
if isinstance(data, ElementBase): if isinstance(data, ElementBase):
if use_filters: if use_filters:
for filter in self.__filters['out']: for filter in self.__filters['out']:
@ -989,61 +904,37 @@ class XMLStream(object):
if data is None: if data is None:
return return
if mask is not None:
log.warning("Use of send mask waiters is deprecated.")
wait_for = Waiter("SendWait_%s" % self.new_id(),
MatchXMLMask(mask))
self.register_handler(wait_for)
if isinstance(data, ElementBase): if isinstance(data, ElementBase):
with self.send_queue_lock: if use_filters:
if use_filters: for filter in self.__filters['out_sync']:
for filter in self.__filters['out_sync']: data = filter(data)
data = filter(data) if data is None:
if data is None: return
return str_data = tostring(data.xml, xmlns=self.default_ns,
str_data = tostring(data.xml, xmlns=self.default_ns, stream=self,
stream=self, top_level=True)
top_level=True) self.send_raw(str_data)
self.send_raw(str_data)
else: else:
self.send_raw(data) self.send_raw(data)
if mask is not None:
return wait_for.wait(timeout)
def send_xml(self, data, mask=None, timeout=None, now=False): def send_xml(self, data):
"""Send an XML object on the stream, and optionally wait """Send an XML object on the stream
for a response.
:param data: The :class:`~xml.etree.ElementTree.Element` XML object :param data: The :class:`~xml.etree.ElementTree.Element` XML object
to send on the stream. to send on the stream.
:param mask: **DEPRECATED**
An XML string snippet matching the structure
of the expected response. Execution will block
in this thread until the response is received
or a timeout occurs.
:param int timeout: Time in seconds to wait for a response before
continuing. Defaults to :attr:`response_timeout`.
:param bool now: Indicates if the send queue should be skipped,
sending the stanza immediately. Useful mainly
for stream initialization stanzas.
Defaults to ``False``.
""" """
if timeout is None: return self.send(tostring(data))
timeout = self.response_timeout
return self.send(tostring(data), mask, timeout, now)
def send_raw(self, data): def send_raw(self, data):
"""Send raw data across the stream. """Send raw data across the stream.
:param string data: Any bytes or utf-8 string value. :param string data: Any bytes or utf-8 string value.
""" """
if not self.transport:
raise NotConnectedError()
if isinstance(data, str): if isinstance(data, str):
data = data.encode('utf-8') data = data.encode('utf-8')
if not self.transport: self.transport.write(data)
logger.error("Cannot send data, we are not connected.")
else:
self.transport.write(data)
def _start_thread(self, name, target, track=True): def _start_thread(self, name, target, track=True):
self.__thread[name] = threading.Thread(name=name, target=target) self.__thread[name] = threading.Thread(name=name, target=target)
@ -1055,31 +946,6 @@ class XMLStream(object):
with self.__thread_cond: with self.__thread_cond:
self.__thread_count += 1 self.__thread_count += 1
def _end_thread(self, name, early=False):
with self.__thread_cond:
curr_thread = threading.current_thread().name
if curr_thread in self.__active_threads:
self.__thread_count -= 1
self.__active_threads.remove(curr_thread)
if early:
log.debug('Threading deadlock prevention!')
log.debug(("Marked %s thread as ended due to " + \
"disconnect() call. %s threads remain.") % (
name, self.__thread_count))
else:
log.debug("Stopped %s thread. %s threads remain." % (
name, self.__thread_count))
else:
log.debug(("Finished exiting %s thread after early " + \
"termination from disconnect() call. " + \
"%s threads remain.") % (
name, self.__thread_count))
if self.__thread_count == 0:
self.__thread_cond.notify()
def _build_stanza(self, xml, default_ns=None): def _build_stanza(self, xml, default_ns=None):
"""Create a stanza object from a given XML object. """Create a stanza object from a given XML object.