XEP-0065: API changes
This commit is contained in:
		@@ -8,6 +8,46 @@ XEP-0065: SOCKS5 Bytestreams
 | 
			
		||||
    :members:
 | 
			
		||||
    :exclude-members: session_bind, plugin_init, plugin_end
 | 
			
		||||
 | 
			
		||||
Internal API methods
 | 
			
		||||
--------------------
 | 
			
		||||
 | 
			
		||||
The internal API is used here to authorize or pre-authorize streams.
 | 
			
		||||
 | 
			
		||||
.. glossary::
 | 
			
		||||
 | 
			
		||||
    authorized_sid (0065 version)
 | 
			
		||||
        - **jid**: :class:`~.JID` receiving the stream initiation.
 | 
			
		||||
        - **node**:  stream id
 | 
			
		||||
        - **ifrom**: who the stream is from.
 | 
			
		||||
        - **args**: :class:`~.Iq` of the stream request.
 | 
			
		||||
        - **returns**: ``True`` if the stream should be accepted,
 | 
			
		||||
          ``False`` otherwise.
 | 
			
		||||
 | 
			
		||||
        Check if the stream should be accepted. Uses
 | 
			
		||||
        the information setup by :term:`preauthorize_sid (0065 version)`
 | 
			
		||||
        by default.
 | 
			
		||||
 | 
			
		||||
    authorized (0065 version)
 | 
			
		||||
        - **jid**: :class:`~.JID` receiving the stream initiation.
 | 
			
		||||
        - **node**:  stream id
 | 
			
		||||
        - **ifrom**: who the stream is from.
 | 
			
		||||
        - **args**: :class:`~.Iq` of the stream request.
 | 
			
		||||
        - **returns**: ``True`` if the stream should be accepted,
 | 
			
		||||
          ``False`` otherwise.
 | 
			
		||||
 | 
			
		||||
        A fallback handler (run after :term:`authorized_sid (0065 version)`)
 | 
			
		||||
        to check if a stream should be accepted. Uses the ``auto_accept``
 | 
			
		||||
        parameter by default.
 | 
			
		||||
 | 
			
		||||
    preauthorize_sid (0065 version)
 | 
			
		||||
        - **jid**: :class:`~.JID` receiving the stream initiation.
 | 
			
		||||
        - **node**:  stream id
 | 
			
		||||
        - **ifrom**: who the stream will be from.
 | 
			
		||||
        - **args**: Unused.
 | 
			
		||||
 | 
			
		||||
        Register a stream id to be accepted automatically (called from
 | 
			
		||||
        other plugins such as XEP-0095).
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Stanza elements
 | 
			
		||||
---------------
 | 
			
		||||
 
 | 
			
		||||
@@ -8,7 +8,7 @@ from uuid import uuid4
 | 
			
		||||
from slixmpp.stanza import Iq
 | 
			
		||||
from slixmpp.exceptions import XMPPError
 | 
			
		||||
from slixmpp.xmlstream import register_stanza_plugin
 | 
			
		||||
from slixmpp.xmlstream.handler import Callback
 | 
			
		||||
from slixmpp.xmlstream.handler import CoroutineCallback
 | 
			
		||||
from slixmpp.xmlstream.matcher import StanzaPath
 | 
			
		||||
from slixmpp.plugins.base import BasePlugin
 | 
			
		||||
 | 
			
		||||
@@ -34,10 +34,11 @@ class XEP_0065(BasePlugin):
 | 
			
		||||
        self._sessions = {}
 | 
			
		||||
        self._preauthed_sids = {}
 | 
			
		||||
 | 
			
		||||
        self.xmpp.register_handler(
 | 
			
		||||
            Callback('Socks5 Bytestreams',
 | 
			
		||||
                     StanzaPath('iq@type=set/socks/streamhost'),
 | 
			
		||||
                     self._handle_streamhost))
 | 
			
		||||
        self.xmpp.register_handler(CoroutineCallback(
 | 
			
		||||
            'Socks5 Bytestreams',
 | 
			
		||||
             StanzaPath('iq@type=set/socks/streamhost'),
 | 
			
		||||
             self._handle_streamhost
 | 
			
		||||
        ))
 | 
			
		||||
 | 
			
		||||
        self.api.register(self._authorized, 'authorized', default=True)
 | 
			
		||||
        self.api.register(self._authorized_sid, 'authorized_sid', default=True)
 | 
			
		||||
@@ -158,13 +159,13 @@ class XEP_0065(BasePlugin):
 | 
			
		||||
        digest.update(str(target).encode('utf8'))
 | 
			
		||||
        return digest.hexdigest()
 | 
			
		||||
 | 
			
		||||
    def _handle_streamhost(self, iq):
 | 
			
		||||
    async def _handle_streamhost(self, iq):
 | 
			
		||||
        """Handle incoming SOCKS5 session request."""
 | 
			
		||||
        sid = iq['socks']['sid']
 | 
			
		||||
        if not sid:
 | 
			
		||||
            raise XMPPError(etype='modify', condition='bad-request')
 | 
			
		||||
 | 
			
		||||
        if not self._accept_stream(iq):
 | 
			
		||||
        if not await self._accept_stream(iq):
 | 
			
		||||
            raise XMPPError(etype='modify', condition='not-acceptable')
 | 
			
		||||
 | 
			
		||||
        streamhosts = iq['socks']['streamhosts']
 | 
			
		||||
@@ -180,39 +181,37 @@ class XEP_0065(BasePlugin):
 | 
			
		||||
                    streamhost['host'],
 | 
			
		||||
                    streamhost['port']))
 | 
			
		||||
 | 
			
		||||
        async def gather(futures, iq, streamhosts):
 | 
			
		||||
            proxies = await asyncio.gather(*futures, return_exceptions=True)
 | 
			
		||||
            for streamhost, proxy in zip(streamhosts, proxies):
 | 
			
		||||
                if isinstance(proxy, ValueError):
 | 
			
		||||
                    continue
 | 
			
		||||
                elif isinstance(proxy, socket.error):
 | 
			
		||||
                    log.error('Socket error while connecting to the proxy.')
 | 
			
		||||
                    continue
 | 
			
		||||
                proxy = proxy[1]
 | 
			
		||||
                # TODO: what if the future never happens?
 | 
			
		||||
                try:
 | 
			
		||||
                    addr, port = await proxy.connected
 | 
			
		||||
                except socket.error:
 | 
			
		||||
                    log.exception('Socket error while connecting to the proxy.')
 | 
			
		||||
                    continue
 | 
			
		||||
                # TODO: make a better choice than just the first working one.
 | 
			
		||||
                used_streamhost = streamhost['jid']
 | 
			
		||||
                conn = proxy
 | 
			
		||||
                break
 | 
			
		||||
            else:
 | 
			
		||||
                raise XMPPError(etype='cancel', condition='item-not-found')
 | 
			
		||||
        proxies = await asyncio.gather(*proxy_futures, return_exceptions=True)
 | 
			
		||||
        for streamhost, proxy in zip(streamhosts, proxies):
 | 
			
		||||
            if isinstance(proxy, ValueError):
 | 
			
		||||
                continue
 | 
			
		||||
            elif isinstance(proxy, socket.error):
 | 
			
		||||
                log.error('Socket error while connecting to the proxy.')
 | 
			
		||||
                continue
 | 
			
		||||
            proxy = proxy[1]
 | 
			
		||||
            # TODO: what if the future never happens?
 | 
			
		||||
            try:
 | 
			
		||||
                addr, port = await proxy.connected
 | 
			
		||||
            except socket.error:
 | 
			
		||||
                log.exception('Socket error while connecting to the proxy.')
 | 
			
		||||
                continue
 | 
			
		||||
            # TODO: make a better choice than just the first working one.
 | 
			
		||||
            used_streamhost = streamhost['jid']
 | 
			
		||||
            conn = proxy
 | 
			
		||||
            break
 | 
			
		||||
        else:
 | 
			
		||||
            raise XMPPError(etype='cancel', condition='item-not-found')
 | 
			
		||||
 | 
			
		||||
            # TODO: close properly the connection to the other proxies.
 | 
			
		||||
        # TODO: close properly the connection to the other proxies.
 | 
			
		||||
 | 
			
		||||
            iq = iq.reply()
 | 
			
		||||
            self._sessions[sid] = conn
 | 
			
		||||
            iq['socks']['sid'] = sid
 | 
			
		||||
            iq['socks']['streamhost_used']['jid'] = used_streamhost
 | 
			
		||||
            iq.send()
 | 
			
		||||
            self.xmpp.event('socks5_stream', conn)
 | 
			
		||||
            self.xmpp.event('stream:%s:%s' % (sid, requester), conn)
 | 
			
		||||
        iq = iq.reply()
 | 
			
		||||
        self._sessions[sid] = conn
 | 
			
		||||
        iq['socks']['sid'] = sid
 | 
			
		||||
        iq['socks']['streamhost_used']['jid'] = used_streamhost
 | 
			
		||||
        iq.send()
 | 
			
		||||
        self.xmpp.event('socks5_stream', conn)
 | 
			
		||||
        self.xmpp.event('stream:%s:%s' % (sid, requester), conn)
 | 
			
		||||
 | 
			
		||||
        asyncio.ensure_future(gather(proxy_futures, iq, streamhosts))
 | 
			
		||||
 | 
			
		||||
    def activate(self, proxy, sid, target, ifrom=None, timeout=None, callback=None):
 | 
			
		||||
        """Activate the socks5 session that has been negotiated."""
 | 
			
		||||
@@ -253,14 +252,14 @@ class XEP_0065(BasePlugin):
 | 
			
		||||
        factory = lambda: Socks5Protocol(dest, 0, self.xmpp.event)
 | 
			
		||||
        return self.xmpp.loop.create_connection(factory, proxy, proxy_port)
 | 
			
		||||
 | 
			
		||||
    def _accept_stream(self, iq):
 | 
			
		||||
    async def _accept_stream(self, iq):
 | 
			
		||||
        receiver = iq['to']
 | 
			
		||||
        sender = iq['from']
 | 
			
		||||
        sid = iq['socks']['sid']
 | 
			
		||||
 | 
			
		||||
        if self.api['authorized_sid'](receiver, sid, sender, iq):
 | 
			
		||||
        if await self.api['authorized_sid'](receiver, sid, sender, iq):
 | 
			
		||||
            return True
 | 
			
		||||
        return self.api['authorized'](receiver, sid, sender, iq)
 | 
			
		||||
        return await self.api['authorized'](receiver, sid, sender, iq)
 | 
			
		||||
 | 
			
		||||
    def _authorized(self, jid, sid, ifrom, iq):
 | 
			
		||||
        return self.auto_accept
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user