The xep_0065 plugin supports now multiple stream (multiple connected

sockets).

To send data over a stream, we need to pass the SID in order to
retrieve the good proxy thread (and so, the good socket).
This commit is contained in:
Sandro Munda
2012-06-04 19:39:48 +02:00
parent 44ee0633f2
commit 39505ae1ff

View File

@@ -27,6 +27,10 @@ class xep_0065(base_plugin):
dependencies = set(['xep_0030', ]) dependencies = set(['xep_0030', ])
xep = '0065' xep = '0065'
# A dict contains for each SID, the proxy thread currently
# running.
proxy_threads = {}
def plugin_init(self): def plugin_init(self):
""" Initializes the xep_0065 plugin and all event callbacks. """ Initializes the xep_0065 plugin and all event callbacks.
""" """
@@ -127,6 +131,9 @@ class xep_0065(base_plugin):
self.proxy_port, self.on_recv) self.proxy_port, self.on_recv)
self.proxy_thread.start() self.proxy_thread.start()
# Registers the new thread in the proxy_thread dict.
self.proxy_threads[sid] = self.proxy_thread
# Wait until the proxy is connected # Wait until the proxy is connected
self.proxy_thread.connected.wait() self.proxy_thread.connected.wait()
@@ -142,17 +149,20 @@ class xep_0065(base_plugin):
""" Handles all streamhost-used stanzas. """ Handles all streamhost-used stanzas.
""" """
# Sets the requester and the target. # Sets the SID, the requester and the target.
sid = iq['q']['sid']
requester = '%s' % self.xmpp.boundjid requester = '%s' % self.xmpp.boundjid
target = '%s' % iq['from'] target = '%s' % iq['from']
# The Requester will establish a connection to the SOCKS5 # The Requester will establish a connection to the SOCKS5
# proxy in the same way the Target did. # proxy in the same way the Target did.
self.proxy_thread = Proxy(iq['q']['sid'], requester, target, self.proxy_thread = Proxy(sid, requester, target, self.proxy_host,
self.proxy_host, self.proxy_port, self.proxy_port, self.on_recv)
self.on_recv)
self.proxy_thread.start() self.proxy_thread.start()
# Registers the new thread in the proxy_thread dict.
self.proxy_threads[sid] = self.proxy_thread
# Wait until the proxy is connected # Wait until the proxy is connected
self.proxy_thread.connected.wait() self.proxy_thread.connected.wait()
@@ -174,21 +184,34 @@ class xep_0065(base_plugin):
# Send the IQ. # Send the IQ.
act_iq.send() act_iq.send()
def send(self, msg): def send(self, sid, msg):
""" Sends the msg to the socket. """ Sends the msg to the socket.
sid : The SID to retrieve the good proxy stored in the
proxy_threads dict
msg : The message data. msg : The message data.
""" """
self.proxy_thread.send(msg) proxy = self.proxy_threads.get(sid)
if proxy:
proxy.send(msg)
else:
# TODO: raise an exception.
pass
def on_recv(self, data): def on_recv(self, sid, data):
""" A default behavior when socket are receiving data. """ Called when receiving data
This method should be overriden.
""" """
log.debug('Received data: %s' % data) if not data:
try:
del self.proxy_threads[sid]
except KeyError:
# TODO: internal error, raise an exception ?
pass
else:
log.debug('Received data: %s' % data)
self.xmpp.event('socks_recv', data)
class Proxy(Thread): class Proxy(Thread):
@@ -212,9 +235,6 @@ class Proxy(Thread):
# Initializes the thread. # Initializes the thread.
Thread.__init__(self) Thread.__init__(self)
# This thread is a daemon thread.
self.daemon = True
# Because the xep_0065 plugin uses the proxy_port as string, # Because the xep_0065 plugin uses the proxy_port as string,
# the Proxy class accepts the proxy_port argument as a string # the Proxy class accepts the proxy_port argument as a string
# or an integer. Here, we force to use the port as an integer. # or an integer. Here, we force to use the port as an integer.
@@ -252,26 +272,33 @@ class Proxy(Thread):
# The port MUST be 0. # The port MUST be 0.
self.s.connect((dest, 0)) self.s.connect((dest, 0))
log.info('Connected') log.info('Socket connected.')
self.connected.set() self.connected.set()
# Listen for data on the socket # Listen for data on the socket
self.listen() self.listen()
# Listen returns when the socket must be closed.
self.s.close()
log.info('Socket closed.')
def listen(self): def listen(self):
""" Listen for data on the socket. When receiving data, call """ Listen for data on the socket. When receiving data, call
the callback on_recv callable. the callback on_recv callable.
""" """
while True: socket_open = True
while socket_open:
ins, out, err = select([self.s, ], [], []) ins, out, err = select([self.s, ], [], [])
for s in ins: for s in ins:
data = self.recv_size(self.s) data = self.recv_size(self.s)
self.on_recv(data) if not data:
socket_open = False
self.on_recv(self.sid, data)
def recv_size(self, the_socket): def recv_size(self, the_socket):
# Data length is packed into 4 bytes.
total_len = 0 total_len = 0
total_data = [] total_data = []
size = sys.maxint size = sys.maxint
@@ -280,6 +307,9 @@ class Proxy(Thread):
while total_len < size: while total_len < size:
sock_data = the_socket.recv(recv_size) sock_data = the_socket.recv(recv_size)
if not sock_data:
return ''.join(total_data)
if not total_data: if not total_data:
if len(sock_data) > 4: if len(sock_data) > 4:
size_data += sock_data size_data += sock_data