Make tests pass for catching exceptions.

May now use sys.excepthook to catch exceptions
from threaded handlers.
This commit is contained in:
Lance Stout
2010-12-17 13:11:03 -05:00
parent 506eccf84d
commit 34c374a1e1
3 changed files with 47 additions and 16 deletions

View File

@@ -682,6 +682,7 @@ class XMLStream(object):
Event handlers and the send queue will be threaded
regardless of this parameter's value.
"""
self._thread_excepthook()
self.scheduler.process(threaded=True)
def start_thread(name, target):
@@ -954,3 +955,26 @@ class XMLStream(object):
self.disconnect()
self.event_queue.put(('quit', None, None))
return
def _thread_excepthook(self):
"""
If a threaded event handler raises an exception, there is no way to
catch it except with an excepthook. Currently, each thread has its own
excepthook, but ideally we could use the main sys.excepthook.
Modifies threading.Thread to use sys.excepthook when an exception
is not caught.
"""
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
init_old(self, *args, **kwargs)
run_old = self.run
def run_with_except_hook(*args, **kw):
try:
run_old(*args, **kw)
except (KeyboardInterrupt, SystemExit):
raise
except:
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init