new state machine in place

This commit is contained in:
Nathan Fritz
2010-10-13 18:15:21 -07:00
parent b0e036d03c
commit 7ad7a29a8f
8 changed files with 462 additions and 356 deletions

View File

@@ -104,7 +104,7 @@ class Scheduler(object):
quit -- Stop the scheduler.
"""
def __init__(self, parentqueue=None):
def __init__(self, parentqueue=None, parentstop=None):
"""
Create a new scheduler.
@@ -116,6 +116,7 @@ class Scheduler(object):
self.thread = None
self.run = False
self.parentqueue = parentqueue
self.parentstop = parentstop
def process(self, threaded=True):
"""
@@ -135,38 +136,41 @@ class Scheduler(object):
def _process(self):
"""Process scheduled tasks."""
self.run = True
while self.run:
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
try:
while self.run:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated:
self.schedule = sorted(self.schedule,
key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
if self.parentstop is not None: self.parentstop.set()
except SystemExit:
self.run = False
if self.parentstop is not None: self.parentstop.set()
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))

View File

@@ -1,59 +0,0 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from __future__ import with_statement
import threading
class StateMachine(object):
def __init__(self, states=[], groups=[]):
self.lock = threading.Lock()
self.__state = {}
self.__default_state = {}
self.__group = {}
self.addStates(states)
self.addGroups(groups)
def addStates(self, states):
with self.lock:
for state in states:
if state in self.__state or state in self.__group:
raise IndexError("The state or group '%s' is already in the StateMachine." % state)
self.__state[state] = states[state]
self.__default_state[state] = states[state]
def addGroups(self, groups):
with self.lock:
for gstate in groups:
if gstate in self.__state or gstate in self.__group:
raise IndexError("The key or group '%s' is already in the StateMachine." % gstate)
for state in groups[gstate]:
if state in self.__state:
raise IndexError("The group %s contains a key %s which is not set in the StateMachine." % (gstate, state))
self.__group[gstate] = groups[gstate]
def set(self, state, status):
with self.lock:
if state in self.__state:
self.__state[state] = bool(status)
else:
raise KeyError("StateMachine does not contain state %s." % state)
def __getitem__(self, key):
if key in self.__group:
for state in self.__group[key]:
if not self.__state[state]:
return False
return True
return self.__state[key]
def __getattr__(self, attr):
return self.__getitem__(attr)
def reset(self):
self.__state = self.__default_state

View File

@@ -1,139 +0,0 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from __future__ import with_statement
import threading
class StateError(Exception):
"""Raised whenever a state transition was attempted but failed."""
class StateManager(object):
"""
At the very core of SleekXMPP there is a need to track various
library configuration settings, XML stream features, and the
network connection status. The state manager is responsible for
tracking this information in a thread-safe manner.
State 'variables' store the current state of these items as simple
string values or booleans. Changing those values must be done
according to transitions defined when creating the state variable.
If a state variable is given a value that is not allowed according
to the transition definitions, a StateError is raised. When a
valid value is assigned an event is raised named:
_state_changed_nameofthestatevariable
The event carries a dictionary containing the previous and the new
state values.
"""
def __init__(self, event_func=None):
"""
Initialize the state manager. The parameter event_func should be
the event() method of a SleekXMPP object in order to enable
_state_changed_* events.
"""
self.main_lock = threading.Lock()
self.locks = {}
self.state_variables = {}
if event_func is not None:
self.event = event_func
else:
self.event = lambda name, data: None
def add(self, name, default=False, values=None, transitions=None):
"""
Create a new state variable.
When transitions is specified, only those defined state change
transitions will be allowed.
When values is specified (and not transitions), any state changes
between those values are allowed.
If neither values nor transitions are defined, then the state variable
will be a binary switch between True and False.
"""
if name in self.state_variables:
raise IndexError("State variable %s already exists" % name)
self.locks[name] = threading.Lock()
with self.locks[name]:
var = {'value': default,
'default': default,
'transitions': {}}
if transitions is not None:
for start in transitions:
var['transitions'][start] = set(transitions[start])
elif values is not None:
values = set(values)
for value in values:
var['transitions'][value] = values
elif values is None:
var['transitions'] = {True: [False],
False: [True]}
self.state_variables[name] = var
def addStates(self, var_defs):
"""
Create multiple state variables at once.
"""
for var, data in var_defs:
self.add(var,
default=data.get('default', False),
values=data.get('values', None),
transitions=data.get('transitions', None))
def force_set(self, name, val):
"""
Force setting a state variable's value by overriding transition checks.
"""
with self.locks[name]:
self.state_variables[name]['value'] = val
def reset(self, name):
"""
Reset a state variable to its default value.
"""
with self.locks[name]:
default = self.state_variables[name]['default']
self.state_variables[name]['value'] = default
def __getitem__(self, name):
"""
Get the value of a state variable if it exists.
"""
with self.locks[name]:
if name not in self.state_variables:
raise IndexError("State variable %s does not exist" % name)
return self.state_variables[name]['value']
def __setitem__(self, name, val):
"""
Attempt to set the value of a state variable, but raise StateError
if the transition is undefined.
A _state_changed_* event is triggered after a successful transition.
"""
with self.locks[name]:
if name not in self.state_variables:
raise IndexError("State variable %s does not exist" % name)
current = self.state_variables[name]['value']
if current == val:
return
if val in self.state_variables[name]['transitions'][current]:
self.state_variables[name]['value'] = val
self.event('_state_changed_%s' % name, {'from': current, 'to': val})
else:
raise StateError("Can not transition from '%s' to '%s'" % (str(current), str(val)))

View File

@@ -21,7 +21,8 @@ try:
except ImportError:
import Queue as queue
from sleekxmpp.xmlstream import StateMachine, Scheduler, tostring
from sleekxmpp.thirdparty.statemachine import StateMachine
from sleekxmpp.xmlstream import Scheduler, tostring
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET
# In Python 2.x, file socket objects are broken. A patched socket
@@ -92,6 +93,8 @@ class XMLStream(object):
stream_header -- The closing tag of the stream's root element.
use_ssl -- Flag indicating if SSL should be used.
use_tls -- Flag indicating if TLS should be used.
stop -- threading Event used to stop all threads.
auto_reconnect-- Flag to determine whether we auto reconnect.
Methods:
add_event_handler -- Add a handler for a custom event.
@@ -152,15 +155,8 @@ class XMLStream(object):
self.ssl_support = SSL_SUPPORT
# TODO: Integrate the new state machine.
self.state = StateMachine()
self.state.addStates({'connected': False,
'is client': False,
'ssl': False,
'tls': False,
'reconnect': True,
'processing': False,
'disconnecting': False})
self.state = StateMachine(('disconnected', 'connected'))
self.state._set_state('disconnected')
self.address = (host, int(port))
self.filesocket = None
@@ -178,9 +174,10 @@ class XMLStream(object):
self.stream_header = "<stream>"
self.stream_footer = "</stream>"
self.stop = threading.Event()
self.event_queue = queue.Queue()
self.send_queue = queue.Queue()
self.scheduler = Scheduler(self.event_queue)
self.scheduler = Scheduler(self.event_queue, self.stop)
self.namespace_map = {}
@@ -193,7 +190,8 @@ class XMLStream(object):
self._id = 0
self._id_lock = threading.Lock()
self.run = True
self.auto_reconnect = True
self.is_client = False
def new_id(self):
"""
@@ -232,17 +230,23 @@ class XMLStream(object):
if host and port:
self.address = (host, int(port))
self.is_client = True
# Respect previous SSL and TLS usage directives.
if use_ssl is not None:
self.use_ssl = use_ssl
if use_tls is not None:
self.use_tls = use_tls
self.state.set('is client', True)
# Repeatedly attempt to connect until a successful connection
# is established.
while reattempt and not self.state['connected']:
connected = self.state.transition('disconnected', 'connected', func=self._connect)
while reattempt and not connected:
connected = self.state.transition('disconnected', 'connected', func=self._connect)
return connected
def _connect(self):
self.stop.clear()
self.socket = self.socket_class(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(None)
if self.use_ssl and self.ssl_support:
@@ -257,13 +261,15 @@ class XMLStream(object):
try:
self.socket.connect(self.address)
self.set_socket(self.socket)
self.state.set('connected', True)
self.set_socket(self.socket, ignore=True)
#this event is where you should set your application state
self.event("connected", direct=True)
return True
except socket.error as serr:
error_msg = "Could not connect. Socket Error #%s: %s"
logging.error(error_msg % (serr.errno, serr.strerror))
time.sleep(1)
return False
def disconnect(self, reconnect=False):
"""
@@ -277,40 +283,38 @@ class XMLStream(object):
and processing should be restarted.
Defaults to False.
"""
self.event("disconnected")
self.state.set('reconnect', reconnect)
if self.state['disconnecting']:
return
if not self.state['reconnect']:
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
# Send the end of stream marker.
self.send_raw(self.stream_footer)
# Wait for confirmation that the stream was
# closed in the other direction.
time.sleep(1)
self.state.transition('connected', 'disconnected', wait=0.0, func=self._disconnect, args=(reconnect,))
def _disconnect(self, reconnect=False):
# Send the end of stream marker.
self.send_raw(self.stream_footer)
# Wait for confirmation that the stream was
# closed in the other direction.
time.sleep(1)
if not reconnect:
self.auto_reconnect = False
self.stop.set()
try:
self.socket.close()
self.filesocket.close()
self.socket.shutdown(socket.SHUT_RDWR)
except socket.error as serr:
pass
finally:
#clear your application state
self.event("disconnected", direct=True)
return True
def reconnect(self):
"""
Reset the stream's state and reconnect to the server.
"""
logging.info("Reconnecting")
self.event("disconnected")
self.state.set('tls', False)
self.state.set('ssl', False)
time.sleep(1)
self.connect()
logging.debug("reconnecting...")
self.state.transition('connected', 'disconnected', wait=0.0, func=self._disconnect, args=(True,))
return self.state.transition('disconnected', 'connected', wait=0.0, func=self._connect)
def set_socket(self, socket):
def set_socket(self, socket, ignore=False):
"""
Set the socket to use for the stream.
@@ -318,6 +322,7 @@ class XMLStream(object):
Arguments:
socket -- The new socket to use.
ignore -- don't set the state
"""
self.socket = socket
if socket is not None:
@@ -331,7 +336,8 @@ class XMLStream(object):
self.filesocket = FileSocket(self.socket)
else:
self.filesocket = self.socket.makefile('rb', 0)
self.state.set('connected', True)
if not ignore:
self.state._set_state('connected')
def start_tls(self):
"""
@@ -490,17 +496,21 @@ class XMLStream(object):
self.__event_handlers[name] = filter(filter_pointers,
self.__event_handlers[name])
def event(self, name, data={}):
def event(self, name, data={}, direct=False):
"""
Manually trigger a custom event.
Arguments:
name -- The name of the event to trigger.
data -- Data that will be passed to each event handler.
Defaults to an empty dictionary.
name -- The name of the event to trigger.
data -- Data that will be passed to each event handler.
Defaults to an empty dictionary.
direct -- Runs the event directly if True.
"""
for handler in self.__event_handlers.get(name, []):
self.event_queue.put(('event', handler, copy.copy(data)))
if direct:
handler[0](copy.copy(data))
else:
self.event_queue.put(('event', handler, copy.copy(data)))
if handler[2]:
# If the handler is disposable, we will go ahead and
# remove it now instead of waiting for it to be
@@ -640,49 +650,36 @@ class XMLStream(object):
# The body of this loop will only execute once per connection.
# Additional passes will be made only if an error occurs and
# reconnecting is permitted.
while self.run and (firstrun or self.state['reconnect']):
self.state.set('processing', True)
while not self.stop.isSet() and firstrun or self.auto_reconnect:
firstrun = False
try:
if self.state['is client']:
if self.is_client:
self.send_raw(self.stream_header)
# The call to self.__read_xml will block and prevent
# the body of the loop from running until a disconnect
# occurs. After any reconnection, the stream header will
# be resent and processing will resume.
while self.run and self.__read_xml():
while not self.stop.isSet() and self.__read_xml():
# Ensure the stream header is sent for any
# new connections.
if self.state['is client']:
if self.is_client:
self.send_raw(self.stream_header)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected")
self.state.set('processing', False)
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.scheduler.run = False
self.event_queue.put(('quit', None, None))
return
logging.debug("Keyboard Escape Detected in _process")
self.stop.set()
except SystemExit:
self.event_queue.put(('quit', None, None))
return
logging.debug("SystemExit in _process")
self.stop.set()
except socket.error:
if not self.state.reconnect:
return
self.state.set('processing', False)
logging.exception('Socket Error')
self.disconnect(reconnect=True)
except:
if not self.state.reconnect:
return
self.state.set('processing', False)
logging.exception('Connection error. Reconnecting.')
self.disconnect(reconnect=True)
if self.state['reconnect']:
if not self.stop.isSet() and self.auto_reconnect:
self.reconnect()
self.state.set('processing', False)
self.event_queue.put(('quit', None, None))
else:
self.disconnect()
self.event_queue.put(('quit', None, None))
self.scheduler.run = False
def __read_xml(self):
"""
@@ -705,7 +702,6 @@ class XMLStream(object):
if depth == 0:
# The stream's root element has closed,
# terminating the stream.
self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending read XML loop")
return False
elif depth == 1:
@@ -754,8 +750,11 @@ class XMLStream(object):
stanza_copy = stanza_type(self, copy.deepcopy(xml))
handler.prerun(stanza_copy)
self.event_queue.put(('stanza', handler, stanza_copy))
if handler.checkDelete():
self.__handlers.pop(self.__handlers.index(handler))
try:
if handler.checkDelete():
self.__handlers.pop(self.__handlers.index(handler))
except:
pass #not thread safe
unhandled = False
# Some stanzas require responses, such as Iq queries. A default
@@ -773,61 +772,73 @@ class XMLStream(object):
handlers may be spawned in individual threads.
"""
logging.debug("Loading event runner")
while self.run:
try:
event = self.event_queue.get(True, timeout=5)
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is None:
continue
try:
while not self.stop.isSet():
try:
event = self.event_queue.get(True, timeout=5)
except queue.Empty:
event = None
if event is None:
continue
etype, handler = event[0:2]
args = event[2:]
etype, handler = event[0:2]
args = event[2:]
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
error_msg = 'Error processing stream handler: %s'
logging.exception(error_msg % handler.name)
args[0].exception(e)
elif etype == 'schedule':
try:
logging.debug(args)
handler(*args[0])
except:
logging.exception('Error processing scheduled task')
elif etype == 'event':
func, threaded, disposable = handler
try:
if threaded:
x = threading.Thread(name="Event_%s" % str(func),
target=func,
args=args)
x.start()
else:
func(*args)
except:
logging.exception('Error processing event handler: %s')
elif etype == 'quit':
logging.debug("Quitting event runner thread")
return False
if etype == 'stanza':
try:
handler.run(args[0])
except Exception as e:
error_msg = 'Error processing stream handler: %s'
logging.exception(error_msg % handler.name)
args[0].exception(e)
elif etype == 'schedule':
try:
logging.debug(args)
handler(*args[0])
except:
logging.exception('Error processing scheduled task')
elif etype == 'event':
func, threaded, disposable = handler
try:
if threaded:
x = threading.Thread(name="Event_%s" % str(func),
target=func,
args=args)
x.start()
else:
func(*args)
except:
logging.exception('Error processing event handler: %s')
elif etype == 'quit':
logging.debug("Quitting event runner thread")
return False
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected in _event_runner")
self.disconnect()
return
except SystemExit:
self.disconnect()
self.event_queue.put(('quit', None, None))
return
def _send_thread(self):
"""
Extract stanzas from the send queue and send them on the stream.
"""
while self.run:
data = self.send_queue.get(True)
logging.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except:
logging.warning("Failed to send %s" % data)
self.state.set('connected', False)
if self.state.reconnect:
logging.exception("Disconnected. Socket Error.")
self.disconnect(reconnect=True)
try:
while not self.stop.isSet():
data = self.send_queue.get(True)
logging.debug("SEND: %s" % data)
try:
self.socket.send(data.encode('utf-8'))
except:
logging.warning("Failed to send %s" % data)
self.disconnect(self.auto_reconnect)
except KeyboardInterrupt:
logging.debug("Keyboard Escape Detected in _send_thread")
self.disconnect()
return
except SystemExit:
self.disconnect()
self.event_queue.put(('quit', None, None))
return