XEP-0047: use asyncio’s Queue implementation, to prevent any possibility of deadlock.
This commit is contained in:
parent
ac31913a65
commit
766d0dfd40
@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import socket
|
||||
import logging
|
||||
from queue import Queue
|
||||
|
||||
from slixmpp.stanza import Iq
|
||||
from slixmpp.exceptions import XMPPError
|
||||
@ -29,7 +29,7 @@ class IBBytestream(object):
|
||||
self.stream_in_closed = False
|
||||
self.stream_out_closed = False
|
||||
|
||||
self.recv_queue = Queue()
|
||||
self.recv_queue = asyncio.Queue()
|
||||
|
||||
def send(self, data):
|
||||
if not self.stream_started or self.stream_out_closed:
|
||||
@ -78,24 +78,19 @@ class IBBytestream(object):
|
||||
self.close()
|
||||
raise XMPPError('not-acceptable')
|
||||
|
||||
self.recv_queue.put(data)
|
||||
self.recv_queue.put_nowait(data)
|
||||
self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
|
||||
|
||||
if isinstance(stanza, Iq):
|
||||
stanza.reply().send()
|
||||
|
||||
def recv(self, *args, **kwargs):
|
||||
return self.read(block=True)
|
||||
return self.read()
|
||||
|
||||
def read(self, block=True, timeout=None, **kwargs):
|
||||
def read(self):
|
||||
if not self.stream_started or self.stream_in_closed:
|
||||
raise socket.error
|
||||
if timeout is not None:
|
||||
block = True
|
||||
try:
|
||||
return self.recv_queue.get(block, timeout)
|
||||
except:
|
||||
return None
|
||||
return self.recv_queue.get_nowait()
|
||||
|
||||
def close(self):
|
||||
iq = self.xmpp.Iq()
|
||||
@ -106,7 +101,7 @@ class IBBytestream(object):
|
||||
self.stream_out_closed = True
|
||||
def _close_stream(_):
|
||||
self.stream_in_closed = True
|
||||
iq.send(block=False, callback=_close_stream)
|
||||
iq.send(callback=_close_stream)
|
||||
self.xmpp.event('ibb_stream_end', self)
|
||||
|
||||
def _closed(self, iq):
|
||||
|
Loading…
Reference in New Issue
Block a user