control-c fixes

This commit is contained in:
Nathan Fritz
2010-05-29 10:19:28 +08:00
committed by Thom Nichols
parent 3e5cdc8664
commit 257bcadd96
3 changed files with 52 additions and 42 deletions

View File

@@ -31,11 +31,12 @@ class Task(object):
class Scheduler(object):
"""Threaded scheduler that allows for updates mid-execution unlike http://docs.python.org/library/sched.html#module-sched"""
def __init__(self):
def __init__(self, parentqueue=None):
self.addq = queue.Queue()
self.schedule = []
self.thread = None
self.run = False
self.parentqueue = parentqueue
def process(self, threaded=True):
if threaded:
@@ -47,29 +48,37 @@ class Scheduler(object):
def _process(self):
self.run = True
while self.run:
wait = 5
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
wait = 5
updated = False
if self.schedule:
wait = self.schedule[0].next - time.time()
try:
if wait <= 0.0:
newtask = self.addq.get(False)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
newtask = self.addq.get(True, wait)
except queue.Empty:
cleanup = []
for task in self.schedule:
if time.time() >= task.next:
updated = True
if not task.run():
cleanup.append(task)
else:
break
for task in cleanup:
x = self.schedule.pop(self.schedule.index(task))
else:
updated = True
self.schedule.append(newtask)
finally:
if updated: self.schedule = sorted(self.schedule, key=lambda task: task.next)
except KeyboardInterrupt:
self.run = False
logging.debug("Qutting Scheduler thread")
if self.parentqueue is not None:
self.parentqueue.put(('quit', None, None))
def add(self, name, seconds, callback, args=None, kwargs=None, repeat=False, qpointer=None):
self.addq.put(Task(name, seconds, callback, args, kwargs, repeat, qpointer))

View File

@@ -76,7 +76,7 @@ class XMLStream(object):
self.eventqueue = queue.Queue()
self.sendqueue = queue.Queue()
self.scheduler = scheduler.Scheduler()
self.scheduler = scheduler.Scheduler(self.eventqueue)
self.namespace_map = {}
@@ -149,6 +149,7 @@ class XMLStream(object):
def process(self, threaded=True):
self.scheduler.process(threaded=True)
for t in range(0, HANDLER_THREADS):
logging.debug("Starting HANDLER THREAD")
self.__thread['eventhandle%s' % t] = threading.Thread(name='eventhandle%s' % t, target=self._eventRunner)
self.__thread['eventhandle%s' % t].start()
self.__thread['sendthread'] = threading.Thread(name='sendthread', target=self._sendThread)
@@ -333,6 +334,9 @@ class XMLStream(object):
event = self.eventqueue.get(True, timeout=5)
except queue.Empty:
event = None
except KeyboardInterrupt:
self.run = False
self.scheduler.run = False
if event is not None:
etype = event[0]
handler = event[1]