Initial, mostly working XEP-0047 plugin.
This is inspired by the version from macdiesel and tomstrummer, but their version was heavily linked with XEP-0096 and focused solely on file transfer. This version is a more generic implementation.
This commit is contained in:
		
							
								
								
									
										1
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								setup.py
									
									
									
									
									
								
							| @@ -59,6 +59,7 @@ packages     = [ 'sleekxmpp', | |||||||
|                  'sleekxmpp/plugins/xep_0009/stanza', |                  'sleekxmpp/plugins/xep_0009/stanza', | ||||||
|                  'sleekxmpp/plugins/xep_0030', |                  'sleekxmpp/plugins/xep_0030', | ||||||
|                  'sleekxmpp/plugins/xep_0030/stanza', |                  'sleekxmpp/plugins/xep_0030/stanza', | ||||||
|  |                  'sleekxmpp/plugins/xep_0047', | ||||||
|                  'sleekxmpp/plugins/xep_0050', |                  'sleekxmpp/plugins/xep_0050', | ||||||
|                  'sleekxmpp/plugins/xep_0059', |                  'sleekxmpp/plugins/xep_0059', | ||||||
|                  'sleekxmpp/plugins/xep_0060', |                  'sleekxmpp/plugins/xep_0060', | ||||||
|   | |||||||
							
								
								
									
										13
									
								
								sleekxmpp/plugins/xep_0047/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										13
									
								
								sleekxmpp/plugins/xep_0047/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,13 @@ | |||||||
|  | """ | ||||||
|  |     SleekXMPP: The Sleek XMPP Library | ||||||
|  |     Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout | ||||||
|  |     This file is part of SleekXMPP. | ||||||
|  |  | ||||||
|  |     See the file LICENSE for copying permission. | ||||||
|  | """ | ||||||
|  |  | ||||||
|  | from sleekxmpp.plugins.xep_0047 import stanza | ||||||
|  | from sleekxmpp.plugins.xep_0047.stanza import Open, Close, Data | ||||||
|  | from sleekxmpp.plugins.xep_0047.stream import IBBytestream | ||||||
|  | from sleekxmpp.plugins.xep_0047.ibb import xep_0047 | ||||||
|  |  | ||||||
							
								
								
									
										141
									
								
								sleekxmpp/plugins/xep_0047/ibb.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										141
									
								
								sleekxmpp/plugins/xep_0047/ibb.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,141 @@ | |||||||
|  | import uuid | ||||||
|  | import logging | ||||||
|  | import threading | ||||||
|  |  | ||||||
|  | from sleekxmpp import Message, Iq | ||||||
|  | from sleekxmpp.exceptions import XMPPError, IqError, IqTimeout | ||||||
|  | from sleekxmpp.xmlstream.handler import Callback | ||||||
|  | from sleekxmpp.xmlstream.matcher import StanzaPath | ||||||
|  | from sleekxmpp.xmlstream import register_stanza_plugin | ||||||
|  | from sleekxmpp.plugins.base import base_plugin | ||||||
|  | from sleekxmpp.plugins.xep_0047 import stanza, Open, Close, Data, IBBytestream | ||||||
|  |  | ||||||
|  |  | ||||||
|  | log = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class xep_0047(base_plugin): | ||||||
|  |  | ||||||
|  |     def plugin_init(self): | ||||||
|  |         self.xep = '0047' | ||||||
|  |         self.description = 'In-band Bytestreams' | ||||||
|  |         self.stanza = stanza | ||||||
|  |  | ||||||
|  |         self.streams = {} | ||||||
|  |         self.pending_streams = {} | ||||||
|  |         self.pending_close_streams = {} | ||||||
|  |         self._stream_lock = threading.Lock() | ||||||
|  |  | ||||||
|  |         self.max_block_size = self.config.get('max_block_size', 8192) | ||||||
|  |         self.window_size = self.config.get('window_size', 1) | ||||||
|  |         self.auto_accept = self.config.get('auto_accept', True) | ||||||
|  |         self.accept_stream = self.config.get('accept_stream', None) | ||||||
|  |  | ||||||
|  |         register_stanza_plugin(Iq, Open) | ||||||
|  |         register_stanza_plugin(Iq, Close) | ||||||
|  |         register_stanza_plugin(Iq, Data) | ||||||
|  |  | ||||||
|  |         self.xmpp.register_handler(Callback( | ||||||
|  |             'IBB Open', | ||||||
|  |             StanzaPath('iq@type=set/ibb_open'), | ||||||
|  |             self._handle_open_request)) | ||||||
|  |  | ||||||
|  |         self.xmpp.register_handler(Callback( | ||||||
|  |             'IBB Close', | ||||||
|  |             StanzaPath('iq@type=set/ibb_close'), | ||||||
|  |             self._handle_close)) | ||||||
|  |  | ||||||
|  |         self.xmpp.register_handler(Callback( | ||||||
|  |             'IBB Data', | ||||||
|  |             StanzaPath('iq@type=set/ibb_data'), | ||||||
|  |             self._handle_data)) | ||||||
|  |  | ||||||
|  |     def post_init(self): | ||||||
|  |         self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb') | ||||||
|  |  | ||||||
|  |     def _accept_stream(self, iq): | ||||||
|  |         if self.accept_stream is not None: | ||||||
|  |             return self.accept_stream(iq) | ||||||
|  |         if self.auto_accept: | ||||||
|  |             if iq['ibb_open']['block_size'] <= self.max_block_size: | ||||||
|  |                 return True | ||||||
|  |         return False | ||||||
|  |  | ||||||
|  |     def open_stream(self, jid, block_size=4096, sid=None, window=1, | ||||||
|  |                     ifrom=None, block=True, timeout=None, callback=None): | ||||||
|  |         if not block and callback is not None: | ||||||
|  |             callback = lambda resp: self._handle_opened_stream(resp, callback) | ||||||
|  |  | ||||||
|  |         if sid is None: | ||||||
|  |             sid = str(uuid.uuid4()) | ||||||
|  |  | ||||||
|  |         iq = self.xmpp.Iq() | ||||||
|  |         iq['type'] = 'set' | ||||||
|  |         iq['to'] = jid | ||||||
|  |         iq['from'] = ifrom | ||||||
|  |         iq['ibb_open']['block_size'] = block_size | ||||||
|  |         iq['ibb_open']['sid'] = sid | ||||||
|  |         iq['ibb_open']['stanza'] = 'iq' | ||||||
|  |  | ||||||
|  |         stream = IBBytestream(self.xmpp, sid, size,  | ||||||
|  |                               iq['to'], iq['from'], window) | ||||||
|  |          | ||||||
|  |         with self._stream_lock: | ||||||
|  |             self.pending_streams[iq['id']] = stream | ||||||
|  |                                                            | ||||||
|  |         resp = iq.send(block=block, timeout=timeout, callback=callback) | ||||||
|  |         if block: | ||||||
|  |             self._handle_opened_stream(resp) | ||||||
|  |             return stream | ||||||
|  |  | ||||||
|  |     def _handle_opened_stream(self, iq, callback=None): | ||||||
|  |         if iq['type'] == 'result': | ||||||
|  |             with self._stream_lock: | ||||||
|  |                 stream = self.pending_streams[iq['id']] | ||||||
|  |                 stream.sender = iq['to'] | ||||||
|  |                 stream.receiver = iq['from'] | ||||||
|  |                 stream.stream_started.set() | ||||||
|  |                 self.streams[stream.sid] = stream | ||||||
|  |                 self.xmpp.event('ibb_stream_start', stream) | ||||||
|  |  | ||||||
|  |         if callback is not None: | ||||||
|  |             callback(iq) | ||||||
|  |  | ||||||
|  |         with self._stream_lock: | ||||||
|  |             if iq['id'] in self.pending_streams: | ||||||
|  |                 del self.pending_streams[iq['id']] | ||||||
|  |  | ||||||
|  |     def _handle_open_request(self, iq): | ||||||
|  |         sid = iq['ibb_open']['sid'] | ||||||
|  |         size = iq['ibb_open']['block_size'] | ||||||
|  |         if not self._accept_stream(iq): | ||||||
|  |             raise XMPPError('not-acceptable') | ||||||
|  |  | ||||||
|  |         if size > self.max_block_size: | ||||||
|  |             raise XMPPError('resource-constraint') | ||||||
|  |  | ||||||
|  |         stream = IBBytestream(self.xmpp, sid, size,  | ||||||
|  |                               iq['from'], iq['to'], | ||||||
|  |                               self.window_size) | ||||||
|  |         stream.stream_started.set() | ||||||
|  |         self.streams[sid] = stream | ||||||
|  |         iq.reply() | ||||||
|  |         iq.send() | ||||||
|  |  | ||||||
|  |         self.xmpp.event('ibb_stream_start', stream) | ||||||
|  |  | ||||||
|  |     def _handle_data(self, iq): | ||||||
|  |         sid = iq['ibb_data']['sid'] | ||||||
|  |         stream = self.streams.get(sid, None) | ||||||
|  |         if stream is not None and iq['from'] != stream.sender: | ||||||
|  |             stream._recv_data(iq) | ||||||
|  |         else: | ||||||
|  |             raise XMPPError('item-not-found') | ||||||
|  |  | ||||||
|  |     def _handle_close(self, iq): | ||||||
|  |         sid = iq['ibb_close']['sid'] | ||||||
|  |         stream = self.streams.get(sid, None) | ||||||
|  |         if stream is not None and iq['from'] != stream.sender: | ||||||
|  |             stream._closed(iq) | ||||||
|  |         else: | ||||||
|  |             raise XMPPError('item-not-found') | ||||||
							
								
								
									
										63
									
								
								sleekxmpp/plugins/xep_0047/stanza.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								sleekxmpp/plugins/xep_0047/stanza.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,63 @@ | |||||||
|  | import re | ||||||
|  | import base64 | ||||||
|  |  | ||||||
|  | from sleekxmpp.exceptions import XMPPError | ||||||
|  | from sleekxmpp.xmlstream import register_stanza_plugin, ET, ElementBase | ||||||
|  | from sleekxmpp.thirdparty.suelta.util import bytes | ||||||
|  |  | ||||||
|  |  | ||||||
|  | VALID_B64 = re.compile(r'[A-Za-z0-9\+\/]*=*') | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def to_b64(data): | ||||||
|  |     return bytes(base64.b64encode(bytes(data))).decode('utf-8') | ||||||
|  |  | ||||||
|  | def from_b64(data): | ||||||
|  |     return bytes(base64.b64decode(bytes(data))).decode('utf-8') | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Open(ElementBase): | ||||||
|  |     name = 'open' | ||||||
|  |     namespace = 'http://jabber.org/protocol/ibb' | ||||||
|  |     plugin_attrib = 'ibb_open' | ||||||
|  |     interfaces = set(('block_size', 'sid', 'stanza')) | ||||||
|  |  | ||||||
|  |     def get_block_size(self): | ||||||
|  |         return int(self._get_attr('block-size')) | ||||||
|  |  | ||||||
|  |     def set_block_size(self, value): | ||||||
|  |         self._set_attr('block-size', str(value)) | ||||||
|  |  | ||||||
|  |     def del_block_size(self): | ||||||
|  |         self._del_attr('block-size') | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Data(ElementBase): | ||||||
|  |     name = 'data' | ||||||
|  |     namespace = 'http://jabber.org/protocol/ibb' | ||||||
|  |     plugin_attrib = 'ibb_data' | ||||||
|  |     interfaces = set(('seq', 'sid', 'data')) | ||||||
|  |     sub_interfaces = set(['data']) | ||||||
|  |  | ||||||
|  |     def get_seq(self): | ||||||
|  |         return int(self._get_attr('seq', '0')) | ||||||
|  |  | ||||||
|  |     def set_seq(self, value): | ||||||
|  |         self._set_attr('seq', str(value)) | ||||||
|  |  | ||||||
|  |     def get_data(self): | ||||||
|  |         b64_data = self._get_sub_text('data', '') | ||||||
|  |         if VALID_B64.match(b64_data).group() == b64_data: | ||||||
|  |             return from_b64(b64_data) | ||||||
|  |         else: | ||||||
|  |             raise XMPPError('not-acceptable') | ||||||
|  |  | ||||||
|  |     def set_data(self, value): | ||||||
|  |         self._set_sub_text('data', to_b64(value)) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class Close(ElementBase): | ||||||
|  |     name = 'close' | ||||||
|  |     namespace = 'http://jabber.org/protocol/ibb' | ||||||
|  |     plugin_attrib = 'ibb_close' | ||||||
|  |     interfaces = set(['sid']) | ||||||
							
								
								
									
										137
									
								
								sleekxmpp/plugins/xep_0047/stream.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										137
									
								
								sleekxmpp/plugins/xep_0047/stream.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,137 @@ | |||||||
|  | import socket | ||||||
|  | import threading | ||||||
|  | import logging | ||||||
|  | try: | ||||||
|  |     import queue | ||||||
|  | except ImportError: | ||||||
|  |     import Queue as queue | ||||||
|  |  | ||||||
|  | from sleekxmpp.exceptions import XMPPError | ||||||
|  |  | ||||||
|  |  | ||||||
|  | log = logging.getLogger(__name__) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class IBBytestream(object): | ||||||
|  |  | ||||||
|  |     def __init__(self, xmpp, sid, block_size, to, ifrom, window_size=1): | ||||||
|  |         self.xmpp = xmpp | ||||||
|  |         self.sid = sid | ||||||
|  |         self.block_size = block_size | ||||||
|  |         self.window_size = window_size | ||||||
|  |  | ||||||
|  |         self.receiver = to | ||||||
|  |         self.sender = ifrom | ||||||
|  |  | ||||||
|  |         self.send_seq = -1 | ||||||
|  |         self.recv_seq = -1 | ||||||
|  |  | ||||||
|  |         self._send_seq_lock = threading.Lock() | ||||||
|  |         self._recv_seq_lock = threading.Lock() | ||||||
|  |          | ||||||
|  |         self.stream_started = threading.Event() | ||||||
|  |         self.stream_in_closed = threading.Event() | ||||||
|  |         self.stream_out_closed = threading.Event() | ||||||
|  |  | ||||||
|  |         self.recv_queue = queue.Queue() | ||||||
|  |  | ||||||
|  |         self.send_window = threading.BoundedSemaphore(value=self.window_size) | ||||||
|  |         self.window_ids = set() | ||||||
|  |         self.window_empty = threading.Event() | ||||||
|  |         self.window_empty.set() | ||||||
|  |  | ||||||
|  |     def send(self, data): | ||||||
|  |         if not self.stream_started.is_set() or \ | ||||||
|  |                self.stream_out_closed.is_set(): | ||||||
|  |             raise socket.error | ||||||
|  |         data = data[0:self.block_size] | ||||||
|  |         self.send_window.acquire() | ||||||
|  |         with self._send_seq_lock: | ||||||
|  |             self.send_seq = (self.send_seq + 1) % 65535 | ||||||
|  |             seq = self.send_seq | ||||||
|  |         iq = self.xmpp.Iq() | ||||||
|  |         iq['type'] = 'set' | ||||||
|  |         iq['to'] = self.receiver | ||||||
|  |         iq['from'] = self.sender | ||||||
|  |         iq['ibb_data']['sid'] = self.sid | ||||||
|  |         iq['ibb_data']['seq'] = seq | ||||||
|  |         iq['ibb_data']['data'] = data  | ||||||
|  |         self.window_empty.clear() | ||||||
|  |         self.window_ids.add(iq['id']) | ||||||
|  |         iq.send(block=False, callback=self._recv_ack) | ||||||
|  |         return len(data) | ||||||
|  |  | ||||||
|  |     def sendall(self, data): | ||||||
|  |         sent_len = 0 | ||||||
|  |         while sent_len < len(data): | ||||||
|  |             sent_len += self.send(data[sent_len:]) | ||||||
|  |  | ||||||
|  |     def _recv_ack(self, iq): | ||||||
|  |         self.window_ids.remove(iq['id']) | ||||||
|  |         if not self.window_ids: | ||||||
|  |             self.window_empty.set() | ||||||
|  |         self.send_window.release() | ||||||
|  |         if iq['type'] == 'error': | ||||||
|  |             self.close() | ||||||
|  |  | ||||||
|  |     def _recv_data(self, iq): | ||||||
|  |         with self._recv_seq_lock: | ||||||
|  |             new_seq = iq['ibb_data']['seq'] | ||||||
|  |             if new_seq != (self.recv_seq + 1) % 65535: | ||||||
|  |                 self.close() | ||||||
|  |                 raise XMPPError('unexpected-request') | ||||||
|  |             self.recv_seq = new_seq | ||||||
|  |  | ||||||
|  |         data = iq['ibb_data']['data'] | ||||||
|  |         if len(data) > self.block_size: | ||||||
|  |             self.close() | ||||||
|  |             raise XMPPError('not-acceptable') | ||||||
|  |  | ||||||
|  |         self.recv_queue.put(data) | ||||||
|  |         self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data}) | ||||||
|  |         iq.reply() | ||||||
|  |         iq.send() | ||||||
|  |  | ||||||
|  |     def recv(self, *args, **kwargs): | ||||||
|  |         return self.read(block=True) | ||||||
|  |  | ||||||
|  |     def read(self, block=True, timeout=None, **kwargs): | ||||||
|  |         if not self.stream_started.is_set() or \ | ||||||
|  |                self.stream_in_closed.is_set(): | ||||||
|  |             raise socket.error | ||||||
|  |         if timeout is not None: | ||||||
|  |             block = True | ||||||
|  |         try: | ||||||
|  |             return self.recv_queue.get(block, timeout) | ||||||
|  |         except: | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |     def close(self): | ||||||
|  |         iq = self.xmpp.Iq() | ||||||
|  |         iq['type'] = 'set' | ||||||
|  |         iq['to'] = self.receiver | ||||||
|  |         iq['from'] = self.sender | ||||||
|  |         iq['ibb_close']['sid'] = self.sid | ||||||
|  |         self.stream_out_closed.set() | ||||||
|  |         iq.send(block=False,  | ||||||
|  |                 callback=lambda x: self.stream_in_closed.set()) | ||||||
|  |         self.xmpp.event('ibb_stream_end', self) | ||||||
|  |  | ||||||
|  |     def _closed(self, iq): | ||||||
|  |         self.stream_in_closed.set() | ||||||
|  |         self.stream_out_closed.set() | ||||||
|  |         while not self.window_empty.is_set(): | ||||||
|  |             log.info('waiting for send window to empty') | ||||||
|  |             self.window_empty.wait(timeout=1) | ||||||
|  |         iq.reply() | ||||||
|  |         iq.send() | ||||||
|  |         self.xmpp.event('ibb_stream_end', self) | ||||||
|  |  | ||||||
|  |     def makefile(self, *args, **kwargs): | ||||||
|  |         return self | ||||||
|  |  | ||||||
|  |     def connect(*args, **kwargs): | ||||||
|  |         return None | ||||||
|  |  | ||||||
|  |     def shutdown(self, *args, **kwargs): | ||||||
|  |         return None | ||||||
		Reference in New Issue
	
	Block a user
	 Lance Stout
					Lance Stout