Compare commits

..

2 Commits

Author SHA1 Message Date
nicoco
56004802fa Merge branch 'master' into xep356-iq 2023-12-19 14:13:35 +00:00
Nicolas Cedilnik
8bfe6177f4 xep0356: implement IQ privilege
Also included:

- correctly handle privileges from different
  servers
- check that privileges have been granted before
  attempting to send something and raise
  PermissionError if not
- use dataclass and enums to store permissions instead of
  untyped dict
2023-07-23 15:38:43 +02:00
45 changed files with 577 additions and 1224 deletions

5
.gitignore vendored
View File

@@ -15,8 +15,3 @@ slixmpp.egg-info/
.idea/
.vscode/
venv/
# Added by cargo
/target
/Cargo.lock

View File

@@ -1,6 +0,0 @@
steps:
mypy:
image: python:3
commands:
- pip3 install mypy types-setuptools
- mypy slixmpp

View File

@@ -1,10 +0,0 @@
steps:
test_integration:
image: "python:3.11"
secrets: [ci_account1, ci_account1_password, ci_account2, ci_account2_password, ci_muc_server]
commands:
- apt-get update
- apt-get install -y python3-pip cython3 gpg idn libidn-dev
- pip3 install emoji aiohttp aiodns
- python3 setup.py build_ext --inplace
- ./run_integration_tests.py

View File

@@ -1,17 +0,0 @@
steps:
unit_tests:
image: "python:${TAG}"
commands:
- apt-get update
- apt-get install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
matrix:
TAG:
- "3.7"
- "3.9"
- "3.8"
- "3.10"
- "3.11"
- "3.12"

View File

@@ -1,13 +0,0 @@
[package]
name = "slixmpp"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
jid = "0.10"
pyo3 = "0.21"
[lib]
crate-type = ["cdylib"]

View File

@@ -1064,12 +1064,5 @@
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.4.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.5</revision>
<created>2024-02-02</created>
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.5.tar.gz"/>
</Version>
</release>
</Project>
</rdf:RDF>

View File

@@ -1,95 +0,0 @@
Projects Using Slixmpp
======================
Applications
------------
sendxmpp-py
~~~~~~~~~~~
sendxmpp is a command line program and is the XMPP equivalent of sendmail. It is a Python version of the original sendxmpp which is written in Perl.
- `Source <https://github.com/moparisthebest/sendxmpp-py>`_
Bots
----
BotLogMauve
~~~~~~~~~~~
XMPP bot which logs groupchat messages. Logs are in text format, with one file per day and per groupchat.
- `Source <https://git.khaganat.net/khaganat/BotLogMauve>`_
LinkBot
~~~~~~~
This bot reveals the title of any shared link in a groupchat for quick content insight.
- `Source <https://git.xmpp-it.net/mario/XMPPBot>`_
llama-bot
~~~~~~~~~
Llama-bot enables engaging communication with the LLM (large language model) of llama.cpp, providing seamless and dynamic conversation with it.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://github.com/decent-im/llama-bot>`_
- `Demo <xmpp:llama@decent.im?message>`_
Morbot
~~~~~~
Morbot is a simple Slixmpp bot that will take new articles from listed RSS feeds and send them to assigned XMPP MUCs.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/Morbot>`_
Slixfeed
~~~~~~~~
Slixfeed aims to be an easy to use and fully-featured news aggregator bot for XMPP. It provides a convenient access to Blogs, Fediverse and News websites along with filtering functionality.
- `Groupchat <xmpp:slixfeed@chat.woodpeckersnest.space?join>`_
- `Source <https://gitgud.io/sjehuda/slixfeed>`_
sms4you
~~~~~~~
sms4you forwards messages from and to SMS and connects either with sms4you-xmpp or sms4you-email to choose the other mean of communication. Nice for receiving or sending SMS, independently from carrying a SIM card.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Homepage <https://sms4you-team.pages.debian.net/sms4you/>`_
- `Source <https://salsa.debian.org/sms4you-team/sms4you>`_
Stable Diffusion
~~~~~~~~~~~~~~~~
XMPP bot that generates digital images from textual descriptions.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Source <https://www.nicoco.fr/blog/2022/08/31/xmpp-bot-stable-diffusion/>`_
WhisperBot
~~~~~~~~~~
XMPP bot that transliterates audio messages using OpenAI's Whisper libraries.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/WhisperBot>`_
XMPP MUC Message Gateway
~~~~~~~~~~~~~~~~~~~~~~~~
A multipurpose JSON forwarder microservice from HTTP POST to XMPP MUC room over TLSv1.2 with SliXMPP.
- `Source <https://github.com/immanuelfodor/xmpp-muc-message-gateway>`_
Services
--------
AtomToPubsub
~~~~~~~~~~~~
AtomToPubsub is a simple Python script that parses Atom + RSS feeds and pushes the entries to a designated XMPP Pubsub Node.
- `Groupchat <xmpp:movim@conference.movim.eu?join>`_
- `Source <https://github.com/imattau/atomtopubsub>`_
Slidge
~~~~~~
Slidge is a general purpose XMPP gateway framework in Python.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Homepage <https://slidge.im/core/>`_
- `Source <https://sr.ht/~nicoco/slidge>`_

View File

@@ -10,7 +10,7 @@ UNIQUE = uuid4().hex
class TestMUC(SlixIntegration):
async def asyncSetUp(self):
self.mucserver = self.envjid('CI_MUC_SERVER', default='chat.jabberfr.org')
self.mucserver = self.envjid('CI_MUC_SERVER')
self.muc = JID('%s@%s' % (UNIQUE, self.mucserver))
self.add_client(
self.envjid('CI_ACCOUNT1'),

View File

@@ -138,8 +138,8 @@ class ClientXMPP(BaseXMPP):
self.credentials['password'] = value
def connect(self, address: Optional[Tuple[str, int]] = None, # type: ignore
use_ssl: Optional[bool] = None, force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
use_ssl: bool = False, force_starttls: bool = True,
disable_starttls: bool = False) -> None:
"""Connect to the XMPP server.
When no address is given, a SRV lookup for the server will
@@ -166,7 +166,7 @@ class ClientXMPP(BaseXMPP):
host, port = (self.boundjid.host, 5222)
self.dns_service = 'xmpp-client'
XMLStream.connect(self, host, port, use_ssl=use_ssl,
return XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
def register_feature(self, name: str, handler: Callable, restart: bool = False, order: int = 5000) -> None:

View File

@@ -9,8 +9,6 @@
import logging
import hashlib
from typing import Optional
from slixmpp import Message, Iq, Presence
from slixmpp.basexmpp import BaseXMPP
from slixmpp.stanza import Handshake
@@ -95,9 +93,7 @@ class ComponentXMPP(BaseXMPP):
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
def connect(self, host: Optional[str] = None, port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
def connect(self, host=None, port=None, use_ssl=False):
"""Connect to the server.
@@ -107,18 +103,17 @@ class ComponentXMPP(BaseXMPP):
Defauts to :attr:`server_port`.
:param use_ssl: Flag indicating if SSL should be used by connecting
directly to a port using SSL.
:param force_starttls: UNUSED
:param disable_starttls: UNUSED
"""
if host is not None:
self.server_host = host
if port:
self.server_port = port
if host is None:
host = self.server_host
if port is None:
port = self.server_port
self.server_name = self.boundjid.host
log.debug("Connecting to %s:%s", host, port)
XMLStream.connect(self, host=self.server_host, port=self.server_port, use_ssl=use_ssl)
return XMLStream.connect(self, host=host, port=port,
use_ssl=use_ssl)
def incoming_filter(self, xml):
"""

View File

@@ -37,8 +37,7 @@ class FeatureMechanisms(BasePlugin):
'unencrypted_digest': False,
'unencrypted_cram': False,
'unencrypted_scram': True,
'order': 100,
'tls_version': None,
'order': 100
}
def plugin_init(self):
@@ -97,20 +96,7 @@ class FeatureMechanisms(BasePlugin):
result[value] = creds.get('email', jid)
elif value == 'channel_binding':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
version = self.xmpp.socket.version()
# As of now, python does not implement anything else
# than tls-unique, which is forbidden on TLSv1.3
# see https://github.com/python/cpython/issues/95341
if version != 'TLSv1.3':
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-unique"
)
elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES:
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-exporter"
)
else:
result[value] = None
result[value] = self.xmpp.socket.get_channel_binding()
else:
result[value] = None
elif value == 'host':
@@ -135,11 +121,6 @@ class FeatureMechanisms(BasePlugin):
result[value] = True
else:
result[value] = False
elif value == 'tls_version':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.version()
elif value == 'binding_proposed':
result[value] = any(x for x in self.mech_list if x.endswith('-PLUS'))
else:
result[value] = self.config.get(value, False)
return result

View File

@@ -1 +1,445 @@
from libslixmpp import JID, InvalidJID
# slixmpp.jid
# ~~~~~~~~~~~~~~~~~~~~~~~
# This module allows for working with Jabber IDs (JIDs).
# Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details
from __future__ import annotations
import re
import socket
from functools import lru_cache
from typing import (
Optional,
Union,
)
from slixmpp.stringprep import nodeprep, resourceprep, idna, StringprepError
HAVE_INET_PTON = hasattr(socket, 'inet_pton')
#: The basic regex pattern that a JID must match in order to determine
#: the local, domain, and resource parts. This regex does NOT do any
#: validation, which requires application of nodeprep, resourceprep, etc.
JID_PATTERN = re.compile(
"^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$"
)
#: The set of escape sequences for the characters not allowed by nodeprep.
JID_ESCAPE_SEQUENCES = {'\\20', '\\22', '\\26', '\\27', '\\2f',
'\\3a', '\\3c', '\\3e', '\\40', '\\5c'}
#: The reverse mapping of escape sequences to their original forms.
JID_UNESCAPE_TRANSFORMATIONS = {'\\20': ' ',
'\\22': '"',
'\\26': '&',
'\\27': "'",
'\\2f': '/',
'\\3a': ':',
'\\3c': '<',
'\\3e': '>',
'\\40': '@',
'\\5c': '\\'}
# TODO: Find the best cache size for a standard usage.
@lru_cache(maxsize=1024)
def _parse_jid(data: str):
"""
Parse string data into the node, domain, and resource
components of a JID, if possible.
:param string data: A string that is potentially a JID.
:raises InvalidJID:
:returns: tuple of the validated local, domain, and resource strings
"""
match = JID_PATTERN.match(data)
if not match:
raise InvalidJID('JID could not be parsed')
(node, domain, resource) = match.groups()
node = _validate_node(node)
domain = _validate_domain(domain)
resource = _validate_resource(resource)
return node, domain, resource
def _validate_node(node: Optional[str]):
"""Validate the local, or username, portion of a JID.
:raises InvalidJID:
:returns: The local portion of a JID, as validated by nodeprep.
"""
if node is None:
return ''
try:
node = nodeprep(node)
except StringprepError:
raise InvalidJID('Nodeprep failed')
if not node:
raise InvalidJID('Localpart must not be 0 bytes')
if len(node) > 1023:
raise InvalidJID('Localpart must be less than 1024 bytes')
return node
def _validate_domain(domain: str):
"""Validate the domain portion of a JID.
IP literal addresses are left as-is, if valid. Domain names
are stripped of any trailing label separators (`.`), and are
checked with the nameprep profile of stringprep. If the given
domain is actually a punyencoded version of a domain name, it
is converted back into its original Unicode form. Domains must
also not start or end with a dash (`-`).
:raises InvalidJID:
:returns: The validated domain name
"""
ip_addr = False
# First, check if this is an IPv4 address
try:
socket.inet_aton(domain)
ip_addr = True
except socket.error:
pass
# Check if this is an IPv6 address
if not ip_addr and HAVE_INET_PTON and domain[0] == '[' and domain[-1] == ']':
try:
ip = domain[1:-1]
socket.inet_pton(socket.AF_INET6, ip)
ip_addr = True
except (socket.error, ValueError):
pass
if not ip_addr:
# This is a domain name, which must be checked further
if domain and domain[-1] == '.':
domain = domain[:-1]
try:
domain = idna(domain)
except StringprepError:
raise InvalidJID(f'idna validation failed: {domain}')
if ':' in domain:
raise InvalidJID(f'Domain containing a port: {domain}')
for label in domain.split('.'):
if not label:
raise InvalidJID(f'Domain containing too many dots: {domain}')
if '-' in (label[0], label[-1]):
raise InvalidJID(f'Domain starting or ending with -: {domain}')
if not domain:
raise InvalidJID('Domain must not be 0 bytes')
if len(domain) > 1023:
raise InvalidJID('Domain must be less than 1024 bytes')
return domain
def _validate_resource(resource: Optional[str]):
"""Validate the resource portion of a JID.
:raises InvalidJID:
:returns: The local portion of a JID, as validated by resourceprep.
"""
if resource is None:
return ''
try:
resource = resourceprep(resource)
except StringprepError:
raise InvalidJID('Resourceprep failed')
if not resource:
raise InvalidJID('Resource must not be 0 bytes')
if len(resource) > 1023:
raise InvalidJID('Resource must be less than 1024 bytes')
return resource
def _unescape_node(node: str):
"""Unescape a local portion of a JID.
.. note::
The unescaped local portion is meant ONLY for presentation,
and should not be used for other purposes.
"""
unescaped = []
seq = ''
for i, char in enumerate(node):
if char == '\\':
seq = node[i:i+3]
if seq not in JID_ESCAPE_SEQUENCES:
seq = ''
if seq:
if len(seq) == 3:
unescaped.append(JID_UNESCAPE_TRANSFORMATIONS.get(seq, char))
# Pop character off the escape sequence, and ignore it
seq = seq[1:]
else:
unescaped.append(char)
return ''.join(unescaped)
def _format_jid(
local: Optional[str] = None,
domain: Optional[str] = None,
resource: Optional[str] = None,
):
"""Format the given JID components into a full or bare JID.
:param string local: Optional. The local portion of the JID.
:param string domain: Required. The domain name portion of the JID.
:param strin resource: Optional. The resource portion of the JID.
:return: A full or bare JID string.
"""
if domain is None:
return ''
if local is not None:
result = local + '@' + domain
else:
result = domain
if resource is not None:
result += '/' + resource
return result
class InvalidJID(ValueError):
"""
Raised when attempting to create a JID that does not pass validation.
It can also be raised if modifying an existing JID in such a way as
to make it invalid, such trying to remove the domain from an existing
full JID while the local and resource portions still exist.
"""
# pylint: disable=R0903
class UnescapedJID:
"""
.. versionadded:: 1.1.10
"""
__slots__ = ('_node', '_domain', '_resource')
def __init__(
self,
node: Optional[str],
domain: Optional[str],
resource: Optional[str],
):
self._node = node
self._domain = domain
self._resource = resource
def __getattribute__(self, name: str):
"""Retrieve the given JID component.
:param name: one of: user, server, domain, resource,
full, or bare.
"""
if name == 'resource':
return self._resource or ''
if name in ('user', 'username', 'local', 'node'):
return self._node or ''
if name in ('server', 'domain', 'host'):
return self._domain or ''
if name in ('full', 'jid'):
return _format_jid(self._node, self._domain, self._resource)
if name == 'bare':
return _format_jid(self._node, self._domain)
return object.__getattribute__(self, name)
def __str__(self):
"""Use the full JID as the string value."""
return _format_jid(self._node, self._domain, self._resource)
def __repr__(self):
"""Use the full JID as the representation."""
return _format_jid(self._node, self._domain, self._resource)
class JID:
"""
A representation of a Jabber ID, or JID.
Each JID may have three components: a user, a domain, and an optional
resource. For example: user@domain/resource
When a resource is not used, the JID is called a bare JID.
The JID is a full JID otherwise.
**JID Properties:**
:full: The string value of the full JID.
:jid: Alias for ``full``.
:bare: The string value of the bare JID.
:node: The node portion of the JID.
:user: Alias for ``node``.
:local: Alias for ``node``.
:username: Alias for ``node``.
:domain: The domain name portion of the JID.
:server: Alias for ``domain``.
:host: Alias for ``domain``.
:resource: The resource portion of the JID.
:param string jid:
A string of the form ``'[user@]domain[/resource]'``.
:param bool bare:
If present, discard the provided resource.
:raises InvalidJID:
"""
__slots__ = ('_node', '_domain', '_resource', '_bare', '_full')
def __init__(self, jid: Optional[Union[str, 'JID']] = None, bare: bool = False):
if not jid:
self._node = ''
self._domain = ''
self._resource = ''
self._bare = ''
self._full = ''
return
elif not isinstance(jid, JID):
node, domain, resource = _parse_jid(jid)
self._node = node
self._domain = domain
self._resource = resource if not bare else ''
else:
self._node = jid._node
self._domain = jid._domain
self._resource = jid._resource if not bare else ''
self._update_bare_full()
def unescape(self):
"""Return an unescaped JID object.
Using an unescaped JID is preferred for displaying JIDs
to humans, and they should NOT be used for any other
purposes than for presentation.
:return: :class:`UnescapedJID`
.. versionadded:: 1.1.10
"""
return UnescapedJID(_unescape_node(self._node),
self._domain,
self._resource)
def _update_bare_full(self):
"""Format the given JID into a bare and a full JID.
"""
self._bare = (self._node + '@' + self._domain
if self._node
else self._domain)
self._full = (self._bare + '/' + self._resource
if self._resource
else self._bare)
@property
def bare(self) -> str:
return self._bare
@bare.setter
def bare(self, value: str):
node, domain, resource = _parse_jid(value)
assert not resource
self._node = node
self._domain = domain
self._update_bare_full()
@property
def node(self) -> str:
return self._node
@node.setter
def node(self, value: Optional[str]):
self._node = _validate_node(value)
self._update_bare_full()
@property
def domain(self) -> str:
return self._domain
@domain.setter
def domain(self, value: str):
self._domain = _validate_domain(value)
self._update_bare_full()
@property
def resource(self) -> str:
return self._resource
@resource.setter
def resource(self, value: Optional[str]):
self._resource = _validate_resource(value)
self._update_bare_full()
@property
def full(self) -> str:
return self._full
@full.setter
def full(self, value: str):
self._node, self._domain, self._resource = _parse_jid(value)
self._update_bare_full()
user = node
local = node
username = node
server = domain
host = domain
jid = full
def __str__(self):
"""Use the full JID as the string value."""
return self._full
def __repr__(self):
"""Use the full JID as the representation."""
return self._full
# pylint: disable=W0212
def __eq__(self, other):
"""Two JIDs are equal if they have the same full JID value."""
if isinstance(other, UnescapedJID):
return False
if not isinstance(other, JID):
try:
other = JID(other)
except InvalidJID:
return NotImplemented
return (self._node == other._node and
self._domain == other._domain and
self._resource == other._resource)
def __ne__(self, other):
"""Two JIDs are considered unequal if they are not equal."""
return not self == other
def __hash__(self):
"""Hash a JID based on the string version of its full JID."""
return hash(self._full)

View File

@@ -76,7 +76,6 @@ PLUGINS = [
'xep_0256', # Last Activity in Presence
'xep_0257', # Client Certificate Management for SASL EXTERNAL
'xep_0258', # Security Labels in XMPP
'xep_0264', # Jingle Content Thumbnails
# 'xep_0270', # XMPP Compliance Suites 2010. Dont automatically load
'xep_0279', # Server IP Check
'xep_0280', # Message Carbons
@@ -86,7 +85,6 @@ PLUGINS = [
# 'xep_0302', # XMPP Compliance Suites 2012. Dont automatically load
'xep_0308', # Last Message Correction
'xep_0313', # Message Archive Management
'xep_0317', # Hats
'xep_0319', # Last User Interaction in Presence
# 'xep_0323', # IoT Systems Sensor Data. Dont automatically load
# 'xep_0325', # IoT Systems Control. Dont automatically load
@@ -120,7 +118,6 @@ PLUGINS = [
'xep_0444', # Message Reactions
'xep_0447', # Stateless file sharing
'xep_0461', # Message Replies
'xep_0469', # Bookmarks Pinning
# Meant to be imported by plugins
]

View File

@@ -7,8 +7,7 @@ import logging
import hashlib
import base64
from asyncio import Future, Lock
from collections import defaultdict
from asyncio import Future
from typing import Optional
from slixmpp import __version__
@@ -95,9 +94,6 @@ class XEP_0115(BasePlugin):
disco.assign_verstring = self.assign_verstring
disco.get_verstring = self.get_verstring
# prevent concurrent fetches for the same hash
self._locks = defaultdict(Lock)
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace)
self.xmpp.del_filter('out', self._filter_add_caps)
@@ -141,7 +137,7 @@ class XEP_0115(BasePlugin):
self.xmpp.event('entity_caps', p)
async def _process_caps(self, pres: Presence):
async def _process_caps(self, pres):
if not pres['caps']['hash']:
log.debug("Received unsupported legacy caps: %s, %s, %s",
pres['caps']['node'],
@@ -151,11 +147,7 @@ class XEP_0115(BasePlugin):
return
ver = pres['caps']['ver']
async with self._locks[ver]:
await self._process_caps_wrapped(pres, ver)
self._locks.pop(ver, None)
async def _process_caps_wrapped(self, pres: Presence, ver: str):
existing_verstring = await self.get_verstring(pres['from'].full)
if str(existing_verstring) == str(ver):
return

View File

@@ -137,14 +137,7 @@ class XEP_0199(BasePlugin):
async def _keepalive(self, event=None):
log.debug("Keepalive ping...")
try:
ifrom = None
if self.xmpp.is_component:
ifrom = self.xmpp.boundjid
rtt = await self.ping(
self.xmpp.boundjid.host,
timeout=self.timeout,
ifrom=ifrom
)
rtt = await self.ping(self.xmpp.boundjid.host, timeout=self.timeout)
except IqTimeout:
log.debug("Did not receive ping back in time. " + \
"Requesting Reconnect.")

View File

@@ -15,32 +15,6 @@ log = logging.getLogger(__name__)
class XEP_0221(BasePlugin):
"""
XEP-0221: Data Forms Media Element
In certain implementations of Data Forms (XEP-0004), it can be
helpful to include media data such as small images. One example is
CAPTCHA Forms (XEP-0158). This plugin implements a method for
including media data in a data form.
Typical use pattern:
.. code-block:: python
self.register_plugin('xep_0221')
self['xep_0050'].add_command(node="showimage",
name="Show my image",
handler=self.form_handler)
def form_handler(self,iq,session):
image_url="https://xmpp.org/images/logos/xmpp-logo.svg"
form=self['xep_0004'].make_form('result','My Image')
form.addField(var='myimage', ftype='text-single', label='My Image', value=image_url)
form.field['myimage']['media'].add_uri(value=image_url, itype="image/svg")
session['payload']=form
return session
"""
name = 'xep_0221'
description = 'XEP-0221: Data Forms Media Element'

View File

@@ -1,5 +0,0 @@
from slixmpp.plugins.base import register_plugin
from .thumbnail import XEP_0264
register_plugin(XEP_0264)

View File

@@ -1,36 +0,0 @@
from typing import Optional
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0234.stanza import File
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:thumbs:1"
class Thumbnail(ElementBase):
name = plugin_attrib = "thumbnail"
namespace = NS
interfaces = {"uri", "media-type", "width", "height"}
def get_width(self) -> int:
return _int_or_none(self._get_attr("width"))
def get_height(self) -> int:
return _int_or_none(self._get_attr("height"))
def set_width(self, v: int) -> None:
self._set_attr("width", str(v))
def set_height(self, v: int) -> None:
self._set_attr("height", str(v))
def _int_or_none(v) -> Optional[int]:
try:
return int(v)
except ValueError:
return None
def register_plugin():
register_stanza_plugin(File, Thumbnail)

View File

@@ -1,24 +0,0 @@
import logging
from slixmpp.plugins import BasePlugin
from . import stanza
log = logging.getLogger(__name__)
class XEP_0264(BasePlugin):
"""
XEP-0264: Jingle Content Thumbnails
Can also be used with 0385 (Stateless inline media sharing)
"""
name = "xep_0264"
description = "XEP-0264: Jingle Content Thumbnails"
dependencies = {"xep_0234"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -52,10 +52,9 @@ class MAM(ElementBase):
#: fetch, not relevant for the stanza itself.
interfaces = {
'queryid', 'start', 'end', 'with', 'results',
'before_id', 'after_id', 'ids', 'flip_page',
'before_id', 'after_id', 'ids',
}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids',
'flip_page'}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids'}
def setup(self, xml=None):
ElementBase.setup(self, xml)
@@ -82,7 +81,7 @@ class MAM(ElementBase):
def get_start(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('start')
if field and field["value"]:
if field:
return xep_0082.parse(field['value'])
return None
@@ -95,7 +94,7 @@ class MAM(ElementBase):
def get_end(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('end')
if field and field["value"]:
if field:
return xep_0082.parse(field['value'])
return None
@@ -169,8 +168,6 @@ class MAM(ElementBase):
def del_results(self):
self._results = []
def get_flip_page(self):
return self.xml.find(f'{{{self.namespace}}}flip-page') is not None
class Fin(ElementBase):
"""A MAM fin element (end of query).

View File

@@ -1,11 +0,0 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.plugins import register_plugin
from slixmpp.plugins.xep_0317 import stanza
from slixmpp.plugins.xep_0317.hats import XEP_0317
from slixmpp.plugins.xep_0317.stanza import Hat, Hats
register_plugin(XEP_0317)
__all__ = ['stanza', 'XEP_317']

View File

@@ -1,16 +0,0 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0317(BasePlugin):
"""
XEP-0317: Hats
"""
name = 'xep_0317'
description = 'XEP-0317: Hats'
dependencies = {'xep_0030', 'xep_0045', 'xep_0050'}
stanza = stanza
namespace = stanza.NS
def plugin_init(self):
stanza.register_plugin()

View File

@@ -1,58 +0,0 @@
from slixmpp import Presence
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from typing import List, Tuple
NS = 'urn:xmpp:hats:0'
class Hats(ElementBase):
"""
Hats element, container for multiple hats:
.. code-block::xml
<hats xmlns='urn:xmpp:hats:0'>
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
<hat title='Presenter' uri='http://schemas.example.com/hats#presenter' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#EC0524"/>
</hat>
</hats>
"""
name = 'hats'
namespace = NS
plugin_attrib = 'hats'
def add_hats(self, data: List[Tuple[str, str]]) -> None:
for uri, title in data:
hat = Hat()
hat["uri"] = uri
hat["title"] = title
self.append(hat)
class Hat(ElementBase):
"""
Hat element, has a title and url, may contain arbitrary sub-elements.
.. code-block::xml
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
"""
name = 'hat'
plugin_attrib = 'hat'
namespace = NS
interfaces = {'title', 'uri'}
plugin_multi_attrib = "hats"
def register_plugin() -> None:
register_stanza_plugin(Hats, Hat, iterable=True)
register_stanza_plugin(Presence, Hats)

View File

@@ -1,13 +1,8 @@
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp.
# See the file LICENSE for copying permissio
from abc import ABC
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
@@ -15,83 +10,14 @@ from slixmpp.xmlstream import (
)
NS = "urn:xmpp:fallback:0"
NS = 'urn:xmpp:fallback:0'
class Fallback(ElementBase):
namespace = NS
name = "fallback"
plugin_attrib = "fallback"
plugin_multi_attrib = "fallbacks"
interfaces = {"for"}
def _find_fallback(self, fallback_for: str) -> "Fallback":
if self["for"] == fallback_for:
return self
for fallback in self.parent()["fallbacks"]:
if fallback["for"] == fallback_for:
return fallback
raise AttributeError("No fallback for this namespace", fallback_for)
def get_stripped_body(
self, fallback_for: str, element: Literal["body", "subject"] = "body"
) -> str:
"""
Get the body of a message, with the fallback part stripped
:param fallback_for: namespace of the fallback to strip
:param element: set this to "subject" get the stripped subject instead
of body
:return: body (or subject) content minus the fallback part
"""
fallback = self._find_fallback(fallback_for)
start = fallback[element]["start"]
end = fallback[element]["end"]
body = self.parent()[element]
if start == end == 0:
return ""
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
class FallbackMixin(ABC):
namespace = NS
name = NotImplemented
plugin_attrib = NotImplemented
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
return _int_or_zero(self._get_attr("start"))
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
return _int_or_zero(self._get_attr("end"))
class FallbackBody(FallbackMixin, ElementBase):
name = plugin_attrib = "body"
class FallbackSubject(FallbackMixin, ElementBase):
name = plugin_attrib = "subject"
def _int_or_zero(v: str):
try:
return int(v)
except ValueError:
return 0
name = 'fallback'
plugin_attrib = 'fallback'
def register_plugins():
register_stanza_plugin(Message, Fallback, iterable=True)
register_stanza_plugin(Fallback, FallbackBody)
register_stanza_plugin(Fallback, FallbackSubject)
register_stanza_plugin(Message, Fallback)

View File

@@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
name = "xep_0461"
description = "XEP-0461: Message Replies"
dependencies = {"xep_0030", "xep_0428"}
dependencies = {"xep_0030"}
stanza = stanza
namespace = stanza.NS

View File

@@ -2,7 +2,6 @@ from typing import Optional
from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0428.stanza import Fallback
NS = "urn:xmpp:reply:0"
@@ -13,11 +12,39 @@ class Reply(ElementBase):
plugin_attrib = "reply"
interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_fallback_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end:
return body[start:end]
else:
return ""
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
"""
Add plain text fallback for clients not implementing XEP-0461.
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
fallback_body attributes accordingly, so that clients implementing
XEP-0461 can hide the fallback text.
@@ -30,27 +57,39 @@ class Reply(ElementBase):
if nickname:
quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"]
fallback = Fallback()
fallback["for"] = NS
fallback["body"]["start"] = 0
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
msg["feature_fallback"]["for"] = NS
msg["feature_fallback"]["fallback_body"]["start"] = 0
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
def get_fallback_body(self) -> str:
msg = self.parent()
for fallback in msg["fallbacks"]:
if fallback["for"] == NS:
break
else:
return ""
start = fallback["body"]["start"]
end = fallback["body"]["end"]
body = msg["body"]
if start <= end:
return body[start:end]
else:
return ""
class FallBackBody(ElementBase):
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
namespace = FeatureFallBack.namespace
name = "body"
plugin_attrib = "fallback_body"
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
try:
return int(self._get_attr("start"))
except ValueError:
return 0
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
try:
return int(self._get_attr("end"))
except ValueError:
return 0
def register_plugins():
register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@@ -1,8 +0,0 @@
from slixmpp.plugins.base import register_plugin
from . import stanza
from .pinning import XEP_0469
register_plugin(XEP_0469)
__all__ = ['stanza', 'XEP_0469']

View File

@@ -1,17 +0,0 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0469(BasePlugin):
"""
XEP-0469: Bookmark Pinning
"""
name = "xep_0469"
description = "XEP-0469: Bookmark Pinning"
dependencies = {"xep_0402"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -1,31 +0,0 @@
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0402.stanza import Extensions
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:bookmarks-pinning:0"
class Pinned(ElementBase):
"""
Pinned bookmark element
To enable it on a Conference element, use enable() like this:
.. code-block::python
# C being a Conference element
C['extensions'].enable('pinned')
Which will add the <pinned> element to the <extensions> element.
"""
namespace = NS
name = "pinned"
plugin_attrib = "pinned"
interfaces = {"pinned"}
bool_interfaces = {"pinned"}
is_extension = True
def register_plugin():
register_stanza_plugin(Extensions, Pinned)

View File

@@ -69,14 +69,12 @@ from slixmpp.plugins.xep_0249 import XEP_0249
from slixmpp.plugins.xep_0256 import XEP_0256
from slixmpp.plugins.xep_0257 import XEP_0257
from slixmpp.plugins.xep_0258 import XEP_0258
from slixmpp.plugins.xep_0264 import XEP_0264
from slixmpp.plugins.xep_0279 import XEP_0279
from slixmpp.plugins.xep_0280 import XEP_0280
from slixmpp.plugins.xep_0297 import XEP_0297
from slixmpp.plugins.xep_0300 import XEP_0300
from slixmpp.plugins.xep_0308 import XEP_0308
from slixmpp.plugins.xep_0313 import XEP_0313
from slixmpp.plugins.xep_0317 import XEP_0317
from slixmpp.plugins.xep_0319 import XEP_0319
from slixmpp.plugins.xep_0332 import XEP_0332
from slixmpp.plugins.xep_0333 import XEP_0333
@@ -102,7 +100,6 @@ from slixmpp.plugins.xep_0428 import XEP_0428
from slixmpp.plugins.xep_0437 import XEP_0437
from slixmpp.plugins.xep_0439 import XEP_0439
from slixmpp.plugins.xep_0444 import XEP_0444
from slixmpp.plugins.xep_0461 import XEP_0461
class PluginsDict(TypedDict):
@@ -165,14 +162,12 @@ class PluginsDict(TypedDict):
xep_0256: XEP_0256
xep_0257: XEP_0257
xep_0258: XEP_0258
xep_0264: XEP_0264
xep_0279: XEP_0279
xep_0280: XEP_0280
xep_0297: XEP_0297
xep_0300: XEP_0300
xep_0308: XEP_0308
xep_0313: XEP_0313
xep_0317: XEP_0317
xep_0319: XEP_0319
xep_0332: XEP_0332
xep_0333: XEP_0333
@@ -198,4 +193,3 @@ class PluginsDict(TypedDict):
xep_0437: XEP_0437
xep_0439: XEP_0439
xep_0444: XEP_0444
xep_0461: XEP_0461

View File

@@ -29,9 +29,9 @@ class SlixIntegration(IsolatedAsyncioTestCase):
self.clients = []
self.addAsyncCleanup(self._destroy)
def envjid(self, name: str, *, default: Optional[str] = None) -> JID:
def envjid(self, name):
"""Get a JID from an env var"""
value = os.getenv(name, default=default)
value = os.getenv(name)
return JID(value)
def envstr(self, name):

View File

@@ -3,7 +3,6 @@
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import atexit
import unittest
from queue import Queue
from xml.parsers.expat import ExpatError
@@ -751,12 +750,3 @@ class SlixTest(unittest.TestCase):
Error.namespace = 'jabber:client'
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
@atexit.register
def cleanup():
try:
loop = asyncio.get_event_loop()
loop.close()
except:
pass

View File

@@ -181,7 +181,7 @@ class SCRAM(Mech):
channel_binding = True
required_credentials = {'username', 'password'}
optional_credentials = {'authzid', 'channel_binding'}
security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'}
security = {'encrypted', 'unencrypted_scram'}
def setup(self, name):
self.use_channel_binding = False
@@ -244,13 +244,9 @@ class SCRAM(Mech):
self.cnonce = bytes(('%s' % random.random())[2:])
gs2_cbind_flag = b'n'
if self.security_settings['binding_proposed']:
if self.credentials['channel_binding'] and \
self.use_channel_binding:
if self.security_settings['tls_version'] != 'TLSv1.3':
if self.credentials['channel_binding']:
if self.use_channel_binding:
gs2_cbind_flag = b'p=tls-unique'
else:
gs2_cbind_flag = b'p=tls-exporter'
else:
gs2_cbind_flag = b'y'
@@ -284,7 +280,7 @@ class SCRAM(Mech):
raise SASLCancelled('Invalid nonce')
cbind_data = b''
if self.use_channel_binding and self.credentials['channel_binding']:
if self.use_channel_binding:
cbind_data = self.credentials['channel_binding']
cbind_input = self.gs2_header + cbind_data
channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')

View File

@@ -5,5 +5,5 @@
# We don't want to have to import the entire library
# just to get the version info for setup.py
__version__ = '1.8.5'
__version_info__ = (1, 8, 5)
__version__ = '1.8.4'
__version_info__ = (1, 8, 4)

View File

@@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
__all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT', 'register_stanza_plugin']
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT']

View File

@@ -1243,7 +1243,7 @@ class ElementBase(object):
self.init_plugin(item.__class__.plugin_multi_attrib)
else:
self.iterables.append(item)
item.parent = weakref.ref(self)
return self
def appendxml(self, xml: ET.Element) -> ElementBase:

View File

@@ -290,8 +290,8 @@ class XMLStream(asyncio.BaseProtocol):
self.xml_depth = 0
self.xml_root = None
self.force_starttls = True
self.disable_starttls = False
self.force_starttls = None
self.disable_starttls = None
self.waiting_queue = asyncio.Queue()
@@ -405,9 +405,8 @@ class XMLStream(asyncio.BaseProtocol):
self.disconnected.set_result(True)
self.disconnected = asyncio.Future()
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = False,
force_starttls: Optional[bool] = True, disable_starttls: Optional[bool] = False) -> None:
"""Create a new socket and connect to the server.
:param host: The name of the desired server for the connection.
@@ -524,7 +523,7 @@ class XMLStream(asyncio.BaseProtocol):
else:
self.loop.run_until_complete(self.disconnected)
else:
tasks: List[Union[asyncio.Task, asyncio.Future]] = [asyncio.Task(asyncio.sleep(timeout))]
tasks: List[Awaitable] = [asyncio.sleep(timeout)]
if not forever:
tasks.append(self.disconnected)
self.loop.run_until_complete(asyncio.wait(tasks))
@@ -850,8 +849,6 @@ class XMLStream(asyncio.BaseProtocol):
log.debug("Connection error:", exc_info=True)
self.disconnect()
return False
if transp is None:
raise Exception("Transport should not be none")
der_cert = transp.get_extra_info("ssl_object").getpeercert(True)
pem_cert = ssl.DER_cert_to_PEM_cert(der_cert)
self.event('ssl_cert', pem_cert)

View File

@@ -1,278 +0,0 @@
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
pyo3::create_exception!(py_jid, InvalidJID, PyValueError, "Raised when attempting to create a JID that does not pass validation.\n\nIt can also be raised if modifying an existing JID in such a way as\nto make it invalid, such trying to remove the domain from an existing\nfull JID while the local and resource portions still exist.");
fn to_exc(err: jid::Error) -> PyErr {
InvalidJID::new_err(err.to_string())
}
/// A representation of a Jabber ID, or JID.
///
/// Each JID may have three components: a user, a domain, and an optional resource. For example:
/// user@domain/resource
///
/// When a resource is not used, the JID is called a bare JID. The JID is a full JID otherwise.
///
/// Raises InvalidJID if the parser rejects it.
#[pyclass(name = "JID", module = "slixmpp.jid")]
struct PyJid {
jid: Option<jid::Jid>,
}
#[pymethods]
impl PyJid {
#[new]
#[pyo3(signature = (jid=None, bare=false))]
fn new(jid: Option<&Bound<'_, PyAny>>, bare: bool) -> PyResult<Self> {
if let Some(jid) = jid {
if let Ok(py_jid) = jid.extract::<PyRef<PyJid>>() {
if bare {
if let Some(py_jid) = &(*py_jid).jid {
Ok(PyJid {
jid: Some(jid::Jid::Bare(py_jid.to_bare())),
})
} else {
Ok(PyJid { jid: None })
}
} else {
Ok(PyJid {
jid: (*py_jid).jid.clone(),
})
}
} else {
let jid: &str = jid.extract()?;
if jid.is_empty() {
Ok(PyJid { jid: None })
} else {
let mut jid = jid::Jid::new(jid).map_err(to_exc)?;
if bare {
jid = jid::Jid::Bare(jid.into_bare())
}
Ok(PyJid { jid: Some(jid) })
}
}
} else {
Ok(PyJid { jid: None })
}
}
/*
// TODO: implement or remove from the API
fn unescape() {
}
*/
#[getter]
fn get_bare(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_bare().to_string(),
}
}
#[setter]
fn set_bare(&mut self, bare: &str) -> PyResult<()> {
let bare = jid::BareJid::new(bare).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(_)) | None => jid::Jid::Bare(bare),
Some(jid::Jid::Full(jid)) => jid::Jid::Full(bare.with_resource(&jid.resource())),
});
Ok(())
}
#[getter]
fn get_full(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
#[setter]
fn set_full(&mut self, full: &str) -> PyResult<()> {
// JID.full = 'domain' is acceptable in slixmpp.
self.jid = Some(jid::Jid::new(full).map_err(to_exc)?);
Ok(())
}
#[getter]
fn get_node(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid
.node_str()
.map(ToString::to_string)
.unwrap_or_else(String::new),
}
}
#[setter]
fn set_node(&mut self, node: &str) -> PyResult<()> {
let node = jid::NodePart::new(node).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => {
jid::Jid::Bare(jid::BareJid::from_parts(Some(&node), &jid.domain()))
}
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
Some(&node),
&jid.domain(),
&jid.resource(),
)),
None => Err(InvalidJID::new_err("JID.node must apply to a proper JID"))?,
});
Ok(())
}
#[getter]
fn get_domain(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.domain_str().to_string(),
}
}
#[setter]
fn set_domain(&mut self, domain: &str) -> PyResult<()> {
let domain = jid::DomainPart::new(domain).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => {
jid::Jid::Bare(jid::BareJid::from_parts(jid.node().as_ref(), &domain))
}
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
jid.node().as_ref(),
&domain,
&jid.resource(),
)),
None => jid::Jid::Bare(jid::BareJid::from_parts(None, &domain)),
});
Ok(())
}
#[getter]
fn get_resource(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid
.resource_str()
.map(ToString::to_string)
.unwrap_or_else(String::new),
}
}
#[setter]
fn set_resource(&mut self, resource: &str) -> PyResult<()> {
let resource = jid::ResourcePart::new(resource).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => jid::Jid::Full(jid.with_resource(&resource)),
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
jid.node().as_ref(),
&jid.domain(),
&resource,
)),
None => Err(InvalidJID::new_err(
"JID.resource must apply to a proper JID",
))?,
});
Ok(())
}
/// Use the full JID as the string value.
fn __str__(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
/// Use the full JID as the representation.
fn __repr__(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
/// Two JIDs are equal if they have the same full JID value.
fn __richcmp__(&self, other: &Bound<'_, PyAny>, op: pyo3::basic::CompareOp) -> PyResult<bool> {
let other = if let Ok(other) = other.extract::<PyRef<PyJid>>() {
other
} else if other.is_none() {
Bound::new(other.py(), PyJid::new(None, false)?)?.borrow()
} else {
Bound::new(other.py(), PyJid::new(Some(other), false)?)?.borrow()
};
match (&self.jid, &other.jid) {
(None, None) => Ok(true),
(Some(jid), Some(other)) => match op {
pyo3::basic::CompareOp::Eq => Ok(jid == other),
pyo3::basic::CompareOp::Ne => Ok(jid != other),
_ => Err(PyNotImplementedError::new_err(
"Only == and != are implemented",
)),
},
_ => Ok(false),
}
}
/// Hash a JID based on the string version of its full JID.
fn __hash__(&self) -> isize {
if let Some(jid) = &self.jid {
// Use the same algorithm as the Python JID.
let string = jid.to_string();
unsafe { pyo3::ffi::_Py_HashBytes(string.as_ptr() as *const _, string.len() as isize) }
} else {
0
}
}
// Aliases
#[getter]
fn get_user(&self) -> String {
self.get_node()
}
#[setter]
fn set_user(&mut self, user: &str) -> PyResult<()> {
self.set_node(user)
}
#[getter]
fn get_server(&self) -> String {
self.get_domain()
}
#[setter]
fn set_server(&mut self, server: &str) -> PyResult<()> {
self.set_domain(server)
}
#[getter]
fn get_host(&self) -> String {
self.get_domain()
}
#[setter]
fn set_host(&mut self, host: &str) -> PyResult<()> {
self.set_domain(host)
}
#[getter]
fn get_jid(&self) -> String {
self.get_full()
}
#[setter]
fn set_jid(&mut self, jid: &str) -> PyResult<()> {
self.set_full(jid)
}
}
#[pymodule]
#[pyo3(name = "libslixmpp")]
fn py_jid(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyJid>()?;
m.add("InvalidJID", py.get_type_bound::<InvalidJID>())?;
Ok(())
}

View File

@@ -3,6 +3,7 @@ from __future__ import unicode_literals
import unittest
from slixmpp.test import SlixTest
from slixmpp import JID, InvalidJID
from slixmpp.jid import nodeprep
class TestJIDClass(SlixTest):
@@ -191,12 +192,10 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, 'test.com/%s' % resource)
self.assertRaises(InvalidJID, JID, 'user@test.com/%s' % resource)
@unittest.skip('Rust')
def testTooLongDomainLabel(self):
domain = ('a' * 64) + '.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainEmptyLabel(self):
domain = 'aaa..bbb.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@@ -217,7 +216,6 @@ class TestJIDClass(SlixTest):
jid3 = JID('%s/resource' % domain)
jid4 = JID('user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6NoBrackets(self):
domain = '::1'
@@ -226,7 +224,6 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6MissingBracket(self):
domain = '[::1'
@@ -235,7 +232,6 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6WrongBracket(self):
domain = '[::]1]'
@@ -244,7 +240,6 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainWithPort(self):
domain = 'example.com:5555'
@@ -253,14 +248,12 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainWithTrailingDot(self):
domain = 'example.com.'
jid = JID('user@%s/resource' % domain)
self.assertEqual(jid.domain, 'example.com')
@unittest.skip('Rust')
def testDomainWithDashes(self):
domain = 'example.com-'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@@ -268,13 +261,21 @@ class TestJIDClass(SlixTest):
domain = '-example.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testACEDomain(self):
domain = 'xn--bcher-kva.ch'
jid = JID('user@%s/resource' % domain)
self.assertEqual(jid.domain.encode('utf-8'), b'b\xc3\xbccher.ch')
def testJIDUnescape(self):
jid = JID('here\\27s_a_wild_\\26_\\2fcr%zy\\2f_\\40ddress\\20for\\3a\\3cwv\\3e(\\22IMPS\\22)\\5c@example.com')
ujid = jid.unescape()
self.assertEqual(ujid.local, 'here\'s_a_wild_&_/cr%zy/_@ddress for:<wv>("imps")\\')
jid = JID('blah\\5cfoo\\5c20bar@example.com')
ujid = jid.unescape()
self.assertEqual(ujid.local, 'blah\\foo\\20bar')
def testStartOrEndWithEscapedSpaces(self):
local = ' foo'
self.assertRaises(InvalidJID, JID, '%s@example.com' % local)
@@ -287,5 +288,9 @@ class TestJIDClass(SlixTest):
#self.assertRaises(InvalidJID, JID, '%s@example.com' % '\\20foo2')
#self.assertRaises(InvalidJID, JID, '%s@example.com' % 'bar2\\20')
def testNodePrepIdemptotent(self):
node = 'ᴹᴵᴷᴬᴱᴸ'
self.assertEqual(nodeprep(node), nodeprep(nodeprep(node)))
suite = unittest.TestLoader().loadTestsFromTestCase(TestJIDClass)

View File

@@ -1,67 +0,0 @@
import unittest
from slixmpp import Presence
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0317 as xep_0317
from slixmpp.plugins.xep_0317 import stanza
class TestStanzaHats(SlixTest):
def setUp(self):
stanza.register_plugin()
def test_create_hats(self):
raw_xml = """
<hats xmlns="urn:xmpp:hats:0">
<hat uri="http://example.com/hats#Teacher" title="Teacher"/>
</hats>
"""
hats = xep_0317.Hats()
hat = xep_0317.Hat()
hat['uri'] = 'http://example.com/hats#Teacher'
hat['title'] = 'Teacher'
hats.append(hat)
self.check(hats, raw_xml, use_values=False)
def test_set_single_hat(self):
presence = Presence()
presence["hats"]["hat"]["uri"] = "test-uri"
presence["hats"]["hat"]["title"] = "test-title"
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='test-uri' title='test-title'/>
</hats>
</presence>
""",
)
def test_set_multi_hat(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='uri1' title='title1'/>
<hat uri='uri2' title='title2'/>
</hats>
</presence>
""",
)
def test_get_hats(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
for i, hat in enumerate(presence["hats"]["hats"], start=1):
self.assertEqual(hat["uri"], f"uri{i}")
self.assertEqual(hat["title"], f"title{i}")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaHats)

View File

@@ -1,149 +0,0 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza
from slixmpp.plugins import xep_0461
from slixmpp.plugins import xep_0444
class TestFallback(SlixTest):
def setUp(self):
stanza.register_plugins()
def testSingleFallbackBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackSubject(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["subject"]["start"] = 0
message["fallback"]["subject"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<subject start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackWholeBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
def testMultiFallback(self):
message = Message()
f1 = stanza.Fallback()
f1["for"] = "ns1"
f2 = stanza.Fallback()
f2["for"] = "ns2"
message.append(f1)
message.append(f2)
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
</message>
""",
)
for i, fallback in enumerate(message["fallbacks"], start=1):
self.assertEqual(fallback["for"], f"ns{i}")
def testStripFallbackPartOfBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
</fallback>
</message>
""",
)
self.assertEqual(
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
)
def testStripWholeBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
def testStripMultiFallback(self):
message = Message()
message["body"] = "> huuuuu\n👍"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
reaction_fallback = stanza.Fallback()
reaction_fallback["for"] = xep_0444.stanza.NS
reaction_fallback.enable("body")
message.append(reaction_fallback)
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)

View File

@@ -1,13 +1,11 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest):
def setUp(self):
fallback_stanza.register_plugins()
stanza.register_plugins()
def testReply(self):
@@ -28,9 +26,9 @@ class TestReply(SlixTest):
def testFallback(self):
message = Message()
message["body"] = "12345\nrealbody"
message["fallback"]["for"] = "NS"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 6
message["feature_fallback"]["for"] = "NS"
message["feature_fallback"]["fallback_body"]["start"] = 0
message["feature_fallback"]["fallback_body"]["end"] = 6
self.check(
message,
@@ -44,18 +42,18 @@ class TestReply(SlixTest):
""",
)
assert message["fallback"].get_stripped_body("NS") == "realbody"
assert message["feature_fallback"].get_stripped_body() == "realbody"
def testAddFallBackHelper(self):
msg = Message()
msg["body"] = "Great"
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
# ugly dedent but the test does not pass without it
self.check(
msg, # language=XML
msg,
"""
<message xmlns="jabber:client" type="normal">
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
<reply xmlns="urn:xmpp:reply:0" />
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
<body start='0' end='33' />
</fallback>
@@ -69,8 +67,8 @@ class TestReply(SlixTest):
msg = Message()
msg["body"] = "Great"
msg["reply"].add_quoted_fallback(body)
body2 = msg["reply"].get_fallback_body()
msg["feature_fallback"].add_quoted_fallback(body)
body2 = msg["feature_fallback"].get_fallback_body()
self.assertTrue(body2 == quoted, body2)

View File

@@ -1,36 +0,0 @@
import unittest
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0469 import stanza
from slixmpp.plugins.xep_0402 import stanza as b_stanza
class TestBookmarksPinning(SlixTest):
def setUp(self):
b_stanza.register_plugin()
stanza.register_plugin()
def test_pinned(self):
bookmark = b_stanza.Conference()
bookmark["password"] = "pass"
bookmark["nick"] = "nick"
bookmark["autojoin"] = False
bookmark["extensions"].enable("pinned")
self.check(
bookmark,
"""
<conference xmlns='urn:xmpp:bookmarks:1'
autojoin='false'>
<nick>nick</nick>
<password>pass</password>
<extensions>
<pinned xmlns="urn:xmpp:bookmarks-pinning:0" />
</extensions>
</conference>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestBookmarksPinning)

View File

@@ -1,76 +0,0 @@
import logging
import unittest
from slixmpp.test import SlixTest
class TestCaps(SlixTest):
def setUp(self):
self.stream_start(plugins=["xep_0115"])
def testConcurrentSameHash(self):
"""
Check that we only resolve a given ver string to a disco info once,
even if we receive several presences with that same ver string
consecutively.
"""
self.recv( # language=XML
"""
<presence from='romeo@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.recv( # language=XML
"""
<presence from='i-dont-know-much-shakespeare@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.send( # language=XML
"""
<iq xmlns="jabber:client"
id="1"
to="romeo@montague.lit/orchard"
type="get">
<query xmlns="http://jabber.org/protocol/disco#info"
node="a-node#h0TdMvqNR8FHUfFG1HauOLYZDqE="/>
</iq>
"""
)
self.send(None)
self.recv( # language=XML
"""
<iq from='romeo@montague.lit/orchard'
id='1'
type='result'>
<query xmlns='http://jabber.org/protocol/disco#info'
node='a-nodes#h0TdMvqNR8FHUfFG1HauOLYZDqE='>
<identity category='client' name='a client' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
</query>
</iq>
"""
)
self.send(None)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"romeo@montague.lit/orchard", "http://jabber.org/protocol/caps"
)
)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"i-dont-know-much-shakespeare@montague.lit/orchard",
"http://jabber.org/protocol/caps",
)
)
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestCaps)

View File

@@ -9,8 +9,8 @@ class TestReply(SlixTest):
def testFallBackBody(self):
async def on_reply(msg):
start = msg["fallback"]["body"]["start"]
end = msg["fallback"]["body"]["end"]
start = msg["feature_fallback"]["fallback_body"]["start"]
end = msg["feature_fallback"]["fallback_body"]["end"]
self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(),
reply_id=msg.get_id(),