Compare commits

..

42 Commits

Author SHA1 Message Date
Emmanuel Gil Peyrot
292f3206f6 Skip tests with known missing idna validation 2024-04-19 14:37:19 +02:00
Emmanuel Gil Peyrot
d1f2e196db Initial Rust version. 2024-04-19 14:30:50 +02:00
Emmanuel Gil Peyrot
f084ad2724 Remove UnescapedJID
It hadn’t been functional for many years, producing invalid JIDs and
being confusing for users anyway.  Better remove it.
2024-04-19 13:57:29 +02:00
mathieui
7c79f28587 XEP-0199: handle component case for keepalive ping 2024-03-22 20:48:36 +01:00
mathieui
dcaf812a28 ci: build cython module for itests 2024-02-09 23:28:15 +01:00
mathieui
ae4de043d2 itests: fix default server call 2024-02-09 23:11:29 +01:00
mathieui
998bbb80ad itests: hardcode default MUC server 2024-02-09 23:07:32 +01:00
mathieui
5a5b36ab39 xmlstream: make mypy even happier 2024-02-09 22:58:20 +01:00
mathieui
f151f0a7ab xmlstream/componentxmpp: fix some typing issues
Make mypy happier
2024-02-09 22:55:20 +01:00
mathieui
2424a3b36f slixtest: cleanup loop only if needed
if not, get_event_loop will throw, we can ignore this
2024-02-09 22:49:47 +01:00
mathieui
1c4bbbce8e ci: fix mypy step 2024-02-09 21:41:03 +01:00
mathieui
66d552d057 xep_0317: Fix compatibility with python < 3.9 2024-02-09 21:32:19 +01:00
nicoco
b8205a9ae4 Update plugin: XEP-0317 (hats)
Merge changes from nicoco's MR that I missed, improving tests and
interface.
2024-02-09 21:06:14 +01:00
nicoco
85b7210115 XEP-0264: Jingle Content Thumbnails (new plugin)
Cheogram actually uses it with SIMS to embed
a blurhash preview in the stanza.
2024-02-09 12:10:12 +01:00
nicoco
909c865524 XEP-0313: Do not try to parse date for fields without value.
Without this we end up passing "None"
instead of a str to the date parser,
which raises a TypeError.
It happens if you try to provide a form
to be filled, when slixmpp acts as a MAM
*server*.
2024-02-09 11:51:34 +01:00
nicoco
586d2f5107 XEP-0313: Add support for flipped page 2024-02-08 20:45:48 +01:00
nicoco
9f7260747f Add XEP_0461 to PluginDict 2024-02-08 20:34:16 +01:00
mathieui
c41209510a xep_049: implement bookmarks pinning stanzas 2024-02-04 11:59:36 +01:00
mathieui
9266486f46 xep_0317: add initial stanza support for hats 2024-02-04 11:32:24 +01:00
mathieui
5226858e0c Release 1.8.5 2024-02-02 01:59:31 +01:00
mathieui
7128ea249b Fix running process() with a timeout (closes #3505) 2024-02-02 01:00:25 +01:00
Maxime “pep” Buquet
992d80dd09 SCRAM: Restrict tls-unique to TLSv1.2
And prepare the code to work when CPython implements tls-exporter for
TLSv1.3.
This adds tls_version and binding_proposed attributes to the security
properties so we can detect if we were offerred channel binding SASL
mechanisms, and which TLS version we are on.

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2024-02-02 00:56:40 +01:00
mathieui
c25305e80f componentxmpp: fix default host for components 2023-12-29 14:13:41 +01:00
mathieui
6765f84133 tests: close event loop at exit
prevents a nice segfault
2023-12-29 13:53:58 +01:00
mathieui
31fe7f7e06 [CI] add woodpecker CI 2023-12-29 13:53:58 +01:00
nicoco
84a7ac020f XEP-0461: rely on XEP-0428 for fallback
Breaks the previous fallback helpers, we now
rely on XEP-0461 instead
2023-12-28 16:38:37 +00:00
nicoco
331c1c1e21 XEP-0428: add fallback body and subject elements
+ tests
+ helpers to strip the fallback content
2023-12-28 16:38:37 +00:00
nicoco
28a60c22e2 ElementBase: add weak ref to parent when using append() 2023-12-28 16:38:37 +00:00
nicoco
af934b5bdf fix slixmpp.xmlstream.__all__ 2023-12-28 16:38:37 +00:00
genghis
897f876504 Correct Slixfeed title and add groupchat link to Stable Diffusion 2023-12-28 16:04:19 +00:00
genghis
2888be17ab Correct groupchat link for WhisperBot 2023-12-28 16:04:19 +00:00
genghis
975e31229c Correct links so they match to their respective text 2023-12-28 16:04:19 +00:00
genghis
6e9e66139d Add Stable Diffusion 2023-12-28 16:04:19 +00:00
genghis
380ac04d52 Update docs/projects.rst 2023-12-28 16:04:19 +00:00
genghis
9e5b530607 Update docs/projects.rst 2023-12-28 16:04:19 +00:00
genghis
71de274fab Update docs/projects.rst 2023-12-28 16:04:19 +00:00
genghis
5a0b02378d Add document Projects
Bots and Services utilizing Slixmpp
2023-12-28 16:04:19 +00:00
sxavier
9fc82e9e6f xep_0221: Add documentation overview and example 2023-12-28 16:01:19 +00:00
nicoco
ca90d3908e xep-0115: perf: avoid simultaneous disco info queries for the same verstring 2023-12-28 15:56:44 +00:00
Daniel Roschka
7de5cbcf33 Fix connect parameters used for follow-up calls
XMLStream.connect() is supposed to persist the parameters
it gets called with to allow follow-up calls to call
XMLStream.connect() without any parameters to result in a connection
with the same properties as the original one. That's for example used by
XMLStream.reconnect() when establishing a new connection.

Unfortunately that was broken for some of the parameters and resulted
different TLS related settings on reconnections. This commit fixes that.
2023-12-27 11:45:04 +01:00
Nicolas Cedilnik
76a11d4899 xep0356: implement IQ privilege
Also included:

- correctly handle privileges from different
  servers
- check that privileges have been granted before
  attempting to send something and raise
  PermissionError if not
- use dataclass and enums to store permissions instead of
  untyped dict
2023-12-19 14:14:16 +00:00
mathieui
dcfa0f20f9 [docs] add readthedocs.yaml 2023-11-13 19:38:48 +01:00
58 changed files with 1636 additions and 756 deletions

7
.gitignore vendored
View File

@@ -14,4 +14,9 @@ slixmpp.egg-info/
.DS_STORE
.idea/
.vscode/
venv/
venv/
# Added by cargo
/target
/Cargo.lock

22
.readthedocs.yaml Normal file
View File

@@ -0,0 +1,22 @@
# .readthedocs.yaml
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
# Required
version: 2
# Set the version of Python and other tools you might need
build:
os: ubuntu-22.04
tools:
python: "3.11"
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
# We recommend specifying your dependencies to enable reproducible builds:
# https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
python:
install:
- requirements: docs/requirements.txt

7
.travis.yml Normal file
View File

@@ -0,0 +1,7 @@
language: python
python:
- "3.7"
- "3.8-dev"
install:
- "pip install ."
script: testall.py

6
.woodpecker/lint.yml Normal file
View File

@@ -0,0 +1,6 @@
steps:
mypy:
image: python:3
commands:
- pip3 install mypy types-setuptools
- mypy slixmpp

View File

@@ -0,0 +1,10 @@
steps:
test_integration:
image: "python:3.11"
secrets: [ci_account1, ci_account1_password, ci_account2, ci_account2_password, ci_muc_server]
commands:
- apt-get update
- apt-get install -y python3-pip cython3 gpg idn libidn-dev
- pip3 install emoji aiohttp aiodns
- python3 setup.py build_ext --inplace
- ./run_integration_tests.py

17
.woodpecker/test.yml Normal file
View File

@@ -0,0 +1,17 @@
steps:
unit_tests:
image: "python:${TAG}"
commands:
- apt-get update
- apt-get install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
matrix:
TAG:
- "3.7"
- "3.9"
- "3.8"
- "3.10"
- "3.11"
- "3.12"

13
Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "slixmpp"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
jid = "0.10"
pyo3 = "0.21"
[lib]
crate-type = ["cdylib"]

View File

@@ -1,50 +0,0 @@
"""
Cythonize the .pyx file for sdist, and compile it for wheels.
NB: produced wheels are not valid, but the sdist should be.
"""
import os
from Cython.Build import cythonize
from subprocess import check_output, CalledProcessError, DEVNULL, call
from tempfile import TemporaryFile
from setuptools.command.build_py import build_py
def check_include(library_name, header):
command = [os.environ.get("PKG_CONFIG", "pkg-config"), "--cflags", library_name]
try:
cflags = check_output(command).decode("utf-8").split()
except FileNotFoundError:
print("pkg-config not found.")
return False
except CalledProcessError:
# pkg-config already prints the missing libraries on stderr.
return False
command = [os.environ.get("CC", "cc")] + cflags + ["-E", "-"]
with TemporaryFile("w+") as c_file:
c_file.write("#include <%s>" % header)
c_file.seek(0)
try:
return call(command, stdin=c_file, stdout=DEVNULL, stderr=DEVNULL) == 0
except FileNotFoundError:
print("%s headers not found." % library_name)
return False
class Build(build_py):
def run(self):
self.run_command("build_ext")
return super().run()
def initialize_options(self):
super().initialize_options()
has_python_headers = check_include("python3", "Python.h")
has_stringprep_headers = check_include("libidn", "stringprep.h")
if has_python_headers and has_stringprep_headers:
self.distribution.ext_modules = cythonize("slixmpp/stringprep.pyx")
else:
print("Falling back to the slow stringprep module.")

View File

@@ -1064,5 +1064,12 @@
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.4.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.5</revision>
<created>2024-02-02</created>
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.5.tar.gz"/>
</Version>
</release>
</Project>
</rdf:RDF>

95
docs/projects.rst Normal file
View File

@@ -0,0 +1,95 @@
Projects Using Slixmpp
======================
Applications
------------
sendxmpp-py
~~~~~~~~~~~
sendxmpp is a command line program and is the XMPP equivalent of sendmail. It is a Python version of the original sendxmpp which is written in Perl.
- `Source <https://github.com/moparisthebest/sendxmpp-py>`_
Bots
----
BotLogMauve
~~~~~~~~~~~
XMPP bot which logs groupchat messages. Logs are in text format, with one file per day and per groupchat.
- `Source <https://git.khaganat.net/khaganat/BotLogMauve>`_
LinkBot
~~~~~~~
This bot reveals the title of any shared link in a groupchat for quick content insight.
- `Source <https://git.xmpp-it.net/mario/XMPPBot>`_
llama-bot
~~~~~~~~~
Llama-bot enables engaging communication with the LLM (large language model) of llama.cpp, providing seamless and dynamic conversation with it.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://github.com/decent-im/llama-bot>`_
- `Demo <xmpp:llama@decent.im?message>`_
Morbot
~~~~~~
Morbot is a simple Slixmpp bot that will take new articles from listed RSS feeds and send them to assigned XMPP MUCs.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/Morbot>`_
Slixfeed
~~~~~~~~
Slixfeed aims to be an easy to use and fully-featured news aggregator bot for XMPP. It provides a convenient access to Blogs, Fediverse and News websites along with filtering functionality.
- `Groupchat <xmpp:slixfeed@chat.woodpeckersnest.space?join>`_
- `Source <https://gitgud.io/sjehuda/slixfeed>`_
sms4you
~~~~~~~
sms4you forwards messages from and to SMS and connects either with sms4you-xmpp or sms4you-email to choose the other mean of communication. Nice for receiving or sending SMS, independently from carrying a SIM card.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Homepage <https://sms4you-team.pages.debian.net/sms4you/>`_
- `Source <https://salsa.debian.org/sms4you-team/sms4you>`_
Stable Diffusion
~~~~~~~~~~~~~~~~
XMPP bot that generates digital images from textual descriptions.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Source <https://www.nicoco.fr/blog/2022/08/31/xmpp-bot-stable-diffusion/>`_
WhisperBot
~~~~~~~~~~
XMPP bot that transliterates audio messages using OpenAI's Whisper libraries.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/WhisperBot>`_
XMPP MUC Message Gateway
~~~~~~~~~~~~~~~~~~~~~~~~
A multipurpose JSON forwarder microservice from HTTP POST to XMPP MUC room over TLSv1.2 with SliXMPP.
- `Source <https://github.com/immanuelfodor/xmpp-muc-message-gateway>`_
Services
--------
AtomToPubsub
~~~~~~~~~~~~
AtomToPubsub is a simple Python script that parses Atom + RSS feeds and pushes the entries to a designated XMPP Pubsub Node.
- `Groupchat <xmpp:movim@conference.movim.eu?join>`_
- `Source <https://github.com/imattau/atomtopubsub>`_
Slidge
~~~~~~
Slidge is a general purpose XMPP gateway framework in Python.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Homepage <https://slidge.im/core/>`_
- `Source <https://sr.ht/~nicoco/slidge>`_

View File

@@ -10,7 +10,7 @@ UNIQUE = uuid4().hex
class TestMUC(SlixIntegration):
async def asyncSetUp(self):
self.mucserver = self.envjid('CI_MUC_SERVER')
self.mucserver = self.envjid('CI_MUC_SERVER', default='chat.jabberfr.org')
self.muc = JID('%s@%s' % (UNIQUE, self.mucserver))
self.add_client(
self.envjid('CI_ACCOUNT1'),

15
mypy.ini Normal file
View File

@@ -0,0 +1,15 @@
[mypy]
check_untyped_defs = False
ignore_missing_imports = True
[mypy-slixmpp.types]
ignore_errors = True
[mypy-slixmpp.thirdparty.*]
ignore_errors = True
[mypy-slixmpp.plugins.*]
ignore_errors = True
[mypy-slixmpp.plugins.base]
ignore_errors = False

View File

@@ -1,67 +0,0 @@
[project]
name = "slixmpp"
version = "1.8.4"
description = 'Slixmpp is an elegant Python library for XMPP (aka Jabber).'
requires-python = ">=3.7"
dependencies = [
"aiodns >= 1.0",
"pyasn1",
"pyasn1_modules",
"typing_extensions; python_version < '3.8.0'",
]
classifiers = [
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Topic :: Internet :: XMPP',
'Topic :: Software Development :: Libraries :: Python Modules',
]
readme = "README.rst"
license = { file = "LICENSE" }
[[project.authors]]
name = "Florent Le Coz"
email = "louiz@louiz.org"
[project.urls]
Repository = 'https://codeberg.org/poezio/slixmpp'
[project.optional-dependencies]
XEP-0363 = ['aiohttp']
XEP-0444 = ['emoji']
XEP-0454 = ['cryptography']
Safer-XML-parsing = ['defusedxml']
[build-system]
requires = ["setuptools", "cython"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["slixmpp"]
py-modules = ["_custom_build"]
[tool.setuptools.cmdclass]
build_py = "_custom_build.Build"
[tool.mypy]
check_untyped_defs = false
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = 'slixmpp.types'
ignore_errors = true
[[tool.mypy.overrides]]
module = 'slixmpp.thirdparty.*'
ignore_errors = true
[[tool.mypy.overrides]]
module = 'slixmpp.plugins.*'
ignore_errors = true
[[tool.mypy.overrides]]
module = 'slixmpp.plugins.base'
ignore_errors = false

103
setup.py Executable file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (C) 2007-2011 Nathanael C. Fritz
# All Rights Reserved
#
# This software is licensed as described in the README.rst and LICENSE
# file, which you should have received as part of this distribution.
import runpy
import os
from pathlib import Path
from subprocess import call, DEVNULL, check_output, CalledProcessError
from tempfile import TemporaryFile
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
from run_tests import TestCommand
version_mod = runpy.run_path('slixmpp/version.py')
VERSION = version_mod['__version__']
DESCRIPTION = ('Slixmpp is an elegant Python library for XMPP (aka Jabber).')
with open('README.rst', encoding='utf8') as readme:
LONG_DESCRIPTION = readme.read()
CLASSIFIERS = [
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Topic :: Internet :: XMPP',
'Topic :: Software Development :: Libraries :: Python Modules',
]
packages = [str(mod.parent) for mod in Path('slixmpp').rglob('__init__.py')]
def check_include(library_name, header):
command = [os.environ.get('PKG_CONFIG', 'pkg-config'), '--cflags', library_name]
try:
cflags = check_output(command).decode('utf-8').split()
except FileNotFoundError:
print('pkg-config not found.')
return False
except CalledProcessError:
# pkg-config already prints the missing libraries on stderr.
return False
command = [os.environ.get('CC', 'cc')] + cflags + ['-E', '-']
with TemporaryFile('w+') as c_file:
c_file.write('#include <%s>' % header)
c_file.seek(0)
try:
return call(command, stdin=c_file, stdout=DEVNULL, stderr=DEVNULL) == 0
except FileNotFoundError:
print('%s headers not found.' % library_name)
return False
HAS_PYTHON_HEADERS = check_include('python3', 'Python.h')
HAS_STRINGPREP_HEADERS = check_include('libidn', 'stringprep.h')
ext_modules = None
if HAS_PYTHON_HEADERS and HAS_STRINGPREP_HEADERS:
try:
from Cython.Build import cythonize
except ImportError:
print('Cython not found, falling back to the slow stringprep module.')
else:
ext_modules = cythonize('slixmpp/stringprep.pyx')
else:
print('Falling back to the slow stringprep module.')
setup(
name="slixmpp",
version=VERSION,
description=DESCRIPTION,
long_description=LONG_DESCRIPTION,
author='Florent Le Coz',
author_email='louiz@louiz.org',
url='https://codeberg.org/poezio/slixmpp',
license='MIT',
platforms=['any'],
package_data={'slixmpp': ['py.typed']},
packages=packages,
ext_modules=ext_modules,
install_requires=[
'aiodns>=1.0',
'pyasn1',
'pyasn1_modules',
'typing_extensions; python_version < "3.8.0"',
],
extras_require={
'XEP-0363': ['aiohttp'],
'XEP-0444 compliance': ['emoji'],
'XEP-0454': ['cryptography'],
'Safer XML parsing': ['defusedxml'],
},
classifiers=CLASSIFIERS,
cmdclass={'test': TestCommand}
)

View File

@@ -138,8 +138,8 @@ class ClientXMPP(BaseXMPP):
self.credentials['password'] = value
def connect(self, address: Optional[Tuple[str, int]] = None, # type: ignore
use_ssl: bool = False, force_starttls: bool = True,
disable_starttls: bool = False) -> None:
use_ssl: Optional[bool] = None, force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
"""Connect to the XMPP server.
When no address is given, a SRV lookup for the server will
@@ -166,8 +166,8 @@ class ClientXMPP(BaseXMPP):
host, port = (self.boundjid.host, 5222)
self.dns_service = 'xmpp-client'
return XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
def register_feature(self, name: str, handler: Callable, restart: bool = False, order: int = 5000) -> None:
"""Register a stream feature handler.

View File

@@ -9,6 +9,8 @@
import logging
import hashlib
from typing import Optional
from slixmpp import Message, Iq, Presence
from slixmpp.basexmpp import BaseXMPP
from slixmpp.stanza import Handshake
@@ -93,7 +95,9 @@ class ComponentXMPP(BaseXMPP):
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
def connect(self, host=None, port=None, use_ssl=False):
def connect(self, host: Optional[str] = None, port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
"""Connect to the server.
@@ -103,17 +107,18 @@ class ComponentXMPP(BaseXMPP):
Defauts to :attr:`server_port`.
:param use_ssl: Flag indicating if SSL should be used by connecting
directly to a port using SSL.
:param force_starttls: UNUSED
:param disable_starttls: UNUSED
"""
if host is None:
host = self.server_host
if port is None:
port = self.server_port
if host is not None:
self.server_host = host
if port:
self.server_port = port
self.server_name = self.boundjid.host
log.debug("Connecting to %s:%s", host, port)
return XMLStream.connect(self, host=host, port=port,
use_ssl=use_ssl)
XMLStream.connect(self, host=self.server_host, port=self.server_port, use_ssl=use_ssl)
def incoming_filter(self, xml):
"""

View File

@@ -37,7 +37,8 @@ class FeatureMechanisms(BasePlugin):
'unencrypted_digest': False,
'unencrypted_cram': False,
'unencrypted_scram': True,
'order': 100
'order': 100,
'tls_version': None,
}
def plugin_init(self):
@@ -96,7 +97,20 @@ class FeatureMechanisms(BasePlugin):
result[value] = creds.get('email', jid)
elif value == 'channel_binding':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.get_channel_binding()
version = self.xmpp.socket.version()
# As of now, python does not implement anything else
# than tls-unique, which is forbidden on TLSv1.3
# see https://github.com/python/cpython/issues/95341
if version != 'TLSv1.3':
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-unique"
)
elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES:
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-exporter"
)
else:
result[value] = None
else:
result[value] = None
elif value == 'host':
@@ -121,6 +135,11 @@ class FeatureMechanisms(BasePlugin):
result[value] = True
else:
result[value] = False
elif value == 'tls_version':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.version()
elif value == 'binding_proposed':
result[value] = any(x for x in self.mech_list if x.endswith('-PLUS'))
else:
result[value] = self.config.get(value, False)
return result

View File

@@ -1,445 +1 @@
# slixmpp.jid
# ~~~~~~~~~~~~~~~~~~~~~~~
# This module allows for working with Jabber IDs (JIDs).
# Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details
from __future__ import annotations
import re
import socket
from functools import lru_cache
from typing import (
Optional,
Union,
)
from slixmpp.stringprep import nodeprep, resourceprep, idna, StringprepError
HAVE_INET_PTON = hasattr(socket, 'inet_pton')
#: The basic regex pattern that a JID must match in order to determine
#: the local, domain, and resource parts. This regex does NOT do any
#: validation, which requires application of nodeprep, resourceprep, etc.
JID_PATTERN = re.compile(
"^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$"
)
#: The set of escape sequences for the characters not allowed by nodeprep.
JID_ESCAPE_SEQUENCES = {'\\20', '\\22', '\\26', '\\27', '\\2f',
'\\3a', '\\3c', '\\3e', '\\40', '\\5c'}
#: The reverse mapping of escape sequences to their original forms.
JID_UNESCAPE_TRANSFORMATIONS = {'\\20': ' ',
'\\22': '"',
'\\26': '&',
'\\27': "'",
'\\2f': '/',
'\\3a': ':',
'\\3c': '<',
'\\3e': '>',
'\\40': '@',
'\\5c': '\\'}
# TODO: Find the best cache size for a standard usage.
@lru_cache(maxsize=1024)
def _parse_jid(data: str):
"""
Parse string data into the node, domain, and resource
components of a JID, if possible.
:param string data: A string that is potentially a JID.
:raises InvalidJID:
:returns: tuple of the validated local, domain, and resource strings
"""
match = JID_PATTERN.match(data)
if not match:
raise InvalidJID('JID could not be parsed')
(node, domain, resource) = match.groups()
node = _validate_node(node)
domain = _validate_domain(domain)
resource = _validate_resource(resource)
return node, domain, resource
def _validate_node(node: Optional[str]):
"""Validate the local, or username, portion of a JID.
:raises InvalidJID:
:returns: The local portion of a JID, as validated by nodeprep.
"""
if node is None:
return ''
try:
node = nodeprep(node)
except StringprepError:
raise InvalidJID('Nodeprep failed')
if not node:
raise InvalidJID('Localpart must not be 0 bytes')
if len(node) > 1023:
raise InvalidJID('Localpart must be less than 1024 bytes')
return node
def _validate_domain(domain: str):
"""Validate the domain portion of a JID.
IP literal addresses are left as-is, if valid. Domain names
are stripped of any trailing label separators (`.`), and are
checked with the nameprep profile of stringprep. If the given
domain is actually a punyencoded version of a domain name, it
is converted back into its original Unicode form. Domains must
also not start or end with a dash (`-`).
:raises InvalidJID:
:returns: The validated domain name
"""
ip_addr = False
# First, check if this is an IPv4 address
try:
socket.inet_aton(domain)
ip_addr = True
except socket.error:
pass
# Check if this is an IPv6 address
if not ip_addr and HAVE_INET_PTON and domain[0] == '[' and domain[-1] == ']':
try:
ip = domain[1:-1]
socket.inet_pton(socket.AF_INET6, ip)
ip_addr = True
except (socket.error, ValueError):
pass
if not ip_addr:
# This is a domain name, which must be checked further
if domain and domain[-1] == '.':
domain = domain[:-1]
try:
domain = idna(domain)
except StringprepError:
raise InvalidJID(f'idna validation failed: {domain}')
if ':' in domain:
raise InvalidJID(f'Domain containing a port: {domain}')
for label in domain.split('.'):
if not label:
raise InvalidJID(f'Domain containing too many dots: {domain}')
if '-' in (label[0], label[-1]):
raise InvalidJID(f'Domain starting or ending with -: {domain}')
if not domain:
raise InvalidJID('Domain must not be 0 bytes')
if len(domain) > 1023:
raise InvalidJID('Domain must be less than 1024 bytes')
return domain
def _validate_resource(resource: Optional[str]):
"""Validate the resource portion of a JID.
:raises InvalidJID:
:returns: The local portion of a JID, as validated by resourceprep.
"""
if resource is None:
return ''
try:
resource = resourceprep(resource)
except StringprepError:
raise InvalidJID('Resourceprep failed')
if not resource:
raise InvalidJID('Resource must not be 0 bytes')
if len(resource) > 1023:
raise InvalidJID('Resource must be less than 1024 bytes')
return resource
def _unescape_node(node: str):
"""Unescape a local portion of a JID.
.. note::
The unescaped local portion is meant ONLY for presentation,
and should not be used for other purposes.
"""
unescaped = []
seq = ''
for i, char in enumerate(node):
if char == '\\':
seq = node[i:i+3]
if seq not in JID_ESCAPE_SEQUENCES:
seq = ''
if seq:
if len(seq) == 3:
unescaped.append(JID_UNESCAPE_TRANSFORMATIONS.get(seq, char))
# Pop character off the escape sequence, and ignore it
seq = seq[1:]
else:
unescaped.append(char)
return ''.join(unescaped)
def _format_jid(
local: Optional[str] = None,
domain: Optional[str] = None,
resource: Optional[str] = None,
):
"""Format the given JID components into a full or bare JID.
:param string local: Optional. The local portion of the JID.
:param string domain: Required. The domain name portion of the JID.
:param strin resource: Optional. The resource portion of the JID.
:return: A full or bare JID string.
"""
if domain is None:
return ''
if local is not None:
result = local + '@' + domain
else:
result = domain
if resource is not None:
result += '/' + resource
return result
class InvalidJID(ValueError):
"""
Raised when attempting to create a JID that does not pass validation.
It can also be raised if modifying an existing JID in such a way as
to make it invalid, such trying to remove the domain from an existing
full JID while the local and resource portions still exist.
"""
# pylint: disable=R0903
class UnescapedJID:
"""
.. versionadded:: 1.1.10
"""
__slots__ = ('_node', '_domain', '_resource')
def __init__(
self,
node: Optional[str],
domain: Optional[str],
resource: Optional[str],
):
self._node = node
self._domain = domain
self._resource = resource
def __getattribute__(self, name: str):
"""Retrieve the given JID component.
:param name: one of: user, server, domain, resource,
full, or bare.
"""
if name == 'resource':
return self._resource or ''
if name in ('user', 'username', 'local', 'node'):
return self._node or ''
if name in ('server', 'domain', 'host'):
return self._domain or ''
if name in ('full', 'jid'):
return _format_jid(self._node, self._domain, self._resource)
if name == 'bare':
return _format_jid(self._node, self._domain)
return object.__getattribute__(self, name)
def __str__(self):
"""Use the full JID as the string value."""
return _format_jid(self._node, self._domain, self._resource)
def __repr__(self):
"""Use the full JID as the representation."""
return _format_jid(self._node, self._domain, self._resource)
class JID:
"""
A representation of a Jabber ID, or JID.
Each JID may have three components: a user, a domain, and an optional
resource. For example: user@domain/resource
When a resource is not used, the JID is called a bare JID.
The JID is a full JID otherwise.
**JID Properties:**
:full: The string value of the full JID.
:jid: Alias for ``full``.
:bare: The string value of the bare JID.
:node: The node portion of the JID.
:user: Alias for ``node``.
:local: Alias for ``node``.
:username: Alias for ``node``.
:domain: The domain name portion of the JID.
:server: Alias for ``domain``.
:host: Alias for ``domain``.
:resource: The resource portion of the JID.
:param string jid:
A string of the form ``'[user@]domain[/resource]'``.
:param bool bare:
If present, discard the provided resource.
:raises InvalidJID:
"""
__slots__ = ('_node', '_domain', '_resource', '_bare', '_full')
def __init__(self, jid: Optional[Union[str, 'JID']] = None, bare: bool = False):
if not jid:
self._node = ''
self._domain = ''
self._resource = ''
self._bare = ''
self._full = ''
return
elif not isinstance(jid, JID):
node, domain, resource = _parse_jid(jid)
self._node = node
self._domain = domain
self._resource = resource if not bare else ''
else:
self._node = jid._node
self._domain = jid._domain
self._resource = jid._resource if not bare else ''
self._update_bare_full()
def unescape(self):
"""Return an unescaped JID object.
Using an unescaped JID is preferred for displaying JIDs
to humans, and they should NOT be used for any other
purposes than for presentation.
:return: :class:`UnescapedJID`
.. versionadded:: 1.1.10
"""
return UnescapedJID(_unescape_node(self._node),
self._domain,
self._resource)
def _update_bare_full(self):
"""Format the given JID into a bare and a full JID.
"""
self._bare = (self._node + '@' + self._domain
if self._node
else self._domain)
self._full = (self._bare + '/' + self._resource
if self._resource
else self._bare)
@property
def bare(self) -> str:
return self._bare
@bare.setter
def bare(self, value: str):
node, domain, resource = _parse_jid(value)
assert not resource
self._node = node
self._domain = domain
self._update_bare_full()
@property
def node(self) -> str:
return self._node
@node.setter
def node(self, value: Optional[str]):
self._node = _validate_node(value)
self._update_bare_full()
@property
def domain(self) -> str:
return self._domain
@domain.setter
def domain(self, value: str):
self._domain = _validate_domain(value)
self._update_bare_full()
@property
def resource(self) -> str:
return self._resource
@resource.setter
def resource(self, value: Optional[str]):
self._resource = _validate_resource(value)
self._update_bare_full()
@property
def full(self) -> str:
return self._full
@full.setter
def full(self, value: str):
self._node, self._domain, self._resource = _parse_jid(value)
self._update_bare_full()
user = node
local = node
username = node
server = domain
host = domain
jid = full
def __str__(self):
"""Use the full JID as the string value."""
return self._full
def __repr__(self):
"""Use the full JID as the representation."""
return self._full
# pylint: disable=W0212
def __eq__(self, other):
"""Two JIDs are equal if they have the same full JID value."""
if isinstance(other, UnescapedJID):
return False
if not isinstance(other, JID):
try:
other = JID(other)
except InvalidJID:
return NotImplemented
return (self._node == other._node and
self._domain == other._domain and
self._resource == other._resource)
def __ne__(self, other):
"""Two JIDs are considered unequal if they are not equal."""
return not self == other
def __hash__(self):
"""Hash a JID based on the string version of its full JID."""
return hash(self._full)
from libslixmpp import JID, InvalidJID

View File

@@ -76,6 +76,7 @@ PLUGINS = [
'xep_0256', # Last Activity in Presence
'xep_0257', # Client Certificate Management for SASL EXTERNAL
'xep_0258', # Security Labels in XMPP
'xep_0264', # Jingle Content Thumbnails
# 'xep_0270', # XMPP Compliance Suites 2010. Dont automatically load
'xep_0279', # Server IP Check
'xep_0280', # Message Carbons
@@ -85,6 +86,7 @@ PLUGINS = [
# 'xep_0302', # XMPP Compliance Suites 2012. Dont automatically load
'xep_0308', # Last Message Correction
'xep_0313', # Message Archive Management
'xep_0317', # Hats
'xep_0319', # Last User Interaction in Presence
# 'xep_0323', # IoT Systems Sensor Data. Dont automatically load
# 'xep_0325', # IoT Systems Control. Dont automatically load
@@ -118,6 +120,7 @@ PLUGINS = [
'xep_0444', # Message Reactions
'xep_0447', # Stateless file sharing
'xep_0461', # Message Replies
'xep_0469', # Bookmarks Pinning
# Meant to be imported by plugins
]

View File

@@ -7,7 +7,8 @@ import logging
import hashlib
import base64
from asyncio import Future
from asyncio import Future, Lock
from collections import defaultdict
from typing import Optional
from slixmpp import __version__
@@ -94,6 +95,9 @@ class XEP_0115(BasePlugin):
disco.assign_verstring = self.assign_verstring
disco.get_verstring = self.get_verstring
# prevent concurrent fetches for the same hash
self._locks = defaultdict(Lock)
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace)
self.xmpp.del_filter('out', self._filter_add_caps)
@@ -137,7 +141,7 @@ class XEP_0115(BasePlugin):
self.xmpp.event('entity_caps', p)
async def _process_caps(self, pres):
async def _process_caps(self, pres: Presence):
if not pres['caps']['hash']:
log.debug("Received unsupported legacy caps: %s, %s, %s",
pres['caps']['node'],
@@ -147,7 +151,11 @@ class XEP_0115(BasePlugin):
return
ver = pres['caps']['ver']
async with self._locks[ver]:
await self._process_caps_wrapped(pres, ver)
self._locks.pop(ver, None)
async def _process_caps_wrapped(self, pres: Presence, ver: str):
existing_verstring = await self.get_verstring(pres['from'].full)
if str(existing_verstring) == str(ver):
return

View File

@@ -137,7 +137,14 @@ class XEP_0199(BasePlugin):
async def _keepalive(self, event=None):
log.debug("Keepalive ping...")
try:
rtt = await self.ping(self.xmpp.boundjid.host, timeout=self.timeout)
ifrom = None
if self.xmpp.is_component:
ifrom = self.xmpp.boundjid
rtt = await self.ping(
self.xmpp.boundjid.host,
timeout=self.timeout,
ifrom=ifrom
)
except IqTimeout:
log.debug("Did not receive ping back in time. " + \
"Requesting Reconnect.")

View File

@@ -15,6 +15,32 @@ log = logging.getLogger(__name__)
class XEP_0221(BasePlugin):
"""
XEP-0221: Data Forms Media Element
In certain implementations of Data Forms (XEP-0004), it can be
helpful to include media data such as small images. One example is
CAPTCHA Forms (XEP-0158). This plugin implements a method for
including media data in a data form.
Typical use pattern:
.. code-block:: python
self.register_plugin('xep_0221')
self['xep_0050'].add_command(node="showimage",
name="Show my image",
handler=self.form_handler)
def form_handler(self,iq,session):
image_url="https://xmpp.org/images/logos/xmpp-logo.svg"
form=self['xep_0004'].make_form('result','My Image')
form.addField(var='myimage', ftype='text-single', label='My Image', value=image_url)
form.field['myimage']['media'].add_uri(value=image_url, itype="image/svg")
session['payload']=form
return session
"""
name = 'xep_0221'
description = 'XEP-0221: Data Forms Media Element'

View File

@@ -0,0 +1,5 @@
from slixmpp.plugins.base import register_plugin
from .thumbnail import XEP_0264
register_plugin(XEP_0264)

View File

@@ -0,0 +1,36 @@
from typing import Optional
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0234.stanza import File
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:thumbs:1"
class Thumbnail(ElementBase):
name = plugin_attrib = "thumbnail"
namespace = NS
interfaces = {"uri", "media-type", "width", "height"}
def get_width(self) -> int:
return _int_or_none(self._get_attr("width"))
def get_height(self) -> int:
return _int_or_none(self._get_attr("height"))
def set_width(self, v: int) -> None:
self._set_attr("width", str(v))
def set_height(self, v: int) -> None:
self._set_attr("height", str(v))
def _int_or_none(v) -> Optional[int]:
try:
return int(v)
except ValueError:
return None
def register_plugin():
register_stanza_plugin(File, Thumbnail)

View File

@@ -0,0 +1,24 @@
import logging
from slixmpp.plugins import BasePlugin
from . import stanza
log = logging.getLogger(__name__)
class XEP_0264(BasePlugin):
"""
XEP-0264: Jingle Content Thumbnails
Can also be used with 0385 (Stateless inline media sharing)
"""
name = "xep_0264"
description = "XEP-0264: Jingle Content Thumbnails"
dependencies = {"xep_0234"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -52,9 +52,10 @@ class MAM(ElementBase):
#: fetch, not relevant for the stanza itself.
interfaces = {
'queryid', 'start', 'end', 'with', 'results',
'before_id', 'after_id', 'ids',
'before_id', 'after_id', 'ids', 'flip_page',
}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids'}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids',
'flip_page'}
def setup(self, xml=None):
ElementBase.setup(self, xml)
@@ -81,7 +82,7 @@ class MAM(ElementBase):
def get_start(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('start')
if field:
if field and field["value"]:
return xep_0082.parse(field['value'])
return None
@@ -94,7 +95,7 @@ class MAM(ElementBase):
def get_end(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('end')
if field:
if field and field["value"]:
return xep_0082.parse(field['value'])
return None
@@ -168,6 +169,8 @@ class MAM(ElementBase):
def del_results(self):
self._results = []
def get_flip_page(self):
return self.xml.find(f'{{{self.namespace}}}flip-page') is not None
class Fin(ElementBase):
"""A MAM fin element (end of query).

View File

@@ -0,0 +1,11 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.plugins import register_plugin
from slixmpp.plugins.xep_0317 import stanza
from slixmpp.plugins.xep_0317.hats import XEP_0317
from slixmpp.plugins.xep_0317.stanza import Hat, Hats
register_plugin(XEP_0317)
__all__ = ['stanza', 'XEP_317']

View File

@@ -0,0 +1,16 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0317(BasePlugin):
"""
XEP-0317: Hats
"""
name = 'xep_0317'
description = 'XEP-0317: Hats'
dependencies = {'xep_0030', 'xep_0045', 'xep_0050'}
stanza = stanza
namespace = stanza.NS
def plugin_init(self):
stanza.register_plugin()

View File

@@ -0,0 +1,58 @@
from slixmpp import Presence
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from typing import List, Tuple
NS = 'urn:xmpp:hats:0'
class Hats(ElementBase):
"""
Hats element, container for multiple hats:
.. code-block::xml
<hats xmlns='urn:xmpp:hats:0'>
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
<hat title='Presenter' uri='http://schemas.example.com/hats#presenter' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#EC0524"/>
</hat>
</hats>
"""
name = 'hats'
namespace = NS
plugin_attrib = 'hats'
def add_hats(self, data: List[Tuple[str, str]]) -> None:
for uri, title in data:
hat = Hat()
hat["uri"] = uri
hat["title"] = title
self.append(hat)
class Hat(ElementBase):
"""
Hat element, has a title and url, may contain arbitrary sub-elements.
.. code-block::xml
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
"""
name = 'hat'
plugin_attrib = 'hat'
namespace = NS
interfaces = {'title', 'uri'}
plugin_multi_attrib = "hats"
def register_plugin() -> None:
register_stanza_plugin(Hats, Hat, iterable=True)
register_stanza_plugin(Presence, Hats)

View File

@@ -1,7 +1,7 @@
from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0356 import stanza
from slixmpp.plugins.xep_0356.stanza import Perm, Privilege
from slixmpp.plugins.xep_0356.privilege import XEP_0356
from . import stanza
from .privilege import XEP_0356
from .stanza import Perm, Privilege
register_plugin(XEP_0356)

View File

@@ -0,0 +1,36 @@
import dataclasses
from collections import defaultdict
from enum import Enum
class RosterAccess(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class MessagePermission(str, Enum):
NONE = "none"
OUTGOING = "outgoing"
class IqPermission(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class PresencePermission(str, Enum):
NONE = "none"
MANAGED_ENTITY = "managed_entity"
ROSTER = "roster"
@dataclasses.dataclass
class Permissions:
roster = RosterAccess.NONE
message = MessagePermission.NONE
iq = defaultdict(lambda: IqPermission.NONE)
presence = PresencePermission.NONE

View File

@@ -1,14 +1,16 @@
import logging
import typing
import uuid
from collections import defaultdict
from slixmpp import Message, JID, Iq
from slixmpp import JID, Iq, Message
from slixmpp.plugins.base import BasePlugin
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import StanzaBase
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
from slixmpp.xmlstream.matcher import StanzaPath
from . import stanza
from .permissions import IqPermission, MessagePermission, Permissions, RosterAccess
log = logging.getLogger(__name__)
@@ -29,7 +31,7 @@ class XEP_0356(BasePlugin):
dependencies = {"xep_0297"}
stanza = stanza
granted_privileges = {"roster": "none", "message": "none", "presence": "none"}
granted_privileges = defaultdict(Permissions)
def plugin_init(self):
if not self.xmpp.is_component:
@@ -49,32 +51,42 @@ class XEP_0356(BasePlugin):
def plugin_end(self):
self.xmpp.remove_handler("Privileges")
def _handle_privilege(self, msg: Message):
def _handle_privilege(self, msg: StanzaBase):
"""
Called when the XMPP server advertise the component's privileges.
Stores the privileges in this instance's granted_privileges attribute (a dict)
and raises the privileges_advertised event
"""
permissions = self.granted_privileges[msg.get_from()]
for perm in msg["privilege"]["perms"]:
self.granted_privileges[perm["access"]] = perm["type"]
access = perm["access"]
if access == "iq":
for ns in perm["namespaces"]:
permissions.iq[ns["ns"]] = ns["type"]
elif access in _VALID_ACCESSES:
setattr(permissions, access, perm["type"])
else:
log.warning("Received an invalid privileged access: %s", access)
log.debug(f"Privileges: {self.granted_privileges}")
self.xmpp.event("privileges_advertised")
def send_privileged_message(self, msg: Message):
if self.granted_privileges["message"] == "outgoing":
self._make_privileged_message(msg).send()
else:
log.error(
if (
self.granted_privileges[msg.get_from().domain].message
!= MessagePermission.OUTGOING
):
raise PermissionError(
"The server hasn't authorized us to send messages on behalf of other users"
)
else:
self._make_privileged_message(msg).send()
def _make_privileged_message(self, msg: Message):
stanza = self.xmpp.make_message(
mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
)
stanza["privilege"]["forwarded"].append(msg)
return stanza
server = msg.get_from().domain
wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
wrapped["privilege"]["forwarded"].append(msg)
return wrapped
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
return self.xmpp.make_iq_get(
@@ -106,9 +118,15 @@ class XEP_0356(BasePlugin):
:param jid: user we want to fetch the roster from
"""
if self.granted_privileges["roster"] not in ("get", "both"):
log.error("The server did not grant us privileges to get rosters")
raise ValueError
if isinstance(jid, str):
jid = JID(jid)
if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to get rosters"
)
else:
return await self._make_get_roster(jid).send(**send_kwargs)
@@ -137,8 +155,56 @@ class XEP_0356(BasePlugin):
},
}
"""
if self.granted_privileges["roster"] not in ("set", "both"):
log.error("The server did not grant us privileges to set rosters")
raise ValueError
if isinstance(jid, str):
jid = JID(jid)
if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to set rosters"
)
else:
return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
async def send_privileged_iq(
self, encapsulated_iq: Iq, iq_id: typing.Optional[str] = None
):
"""
Send an IQ on behalf of a user
Caution: the IQ *must* have the jabber:client namespace
"""
iq_id = iq_id or str(uuid.uuid4())
encapsulated_iq["id"] = iq_id
server = encapsulated_iq.get_to().domain
perms = self.granted_privileges.get(server)
if not perms:
raise PermissionError(f"{server} has not granted us any privilege")
itype = encapsulated_iq["type"]
for ns in encapsulated_iq.plugins.values():
type_ = perms.iq[ns.namespace]
if type_ == IqPermission.NONE:
raise PermissionError(
f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
)
elif type_ == IqPermission.BOTH:
pass
elif type_ != itype:
raise PermissionError(
f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
)
iq = self.xmpp.make_iq(
itype=itype,
ifrom=self.xmpp.boundjid.bare,
ito=encapsulated_iq.get_from(),
id=iq_id,
)
iq["privileged_iq"].append(encapsulated_iq)
resp = await iq.send()
return resp["privilege"]["forwarded"]["iq"]
# does not include iq access that is handled differently
_VALID_ACCESSES = {"message", "roster", "presence"}

View File

@@ -1,13 +1,12 @@
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
register_stanza_plugin,
)
from slixmpp.plugins.xep_0297 import Forwarded
from slixmpp.stanza import Iq, Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
NS = "urn:xmpp:privilege:2"
class Privilege(ElementBase):
namespace = "urn:xmpp:privilege:2"
namespace = NS
name = "privilege"
plugin_attrib = "privilege"
@@ -25,26 +24,40 @@ class Privilege(ElementBase):
def presence(self):
return self.permission("presence")
def iq(self):
return self.permission("iq")
def add_perm(self, access, type):
def add_perm(self, access, type_):
# This should only be needed for servers, so maybe out of scope for slixmpp
perm = Perm()
perm["type"] = type
perm["type"] = type_
perm["access"] = access
self.append(perm)
class Perm(ElementBase):
namespace = "urn:xmpp:privilege:2"
namespace = NS
name = "perm"
plugin_attrib = "perm"
plugin_multi_attrib = "perms"
interfaces = {"type", "access"}
class NameSpace(ElementBase):
namespace = NS
name = "namespace"
plugin_attrib = "namespace"
plugin_multi_attrib = "namespaces"
interfaces = {"ns", "type"}
class PrivilegedIq(ElementBase):
namespace = NS
name = "privileged_iq"
plugin_attrib = "privileged_iq"
def register():
register_stanza_plugin(Message, Privilege)
register_stanza_plugin(Iq, Privilege)
register_stanza_plugin(Privilege, Forwarded)
register_stanza_plugin(Privilege, Perm, iterable=True)
register_stanza_plugin(Perm, NameSpace, iterable=True)
register_stanza_plugin(Iq, PrivilegedIq)

View File

@@ -1,8 +1,13 @@
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp.
# See the file LICENSE for copying permissio
from abc import ABC
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
@@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
)
NS = 'urn:xmpp:fallback:0'
NS = "urn:xmpp:fallback:0"
class Fallback(ElementBase):
namespace = NS
name = 'fallback'
plugin_attrib = 'fallback'
name = "fallback"
plugin_attrib = "fallback"
plugin_multi_attrib = "fallbacks"
interfaces = {"for"}
def _find_fallback(self, fallback_for: str) -> "Fallback":
if self["for"] == fallback_for:
return self
for fallback in self.parent()["fallbacks"]:
if fallback["for"] == fallback_for:
return fallback
raise AttributeError("No fallback for this namespace", fallback_for)
def get_stripped_body(
self, fallback_for: str, element: Literal["body", "subject"] = "body"
) -> str:
"""
Get the body of a message, with the fallback part stripped
:param fallback_for: namespace of the fallback to strip
:param element: set this to "subject" get the stripped subject instead
of body
:return: body (or subject) content minus the fallback part
"""
fallback = self._find_fallback(fallback_for)
start = fallback[element]["start"]
end = fallback[element]["end"]
body = self.parent()[element]
if start == end == 0:
return ""
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
class FallbackMixin(ABC):
namespace = NS
name = NotImplemented
plugin_attrib = NotImplemented
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
return _int_or_zero(self._get_attr("start"))
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
return _int_or_zero(self._get_attr("end"))
class FallbackBody(FallbackMixin, ElementBase):
name = plugin_attrib = "body"
class FallbackSubject(FallbackMixin, ElementBase):
name = plugin_attrib = "subject"
def _int_or_zero(v: str):
try:
return int(v)
except ValueError:
return 0
def register_plugins():
register_stanza_plugin(Message, Fallback)
register_stanza_plugin(Message, Fallback, iterable=True)
register_stanza_plugin(Fallback, FallbackBody)
register_stanza_plugin(Fallback, FallbackSubject)

View File

@@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
name = "xep_0461"
description = "XEP-0461: Message Replies"
dependencies = {"xep_0030"}
dependencies = {"xep_0030", "xep_0428"}
stanza = stanza
namespace = stanza.NS

View File

@@ -2,6 +2,7 @@ from typing import Optional
from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0428.stanza import Fallback
NS = "urn:xmpp:reply:0"
@@ -12,39 +13,11 @@ class Reply(ElementBase):
plugin_attrib = "reply"
interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_fallback_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end:
return body[start:end]
else:
return ""
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
"""
Add plain text fallback for clients not implementing XEP-0461.
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
fallback_body attributes accordingly, so that clients implementing
XEP-0461 can hide the fallback text.
@@ -57,39 +30,27 @@ class FeatureFallBack(ElementBase):
if nickname:
quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"]
msg["feature_fallback"]["for"] = NS
msg["feature_fallback"]["fallback_body"]["start"] = 0
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
fallback = Fallback()
fallback["for"] = NS
fallback["body"]["start"] = 0
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
class FallBackBody(ElementBase):
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
namespace = FeatureFallBack.namespace
name = "body"
plugin_attrib = "fallback_body"
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
try:
return int(self._get_attr("start"))
except ValueError:
return 0
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
try:
return int(self._get_attr("end"))
except ValueError:
return 0
def get_fallback_body(self) -> str:
msg = self.parent()
for fallback in msg["fallbacks"]:
if fallback["for"] == NS:
break
else:
return ""
start = fallback["body"]["start"]
end = fallback["body"]["end"]
body = msg["body"]
if start <= end:
return body[start:end]
else:
return ""
def register_plugins():
register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@@ -0,0 +1,8 @@
from slixmpp.plugins.base import register_plugin
from . import stanza
from .pinning import XEP_0469
register_plugin(XEP_0469)
__all__ = ['stanza', 'XEP_0469']

View File

@@ -0,0 +1,17 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0469(BasePlugin):
"""
XEP-0469: Bookmark Pinning
"""
name = "xep_0469"
description = "XEP-0469: Bookmark Pinning"
dependencies = {"xep_0402"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -0,0 +1,31 @@
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0402.stanza import Extensions
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:bookmarks-pinning:0"
class Pinned(ElementBase):
"""
Pinned bookmark element
To enable it on a Conference element, use enable() like this:
.. code-block::python
# C being a Conference element
C['extensions'].enable('pinned')
Which will add the <pinned> element to the <extensions> element.
"""
namespace = NS
name = "pinned"
plugin_attrib = "pinned"
interfaces = {"pinned"}
bool_interfaces = {"pinned"}
is_extension = True
def register_plugin():
register_stanza_plugin(Extensions, Pinned)

View File

@@ -69,12 +69,14 @@ from slixmpp.plugins.xep_0249 import XEP_0249
from slixmpp.plugins.xep_0256 import XEP_0256
from slixmpp.plugins.xep_0257 import XEP_0257
from slixmpp.plugins.xep_0258 import XEP_0258
from slixmpp.plugins.xep_0264 import XEP_0264
from slixmpp.plugins.xep_0279 import XEP_0279
from slixmpp.plugins.xep_0280 import XEP_0280
from slixmpp.plugins.xep_0297 import XEP_0297
from slixmpp.plugins.xep_0300 import XEP_0300
from slixmpp.plugins.xep_0308 import XEP_0308
from slixmpp.plugins.xep_0313 import XEP_0313
from slixmpp.plugins.xep_0317 import XEP_0317
from slixmpp.plugins.xep_0319 import XEP_0319
from slixmpp.plugins.xep_0332 import XEP_0332
from slixmpp.plugins.xep_0333 import XEP_0333
@@ -100,6 +102,7 @@ from slixmpp.plugins.xep_0428 import XEP_0428
from slixmpp.plugins.xep_0437 import XEP_0437
from slixmpp.plugins.xep_0439 import XEP_0439
from slixmpp.plugins.xep_0444 import XEP_0444
from slixmpp.plugins.xep_0461 import XEP_0461
class PluginsDict(TypedDict):
@@ -162,12 +165,14 @@ class PluginsDict(TypedDict):
xep_0256: XEP_0256
xep_0257: XEP_0257
xep_0258: XEP_0258
xep_0264: XEP_0264
xep_0279: XEP_0279
xep_0280: XEP_0280
xep_0297: XEP_0297
xep_0300: XEP_0300
xep_0308: XEP_0308
xep_0313: XEP_0313
xep_0317: XEP_0317
xep_0319: XEP_0319
xep_0332: XEP_0332
xep_0333: XEP_0333
@@ -193,3 +198,4 @@ class PluginsDict(TypedDict):
xep_0437: XEP_0437
xep_0439: XEP_0439
xep_0444: XEP_0444
xep_0461: XEP_0461

View File

@@ -29,9 +29,9 @@ class SlixIntegration(IsolatedAsyncioTestCase):
self.clients = []
self.addAsyncCleanup(self._destroy)
def envjid(self, name):
def envjid(self, name: str, *, default: Optional[str] = None) -> JID:
"""Get a JID from an env var"""
value = os.getenv(name)
value = os.getenv(name, default=default)
return JID(value)
def envstr(self, name):

View File

@@ -3,6 +3,7 @@
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import atexit
import unittest
from queue import Queue
from xml.parsers.expat import ExpatError
@@ -750,3 +751,12 @@ class SlixTest(unittest.TestCase):
Error.namespace = 'jabber:client'
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
@atexit.register
def cleanup():
try:
loop = asyncio.get_event_loop()
loop.close()
except:
pass

View File

@@ -181,7 +181,7 @@ class SCRAM(Mech):
channel_binding = True
required_credentials = {'username', 'password'}
optional_credentials = {'authzid', 'channel_binding'}
security = {'encrypted', 'unencrypted_scram'}
security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'}
def setup(self, name):
self.use_channel_binding = False
@@ -244,11 +244,15 @@ class SCRAM(Mech):
self.cnonce = bytes(('%s' % random.random())[2:])
gs2_cbind_flag = b'n'
if self.credentials['channel_binding']:
if self.use_channel_binding:
gs2_cbind_flag = b'p=tls-unique'
else:
gs2_cbind_flag = b'y'
if self.security_settings['binding_proposed']:
if self.credentials['channel_binding'] and \
self.use_channel_binding:
if self.security_settings['tls_version'] != 'TLSv1.3':
gs2_cbind_flag = b'p=tls-unique'
else:
gs2_cbind_flag = b'p=tls-exporter'
else:
gs2_cbind_flag = b'y'
authzid = b''
if self.credentials['authzid']:
@@ -280,7 +284,7 @@ class SCRAM(Mech):
raise SASLCancelled('Invalid nonce')
cbind_data = b''
if self.use_channel_binding:
if self.use_channel_binding and self.credentials['channel_binding']:
cbind_data = self.credentials['channel_binding']
cbind_input = self.gs2_header + cbind_data
channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')

View File

@@ -5,5 +5,5 @@
# We don't want to have to import the entire library
# just to get the version info for setup.py
__version__ = '1.8.4'
__version_info__ = (1, 8, 4)
__version__ = '1.8.5'
__version_info__ = (1, 8, 5)

View File

@@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
__all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT']
'ET', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT', 'register_stanza_plugin']

View File

@@ -1243,7 +1243,7 @@ class ElementBase(object):
self.init_plugin(item.__class__.plugin_multi_attrib)
else:
self.iterables.append(item)
item.parent = weakref.ref(self)
return self
def appendxml(self, xml: ET.Element) -> ElementBase:

View File

@@ -290,8 +290,8 @@ class XMLStream(asyncio.BaseProtocol):
self.xml_depth = 0
self.xml_root = None
self.force_starttls = None
self.disable_starttls = None
self.force_starttls = True
self.disable_starttls = False
self.waiting_queue = asyncio.Queue()
@@ -405,8 +405,9 @@ class XMLStream(asyncio.BaseProtocol):
self.disconnected.set_result(True)
self.disconnected = asyncio.Future()
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = False,
force_starttls: Optional[bool] = True, disable_starttls: Optional[bool] = False) -> None:
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
"""Create a new socket and connect to the server.
:param host: The name of the desired server for the connection.
@@ -523,7 +524,7 @@ class XMLStream(asyncio.BaseProtocol):
else:
self.loop.run_until_complete(self.disconnected)
else:
tasks: List[Awaitable] = [asyncio.sleep(timeout)]
tasks: List[Union[asyncio.Task, asyncio.Future]] = [asyncio.Task(asyncio.sleep(timeout))]
if not forever:
tasks.append(self.disconnected)
self.loop.run_until_complete(asyncio.wait(tasks))
@@ -849,6 +850,8 @@ class XMLStream(asyncio.BaseProtocol):
log.debug("Connection error:", exc_info=True)
self.disconnect()
return False
if transp is None:
raise Exception("Transport should not be none")
der_cert = transp.get_extra_info("ssl_object").getpeercert(True)
pem_cert = ssl.DER_cert_to_PEM_cert(der_cert)
self.event('ssl_cert', pem_cert)

278
src/lib.rs Normal file
View File

@@ -0,0 +1,278 @@
use pyo3::exceptions::{PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
pyo3::create_exception!(py_jid, InvalidJID, PyValueError, "Raised when attempting to create a JID that does not pass validation.\n\nIt can also be raised if modifying an existing JID in such a way as\nto make it invalid, such trying to remove the domain from an existing\nfull JID while the local and resource portions still exist.");
fn to_exc(err: jid::Error) -> PyErr {
InvalidJID::new_err(err.to_string())
}
/// A representation of a Jabber ID, or JID.
///
/// Each JID may have three components: a user, a domain, and an optional resource. For example:
/// user@domain/resource
///
/// When a resource is not used, the JID is called a bare JID. The JID is a full JID otherwise.
///
/// Raises InvalidJID if the parser rejects it.
#[pyclass(name = "JID", module = "slixmpp.jid")]
struct PyJid {
jid: Option<jid::Jid>,
}
#[pymethods]
impl PyJid {
#[new]
#[pyo3(signature = (jid=None, bare=false))]
fn new(jid: Option<&Bound<'_, PyAny>>, bare: bool) -> PyResult<Self> {
if let Some(jid) = jid {
if let Ok(py_jid) = jid.extract::<PyRef<PyJid>>() {
if bare {
if let Some(py_jid) = &(*py_jid).jid {
Ok(PyJid {
jid: Some(jid::Jid::Bare(py_jid.to_bare())),
})
} else {
Ok(PyJid { jid: None })
}
} else {
Ok(PyJid {
jid: (*py_jid).jid.clone(),
})
}
} else {
let jid: &str = jid.extract()?;
if jid.is_empty() {
Ok(PyJid { jid: None })
} else {
let mut jid = jid::Jid::new(jid).map_err(to_exc)?;
if bare {
jid = jid::Jid::Bare(jid.into_bare())
}
Ok(PyJid { jid: Some(jid) })
}
}
} else {
Ok(PyJid { jid: None })
}
}
/*
// TODO: implement or remove from the API
fn unescape() {
}
*/
#[getter]
fn get_bare(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_bare().to_string(),
}
}
#[setter]
fn set_bare(&mut self, bare: &str) -> PyResult<()> {
let bare = jid::BareJid::new(bare).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(_)) | None => jid::Jid::Bare(bare),
Some(jid::Jid::Full(jid)) => jid::Jid::Full(bare.with_resource(&jid.resource())),
});
Ok(())
}
#[getter]
fn get_full(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
#[setter]
fn set_full(&mut self, full: &str) -> PyResult<()> {
// JID.full = 'domain' is acceptable in slixmpp.
self.jid = Some(jid::Jid::new(full).map_err(to_exc)?);
Ok(())
}
#[getter]
fn get_node(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid
.node_str()
.map(ToString::to_string)
.unwrap_or_else(String::new),
}
}
#[setter]
fn set_node(&mut self, node: &str) -> PyResult<()> {
let node = jid::NodePart::new(node).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => {
jid::Jid::Bare(jid::BareJid::from_parts(Some(&node), &jid.domain()))
}
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
Some(&node),
&jid.domain(),
&jid.resource(),
)),
None => Err(InvalidJID::new_err("JID.node must apply to a proper JID"))?,
});
Ok(())
}
#[getter]
fn get_domain(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.domain_str().to_string(),
}
}
#[setter]
fn set_domain(&mut self, domain: &str) -> PyResult<()> {
let domain = jid::DomainPart::new(domain).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => {
jid::Jid::Bare(jid::BareJid::from_parts(jid.node().as_ref(), &domain))
}
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
jid.node().as_ref(),
&domain,
&jid.resource(),
)),
None => jid::Jid::Bare(jid::BareJid::from_parts(None, &domain)),
});
Ok(())
}
#[getter]
fn get_resource(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid
.resource_str()
.map(ToString::to_string)
.unwrap_or_else(String::new),
}
}
#[setter]
fn set_resource(&mut self, resource: &str) -> PyResult<()> {
let resource = jid::ResourcePart::new(resource).map_err(to_exc)?;
self.jid = Some(match &self.jid {
Some(jid::Jid::Bare(jid)) => jid::Jid::Full(jid.with_resource(&resource)),
Some(jid::Jid::Full(jid)) => jid::Jid::Full(jid::FullJid::from_parts(
jid.node().as_ref(),
&jid.domain(),
&resource,
)),
None => Err(InvalidJID::new_err(
"JID.resource must apply to a proper JID",
))?,
});
Ok(())
}
/// Use the full JID as the string value.
fn __str__(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
/// Use the full JID as the representation.
fn __repr__(&self) -> String {
match &self.jid {
None => String::new(),
Some(jid) => jid.to_string(),
}
}
/// Two JIDs are equal if they have the same full JID value.
fn __richcmp__(&self, other: &Bound<'_, PyAny>, op: pyo3::basic::CompareOp) -> PyResult<bool> {
let other = if let Ok(other) = other.extract::<PyRef<PyJid>>() {
other
} else if other.is_none() {
Bound::new(other.py(), PyJid::new(None, false)?)?.borrow()
} else {
Bound::new(other.py(), PyJid::new(Some(other), false)?)?.borrow()
};
match (&self.jid, &other.jid) {
(None, None) => Ok(true),
(Some(jid), Some(other)) => match op {
pyo3::basic::CompareOp::Eq => Ok(jid == other),
pyo3::basic::CompareOp::Ne => Ok(jid != other),
_ => Err(PyNotImplementedError::new_err(
"Only == and != are implemented",
)),
},
_ => Ok(false),
}
}
/// Hash a JID based on the string version of its full JID.
fn __hash__(&self) -> isize {
if let Some(jid) = &self.jid {
// Use the same algorithm as the Python JID.
let string = jid.to_string();
unsafe { pyo3::ffi::_Py_HashBytes(string.as_ptr() as *const _, string.len() as isize) }
} else {
0
}
}
// Aliases
#[getter]
fn get_user(&self) -> String {
self.get_node()
}
#[setter]
fn set_user(&mut self, user: &str) -> PyResult<()> {
self.set_node(user)
}
#[getter]
fn get_server(&self) -> String {
self.get_domain()
}
#[setter]
fn set_server(&mut self, server: &str) -> PyResult<()> {
self.set_domain(server)
}
#[getter]
fn get_host(&self) -> String {
self.get_domain()
}
#[setter]
fn set_host(&mut self, host: &str) -> PyResult<()> {
self.set_domain(host)
}
#[getter]
fn get_jid(&self) -> String {
self.get_full()
}
#[setter]
fn set_jid(&mut self, jid: &str) -> PyResult<()> {
self.set_full(jid)
}
}
#[pymodule]
#[pyo3(name = "libslixmpp")]
fn py_jid(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyJid>()?;
m.add("InvalidJID", py.get_type_bound::<InvalidJID>())?;
Ok(())
}

View File

@@ -3,7 +3,6 @@ from __future__ import unicode_literals
import unittest
from slixmpp.test import SlixTest
from slixmpp import JID, InvalidJID
from slixmpp.jid import nodeprep
class TestJIDClass(SlixTest):
@@ -192,10 +191,12 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, 'test.com/%s' % resource)
self.assertRaises(InvalidJID, JID, 'user@test.com/%s' % resource)
@unittest.skip('Rust')
def testTooLongDomainLabel(self):
domain = ('a' * 64) + '.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainEmptyLabel(self):
domain = 'aaa..bbb.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@@ -216,6 +217,7 @@ class TestJIDClass(SlixTest):
jid3 = JID('%s/resource' % domain)
jid4 = JID('user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6NoBrackets(self):
domain = '::1'
@@ -224,6 +226,7 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6MissingBracket(self):
domain = '[::1'
@@ -232,6 +235,7 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainInvalidIPv6WrongBracket(self):
domain = '[::]1]'
@@ -240,6 +244,7 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainWithPort(self):
domain = 'example.com:5555'
@@ -248,12 +253,14 @@ class TestJIDClass(SlixTest):
self.assertRaises(InvalidJID, JID, '%s/resource' % domain)
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testDomainWithTrailingDot(self):
domain = 'example.com.'
jid = JID('user@%s/resource' % domain)
self.assertEqual(jid.domain, 'example.com')
@unittest.skip('Rust')
def testDomainWithDashes(self):
domain = 'example.com-'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@@ -261,21 +268,13 @@ class TestJIDClass(SlixTest):
domain = '-example.com'
self.assertRaises(InvalidJID, JID, 'user@%s/resource' % domain)
@unittest.skip('Rust')
def testACEDomain(self):
domain = 'xn--bcher-kva.ch'
jid = JID('user@%s/resource' % domain)
self.assertEqual(jid.domain.encode('utf-8'), b'b\xc3\xbccher.ch')
def testJIDUnescape(self):
jid = JID('here\\27s_a_wild_\\26_\\2fcr%zy\\2f_\\40ddress\\20for\\3a\\3cwv\\3e(\\22IMPS\\22)\\5c@example.com')
ujid = jid.unescape()
self.assertEqual(ujid.local, 'here\'s_a_wild_&_/cr%zy/_@ddress for:<wv>("imps")\\')
jid = JID('blah\\5cfoo\\5c20bar@example.com')
ujid = jid.unescape()
self.assertEqual(ujid.local, 'blah\\foo\\20bar')
def testStartOrEndWithEscapedSpaces(self):
local = ' foo'
self.assertRaises(InvalidJID, JID, '%s@example.com' % local)
@@ -288,9 +287,5 @@ class TestJIDClass(SlixTest):
#self.assertRaises(InvalidJID, JID, '%s@example.com' % '\\20foo2')
#self.assertRaises(InvalidJID, JID, '%s@example.com' % 'bar2\\20')
def testNodePrepIdemptotent(self):
node = 'ᴹᴵᴷᴬᴱᴸ'
self.assertEqual(nodeprep(node), nodeprep(nodeprep(node)))
suite = unittest.TestLoader().loadTestsFromTestCase(TestJIDClass)

View File

@@ -0,0 +1,67 @@
import unittest
from slixmpp import Presence
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0317 as xep_0317
from slixmpp.plugins.xep_0317 import stanza
class TestStanzaHats(SlixTest):
def setUp(self):
stanza.register_plugin()
def test_create_hats(self):
raw_xml = """
<hats xmlns="urn:xmpp:hats:0">
<hat uri="http://example.com/hats#Teacher" title="Teacher"/>
</hats>
"""
hats = xep_0317.Hats()
hat = xep_0317.Hat()
hat['uri'] = 'http://example.com/hats#Teacher'
hat['title'] = 'Teacher'
hats.append(hat)
self.check(hats, raw_xml, use_values=False)
def test_set_single_hat(self):
presence = Presence()
presence["hats"]["hat"]["uri"] = "test-uri"
presence["hats"]["hat"]["title"] = "test-title"
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='test-uri' title='test-title'/>
</hats>
</presence>
""",
)
def test_set_multi_hat(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='uri1' title='title1'/>
<hat uri='uri2' title='title2'/>
</hats>
</presence>
""",
)
def test_get_hats(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
for i, hat in enumerate(presence["hats"]["hats"], start=1):
self.assertEqual(hat["uri"], f"uri{i}")
self.assertEqual(hat["title"], f"title{i}")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaHats)

View File

@@ -1,9 +1,7 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0356 import stanza
from slixmpp.plugins.xep_0356 import stanza, permissions
class TestPermissions(SlixTest):
@@ -12,30 +10,57 @@ class TestPermissions(SlixTest):
def testAdvertisePermission(self):
xmlstring = """
<message from='capulet.net' to='pubub.capulet.lit'>
<message from='capulet.lit' to='pubsub.capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
<perm access='presence' type='managed_entity'/>
<perm access='iq' type='both'/>
</privilege>
</message>
"""
msg = self.Message()
msg["from"] = "capulet.net"
msg["to"] = "pubub.capulet.lit"
# This raises AttributeError: 'NoneType' object has no attribute 'use_origin_id'
# msg["id"] = "id"
msg["from"] = "capulet.lit"
msg["to"] = "pubsub.capulet.lit"
for access, type_ in [
("roster", "both"),
("message", "outgoing"),
("presence", "managed_entity"),
("roster", permissions.RosterAccess.BOTH),
("message", permissions.MessagePermission.OUTGOING),
("presence", permissions.PresencePermission.MANAGED_ENTITY),
("iq", permissions.IqPermission.BOTH),
]:
msg["privilege"].add_perm(access, type_)
self.check(msg, xmlstring)
# Should this one work? → # AttributeError: 'Message' object has no attribute 'permission'
# self.assertEqual(msg.permission["roster"], "both")
def testIqPermission(self):
x = stanza.Privilege()
x["access"] = "iq"
ns = stanza.NameSpace()
ns["ns"] = "some_ns"
ns["type"] = "get"
x["perm"]["access"] = "iq"
x["perm"].append(ns)
ns = stanza.NameSpace()
ns["ns"] = "some_other_ns"
ns["type"] = "both"
x["perm"].append(ns)
self.check(
x,
"""
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege>
"""
)
nss = set()
for perm in x["perms"]:
for ns in perm["namespaces"]:
nss.add((ns["ns"], ns["type"]))
assert nss == {("some_ns", "get"), ("some_other_ns", "both")}
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)

View File

@@ -0,0 +1,149 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza
from slixmpp.plugins import xep_0461
from slixmpp.plugins import xep_0444
class TestFallback(SlixTest):
def setUp(self):
stanza.register_plugins()
def testSingleFallbackBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackSubject(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["subject"]["start"] = 0
message["fallback"]["subject"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<subject start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackWholeBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
def testMultiFallback(self):
message = Message()
f1 = stanza.Fallback()
f1["for"] = "ns1"
f2 = stanza.Fallback()
f2["for"] = "ns2"
message.append(f1)
message.append(f2)
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
</message>
""",
)
for i, fallback in enumerate(message["fallbacks"], start=1):
self.assertEqual(fallback["for"], f"ns{i}")
def testStripFallbackPartOfBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
</fallback>
</message>
""",
)
self.assertEqual(
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
)
def testStripWholeBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
def testStripMultiFallback(self):
message = Message()
message["body"] = "> huuuuu\n👍"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
reaction_fallback = stanza.Fallback()
reaction_fallback["for"] = xep_0444.stanza.NS
reaction_fallback.enable("body")
message.append(reaction_fallback)
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)

View File

@@ -1,11 +1,13 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest):
def setUp(self):
fallback_stanza.register_plugins()
stanza.register_plugins()
def testReply(self):
@@ -26,9 +28,9 @@ class TestReply(SlixTest):
def testFallback(self):
message = Message()
message["body"] = "12345\nrealbody"
message["feature_fallback"]["for"] = "NS"
message["feature_fallback"]["fallback_body"]["start"] = 0
message["feature_fallback"]["fallback_body"]["end"] = 6
message["fallback"]["for"] = "NS"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 6
self.check(
message,
@@ -42,18 +44,18 @@ class TestReply(SlixTest):
""",
)
assert message["feature_fallback"].get_stripped_body() == "realbody"
assert message["fallback"].get_stripped_body("NS") == "realbody"
def testAddFallBackHelper(self):
msg = Message()
msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
# ugly dedent but the test does not pass without it
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
self.check(
msg,
msg, # language=XML
"""
<message xmlns="jabber:client" type="normal">
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
<reply xmlns="urn:xmpp:reply:0" />
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
<body start='0' end='33' />
</fallback>
@@ -67,8 +69,8 @@ class TestReply(SlixTest):
msg = Message()
msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback(body)
body2 = msg["feature_fallback"].get_fallback_body()
msg["reply"].add_quoted_fallback(body)
body2 = msg["reply"].get_fallback_body()
self.assertTrue(body2 == quoted, body2)

View File

@@ -0,0 +1,36 @@
import unittest
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0469 import stanza
from slixmpp.plugins.xep_0402 import stanza as b_stanza
class TestBookmarksPinning(SlixTest):
def setUp(self):
b_stanza.register_plugin()
stanza.register_plugin()
def test_pinned(self):
bookmark = b_stanza.Conference()
bookmark["password"] = "pass"
bookmark["nick"] = "nick"
bookmark["autojoin"] = False
bookmark["extensions"].enable("pinned")
self.check(
bookmark,
"""
<conference xmlns='urn:xmpp:bookmarks:1'
autojoin='false'>
<nick>nick</nick>
<password>pass</password>
<extensions>
<pinned xmlns="urn:xmpp:bookmarks-pinning:0" />
</extensions>
</conference>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestBookmarksPinning)

View File

@@ -0,0 +1,76 @@
import logging
import unittest
from slixmpp.test import SlixTest
class TestCaps(SlixTest):
def setUp(self):
self.stream_start(plugins=["xep_0115"])
def testConcurrentSameHash(self):
"""
Check that we only resolve a given ver string to a disco info once,
even if we receive several presences with that same ver string
consecutively.
"""
self.recv( # language=XML
"""
<presence from='romeo@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.recv( # language=XML
"""
<presence from='i-dont-know-much-shakespeare@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.send( # language=XML
"""
<iq xmlns="jabber:client"
id="1"
to="romeo@montague.lit/orchard"
type="get">
<query xmlns="http://jabber.org/protocol/disco#info"
node="a-node#h0TdMvqNR8FHUfFG1HauOLYZDqE="/>
</iq>
"""
)
self.send(None)
self.recv( # language=XML
"""
<iq from='romeo@montague.lit/orchard'
id='1'
type='result'>
<query xmlns='http://jabber.org/protocol/disco#info'
node='a-nodes#h0TdMvqNR8FHUfFG1HauOLYZDqE='>
<identity category='client' name='a client' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
</query>
</iq>
"""
)
self.send(None)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"romeo@montague.lit/orchard", "http://jabber.org/protocol/caps"
)
)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"i-dont-know-much-shakespeare@montague.lit/orchard",
"http://jabber.org/protocol/caps",
)
)
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestCaps)

View File

@@ -1,7 +1,7 @@
import unittest
from slixmpp import ComponentXMPP, Iq, Message
from slixmpp.roster import RosterItem
from slixmpp import Message, JID, Iq
from slixmpp.plugins.xep_0356 import permissions
from slixmpp.test import SlixTest
@@ -9,9 +9,9 @@ class TestPermissions(SlixTest):
def setUp(self):
self.stream_start(
mode="component",
plugins=["xep_0356"],
plugins=["xep_0356", "xep_0045"],
jid="pubsub.capulet.lit",
server="capulet.net",
server="capulet.lit",
)
def testPluginEnd(self):
@@ -23,26 +23,44 @@ class TestPermissions(SlixTest):
self.assertFalse(exc)
def testGrantedPrivileges(self):
# https://xmpp.org/extensions/xep-0356.html#example-4
results = {"event": False}
x = self.xmpp["xep_0356"]
self.xmpp.add_event_handler(
"privileges_advertised", lambda msg: results.__setitem__("event", True)
)
self.recv(
"""
<message from='capulet.net' to='pubub.capulet.lit' id='54321'>
<message from='capulet.lit' to='pubsub.capulet.lit' id='54321'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege>
</message>
"""
)
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["roster"], "both")
server = JID("capulet.lit")
self.assertEqual(
self.xmpp["xep_0356"].granted_privileges["message"], "outgoing"
x.granted_privileges[server].roster, permissions.RosterAccess.BOTH
)
self.assertEqual(
x.granted_privileges[server].message, permissions.MessagePermission.OUTGOING
)
self.assertEqual(
x.granted_privileges[server].presence, permissions.PresencePermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["nope"], permissions.IqPermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["some_ns"], permissions.IqPermission.GET
)
self.assertEqual(
x.granted_privileges[server].iq["some_other_ns"], permissions.IqPermission.BOTH
)
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["presence"], "none")
self.assertTrue(results["event"])
def testGetRosterIq(self):
@@ -94,7 +112,7 @@ class TestPermissions(SlixTest):
def testMakeOutgoingMessage(self):
xmlstring = """
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.net'>
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'>
<forwarded xmlns='urn:xmpp:forward:0'>
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
@@ -108,9 +126,49 @@ class TestPermissions(SlixTest):
msg["from"] = "juliet@capulet.lit"
msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
self.check(priv_msg, xmlstring, use_values=False)
def testDetectServer(self):
msg = Message()
msg["from"] = "juliet@something"
msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
assert priv_msg.get_to() == "something"
assert priv_msg.get_from() == "pubsub.capulet.lit"
def testIqOnBehalf(self):
iq = Iq()
iq["mucadmin_query"]["item"]["affiliation"] = "member"
iq.set_from("juliet@xxx")
iq.set_to("somemuc@conf")
iq.set_type("get")
self.xmpp["xep_0356"].granted_privileges["conf"].iq["http://jabber.org/protocol/muc#admin"] = permissions.IqPermission.BOTH
r = self.xmpp.loop.create_task(self.xmpp["xep_0356"].send_privileged_iq(iq, iq_id="0"))
self.send(
"""
<iq from="pubsub.capulet.lit"
to="juliet@xxx"
xmlns="jabber:component:accept"
type="get" id="0">
<privileged_iq xmlns='urn:xmpp:privilege:2'>
<iq xmlns='jabber:client'
type='get'
to='somemuc@conf'
from='juliet@xxx'
id="0">
<query xmlns='http://jabber.org/protocol/muc#admin'>
<item affiliation='member'/>
</query>
</iq>
</privileged_iq>
</iq>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)

View File

@@ -9,8 +9,8 @@ class TestReply(SlixTest):
def testFallBackBody(self):
async def on_reply(msg):
start = msg["feature_fallback"]["fallback_body"]["start"]
end = msg["feature_fallback"]["fallback_body"]["end"]
start = msg["fallback"]["body"]["start"]
end = msg["fallback"]["body"]["end"]
self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(),
reply_id=msg.get_id(),

5
tox.ini Normal file
View File

@@ -0,0 +1,5 @@
[tox]
envlist = py34
[testenv]
deps = nose
commands = nosetests --where=tests --exclude=live -i slixtest.py