Merge branch 'remove-more-extra-args' into 'master'

More cleanup of extra args, types, docs, and API

See merge request poezio/slixmpp!126
This commit is contained in:
Link Mauve 2021-02-13 20:25:57 +01:00
commit f50dfd6644
16 changed files with 509 additions and 168 deletions

View File

@ -8,6 +8,10 @@ XEP-0047: In-band Bytestreams
:members: :members:
:exclude-members: session_bind, plugin_init, plugin_end :exclude-members: session_bind, plugin_init, plugin_end
.. module:: slixmpp.plugins.xep_0047
.. autoclass:: IBBytestream
:members:
Stanza elements Stanza elements
--------------- ---------------

View File

@ -8,6 +8,12 @@ XEP-0363: HTTP File Upload
:members: :members:
:exclude-members: session_bind, plugin_init, plugin_end :exclude-members: session_bind, plugin_init, plugin_end
.. autoclass:: UploadServiceNotFound
.. autoclass:: FileTooBig
.. autoclass:: HTTPError
Stanza elements Stanza elements
--------------- ---------------

35
itests/test_bob.py Normal file
View File

@ -0,0 +1,35 @@
import asyncio
import unittest
from slixmpp.test.integration import SlixIntegration
class TestBOB(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(['xep_0231'])
self.data = b'to' * 257
await self.connect_clients()
async def test_bob(self):
"""Check we can send and receive a BOB."""
cid = self.clients[0]['xep_0231'].set_bob(
self.data,
'image/jpeg',
)
recv = await self.clients[1]['xep_0231'].get_bob(
jid=self.clients[0].boundjid,
cid=cid,
)
self.assertEqual(self.data, recv['bob']['data'])
suite = unittest.TestLoader().loadTestsFromTestCase(TestBOB)

37
itests/test_httpupload.py Normal file
View File

@ -0,0 +1,37 @@
try:
import aiohttp
except ImportError:
aiohttp = None
import unittest
from io import BytesIO
from slixmpp.test.integration import SlixIntegration
class TestHTTPUpload(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.register_plugins(['xep_0363'])
# Minimal data, we do not want to clutter the remote server
self.data = b'tototo'
await self.connect_clients()
@unittest.skipIf(aiohttp is None, "aiohttp is not installed")
async def test_httpupload(self):
"""Check we can upload a file properly."""
url = await self.clients[0]['xep_0363'].upload_file(
'toto.txt',
input_file=BytesIO(self.data),
size=len(self.data),
)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
text = await resp.text()
self.assertEqual(text.encode('utf-8'), self.data)
suite = unittest.TestLoader().loadTestsFromTestCase(TestHTTPUpload)

40
itests/test_ibb.py Normal file
View File

@ -0,0 +1,40 @@
import asyncio
import unittest
from slixmpp.test.integration import SlixIntegration
class TestIBB(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
config = {'block_size': 256, 'auto_accept': True}
self.register_plugins(['xep_0047'], [config])
self.data = b'to' * 257
await self.connect_clients()
async def test_ibb(self):
"""Check we can send and receive data through ibb"""
coro_in = self.clients[1].wait_until('ibb_stream_start')
coro_out = self.clients[0]['xep_0047'].open_stream(
self.clients[1].boundjid,
sid='toto'
)
instream, outstream = await asyncio.gather(coro_in, coro_out)
async def send_and_close():
await outstream.sendall(self.data)
await outstream.close()
in_data, _ = await asyncio.gather(instream.gather(), send_and_close())
self.assertEqual(self.data, in_data)
suite = unittest.TestLoader().loadTestsFromTestCase(TestIBB)

64
itests/test_pep.py Normal file
View File

@ -0,0 +1,64 @@
import asyncio
import unittest
from uuid import uuid4
from slixmpp.exceptions import IqError
from slixmpp.test.integration import SlixIntegration
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0060.stanza import Item
class Mystanza(ElementBase):
namespace = 'random-ns'
name = 'mystanza'
plugin_attrib = 'mystanza'
interfaces = {'test'}
register_stanza_plugin(Item, Mystanza)
class TestPEP(SlixIntegration):
async def asyncSetUp(self):
await super().asyncSetUp()
self.add_client(
self.envjid('CI_ACCOUNT1'),
self.envstr('CI_ACCOUNT1_PASSWORD'),
)
self.add_client(
self.envjid('CI_ACCOUNT2'),
self.envstr('CI_ACCOUNT2_PASSWORD'),
)
self.register_plugins(['xep_0222', 'xep_0223'])
for client in self.clients:
client.auto_authorize = True
await self.connect_clients()
async def test_pep_public(self):
"""Check we can get and set public PEP data"""
stanza = Mystanza()
stanza['test'] = str(uuid4().hex)
await self.clients[0]['xep_0222'].store(stanza, id='toto')
fetched = await self.clients[0]['xep_0222'].retrieve(
stanza.namespace,
)
fetched_stanza = fetched['pubsub']['items']['item']['mystanza']
self.assertEqual(fetched_stanza['test'], stanza['test'])
async def test_pep_private(self):
"""Check we can get and set private PEP data"""
stanza = Mystanza()
stanza['test'] = str(uuid4().hex)
await self.clients[0]['xep_0223'].store(
stanza, node='private-random', id='toto'
)
fetched = await self.clients[0]['xep_0223'].retrieve(
'private-random',
)
fetched_stanza = fetched['pubsub']['items']['item']['mystanza']
self.assertEqual(fetched_stanza['test'], stanza['test'])
with self.assertRaises(IqError):
fetched = await self.clients[1]['xep_0060'].get_item(
jid=self.clients[0].boundjid.bare,
node='private-random',
item_id='toto',
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestPEP)

View File

@ -1,8 +1,17 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import asyncio import asyncio
import uuid import uuid
import logging import logging
from slixmpp import Message, Iq from typing import (
Optional,
Union,
)
from slixmpp import JID
from slixmpp.stanza import Message, Iq
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
@ -15,9 +24,27 @@ log = logging.getLogger(__name__)
class XEP_0047(BasePlugin): class XEP_0047(BasePlugin):
"""
XEP-0047: In-Band Bytestreams
Events registered by this plugin:
- :term:`ibb_stream_start`
- :term:`ibb_stream_end`
- :term:`ibb_stream_data`
- :term:`stream:[stream id]:[peer jid]`
Plugin Parameters:
- ``block_size`` (default: ``4096``): default block size to negociate
- ``max_block_size`` (default: ``8192``): max block size to accept
- ``auto_accept`` (default: ``False``): if incoming streams should be
accepted automatically.
"""
name = 'xep_0047' name = 'xep_0047'
description = 'XEP-0047: In-band Bytestreams' description = 'XEP-0047: In-Band Bytestreams'
dependencies = {'xep_0030'} dependencies = {'xep_0030'}
stanza = stanza stanza = stanza
default_config = { default_config = {
@ -105,17 +132,29 @@ class XEP_0047(BasePlugin):
def _preauthorize_sid(self, jid, sid, ifrom, data): def _preauthorize_sid(self, jid, sid, ifrom, data):
self._preauthed_sids[(jid, sid, ifrom)] = True self._preauthed_sids[(jid, sid, ifrom)] = True
def open_stream(self, jid, block_size=None, sid=None, use_messages=False, async def open_stream(self, jid: JID, *, block_size: Optional[int] = None,
ifrom=None, timeout=None, callback=None): sid: Optional[str] = None, use_messages: bool = False,
ifrom: Optional[JID] = None,
**iqkwargs) -> IBBytestream:
"""Open an IBB stream with a peer JID.
.. versionchanged:: 1.8.0
This function is now a coroutine and must be awaited.
All parameters except ``jid`` are keyword-args only.
:param jid: The remote JID to initiate the stream with.
:param block_size: The block size to advertise.
:param sid: The IBB stream id (if not provided, will be auto-generated).
:param use_messages: If the stream should use message stanzas instead of iqs.
:returns: The opened byte stream with the remote JID
:raises .IqError: When the remote entity denied the stream.
"""
if sid is None: if sid is None:
sid = str(uuid.uuid4()) sid = str(uuid.uuid4())
if block_size is None: if block_size is None:
block_size = self.block_size block_size = self.block_size
iq = self.xmpp.Iq() iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
iq['type'] = 'set'
iq['to'] = jid
iq['from'] = ifrom
iq['ibb_open']['block_size'] = block_size iq['ibb_open']['block_size'] = block_size
iq['ibb_open']['sid'] = sid iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq' iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
@ -123,25 +162,21 @@ class XEP_0047(BasePlugin):
stream = IBBytestream(self.xmpp, sid, block_size, stream = IBBytestream(self.xmpp, sid, block_size,
iq['from'], iq['to'], use_messages) iq['from'], iq['to'], use_messages)
stream_future = asyncio.Future() callback = iqkwargs.pop('callback', None)
result = await iq.send(**iqkwargs)
def _handle_opened_stream(iq): log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from'])
log.debug('IBB stream (%s) accepted by %s', stream.sid, iq['from']) stream.self_jid = result['to']
stream.self_jid = iq['to'] stream.peer_jid = result['from']
stream.peer_jid = iq['from']
stream.stream_started = True stream.stream_started = True
self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream) self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
stream_future.set_result(stream)
if callback is not None: if callback is not None:
callback(stream) self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True)
self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream) self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
return stream
iq.send(timeout=timeout, callback=_handle_opened_stream) def _handle_open_request(self, iq: Iq):
return stream_future
def _handle_open_request(self, iq):
sid = iq['ibb_open']['sid'] sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size'] or self.block_size size = iq['ibb_open']['block_size'] or self.block_size
@ -165,7 +200,7 @@ class XEP_0047(BasePlugin):
self.xmpp.event('ibb_stream_start', stream) self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream) self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
def _handle_data(self, stanza): def _handle_data(self, stanza: Union[Iq, Message]):
sid = stanza['ibb_data']['sid'] sid = stanza['ibb_data']['sid']
stream = self.api['get_stream'](stanza['to'], sid, stanza['from']) stream = self.api['get_stream'](stanza['to'], sid, stanza['from'])
if stream is not None and stanza['from'] == stream.peer_jid: if stream is not None and stanza['from'] == stream.peer_jid:
@ -173,7 +208,7 @@ class XEP_0047(BasePlugin):
else: else:
raise XMPPError('item-not-found') raise XMPPError('item-not-found')
def _handle_close(self, iq): def _handle_close(self, iq: Iq):
sid = iq['ibb_close']['sid'] sid = iq['ibb_close']['sid']
stream = self.api['get_stream'](iq['to'], sid, iq['from']) stream = self.api['get_stream'](iq['to'], sid, iq['from'])
if stream is not None and iq['from'] == stream.peer_jid: if stream is not None and iq['from'] == stream.peer_jid:

View File

@ -1,3 +1,6 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import re import re
import base64 import base64

View File

@ -1,17 +1,32 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import asyncio import asyncio
import socket import socket
import logging import logging
from slixmpp.stanza import Iq from typing import (
from slixmpp.exceptions import XMPPError Optional,
IO,
Union,
)
from slixmpp import JID
from slixmpp.stanza import Iq, Message
from slixmpp.exceptions import XMPPError, IqTimeout
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
class IBBytestream(object): class IBBytestream(object):
"""XEP-0047 Stream abstraction. Created by the ibb plugin automatically.
def __init__(self, xmpp, sid, block_size, jid, peer, use_messages=False): Provides send methods and triggers :term:`ibb_stream_data` events.
"""
def __init__(self, xmpp, sid: str, block_size: int, jid: JID, peer: JID,
use_messages: bool = False):
self.xmpp = xmpp self.xmpp = xmpp
self.sid = sid self.sid = sid
self.block_size = block_size self.block_size = block_size
@ -31,7 +46,12 @@ class IBBytestream(object):
self.recv_queue = asyncio.Queue() self.recv_queue = asyncio.Queue()
async def send(self, data, timeout=None): async def send(self, data: bytes, timeout: Optional[int] = None) -> int:
"""Send a single block of data.
:param data: Data to send (will be truncated if above block size).
:returns: Number of bytes sent.
"""
if not self.stream_started or self.stream_out_closed: if not self.stream_started or self.stream_out_closed:
raise socket.error raise socket.error
if len(data) > self.block_size: if len(data) > self.block_size:
@ -58,19 +78,62 @@ class IBBytestream(object):
await iq.send(timeout=timeout) await iq.send(timeout=timeout)
return len(data) return len(data)
async def sendall(self, data, timeout=None): async def sendall(self, data: bytes, timeout: Optional[int] = None):
"""Send all the contents of ``data`` in chunks.
:param data: Raw data to send.
"""
sent_len = 0 sent_len = 0
while sent_len < len(data): while sent_len < len(data):
sent_len += await self.send(data[sent_len:self.block_size], timeout=timeout) sent_len += await self.send(data[sent_len:sent_len+self.block_size], timeout=timeout)
async def sendfile(self, file, timeout=None): async def gather(self, max_data: Optional[int] = None, timeout: int = 3600) -> bytes:
"""Gather all data sent on a stream until it is closed, and return it.
.. versionadded:: 1.8.0
:param max_data: Max number of bytes to receive. (received data may be
over this limit depending on block_size)
:param timeout: Timeout after which an error will be raised.
:raises .IqTimeout: If the timeout is reached.
:returns: All bytes accumulated in the stream.
"""
result = b''
end_future = asyncio.Future()
def on_close(stream):
if stream is self:
end_future.set_result(True)
def on_data(stream):
nonlocal result
if stream is self:
result += stream.read()
if max_data and len(result) > max_data:
end_future.set_result(True)
self.xmpp.add_event_handler('ibb_stream_end', on_close)
self.xmpp.add_event_handler('ibb_stream_data', on_data)
try:
await asyncio.wait_for(end_future, timeout, loop=self.xmpp.loop)
except asyncio.TimeoutError:
raise IqTimeout(result)
finally:
self.xmpp.del_event_handler('ibb_stream_end', on_close)
self.xmpp.del_event_handler('ibb_stream_data', on_data)
return result
async def sendfile(self, file: IO[bytes], timeout: Optional[int] = None):
"""Send the contents of a file over the wire, in chunks.
:param file: The opened file (or file-like) object, in bytes mode."""
while True: while True:
data = file.read(self.block_size) data = file.read(self.block_size)
if not data: if not data:
break break
await self.send(data, timeout=timeout) await self.send(data, timeout=timeout)
def _recv_data(self, stanza): def _recv_data(self, stanza: Union[Message, Iq]):
new_seq = stanza['ibb_data']['seq'] new_seq = stanza['ibb_data']['seq']
if new_seq != (self.recv_seq + 1) % 65536: if new_seq != (self.recv_seq + 1) % 65536:
self.close() self.close()
@ -96,7 +159,8 @@ class IBBytestream(object):
raise socket.error raise socket.error
return self.recv_queue.get_nowait() return self.recv_queue.get_nowait()
def close(self, timeout=None): def close(self, timeout: Optional[int] = None) -> asyncio.Future:
"""Close the stream."""
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
iq['type'] = 'set' iq['type'] = 'set'
iq['to'] = self.peer_jid iq['to'] = self.peer_jid
@ -109,7 +173,7 @@ class IBBytestream(object):
self.xmpp.event('ibb_stream_end', self) self.xmpp.event('ibb_stream_end', self)
return future return future
def _closed(self, iq): def _closed(self, iq: Iq):
self.stream_in_closed = True self.stream_in_closed = True
self.stream_out_closed = True self.stream_out_closed = True
iq.reply().send() iq.reply().send()

View File

@ -5,6 +5,7 @@
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
import logging import logging
from asyncio import Future
from typing import Optional, Callable, List from typing import Optional, Callable, List
from slixmpp import JID from slixmpp import JID
from slixmpp.xmlstream import register_stanza_plugin, ElementBase from slixmpp.xmlstream import register_stanza_plugin, ElementBase
@ -28,9 +29,11 @@ class XEP_0222(BasePlugin):
profile = {'pubsub#persist_items': True, profile = {'pubsub#persist_items': True,
'pubsub#send_last_published_item': 'never'} 'pubsub#send_last_published_item': 'never'}
def configure(self, node, ifrom=None, callback=None, timeout=None): def configure(self, node: str, **iqkwargs) -> Future:
""" """
Update a node's configuration to match the public storage profile. Update a node's configuration to match the public storage profile.
:param node: Node to set the configuration at.
""" """
config = self.xmpp['xep_0004'].Form() config = self.xmpp['xep_0004'].Form()
config['type'] = 'submit' config['type'] = 'submit'
@ -38,29 +41,26 @@ class XEP_0222(BasePlugin):
for field, value in self.profile.items(): for field, value in self.profile.items():
config.add_field(var=field, value=value) config.add_field(var=field, value=value)
return self.xmpp['xep_0060'].set_node_config(None, node, config, return self.xmpp['xep_0060'].set_node_config(
ifrom=ifrom, jid=None, node=node, config=config, **iqkwargs
callback=callback, )
timeout=timeout)
def store(self, stanza: ElementBase, node: Optional[str] = None, def store(self, stanza: ElementBase, node: Optional[str] = None,
id: Optional[str] = None, ifrom: Optional[JID] = None, id: Optional[str] = None, **pubsubkwargs) -> Future:
options: Optional[Form] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None):
""" """
Store public data via PEP. Store public data via PEP.
This is just a (very) thin wrapper around the XEP-0060 publish() This is just a (very) thin wrapper around the XEP-0060 publish()
method to set the defaults expected by PEP. method to set the defaults expected by PEP.
:param stanza: The private content to store. :param stanza: The public content to store.
:param node: The node to publish the content to. If not specified, :param node: The node to publish the content to. If not specified,
the stanza's namespace will be used. the stanza's namespace will be used.
:param id: Optionally specify the ID of the item. :param id: Optionally specify the ID of the item.
:param options: Publish options to use, which will be modified to :param options: Publish options to use, which will be modified to
fit the persistent storage option profile. fit the persistent storage option profile.
""" """
options = pubsubkwargs.pop('options', None)
if not options: if not options:
options = self.xmpp['xep_0004'].stanza.Form() options = self.xmpp['xep_0004'].stanza.Form()
options['type'] = 'submit' options['type'] = 'submit'
@ -75,17 +75,12 @@ class XEP_0222(BasePlugin):
options.add_field(var=field) options.add_field(var=field)
options.get_fields()[field]['value'] = value options.get_fields()[field]['value'] = value
return self.xmpp['xep_0163'].publish(stanza, node, pubsubkwargs['options'] = options
options=options,
ifrom=ifrom, return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs)
callback=callback,
timeout=timeout)
def retrieve(self, node: str, id: Optional[str] = None, def retrieve(self, node: str, id: Optional[str] = None,
item_ids: Optional[List[str]] = None, item_ids: Optional[List[str]] = None, **iqkwargs) -> Future:
ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None):
""" """
Retrieve public data via PEP. Retrieve public data via PEP.
@ -96,23 +91,17 @@ class XEP_0222(BasePlugin):
:param id: Optionally specify the ID of the item. :param id: Optionally specify the ID of the item.
:param item_ids: Specify a group of IDs. If id is also specified, it :param item_ids: Specify a group of IDs. If id is also specified, it
will be included in item_ids. will be included in item_ids.
:param ifrom: Specify the sender's JID.
:param timeout: The length of time (in seconds) to wait for a response
before exiting the send call if blocking is used.
Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT
:param callback: Optional reference to a stream handler function. Will
be executed when a reply stanza is received.
""" """
if item_ids is None: if item_ids is None:
item_ids = [] item_ids = []
if id is not None: if id is not None:
item_ids.append(id) item_ids.append(id)
return self.xmpp['xep_0060'].get_items(None, node, return self.xmpp['xep_0060'].get_items(
jid=None, node=node,
item_ids=item_ids, item_ids=item_ids,
ifrom=ifrom, **iqkwargs
callback=callback, )
timeout=timeout)
register_plugin(XEP_0222) register_plugin(XEP_0222)

View File

@ -5,6 +5,7 @@
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
import logging import logging
from asyncio import Future
from typing import Optional, Callable, List from typing import Optional, Callable, List
from slixmpp import JID from slixmpp import JID
from slixmpp.xmlstream import register_stanza_plugin, ElementBase from slixmpp.xmlstream import register_stanza_plugin, ElementBase
@ -28,28 +29,24 @@ class XEP_0223(BasePlugin):
profile = {'pubsub#persist_items': True, profile = {'pubsub#persist_items': True,
'pubsub#access_model': 'whitelist'} 'pubsub#access_model': 'whitelist'}
def configure(self, node, ifrom=None, callback=None, timeout=None): def configure(self, node: str, **iqkwargs) -> Future:
""" """
Update a node's configuration to match the public storage profile. Update a node's configuration to match the private storage profile.
:param node: Node to set the configuration at.
""" """
# TODO: that cannot possibly work, why is this here?
config = self.xmpp['xep_0004'].Form() config = self.xmpp['xep_0004'].Form()
config['type'] = 'submit' config['type'] = 'submit'
for field, value in self.profile.items(): for field, value in self.profile.items():
config.add_field(var=field, value=value) config.add_field(var=field, value=value)
return self.xmpp['xep_0060'].set_node_config(None, node, config, return self.xmpp['xep_0060'].set_node_config(
ifrom=ifrom, jid=None, node=node, config=config, **iqkwargs
callback=callback, )
timeout=timeout)
def store(self, stanza: ElementBase, node: Optional[str] = None, def store(self, stanza: ElementBase, node: Optional[str] = None,
id: Optional[str] = None, ifrom: Optional[JID] = None, id: Optional[str] = None, **pubsubkwargs) -> Future:
options: Optional[Form] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
""" """
Store private data via PEP. Store private data via PEP.
@ -63,6 +60,7 @@ class XEP_0223(BasePlugin):
:param options: Publish options to use, which will be modified to :param options: Publish options to use, which will be modified to
fit the persistent storage option profile. fit the persistent storage option profile.
""" """
options = pubsubkwargs.pop('options', None)
if not options: if not options:
options = self.xmpp['xep_0004'].stanza.Form() options = self.xmpp['xep_0004'].stanza.Form()
options['type'] = 'submit' options['type'] = 'submit'
@ -77,17 +75,11 @@ class XEP_0223(BasePlugin):
options.add_field(var=field) options.add_field(var=field)
options.get_fields()[field]['value'] = value options.get_fields()[field]['value'] = value
return self.xmpp['xep_0163'].publish(stanza, node, options=options, pubsubkwargs['options'] = options
ifrom=ifrom, callback=callback, return self.xmpp['xep_0163'].publish(stanza, node, id=id, **pubsubkwargs)
timeout=timeout,
timeout_callback=timeout_callback)
def retrieve(self, node: str, id: Optional[str] = None, def retrieve(self, node: str, id: Optional[str] = None,
item_ids: Optional[List[str]] = None, item_ids: Optional[List[str]] = None, **iqkwargs) -> Future:
ifrom: Optional[JID] = None,
callback: Optional[Callable] = None,
timeout: Optional[int] = None,
timeout_callback: Optional[Callable] = None):
""" """
Retrieve private data via PEP. Retrieve private data via PEP.
@ -98,22 +90,17 @@ class XEP_0223(BasePlugin):
:param id: Optionally specify the ID of the item. :param id: Optionally specify the ID of the item.
:param item_ids: Specify a group of IDs. If id is also specified, it :param item_ids: Specify a group of IDs. If id is also specified, it
will be included in item_ids. will be included in item_ids.
:param ifrom: Specify the sender's JID.
:param timeout: The length of time (in seconds) to wait for a response
before exiting the send call if blocking is used.
Defaults to slixmpp.xmlstream.RESPONSE_TIMEOUT
:param callback: Optional reference to a stream handler function. Will
be executed when a reply stanza is received.
""" """
if item_ids is None: if item_ids is None:
item_ids = [] item_ids = []
if id is not None: if id is not None:
item_ids.append(id) item_ids.append(id)
return self.xmpp['xep_0060'].get_items(None, node, return self.xmpp['xep_0060'].get_items(
item_ids=item_ids, ifrom=ifrom, jid=None, node=node,
callback=callback, timeout=timeout, item_ids=item_ids,
timeout_callback=timeout_callback) **iqkwargs
)
register_plugin(XEP_0223) register_plugin(XEP_0223)

View File

@ -1,4 +1,3 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2012 Nathanael C. Fritz, # Copyright (C) 2012 Nathanael C. Fritz,
# Emmanuel Gil Peyrot <linkmauve@linkmauve.fr> # Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
@ -7,7 +6,10 @@
import logging import logging
import hashlib import hashlib
from slixmpp import future_wrapper from asyncio import Future
from typing import Optional
from slixmpp import future_wrapper, JID
from slixmpp.stanza import Iq, Message, Presence from slixmpp.stanza import Iq, Message, Presence
from slixmpp.exceptions import XMPPError from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
@ -65,7 +67,20 @@ class XEP_0231(BasePlugin):
def session_bind(self, jid): def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('urn:xmpp:bob') self.xmpp['xep_0030'].add_feature('urn:xmpp:bob')
def set_bob(self, data, mtype, cid=None, max_age=None): def set_bob(self, data: bytes, mtype: str, cid: Optional[str] = None,
max_age: Optional[int] = None) -> str:
"""Register a blob of binary data as a BOB.
.. versionchanged:: 1.8.0
If ``max_age`` is specified, the registered data will be destroyed
after that time.
:param data: Data to register.
:param mtype: Mime Type of the data (e.g. ``image/jpeg``).
:param cid: Content-ID (will be auto-generated if left out).
:param max_age: Duration of content availability.
:returns: The cid value.
"""
if cid is None: if cid is None:
cid = 'sha1+%s@bob.xmpp.org' % hashlib.sha1(data).hexdigest() cid = 'sha1+%s@bob.xmpp.org' % hashlib.sha1(data).hexdigest()
@ -76,12 +91,24 @@ class XEP_0231(BasePlugin):
bob['max_age'] = max_age bob['max_age'] = max_age
self.api['set_bob'](args=bob) self.api['set_bob'](args=bob)
# Schedule destruction of the data
if max_age is not None and max_age > 0:
self.xmpp.loop.call_later(max_age, self.del_bob, cid)
return cid return cid
@future_wrapper @future_wrapper
def get_bob(self, jid=None, cid=None, cached=True, ifrom=None, def get_bob(self, jid: Optional[JID] = None, cid: Optional[str] = None,
timeout=None, callback=None): cached: bool = True, ifrom: Optional[JID] = None,
**iqkwargs) -> Future:
"""Get a BOB.
.. versionchanged:: 1.8.0
Results not in cache do not raise an error when ``cached`` is True.
:param jid: JID to fetch the BOB from.
:param cid: Content ID (actually required).
:param cached: To fetch the BOB from the local cache first (from CID only)
"""
if cached: if cached:
data = self.api['get_bob'](None, None, ifrom, args=cid) data = self.api['get_bob'](None, None, ifrom, args=cid)
if data is not None: if data is not None:
@ -91,17 +118,14 @@ class XEP_0231(BasePlugin):
return iq return iq
return data return data
iq = self.xmpp.Iq() iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
iq['to'] = jid
iq['from'] = ifrom
iq['type'] = 'get'
iq['bob']['cid'] = cid iq['bob']['cid'] = cid
return iq.send(timeout=timeout, callback=callback) return iq.send(**iqkwargs)
def del_bob(self, cid): def del_bob(self, cid: str):
self.api['del_bob'](args=cid) self.api['del_bob'](args=cid)
def _handle_bob_iq(self, iq): def _handle_bob_iq(self, iq: Iq):
cid = iq['bob']['cid'] cid = iq['bob']['cid']
if iq['type'] == 'result': if iq['type'] == 'result':
@ -131,7 +155,6 @@ class XEP_0231(BasePlugin):
def _get_bob(self, jid, node, ifrom, cid): def _get_bob(self, jid, node, ifrom, cid):
if cid in self._cids: if cid in self._cids:
return self._cids[cid] return self._cids[cid]
raise XMPPError('item-not-found')
def _del_bob(self, jid, node, ifrom, cid): def _del_bob(self, jid, node, ifrom, cid):
if cid in self._cids: if cid in self._cids:

View File

@ -5,7 +5,10 @@
# See the file LICENSE for copying permissio # See the file LICENSE for copying permissio
import logging import logging
import slixmpp from asyncio import Future
from typing import Optional
from slixmpp import JID
from slixmpp.stanza import Message, Iq from slixmpp.stanza import Message, Iq
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
@ -21,6 +24,11 @@ class XEP_0280(BasePlugin):
""" """
XEP-0280 Message Carbons XEP-0280 Message Carbons
Events triggered by this plugin:
- :term:`carbon_received`
- :term:`carbon_sent`
""" """
name = 'xep_0280' name = 'xep_0280'
@ -57,28 +65,22 @@ class XEP_0280(BasePlugin):
def session_bind(self, jid): def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature('urn:xmpp:carbons:2') self.xmpp.plugin['xep_0030'].add_feature('urn:xmpp:carbons:2')
def _handle_carbon_received(self, msg): def _handle_carbon_received(self, msg: Message):
if msg['from'].bare == self.xmpp.boundjid.bare: if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_received', msg) self.xmpp.event('carbon_received', msg)
def _handle_carbon_sent(self, msg): def _handle_carbon_sent(self, msg: Message):
if msg['from'].bare == self.xmpp.boundjid.bare: if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_sent', msg) self.xmpp.event('carbon_sent', msg)
def enable(self, ifrom=None, timeout=None, callback=None, def enable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
timeout_callback=None): """Enable carbons."""
iq = self.xmpp.Iq() iq = self.xmpp.make_iq_set(ifrom=ifrom)
iq['type'] = 'set'
iq['from'] = ifrom
iq.enable('carbon_enable') iq.enable('carbon_enable')
return iq.send(timeout_callback=timeout_callback, timeout=timeout, return iq.send(**iqkwargs)
callback=callback)
def disable(self, ifrom=None, timeout=None, callback=None, def disable(self, ifrom: Optional[JID] = None, **iqkwargs) -> Future:
timeout_callback=None): """Disable carbons."""
iq = self.xmpp.Iq() iq = self.xmpp.make_iq_set(ifrom=ifrom)
iq['type'] = 'set'
iq['from'] = ifrom
iq.enable('carbon_disable') iq.enable('carbon_disable')
return iq.send(timeout_callback=timeout_callback, timeout=timeout, return iq.send(**iqkwargs)
callback=callback)

View File

@ -1,4 +1,3 @@
# slixmpp: The Slick XMPP Library # slixmpp: The Slick XMPP Library
# Copyright (C) 2018 Emmanuel Gil Peyrot # Copyright (C) 2018 Emmanuel Gil Peyrot
# This file is part of slixmpp. # This file is part of slixmpp.
@ -6,6 +5,12 @@
from slixmpp.plugins.base import register_plugin from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0363.stanza import Request, Slot, Put, Get, Header from slixmpp.plugins.xep_0363.stanza import Request, Slot, Put, Get, Header
from slixmpp.plugins.xep_0363.http_upload import XEP_0363 from slixmpp.plugins.xep_0363.http_upload import (
XEP_0363,
UploadServiceNotFound,
FileTooBig,
HTTPError,
FileUploadError,
)
register_plugin(XEP_0363) register_plugin(XEP_0363)

View File

@ -1,18 +1,21 @@
""" # slixmpp: The Slick XMPP Library
slixmpp: The Slick XMPP Library # Copyright (C) 2018 Emmanuel Gil Peyrot
Copyright (C) 2018 Emmanuel Gil Peyrot # This file is part of slixmpp.
This file is part of slixmpp. # See the file LICENSE for copying permission.
See the file LICENSE for copying permission.
"""
import logging import logging
import os.path import os.path
from aiohttp import ClientSession from aiohttp import ClientSession
from asyncio import Future
from mimetypes import guess_type from mimetypes import guess_type
from typing import (
Optional,
IO,
)
from slixmpp import Iq, __version__ from slixmpp import JID, __version__
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
@ -25,19 +28,39 @@ class FileUploadError(Exception):
pass pass
class UploadServiceNotFound(FileUploadError): class UploadServiceNotFound(FileUploadError):
pass """
Raised if no upload service can be found.
"""
class FileTooBig(FileUploadError): class FileTooBig(FileUploadError):
"""
Raised if the file size is above advertised server limits.
args:
- size of the file
- max file size allowed
"""
def __str__(self): def __str__(self):
return 'File size too large: {} (max: {} bytes)' \ return 'File size too large: {} (max: {} bytes)' \
.format(self.args[0], self.args[1]) .format(self.args[0], self.args[1])
class HTTPError(FileUploadError): class HTTPError(FileUploadError):
"""
Raised when we receive an HTTP error response during upload.
args:
- HTTP Error code
- Content of the HTTP response
"""
def __str__(self): def __str__(self):
return 'Could not upload file: %d (%s)' % (self.args[0], self.args[1]) return 'Could not upload file: %d (%s)' % (self.args[0], self.args[1])
class XEP_0363(BasePlugin): class XEP_0363(BasePlugin):
''' This plugin only supports Python 3.5+ ''' """
XEP-0363: HTTP File Upload
"""
name = 'xep_0363' name = 'xep_0363'
description = 'XEP-0363: HTTP File Upload' description = 'XEP-0363: HTTP File Upload'
@ -62,9 +85,7 @@ class XEP_0363(BasePlugin):
self._handle_request)) self._handle_request))
def plugin_end(self): def plugin_end(self):
self._http_session.close()
self.xmpp.remove_handler('HTTP Upload Request') self.xmpp.remove_handler('HTTP Upload Request')
self.xmpp.remove_handler('HTTP Upload Slot')
self.xmpp['xep_0030'].del_feature(feature=Request.namespace) self.xmpp['xep_0030'].del_feature(feature=Request.namespace)
def session_bind(self, jid): def session_bind(self, jid):
@ -73,9 +94,14 @@ class XEP_0363(BasePlugin):
def _handle_request(self, iq): def _handle_request(self, iq):
self.xmpp.event('http_upload_request', iq) self.xmpp.event('http_upload_request', iq)
async def find_upload_service(self, domain=None, timeout=None): async def find_upload_service(self, domain: Optional[JID] = None, **iqkwargs) -> Optional[Iq]:
"""Find an upload service on a domain (our own by default).
:param domain: Domain to disco to find a service.
"""
results = await self.xmpp['xep_0030'].get_info_from_domain( results = await self.xmpp['xep_0030'].get_info_from_domain(
domain=domain, timeout=timeout) domain=domain, **iqkwargs
)
candidates = [] candidates = []
for info in results: for info in results:
@ -87,26 +113,49 @@ class XEP_0363(BasePlugin):
if feature == Request.namespace: if feature == Request.namespace:
return info return info
def request_slot(self, jid, filename, size, content_type=None, ifrom=None, def request_slot(self, jid: JID, filename: str, size: int,
timeout=None, callback=None, timeout_callback=None): content_type: Optional[str] = None, *,
iq = self.xmpp.Iq() ifrom: Optional[JID] = None, **iqkwargs) -> Future:
iq['to'] = jid """Request an HTTP upload slot from a service.
iq['from'] = ifrom
iq['type'] = 'get' :param jid: Service to request the slot from.
:param filename: Name of the file that will be uploaded.
:param size: size of the file in bytes.
:param content_type: Type of the file that will be uploaded.
"""
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
request = iq['http_upload_request'] request = iq['http_upload_request']
request['filename'] = filename request['filename'] = filename
request['size'] = str(size) request['size'] = str(size)
request['content-type'] = content_type or self.default_content_type request['content-type'] = content_type or self.default_content_type
return iq.send(timeout=timeout, callback=callback, return iq.send(**iqkwargs)
timeout_callback=timeout_callback)
async def upload_file(self, filename, size=None, content_type=None, *, async def upload_file(self, filename: str, size: Optional[int] = None,
input_file=None, ifrom=None, domain=None, timeout=None, content_type: Optional[str] = None, *,
callback=None, timeout_callback=None): input_file: Optional[IO[bytes]]=None,
''' Helper function which does all of the uploading process. ''' domain: Optional[JID] = None,
**iqkwargs) -> str:
'''Helper function which does all of the uploading discovery and
process.
:param filename: Path to the file to upload (or only the name if
``input_file`` is provided.
:param size: size of the file in bytes.
:param content_type: Type of the file that will be uploaded.
:param input_file: Binary file stream on the file.
:param domain: Domain to query to find an HTTP upload service.
:raises .UploadServiceNotFound: If slixmpp is unable to find an
an available upload service.
:raises .FileTooBig: If the filesize is above what is accepted by
the service.
:raises .HTTPError: If there is an error in the HTTP operation.
:returns: The URL of the uploaded file.
'''
timeout = iqkwargs.get('timeout', None)
if self.upload_service is None: if self.upload_service is None:
info_iq = await self.find_upload_service( info_iq = await self.find_upload_service(
domain=domain, timeout=timeout) domain=domain, **iqkwargs
)
if info_iq is None: if info_iq is None:
raise UploadServiceNotFound() raise UploadServiceNotFound()
self.upload_service = info_iq['from'] self.upload_service = info_iq['from']
@ -137,9 +186,7 @@ class XEP_0363(BasePlugin):
basename = os.path.basename(filename) basename = os.path.basename(filename)
slot_iq = await self.request_slot(self.upload_service, basename, size, slot_iq = await self.request_slot(self.upload_service, basename, size,
content_type, ifrom, timeout, content_type, **iqkwargs)
callback=callback,
timeout_callback=timeout_callback)
slot = slot_iq['http_upload_slot'] slot = slot_iq['http_upload_slot']
headers = { headers = {

View File

@ -14,7 +14,7 @@ class TestInBandByteStreams(SlixTest):
def tearDown(self): def tearDown(self):
self.stream_close() self.stream_close()
def testOpenStream(self): async def testOpenStream(self):
"""Test requesting a stream, successfully""" """Test requesting a stream, successfully"""
events = [] events = []
@ -25,7 +25,7 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
sid='testing') sid='testing')
self.send(""" self.send("""
@ -45,7 +45,7 @@ class TestInBandByteStreams(SlixTest):
self.assertEqual(events, ['ibb_stream_start']) self.assertEqual(events, ['ibb_stream_start'])
def testAysncOpenStream(self): async def testAysncOpenStream(self):
"""Test requesting a stream, aysnc""" """Test requesting a stream, aysnc"""
events = set() events = set()
@ -58,7 +58,7 @@ class TestInBandByteStreams(SlixTest):
self.xmpp.add_event_handler('ibb_stream_start', on_stream_start) self.xmpp.add_event_handler('ibb_stream_start', on_stream_start)
self.xmpp['xep_0047'].open_stream('tester@localhost/receiver', await self.xmpp['xep_0047'].open_stream('tester@localhost/receiver',
sid='testing', sid='testing',
callback=stream_callback) callback=stream_callback)