added pubsub state stanzas and scheduled events
This commit is contained in:
@@ -4,6 +4,7 @@ except ImportError:
|
||||
import Queue as queue
|
||||
import time
|
||||
import threading
|
||||
import logging
|
||||
|
||||
class Task(object):
|
||||
"""Task object for the Scheduler class"""
|
||||
@@ -34,7 +35,7 @@ class Scheduler(object):
|
||||
self.addq = queue.Queue()
|
||||
self.schedule = []
|
||||
self.thread = None
|
||||
self.run = True
|
||||
self.run = False
|
||||
|
||||
def process(self, threaded=True):
|
||||
if threaded:
|
||||
@@ -44,6 +45,7 @@ class Scheduler(object):
|
||||
self._process()
|
||||
|
||||
def _process(self):
|
||||
self.run = True
|
||||
while self.run:
|
||||
wait = 5
|
||||
updated = False
|
||||
@@ -67,7 +69,7 @@ class Scheduler(object):
|
||||
self.schedule.append(newtask)
|
||||
finally:
|
||||
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
|
||||
print [x.name for x in self.schedule]
|
||||
logging.debug("Qutting Scheduler thread")
|
||||
|
||||
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
|
||||
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))
|
||||
|
@@ -319,6 +319,8 @@ class StanzaBase(ElementBase):
|
||||
|
||||
def __init__(self, stream=None, xml=None, stype=None, sto=None, sfrom=None, sid=None):
|
||||
self.stream = stream
|
||||
if stream is not None:
|
||||
self.namespace = stream.default_ns
|
||||
ElementBase.__init__(self, xml)
|
||||
if stype is not None:
|
||||
self['type'] = stype
|
||||
@@ -326,8 +328,6 @@ class StanzaBase(ElementBase):
|
||||
self['to'] = sto
|
||||
if sfrom is not None:
|
||||
self['from'] = sfrom
|
||||
if stream is not None:
|
||||
self.namespace = stream.default_ns
|
||||
self.tag = "{%s}%s" % (self.namespace, self.name)
|
||||
|
||||
def setType(self, value):
|
||||
|
@@ -180,6 +180,7 @@ class XMLStream(object):
|
||||
self.state.set('reconnect', False)
|
||||
self.disconnect()
|
||||
self.run = False
|
||||
self.scheduler.run = False
|
||||
self.eventqueue.put(('quit', None, None))
|
||||
return
|
||||
except CloseStream:
|
||||
@@ -226,6 +227,7 @@ class XMLStream(object):
|
||||
edepth += -1
|
||||
if edepth == 0 and event == b'end':
|
||||
self.disconnect(reconnect=self.state['reconnect'])
|
||||
logging.debug("Ending readXML loop")
|
||||
return False
|
||||
elif edepth == 1:
|
||||
#self.xmlin.put(xmlobj)
|
||||
@@ -234,11 +236,13 @@ class XMLStream(object):
|
||||
except RestartStream:
|
||||
return True
|
||||
except CloseStream:
|
||||
logging.debug("Ending readXML loop")
|
||||
return False
|
||||
if root:
|
||||
root.clear()
|
||||
if event == b'start':
|
||||
edepth += 1
|
||||
logging.debug("Ending readXML loop")
|
||||
|
||||
def _sendThread(self):
|
||||
while self.run:
|
||||
@@ -268,6 +272,7 @@ class XMLStream(object):
|
||||
logging.debug("Disconnecting...")
|
||||
self.state.set('disconnecting', True)
|
||||
self.run = False
|
||||
self.scheduler.run = False
|
||||
if self.state['connected']:
|
||||
self.sendRaw(self.stream_footer)
|
||||
time.sleep(1)
|
||||
@@ -339,7 +344,8 @@ class XMLStream(object):
|
||||
args[0].exception(e)
|
||||
elif etype == 'schedule':
|
||||
try:
|
||||
handler.run(*args)
|
||||
logging.debug(args)
|
||||
handler(*args[0])
|
||||
except:
|
||||
logging.error(traceback.format_exc())
|
||||
elif etype == 'quit':
|
||||
|
Reference in New Issue
Block a user