Save progress on SI file transfer

This commit is contained in:
Lance Stout 2013-02-19 01:00:04 -08:00
parent 0a2737dc77
commit 3732139fc3
9 changed files with 474 additions and 32 deletions

View File

@ -21,10 +21,10 @@ class XEP_0047(BasePlugin):
dependencies = set(['xep_0030'])
stanza = stanza
default_config = {
'block_size': 4096,
'max_block_size': 8192,
'window_size': 1,
'auto_accept': True,
'accept_stream': None
'auto_accept': False,
}
def plugin_init(self):
@ -33,6 +33,9 @@ class XEP_0047(BasePlugin):
self.pending_close_streams = {}
self._stream_lock = threading.Lock()
self._preauthed_sids_lock = threading.Lock()
self._preauthed_sids = {}
register_stanza_plugin(Iq, Open)
register_stanza_plugin(Iq, Close)
register_stanza_plugin(Iq, Data)
@ -58,6 +61,10 @@ class XEP_0047(BasePlugin):
StanzaPath('message/ibb_data'),
self._handle_data))
self.api.register(self._authorized, 'authorized', default=True)
self.api.register(self._authorized_sid, 'authorized_sid', default=True)
self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True)
def plugin_end(self):
self.xmpp.remove_handler('IBB Open')
self.xmpp.remove_handler('IBB Close')
@ -69,17 +76,37 @@ class XEP_0047(BasePlugin):
self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb')
def _accept_stream(self, iq):
if self.accept_stream is not None:
return self.accept_stream(iq)
receiver = iq['to']
sender = iq['from']
sid = iq['ibb_open']['sid']
if self.api['authorized_sid'](receiver, sid, sender, iq):
return True
return self.api['authorized'](receiver, sid, sender, iq)
def _authorized(self, jid, sid, ifrom, iq):
if self.auto_accept:
if iq['ibb_open']['block_size'] <= self.max_block_size:
return True
return False
def open_stream(self, jid, block_size=4096, sid=None, window=1, use_messages=False,
def _authorized_sid(self, jid, sid, ifrom, iq):
with self._preauthed_sids_lock:
if (jid, sid, ifrom) in self._preauthed_sids:
del self._preauthed_sids[(jid, sid, ifrom)]
return True
return False
def _preauthorize_sid(self, jid, sid, ifrom, data):
with self._preauthed_sids_lock:
self._preauthed_sids[(jid, sid, ifrom)] = True
def open_stream(self, jid, block_size=None, sid=None, window=1, use_messages=False,
ifrom=None, block=True, timeout=None, callback=None):
if sid is None:
sid = str(uuid.uuid4())
if block_size is None:
block_size = self.block_size
iq = self.xmpp.Iq()
iq['type'] = 'set'
@ -90,7 +117,7 @@ class XEP_0047(BasePlugin):
iq['ibb_open']['stanza'] = 'iq'
stream = IBBytestream(self.xmpp, sid, block_size,
iq['to'], iq['from'], window,
iq['from'], iq['to'], window,
use_messages)
with self._stream_lock:
@ -118,11 +145,12 @@ class XEP_0047(BasePlugin):
with self._stream_lock:
stream = self.pending_streams.get(iq['id'], None)
if stream is not None:
stream.sender = iq['to']
stream.receiver = iq['from']
stream.self_jid = iq['to']
stream.peer_jid = iq['from']
stream.stream_started.set()
self.streams[stream.sid] = stream
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
with self._stream_lock:
if iq['id'] in self.pending_streams:
@ -130,15 +158,19 @@ class XEP_0047(BasePlugin):
def _handle_open_request(self, iq):
sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size']
size = iq['ibb_open']['block_size'] or self.block_size
if not sid:
raise XMPPError(etype='modify', condition='bad-request')
if not self._accept_stream(iq):
raise XMPPError('not-acceptable')
raise XMPPError(etype='modify', condition='not-acceptable')
if size > self.max_block_size:
raise XMPPError('resource-constraint')
stream = IBBytestream(self.xmpp, sid, size,
iq['from'], iq['to'],
iq['to'], iq['from'],
self.window_size)
stream.stream_started.set()
self.streams[sid] = stream
@ -146,11 +178,12 @@ class XEP_0047(BasePlugin):
iq.send()
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
def _handle_data(self, stanza):
sid = stanza['ibb_data']['sid']
stream = self.streams.get(sid, None)
if stream is not None and stanza['from'] != stream.sender:
if stream is not None and stanza['from'] == stream.peer_jid:
stream._recv_data(stanza)
else:
raise XMPPError('item-not-found')
@ -158,7 +191,7 @@ class XEP_0047(BasePlugin):
def _handle_close(self, iq):
sid = iq['ibb_close']['sid']
stream = self.streams.get(sid, None)
if stream is not None and iq['from'] != stream.sender:
if stream is not None and iq['from'] == stream.peer_jid:
stream._closed(iq)
else:
raise XMPPError('item-not-found')

View File

@ -12,15 +12,17 @@ log = logging.getLogger(__name__)
class IBBytestream(object):
def __init__(self, xmpp, sid, block_size, to, ifrom, window_size=1, use_messages=False):
def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False):
self.xmpp = xmpp
self.sid = sid
self.block_size = block_size
self.window_size = window_size
self.use_messages = use_messages
self.receiver = to
self.sender = ifrom
if jid is None:
jid = xmpp.boundjid
self.self_jid = jid
self.peer_jid = peer
self.send_seq = -1
self.recv_seq = -1
@ -50,8 +52,8 @@ class IBBytestream(object):
seq = self.send_seq
if self.use_messages:
msg = self.xmpp.Message()
msg['to'] = self.receiver
msg['from'] = self.sender
msg['to'] = self.peer_jid
msg['from'] = self.self_jid
msg['id'] = self.xmpp.new_id()
msg['ibb_data']['sid'] = self.sid
msg['ibb_data']['seq'] = seq
@ -61,8 +63,8 @@ class IBBytestream(object):
else:
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.receiver
iq['from'] = self.sender
iq['to'] = self.peer_jid
iq['from'] = self.self_jid
iq['ibb_data']['sid'] = self.sid
iq['ibb_data']['seq'] = seq
iq['ibb_data']['data'] = data
@ -121,8 +123,8 @@ class IBBytestream(object):
def close(self):
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['to'] = self.receiver
iq['from'] = self.sender
iq['to'] = self.peer_jid
iq['from'] = self.self_jid
iq['ibb_close']['sid'] = self.sid
self.stream_out_closed.set()
iq.send(block=False,
@ -132,9 +134,6 @@ class IBBytestream(object):
def _closed(self, iq):
self.stream_in_closed.set()
self.stream_out_closed.set()
while not self.window_empty.is_set():
log.info('waiting for send window to empty')
self.window_empty.wait(timeout=1)
iq.reply()
iq.send()
self.xmpp.event('ibb_stream_end', self)

View File

@ -25,6 +25,9 @@ class XEP_0065(base_plugin):
name = 'xep_0065'
description = "Socks5 Bytestreams"
dependencies = set(['xep_0030'])
default_config = {
'auto_accept': False
}
def plugin_init(self):
register_stanza_plugin(Iq, Socks5)
@ -33,11 +36,18 @@ class XEP_0065(base_plugin):
self._sessions = {}
self._sessions_lock = threading.Lock()
self._preauthed_sids_lock = threading.Lock()
self._preauthed_sids = {}
self.xmpp.register_handler(
Callback('Socks5 Bytestreams',
StanzaPath('iq@type=set/socks/streamhost'),
self._handle_streamhost))
self.api.register(self._authorized, 'authorized', default=True)
self.api.register(self._authorized_sid, 'authorized_sid', default=True)
self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True)
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature(Socks5.namespace)
@ -50,14 +60,15 @@ class XEP_0065(base_plugin):
"""Returns the socket associated to the SID."""
return self._sessions.get(sid, None)
def handshake(self, to, ifrom=None, timeout=None):
def handshake(self, to, ifrom=None, sid=None, timeout=None):
""" Starts the handshake to establish the socks5 bytestreams
connection.
"""
if not self._proxies:
self._proxies = self.discover_proxies()
sid = uuid4().hex
if sid is None:
sid = uuid4().hex
used = self.request_stream(to, sid=sid, ifrom=ifrom, timeout=timeout)
proxy = used['socks']['streamhost_used']['jid']
@ -72,10 +83,12 @@ class XEP_0065(base_plugin):
self.xmpp.boundjid,
to,
self._proxies[proxy][0],
self._proxies[proxy][1])
self._proxies[proxy][1],
peer=to)
# Request that the proxy activate the session with the target.
self.activate(proxy, sid, to, timeout=timeout)
self.xmpp.event('stream:%s:%s' % (sid, conn.peer_jid), conn)
return self.get_socket(sid)
def request_stream(self, to, sid=None, ifrom=None, block=True, timeout=None, callback=None):
@ -139,19 +152,24 @@ class XEP_0065(base_plugin):
"""Handle incoming SOCKS5 session request."""
sid = iq['socks']['sid']
if not sid:
raise XMPPError(etype='modify', condition='bad-request')
if not self._accept_stream(iq):
raise XMPPError(etype='modify', condition='not-acceptable')
streamhosts = iq['socks']['streamhosts']
conn = None
used_streamhost = None
sender = iq['from']
for streamhost in streamhosts:
try:
conn = self._connect_proxy(sid,
iq['from'],
sender,
self.xmpp.boundjid,
streamhost['host'],
streamhost['port'])
streamhost['port'],
peer=sender)
used_streamhost = streamhost['jid']
break
except socket.error:
@ -165,6 +183,8 @@ class XEP_0065(base_plugin):
iq['socks']['sid'] = sid
iq['socks']['streamhost_used']['jid'] = used_streamhost
iq.send()
self.xmpp.event('socks5_stream', conn)
self.xmpp.event('stream:%s:%s' % (sid, conn.peer_jid), conn)
def activate(self, proxy, sid, target, ifrom=None, block=True, timeout=None, callback=None):
"""Activate the socks5 session that has been negotiated."""
@ -191,7 +211,7 @@ class XEP_0065(base_plugin):
with self._sessions_lock:
self._sessions = {}
def _connect_proxy(self, sid, requester, target, proxy, proxy_port):
def _connect_proxy(self, sid, requester, target, proxy, proxy_port, peer=None):
""" Establishes a connection between the client and the server-side
Socks5 proxy.
@ -200,6 +220,8 @@ class XEP_0065(base_plugin):
target : The JID of the target. <str>
proxy_host : The hostname or the IP of the proxy. <str>
proxy_port : The port of the proxy. <str> or <int>
peer : The JID for the other side of the stream, regardless
of target or requester status.
"""
# Because the xep_0065 plugin uses the proxy_port as string,
# the Proxy class accepts the proxy_port argument as a string
@ -230,6 +252,34 @@ class XEP_0065(base_plugin):
_close()
sock.close = close
self.xmpp.event('socks_connected', sid)
sock.peer_jid = peer
sock.self_jid = target if requester == peer else requester
self.xmpp.event('socks_connected', sid)
return sock
def _accept_stream(self, iq):
receiver = iq['to']
sender = iq['from']
sid = iq['socks']['sid']
if self.api['authorized_sid'](receiver, sid, sender, iq):
return True
return self.api['authorized'](receiver, sid, sender, iq)
def _authorized(self, jid, sid, ifrom, iq):
return self.auto_accept
def _authorized_sid(self, jid, sid, ifrom, iq):
with self._preauthed_sids_lock:
log.debug('>>> authed sids: %s', self._preauthed_sids)
log.debug('>>> lookup: %s %s %s', jid, sid, ifrom)
if (jid, sid, ifrom) in self._preauthed_sids:
del self._preauthed_sids[(jid, sid, ifrom)]
return True
return False
def _preauthorize_sid(self, jid, sid, ifrom, data):
log.debug('>>>> %s %s %s %s', jid, sid, ifrom, data)
with self._preauthed_sids_lock:
self._preauthed_sids[(jid, sid, ifrom)] = True

View File

@ -0,0 +1,16 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.base import register_plugin
from sleekxmpp.plugins.xep_0095 import stanza
from sleekxmpp.plugins.xep_0095.stanza import SI
from sleekxmpp.plugins.xep_0095.stream_initiation import XEP_0095
register_plugin(XEP_0095)

View File

@ -0,0 +1,25 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.xmlstream import ElementBase
class SI(ElementBase):
name = 'si'
namespace = 'http://jabber.org/protocol/si'
plugin_attrib = 'si'
interfaces = set(['id', 'mime_type', 'profile'])
def get_mime_type(self):
return self._get_attr('mime-type', 'application/octet-stream')
def set_mime_type(self, value):
self._set_attr('mime-type', value)
def del_mime_type(self):
self._del_attr('mime-type')

View File

@ -0,0 +1,197 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
import threading
from uuid import uuid4
from sleekxmpp import Iq, Message
from sleekxmpp.exceptions import XMPPError
from sleekxmpp.plugins import BasePlugin
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.xmlstream import register_stanza_plugin, JID
from sleekxmpp.plugins.xep_0095 import stanza, SI
log = logging.getLogger(__name__)
SOCKS5 = 'http://jabber.org/protocol/bytestreams'
IBB = 'http://jabber.org/protocol/ibb'
class XEP_0095(BasePlugin):
name = 'xep_0095'
description = 'XEP-0095: Stream Initiation'
dependencies = set(['xep_0020', 'xep_0030', 'xep_0047', 'xep_0065'])
stanza = stanza
def plugin_init(self):
self._profiles = {}
self._methods = {}
self._pending_lock = threading.Lock()
self._pending= {}
self.register_method(SOCKS5, 'xep_0065')
self.register_method(IBB, 'xep_0047')
register_stanza_plugin(Iq, SI)
register_stanza_plugin(SI, self.xmpp['xep_0020'].stanza.FeatureNegotiation)
self.xmpp.register_handler(
Callback('SI Request',
StanzaPath('iq@type=set/si'),
self._handle_request))
self.api.register(self._add_pending, 'add_pending', default=True)
self.api.register(self._get_pending, 'get_pending', default=True)
self.api.register(self._del_pending, 'del_pending', default=True)
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature(SI.namespace)
def plugin_end(self):
self.xmpp.remove_handler('SI Request')
self.xmpp['xep_0030'].del_feature(feature=SI.namespace)
def register_profile(self, profile_name, plugin):
self._profiles[profile_name] = plugin
def unregister_profile(self, profile_name):
try:
del self._profiles[profile_name]
except KeyError:
pass
def register_method(self, method, plugin_name):
self._methods[method] = plugin_name
def _handle_request(self, iq):
profile = iq['si']['profile']
sid = iq['si']['id']
if not sid:
raise XMPPError(etype='modify', condition='bad-request')
if profile not in self._profiles:
raise XMPPError(
etype='modify',
condition='bad-request',
extension='bad-profile',
extension_ns=SI.namespace)
neg = iq['si']['feature_neg']['form']['fields']
options = neg['stream-method']['options'] or []
methods = []
for opt in options:
methods.append(opt['value'])
for method in methods:
if method in self._methods:
supported = True
break
else:
raise XMPPError('bad-request',
extension='no-valid-streams',
extension_ns=SI.namespace)
selected_method = SOCKS5 if SOCKS5 in methods else IBB
receiver = iq['to']
sender = iq['from']
self.api['add_pending'](receiver, sid, sender, {
'response_id': iq['id'],
'method': selected_method,
'profile': profile
})
self.xmpp.event('si_request', iq)
def offer(self, jid, sid=None, mime_type=None, profile=None,
methods=None, payload=None, ifrom=None,
**iqargs):
if sid is None:
sid = uuid4().hex
if methods is None:
methods = list(self._methods.keys())
if not isinstance(methods, (list, tuple, set)):
methods = [methods]
si = self.xmpp.Iq()
si['to'] = jid
si['from'] = ifrom
si['type'] = 'set'
si['si']['id'] = sid
si['si']['mime_type'] = mime_type
si['si']['profile'] = profile
if not isinstance(payload, (list, tuple, set)):
payload = [payload]
for item in payload:
si['si'].append(item)
si['si']['feature_neg']['form'].add_field(
var='stream-method',
ftype='list-single',
options=methods)
return si.send(**iqargs)
def accept(self, jid, sid, payload=None, ifrom=None, stream_handler=None):
stream = self.api['get_pending'](ifrom, sid, jid)
iq = self.xmpp.Iq()
iq['id'] = stream['response_id']
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'result'
if payload:
iq['si'].append(payload)
iq['si']['feature_neg']['form']['type'] = 'submit'
iq['si']['feature_neg']['form'].add_field(
var='stream-method',
ftype='list-single',
value=stream['method'])
if ifrom is None:
ifrom = self.xmpp.boundjid
method_plugin = self._methods[stream['method']]
self.xmpp[method_plugin].api['preauthorize_sid'](ifrom, sid, jid)
self.api['del_pending'](ifrom, sid, jid)
if stream_handler:
self.xmpp.add_event_handler('stream:%s:%s' % (sid, jid),
stream_handler,
threaded=True,
disposable=True)
return iq.send()
def decline(self, jid, sid, ifrom=None):
stream = self.api['get_pending'](ifrom, sid, jid)
if not stream:
return
iq = self.xmpp.Iq()
iq['id'] = stream['response_id']
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'error'
iq['error']['condition'] = 'forbidden'
iq['error']['text'] = 'Offer declined'
self.api['del_pending'](ifrom, sid, jid)
return iq.send()
def _add_pending(self, jid, node, ifrom, data):
with self._pending_lock:
self._pending[(jid, node, ifrom)] = data
def _get_pending(self, jid, node, ifrom, data):
with self._pending_lock:
return self._pending.get((jid, node, ifrom), None)
def _del_pending(self, jid, node, ifrom, data):
with self._pending_lock:
if (jid, node, ifrom) in self._pending:
del self._pending[(jid, node, ifrom)]

View File

@ -0,0 +1,16 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
from sleekxmpp.plugins.base import register_plugin
from sleekxmpp.plugins.xep_0096 import stanza
from sleekxmpp.plugins.xep_0096.stanza import File
from sleekxmpp.plugins.xep_0096.file_transfer import XEP_0096
register_plugin(XEP_0096)

View File

@ -0,0 +1,58 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import logging
from sleekxmpp import Iq, Message
from sleekxmpp.plugins import BasePlugin
from sleekxmpp.xmlstream.handler import Callback
from sleekxmpp.xmlstream.matcher import StanzaPath
from sleekxmpp.xmlstream import register_stanza_plugin, JID
from sleekxmpp.plugins.xep_0096 import stanza, File
log = logging.getLogger(__name__)
class XEP_0096(BasePlugin):
name = 'xep_0096'
description = 'XEP-0096: SI File Transfer'
dependencies = set(['xep_0095'])
stanza = stanza
def plugin_init(self):
register_stanza_plugin(self.xmpp['xep_0095'].stanza.SI, File)
self.xmpp['xep_0095'].register_profile(File.namespace, self)
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature(File.namespace)
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature=File.namespace)
self.xmpp['xep_0095'].unregister_profile(File.namespace, self)
def request_file_transfer(self, jid, sid=None, name=None, size=None,
desc=None, hash=None, date=None,
allow_ranged=False, mime_type=None,
**iqargs):
data = File()
data['name'] = name
data['size'] = size
data['date'] = date
data['desc'] = desc
if allow_ranged:
data.enable('range')
return self.xmpp['xep_0095'].offer(jid,
sid=sid,
mime_type=mime_type,
profile=File.namespace,
payload=data,
**iqargs)

View File

@ -0,0 +1,48 @@
"""
SleekXMPP: The Sleek XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of SleekXMPP.
See the file LICENSE for copying permission.
"""
import datetime as dt
from sleekxmpp.xmlstream import ElementBase, register_stanza_plugin
from sleekxmpp.plugins import xep_0082
class File(ElementBase):
name = 'file'
namespace = 'http://jabber.org/protocol/si/profile/file-transfer'
plugin_attrib = 'file'
interfaces = set(['name', 'size', 'date', 'hash', 'desc'])
sub_interfaces = set(['desc'])
def set_size(self, value):
self._set_attr('size', str(value))
def get_date(self):
timestamp = self._get_attr('date')
return xep_0082.parse(timestamp)
def set_date(self, value):
if isinstance(value, dt.datetime):
value = xep_0082.format_datetime(value)
self._set_attr('date', value)
class Range(ElementBase):
name = 'range'
namespace = 'http://jabber.org/protocol/si/profile/file-transfer'
plugin_attrib = 'range'
interfaces = set(['length', 'offset'])
def set_length(self, value):
self._set_attr('length', str(value))
def set_offset(self, value):
self._set_attr('offset', str(value))
register_stanza_plugin(File, Range)