Standardize importing of queue class.

This will make it easier to enable gevent support.
This commit is contained in:
Lance Stout 2012-07-24 02:39:54 -07:00
parent 352ee2f2fd
commit 3e43b36a9d
8 changed files with 47 additions and 42 deletions

View File

@ -1,11 +1,8 @@
import socket import socket
import threading import threading
import logging import logging
try:
import queue
except ImportError:
import Queue as queue
from sleekxmpp.util import Queue
from sleekxmpp.exceptions import XMPPError from sleekxmpp.exceptions import XMPPError
@ -33,7 +30,7 @@ class IBBytestream(object):
self.stream_in_closed = threading.Event() self.stream_in_closed = threading.Event()
self.stream_out_closed = threading.Event() self.stream_out_closed = threading.Event()
self.recv_queue = queue.Queue() self.recv_queue = Queue()
self.send_window = threading.BoundedSemaphore(value=self.window_size) self.send_window = threading.BoundedSemaphore(value=self.window_size)
self.window_ids = set() self.window_ids = set()

View File

@ -8,10 +8,8 @@
import socket import socket
import threading import threading
try:
import queue from sleekxmpp.util import Queue
except ImportError:
import Queue as queue
class TestLiveSocket(object): class TestLiveSocket(object):
@ -39,8 +37,8 @@ class TestLiveSocket(object):
""" """
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.recv_buffer = [] self.recv_buffer = []
self.recv_queue = queue.Queue() self.recv_queue = Queue()
self.send_queue = queue.Queue() self.send_queue = Queue()
self.send_queue_lock = threading.Lock() self.send_queue_lock = threading.Lock()
self.recv_queue_lock = threading.Lock() self.recv_queue_lock = threading.Lock()
self.is_live = True self.is_live = True

View File

@ -7,10 +7,8 @@
""" """
import socket import socket
try:
import queue from sleekxmpp.util import Queue
except ImportError:
import Queue as queue
class TestSocket(object): class TestSocket(object):
@ -36,8 +34,8 @@ class TestSocket(object):
Same as arguments for socket.socket Same as arguments for socket.socket
""" """
self.socket = socket.socket(*args, **kwargs) self.socket = socket.socket(*args, **kwargs)
self.recv_queue = queue.Queue() self.recv_queue = Queue()
self.send_queue = queue.Queue() self.send_queue = Queue()
self.is_live = False self.is_live = False
self.disconnected = False self.disconnected = False

View File

@ -8,13 +8,10 @@
import unittest import unittest
from xml.parsers.expat import ExpatError from xml.parsers.expat import ExpatError
try:
import Queue as queue
except:
import queue
import sleekxmpp import sleekxmpp
from sleekxmpp import ClientXMPP, ComponentXMPP from sleekxmpp import ClientXMPP, ComponentXMPP
from sleekxmpp.util import Queue
from sleekxmpp.stanza import Message, Iq, Presence from sleekxmpp.stanza import Message, Iq, Presence
from sleekxmpp.test import TestSocket, TestLiveSocket from sleekxmpp.test import TestSocket, TestLiveSocket
from sleekxmpp.exceptions import XMPPError, IqTimeout, IqError from sleekxmpp.exceptions import XMPPError, IqTimeout, IqError
@ -338,7 +335,7 @@ class SleekTest(unittest.TestCase):
# We will use this to wait for the session_start event # We will use this to wait for the session_start event
# for live connections. # for live connections.
skip_queue = queue.Queue() skip_queue = Queue()
if socket == 'mock': if socket == 'mock':
self.xmpp.set_socket(TestSocket()) self.xmpp.set_socket(TestSocket())

View File

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
"""
sleekxmpp.util
~~~~~~~~~~~~~~
Part of SleekXMPP: The Sleek XMPP Library
:copyright: (c) 2012 Nathanael C. Fritz, Lance J.T. Stout
:license: MIT, see LICENSE for more details
"""
# =====================================================================
# Standardize import of Queue class:
try:
import queue
except ImportError:
import Queue as queue
Queue = queue.Queue
QueueEmpty = queue.Empty

View File

@ -10,11 +10,8 @@
""" """
import logging import logging
try:
import queue
except ImportError:
import Queue as queue
from sleekxmpp.util import Queue, QueueEmpty
from sleekxmpp.xmlstream.handler.base import BaseHandler from sleekxmpp.xmlstream.handler.base import BaseHandler
@ -37,7 +34,7 @@ class Waiter(BaseHandler):
def __init__(self, name, matcher, stream=None): def __init__(self, name, matcher, stream=None):
BaseHandler.__init__(self, name, matcher, stream=stream) BaseHandler.__init__(self, name, matcher, stream=stream)
self._payload = queue.Queue() self._payload = Queue()
def prerun(self, payload): def prerun(self, payload):
"""Store the matched stanza when received during processing. """Store the matched stanza when received during processing.
@ -74,7 +71,7 @@ class Waiter(BaseHandler):
try: try:
stanza = self._payload.get(True, 1) stanza = self._payload.get(True, 1)
break break
except queue.Empty: except QueueEmpty:
elapsed_time += 1 elapsed_time += 1
if elapsed_time >= timeout: if elapsed_time >= timeout:
log.warning("Timed out waiting for %s", self.name) log.warning("Timed out waiting for %s", self.name)

View File

@ -15,10 +15,8 @@
import time import time
import threading import threading
import logging import logging
try:
import queue from sleekxmpp.util import Queue, QueueEmpty
except ImportError:
import Queue as queue
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -102,7 +100,7 @@ class Scheduler(object):
def __init__(self, parentstop=None): def __init__(self, parentstop=None):
#: A queue for storing tasks #: A queue for storing tasks
self.addq = queue.Queue() self.addq = Queue()
#: A list of tasks in order of execution time. #: A list of tasks in order of execution time.
self.schedule = [] self.schedule = []
@ -157,7 +155,7 @@ class Scheduler(object):
elapsed < wait: elapsed < wait:
newtask = self.addq.get(True, 0.1) newtask = self.addq.get(True, 0.1)
elapsed += 0.1 elapsed += 0.1
except queue.Empty: except QueueEmpty:
cleanup = [] cleanup = []
self.schedule_lock.acquire() self.schedule_lock.acquire()
for task in self.schedule: for task in self.schedule:

View File

@ -26,14 +26,11 @@ import time
import random import random
import weakref import weakref
import uuid import uuid
try:
import queue
except ImportError:
import Queue as queue
from xml.parsers.expat import ExpatError from xml.parsers.expat import ExpatError
import sleekxmpp import sleekxmpp
from sleekxmpp.util import Queue, QueueEmpty
from sleekxmpp.thirdparty.statemachine import StateMachine from sleekxmpp.thirdparty.statemachine import StateMachine
from sleekxmpp.xmlstream import Scheduler, tostring, cert from sleekxmpp.xmlstream import Scheduler, tostring, cert
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase from sleekxmpp.xmlstream.stanzabase import StanzaBase, ET, ElementBase
@ -270,10 +267,10 @@ class XMLStream(object):
self.end_session_on_disconnect = True self.end_session_on_disconnect = True
#: A queue of stream, custom, and scheduled events to be processed. #: A queue of stream, custom, and scheduled events to be processed.
self.event_queue = queue.Queue() self.event_queue = Queue()
#: A queue of string data to be sent over the stream. #: A queue of string data to be sent over the stream.
self.send_queue = queue.Queue() self.send_queue = Queue()
self.send_queue_lock = threading.Lock() self.send_queue_lock = threading.Lock()
self.send_lock = threading.RLock() self.send_lock = threading.RLock()
@ -1586,7 +1583,7 @@ class XMLStream(object):
try: try:
wait = self.wait_timeout wait = self.wait_timeout
event = self.event_queue.get(True, timeout=wait) event = self.event_queue.get(True, timeout=wait)
except queue.Empty: except QueueEmpty:
event = None event = None
if event is None: if event is None:
continue continue
@ -1655,7 +1652,7 @@ class XMLStream(object):
else: else:
try: try:
data = self.send_queue.get(True, 1) data = self.send_queue.get(True, 1)
except queue.Empty: except QueueEmpty:
continue continue
log.debug("SEND: %s", data) log.debug("SEND: %s", data)
enc_data = data.encode('utf-8') enc_data = data.encode('utf-8')