merged a lot of fritzy's changes
This commit is contained in:
@@ -22,6 +22,7 @@ import time
|
||||
import traceback
|
||||
import types
|
||||
import xml.sax.saxutils
|
||||
from . import scheduler
|
||||
|
||||
HANDLER_THREADS = 1
|
||||
|
||||
@@ -76,6 +77,7 @@ class XMLStream(object):
|
||||
|
||||
self.eventqueue = queue.Queue()
|
||||
self.sendqueue = queue.Queue()
|
||||
self.scheduler = scheduler.Scheduler(self.eventqueue)
|
||||
|
||||
self.namespace_map = {}
|
||||
|
||||
@@ -151,7 +153,9 @@ class XMLStream(object):
|
||||
raise RestartStream()
|
||||
|
||||
def process(self, threaded=True):
|
||||
self.scheduler.process(threaded=True)
|
||||
for t in range(0, HANDLER_THREADS):
|
||||
<<<<<<< HEAD
|
||||
th = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
|
||||
th.setDaemon(True)
|
||||
self.__thread['eventhandle%s' % t] = th
|
||||
@@ -160,6 +164,13 @@ class XMLStream(object):
|
||||
th.setDaemon(True)
|
||||
self.__thread['sendthread'] = th
|
||||
th.start()
|
||||
=======
|
||||
logging.debug("Starting HANDLER THREAD")
|
||||
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
|
||||
self.__thread['eventhandle%s' % t].start()
|
||||
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
|
||||
self.__thread['sendthread'].start()
|
||||
>>>>>>> master
|
||||
if threaded:
|
||||
th = threading.Thread(name='process', target=self._process)
|
||||
th.setDaemon(True)
|
||||
@@ -168,8 +179,8 @@ class XMLStream(object):
|
||||
else:
|
||||
self._process()
|
||||
|
||||
def schedule(self, seconds, handler, args=None):
|
||||
threading.Timer(seconds, handler, args).start()
|
||||
def schedule(self, name, seconds, callback, args=None, kwargs=None, repeat=False):
|
||||
self.scheduler.add(name, seconds, callback, args, kwargs, repeat, qpointer=self.eventqueue)
|
||||
|
||||
def _process(self):
|
||||
"Start processing the socket."
|
||||
@@ -189,6 +200,7 @@ class XMLStream(object):
|
||||
self.state.set('reconnect', False)
|
||||
self.disconnect()
|
||||
self.run = False
|
||||
self.scheduler.run = False
|
||||
self.eventqueue.put(('quit', None, None))
|
||||
return
|
||||
except CloseStream:
|
||||
@@ -237,6 +249,7 @@ class XMLStream(object):
|
||||
edepth += -1
|
||||
if edepth == 0 and event == b'end':
|
||||
self.disconnect(reconnect=self.state['reconnect'])
|
||||
logging.debug("Ending readXML loop")
|
||||
return False
|
||||
elif edepth == 1:
|
||||
#self.xmlin.put(xmlobj)
|
||||
@@ -245,11 +258,13 @@ class XMLStream(object):
|
||||
except RestartStream:
|
||||
return True
|
||||
except CloseStream:
|
||||
logging.debug("Ending readXML loop")
|
||||
return False
|
||||
if root:
|
||||
root.clear()
|
||||
if event == b'start':
|
||||
edepth += 1
|
||||
logging.debug("Ending readXML loop")
|
||||
|
||||
def _sendThread(self):
|
||||
while self.run:
|
||||
@@ -279,6 +294,7 @@ class XMLStream(object):
|
||||
logging.debug("Disconnecting...")
|
||||
self.state.set('disconnecting', True)
|
||||
self.run = False
|
||||
self.scheduler.run = False
|
||||
if self.state['connected']:
|
||||
self.sendRaw(self.stream_footer)
|
||||
time.sleep(1)
|
||||
@@ -337,6 +353,9 @@ class XMLStream(object):
|
||||
event = self.eventqueue.get(True, timeout=5)
|
||||
except queue.Empty:
|
||||
event = None
|
||||
except KeyboardInterrupt:
|
||||
self.run = False
|
||||
self.scheduler.run = False
|
||||
if event is not None:
|
||||
etype = event[0]
|
||||
handler = event[1]
|
||||
@@ -348,9 +367,10 @@ class XMLStream(object):
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
args[0].exception(e)
|
||||
elif etype == 'sched':
|
||||
elif etype == 'schedule':
|
||||
try:
|
||||
handler.run(*args)
|
||||
logging.debug(args)
|
||||
handler(*args[0])
|
||||
except:
|
||||
logging.error(traceback.format_exc())
|
||||
elif etype == 'quit':
|
||||
|
||||
Reference in New Issue
Block a user