merged changes from fritzy

This commit is contained in:
Thom Nichols
2010-06-01 22:54:30 -04:00
24 changed files with 933 additions and 135 deletions

View File

@@ -0,0 +1,87 @@
try:
import queue
except ImportError:
import Queue as queue
import time
import threading
import logging
class Task(object):
"""Task object for the Scheduler class"""
def __init__(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.name = name
self.seconds = seconds
self.callback = callback
self.args = args or tuple()
self.kwargs = kwargs or {}
self.repeat = repeat
self.next = time.time() + self.seconds
self.qpointer = qpointer
def run(self):
if self.qpointer is not None:
self.qpointer.put(('schedule', self.callback, self.args))
else:
self.callback(*self.args, **self.kwargs)
self.reset()
return self.repeat
def reset(self):
self.next = time.time() + self.seconds
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
self.thread = threading.Thread(name='shedulerprocess', target=self._process)
self.thread.start()
else:
self._process()
def _process(self):
self.run = True
while self.run:
try:
wait = 1
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Quitting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
def quit(self):
self.run = False

View File

@@ -78,6 +78,9 @@ class ElementBase(tostring.ToString):
def __iter__(self):
self.idx = 0
return self
def __bool__(self):
return True
def __next__(self):
self.idx += 1
@@ -319,6 +322,8 @@ class StanzaBase(ElementBase):
def __init__(self, stream=None, xml=None, stype=None, sto=None, sfrom=None, sid=None):
self.stream = stream
if stream is not None:
self.namespace = stream.default_ns
ElementBase.__init__(self, xml)
if stype is not None:
self['type'] = stype
@@ -326,13 +331,11 @@ class StanzaBase(ElementBase):
self['to'] = sto
if sfrom is not None:
self['from'] = sfrom
if stream is not None:
self.namespace = stream.default_ns
self.tag = "{%s}%s" % (self.namespace, self.name)
def setType(self, value):
if value in self.types:
self.xml.attrib['type'] = value
self.xml.attrib['type'] = value
return self
def getPayload(self):
@@ -340,15 +343,18 @@ class StanzaBase(ElementBase):
def setPayload(self, value):
self.xml.append(value)
return self
def delPayload(self):
self.clear()
return self
def clear(self):
for child in self.xml.getchildren():
self.xml.remove(child)
for plugin in list(self.plugins.keys()):
del self.plugins[plugin]
return self
def reply(self):
self['from'], self['to'] = self['to'], self['from']
@@ -357,6 +363,7 @@ class StanzaBase(ElementBase):
def error(self):
self['type'] = 'error'
return self
def getTo(self):
return JID(self._getAttr('to'))

View File

@@ -22,6 +22,7 @@ import time
import traceback
import types
import xml.sax.saxutils
from . import scheduler
HANDLER_THREADS = 1
@@ -76,6 +77,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@@ -151,6 +153,7 @@ class XMLStream(object):
raise RestartStream()
def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
th.setDaemon(True)
@@ -168,8 +171,8 @@ class XMLStream(object):
else:
self._process()
def schedule(self, seconds, handler, args=None):
threading.Timer(seconds, handler, args).start()
def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
def _process(self):
"Start processing the socket."
@@ -189,6 +192,7 @@ class XMLStream(object):
self.state.set('reconnect', False)
self.disconnect()
self.run = False
self.scheduler.run = False
self.eventqueue.put(('quit', None, None))
return
except CloseStream:
@@ -237,6 +241,7 @@ class XMLStream(object):
edepth += -1
if edepth == 0 and event == b'end':
self.disconnect(reconnect=self.state['reconnect'])
logging.debug("Ending readXML loop")
return False
elif edepth == 1:
#self.xmlin.put(xmlobj)
@@ -245,11 +250,13 @@ class XMLStream(object):
except RestartStream:
return True
except CloseStream:
logging.debug("Ending readXML loop")
return False
if root:
root.clear()
if event == b'start':
edepth += 1
logging.debug("Ending readXML loop")
def _sendThread(self):
while self.run:
@@ -279,6 +286,7 @@ class XMLStream(object):
logging.debug("Disconnecting...")
self.state.set('disconnecting', True)
self.run = False
self.scheduler.run = False
if self.state['connected']:
self.sendRaw(self.stream_footer)
time.sleep(1)
@@ -337,6 +345,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]
@@ -348,9 +359,10 @@ class XMLStream(object):
except Exception as e:
traceback.print_exc()
args[0].exception(e)
elif etype == 'sched':
elif etype == 'schedule':
try:
handler.run(*args)
logging.debug(args)
handler(*args[0])
except:
logging.error(traceback.format_exc())
elif etype == 'quit':