Compare commits

..

4 Commits

Author SHA1 Message Date
nicoco
985926ed7b XEP-0461: rely on XEP-0428 for fallback
Breaks the previous fallback helpers, we now
rely on XEP-0461 instead
2023-12-19 14:15:24 +00:00
nicoco
8d63bd68cf XEP-0428: add fallback body and subject elements
+ tests
+ helpers to strip the fallback content
2023-12-19 14:15:24 +00:00
nicoco
465e735d18 ElementBase: add weak ref to parent when using append() 2023-12-19 14:15:24 +00:00
nicoco
fea4ee83be fix slixmpp.xmlstream.__all__ 2023-12-19 14:15:24 +00:00
63 changed files with 187 additions and 1499 deletions

13
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,13 @@
################ Please use Gitlab instead of Github ###################################
Hello, thank you for contributing to slixmpp!
Youre about to open a pull request on github. However this github repository is not the official place for contributions on slixmpp.
Please open your merge request on https://lab.louiz.org/poezio/slixmpp/
You should be able to log in there with your github credentials, clone the slixmpp repository in your namespace, push your existing pull request into a new branch, and then open a merge request with one click, within 3 minutes.
This will help us review your contribution, avoid spreading things everywhere and it will even run the tests automatically with your changes.
Thank you.

View File

@@ -1,6 +0,0 @@
steps:
mypy:
image: python:3
commands:
- pip3 install mypy types-setuptools
- mypy slixmpp

View File

@@ -1,10 +0,0 @@
steps:
test_integration:
image: "python:3.11"
secrets: [ci_account1, ci_account1_password, ci_account2, ci_account2_password, ci_muc_server]
commands:
- apt-get update
- apt-get install -y python3-pip cython3 gpg idn libidn-dev
- pip3 install emoji aiohttp aiodns
- python3 setup.py build_ext --inplace
- ./run_integration_tests.py

View File

@@ -1,17 +0,0 @@
steps:
unit_tests:
image: "python:${TAG}"
commands:
- apt-get update
- apt-get install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
matrix:
TAG:
- "3.7"
- "3.9"
- "3.8"
- "3.10"
- "3.11"
- "3.12"

View File

@@ -856,7 +856,7 @@
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0424.html"/>
<xmpp:status>complete</xmpp:status>
<xmpp:version>0.4.0</xmpp:version>
<xmpp:version>0.3.0</xmpp:version>
<xmpp:since>1.6.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
@@ -864,7 +864,7 @@
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0425.html"/>
<xmpp:status>complete</xmpp:status>
<xmpp:version>0.3.0</xmpp:version>
<xmpp:version>0.2.1</xmpp:version>
<xmpp:since>1.6.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
@@ -909,14 +909,6 @@
<xmpp:note>no thumbnail support</xmpp:note>
</xmpp:SupportedXep>
</implements>
<implements>
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0490.html"/>
<xmpp:status>complete</xmpp:status>
<xmpp:version>0.1.0</xmpp:version>
<xmpp:since>1.8.6</xmpp:since>
</xmpp:SupportedXep>
</implements>
<release>
<Version>
@@ -1072,19 +1064,5 @@
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.4.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.5</revision>
<created>2024-02-02</created>
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.5.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.6</revision>
<created>2024-12-26</created>
<file-release rdf:resource="https://codeberg.org/poezio/slixmpp/archive/slix-1.8.6.tar.gz"/>
</Version>
</release>
</Project>
</rdf:RDF>

View File

@@ -167,9 +167,8 @@ processing the same stanza twice.
- **Data:** :py:class:`~.Message`
- **Source:** :py:class:`BaseXMPP <.BaseXMPP>`
Makes the contents of message stanzas that include <body> tags available
whenever one is received.
Be sure to check the message type to handle error messages appropriately.
Makes the contents of message stanzas available whenever one is received. Be
sure to check the message type in order to handle error messages.
message_error
- **Data:** :py:class:`~.Message`

View File

@@ -1,104 +0,0 @@
Projects Using Slixmpp
======================
This page enumerates software in the form of applications, bots and gateways utilizing the XMPP protocols with slixmpp.
Applications
------------
sendxmpp-py
~~~~~~~~~~~
sendxmpp is a command line program and is the XMPP equivalent of sendmail. It is a Python version of the original sendxmpp which is written in Perl.
- `Source <https://code.moparisthebest.com/moparisthebest/sendxmpp-py>`_
- `Groupchat <xmpp:xmpp-ircd@chatrooms.hackerposse.com?join>`_
Bots
----
BotLogMauve
~~~~~~~~~~~
XMPP bot which logs groupchat messages. Logs are in text format, with one file per day and per groupchat.
- `Source <https://git.khaganat.net/khaganat/BotLogMauve>`_
BukuBot
~~~~~~~
BukuBot makes it possible to manage and search your bookmarks from your chat.
- `Source <https://codeberg.org/sch/BukuBot>`_
LinkBot
~~~~~~~
This bot reveals the title of any shared link in a groupchat for quick content insight.
- `Source <https://git.xmpp-it.net/mario/XMPPBot>`_
llama-bot
~~~~~~~~~
Llama-bot enables engaging communication with the LLM (large language model) of llama.cpp, providing seamless and dynamic conversation with it.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://github.com/decent-im/llama-bot>`_
- `Demo <xmpp:llama@decent.im?message>`_
Morbot
~~~~~~
Morbot is a simple Slixmpp bot that will take new articles from listed RSS feeds and send them to assigned XMPP MUCs.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/Morbot>`_
Slixfeed
~~~~~~~~
Slixfeed aims to be an easy to use and fully-featured news aggregator bot for XMPP. It provides a convenient access to Blogs, Fediverse and News websites along with filtering functionality.
- `Groupchat <xmpp:slixfeed@chat.woodpeckersnest.space?join>`_
- `Source <https://gitgud.io/sjehuda/slixfeed>`_
sms4you
~~~~~~~
sms4you forwards messages from and to SMS and connects either with sms4you-xmpp or sms4you-email to choose the other mean of communication. Nice for receiving or sending SMS, independently from carrying a SIM card.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Homepage <https://sms4you-team.pages.debian.net/sms4you/>`_
- `Source <https://salsa.debian.org/sms4you-team/sms4you>`_
Stable Diffusion
~~~~~~~~~~~~~~~~
XMPP bot that generates digital images from textual descriptions.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Source <https://www.nicoco.fr/blog/2022/08/31/xmpp-bot-stable-diffusion/>`_
WhisperBot
~~~~~~~~~~
XMPP bot that transliterates audio messages using OpenAI's Whisper libraries.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/WhisperBot>`_
XMPP MUC Message Gateway
~~~~~~~~~~~~~~~~~~~~~~~~
A multipurpose JSON forwarder microservice from HTTP POST to XMPP MUC room over TLSv1.2 with SliXMPP.
- `Source <https://github.com/immanuelfodor/xmpp-muc-message-gateway>`_
Services
--------
AtomToPubsub
~~~~~~~~~~~~
AtomToPubsub is a simple Python script that parses Atom + RSS feeds and pushes the entries to a designated XMPP Pubsub Node.
- `Groupchat <xmpp:movim@conference.movim.eu?join>`_
- `Source <https://github.com/imattau/atomtopubsub>`_
Slidge
~~~~~~
Slidge is a general purpose XMPP gateway framework in Python.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Homepage <https://slidge.im/core/>`_
- `Source <https://sr.ht/~nicoco/slidge>`_

View File

@@ -1,184 +0,0 @@
"""
Recognize image file formats based on their first few bytes.
Taken from cpython 3.11 source code before the removal in 3.13.
Licensed under Zero-Clause BSD
"""
from os import PathLike
import warnings
__all__ = ["what"]
warnings._deprecated(__name__, remove=(3, 13))
#-------------------------#
# Recognize image headers #
#-------------------------#
def what(file, h=None):
f = None
try:
if h is None:
if isinstance(file, (str, PathLike)):
f = open(file, 'rb')
h = f.read(32)
else:
location = file.tell()
h = file.read(32)
file.seek(location)
for tf in tests:
res = tf(h, f)
if res:
return res
finally:
if f: f.close()
return None
#---------------------------------#
# Subroutines per image file type #
#---------------------------------#
tests = []
def test_jpeg(h, f):
"""JPEG data with JFIF or Exif markers; and raw JPEG"""
if h[6:10] in (b'JFIF', b'Exif'):
return 'jpeg'
elif h[:4] == b'\xff\xd8\xff\xdb':
return 'jpeg'
tests.append(test_jpeg)
def test_png(h, f):
if h.startswith(b'\211PNG\r\n\032\n'):
return 'png'
tests.append(test_png)
def test_gif(h, f):
"""GIF ('87 and '89 variants)"""
if h[:6] in (b'GIF87a', b'GIF89a'):
return 'gif'
tests.append(test_gif)
def test_tiff(h, f):
"""TIFF (can be in Motorola or Intel byte order)"""
if h[:2] in (b'MM', b'II'):
return 'tiff'
tests.append(test_tiff)
def test_rgb(h, f):
"""SGI image library"""
if h.startswith(b'\001\332'):
return 'rgb'
tests.append(test_rgb)
def test_pbm(h, f):
"""PBM (portable bitmap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'14' and h[2] in b' \t\n\r':
return 'pbm'
tests.append(test_pbm)
def test_pgm(h, f):
"""PGM (portable graymap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'25' and h[2] in b' \t\n\r':
return 'pgm'
tests.append(test_pgm)
def test_ppm(h, f):
"""PPM (portable pixmap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'36' and h[2] in b' \t\n\r':
return 'ppm'
tests.append(test_ppm)
def test_rast(h, f):
"""Sun raster file"""
if h.startswith(b'\x59\xA6\x6A\x95'):
return 'rast'
tests.append(test_rast)
def test_xbm(h, f):
"""X bitmap (X10 or X11)"""
if h.startswith(b'#define '):
return 'xbm'
tests.append(test_xbm)
def test_bmp(h, f):
if h.startswith(b'BM'):
return 'bmp'
tests.append(test_bmp)
def test_webp(h, f):
if h.startswith(b'RIFF') and h[8:12] == b'WEBP':
return 'webp'
tests.append(test_webp)
def test_exr(h, f):
if h.startswith(b'\x76\x2f\x31\x01'):
return 'exr'
tests.append(test_exr)
#--------------------#
# Small test program #
#--------------------#
def test():
import sys
recursive = 0
if sys.argv[1:] and sys.argv[1] == '-r':
del sys.argv[1:2]
recursive = 1
try:
if sys.argv[1:]:
testall(sys.argv[1:], recursive, 1)
else:
testall(['.'], recursive, 1)
except KeyboardInterrupt:
sys.stderr.write('\n[Interrupted]\n')
sys.exit(1)
def testall(list, recursive, toplevel):
import sys
import os
for filename in list:
if os.path.isdir(filename):
print(filename + '/:', end=' ')
if recursive or toplevel:
print('recursing down:')
import glob
names = glob.glob(os.path.join(glob.escape(filename), '*'))
testall(names, recursive, 0)
else:
print('*** directory (use -r) ***')
else:
print(filename + ':', end=' ')
sys.stdout.flush()
try:
print(what(filename))
except OSError:
print('*** not found ***')
if __name__ == '__main__':
test()

View File

@@ -1,184 +0,0 @@
"""
Recognize image file formats based on their first few bytes.
Taken from cpython 3.11 source code before the removal in 3.13.
Licensed under Zero-Clause BSD
"""
from os import PathLike
import warnings
__all__ = ["what"]
warnings._deprecated(__name__, remove=(3, 13))
#-------------------------#
# Recognize image headers #
#-------------------------#
def what(file, h=None):
f = None
try:
if h is None:
if isinstance(file, (str, PathLike)):
f = open(file, 'rb')
h = f.read(32)
else:
location = file.tell()
h = file.read(32)
file.seek(location)
for tf in tests:
res = tf(h, f)
if res:
return res
finally:
if f: f.close()
return None
#---------------------------------#
# Subroutines per image file type #
#---------------------------------#
tests = []
def test_jpeg(h, f):
"""JPEG data with JFIF or Exif markers; and raw JPEG"""
if h[6:10] in (b'JFIF', b'Exif'):
return 'jpeg'
elif h[:4] == b'\xff\xd8\xff\xdb':
return 'jpeg'
tests.append(test_jpeg)
def test_png(h, f):
if h.startswith(b'\211PNG\r\n\032\n'):
return 'png'
tests.append(test_png)
def test_gif(h, f):
"""GIF ('87 and '89 variants)"""
if h[:6] in (b'GIF87a', b'GIF89a'):
return 'gif'
tests.append(test_gif)
def test_tiff(h, f):
"""TIFF (can be in Motorola or Intel byte order)"""
if h[:2] in (b'MM', b'II'):
return 'tiff'
tests.append(test_tiff)
def test_rgb(h, f):
"""SGI image library"""
if h.startswith(b'\001\332'):
return 'rgb'
tests.append(test_rgb)
def test_pbm(h, f):
"""PBM (portable bitmap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'14' and h[2] in b' \t\n\r':
return 'pbm'
tests.append(test_pbm)
def test_pgm(h, f):
"""PGM (portable graymap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'25' and h[2] in b' \t\n\r':
return 'pgm'
tests.append(test_pgm)
def test_ppm(h, f):
"""PPM (portable pixmap)"""
if len(h) >= 3 and \
h[0] == ord(b'P') and h[1] in b'36' and h[2] in b' \t\n\r':
return 'ppm'
tests.append(test_ppm)
def test_rast(h, f):
"""Sun raster file"""
if h.startswith(b'\x59\xA6\x6A\x95'):
return 'rast'
tests.append(test_rast)
def test_xbm(h, f):
"""X bitmap (X10 or X11)"""
if h.startswith(b'#define '):
return 'xbm'
tests.append(test_xbm)
def test_bmp(h, f):
if h.startswith(b'BM'):
return 'bmp'
tests.append(test_bmp)
def test_webp(h, f):
if h.startswith(b'RIFF') and h[8:12] == b'WEBP':
return 'webp'
tests.append(test_webp)
def test_exr(h, f):
if h.startswith(b'\x76\x2f\x31\x01'):
return 'exr'
tests.append(test_exr)
#--------------------#
# Small test program #
#--------------------#
def test():
import sys
recursive = 0
if sys.argv[1:] and sys.argv[1] == '-r':
del sys.argv[1:2]
recursive = 1
try:
if sys.argv[1:]:
testall(sys.argv[1:], recursive, 1)
else:
testall(['.'], recursive, 1)
except KeyboardInterrupt:
sys.stderr.write('\n[Interrupted]\n')
sys.exit(1)
def testall(list, recursive, toplevel):
import sys
import os
for filename in list:
if os.path.isdir(filename):
print(filename + '/:', end=' ')
if recursive or toplevel:
print('recursing down:')
import glob
names = glob.glob(os.path.join(glob.escape(filename), '*'))
testall(names, recursive, 0)
else:
print('*** directory (use -r) ***')
else:
print(filename + ':', end=' ')
sys.stdout.flush()
try:
print(what(filename))
except OSError:
print('*** not found ***')
if __name__ == '__main__':
test()

View File

@@ -10,7 +10,7 @@ UNIQUE = uuid4().hex
class TestMUC(SlixIntegration):
async def asyncSetUp(self):
self.mucserver = self.envjid('CI_MUC_SERVER', default='chat.jabberfr.org')
self.mucserver = self.envjid('CI_MUC_SERVER')
self.muc = JID('%s@%s' % (UNIQUE, self.mucserver))
self.add_client(
self.envjid('CI_ACCOUNT1'),

View File

@@ -5,7 +5,7 @@ import logging
import unittest
from argparse import ArgumentParser
from setuptools import Command
from distutils.core import Command
from importlib import import_module
from pathlib import Path

View File

@@ -5,7 +5,7 @@ import logging
import unittest
from argparse import ArgumentParser
from setuptools import Command
from distutils.core import Command
from importlib import import_module
from pathlib import Path

View File

@@ -33,17 +33,12 @@ CLASSIFIERS = [
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'Topic :: Internet :: XMPP',
'Topic :: Software Development :: Libraries :: Python Modules',
]
packages = [str(mod.parent) for mod in Path('slixmpp').rglob('__init__.py')]
def check_include(library_name, header):
command = [os.environ.get('PKG_CONFIG', 'pkg-config'), '--cflags', library_name]
try:
@@ -64,7 +59,6 @@ def check_include(library_name, header):
print('%s headers not found.' % library_name)
return False
HAS_PYTHON_HEADERS = check_include('python3', 'Python.h')
HAS_STRINGPREP_HEADERS = check_include('libidn', 'stringprep.h')
@@ -93,7 +87,7 @@ setup(
packages=packages,
ext_modules=ext_modules,
install_requires=[
'aiodns >= 1.0; sys_platform=="linux" or sys_platform=="darwin"',
'aiodns>=1.0',
'pyasn1',
'pyasn1_modules',
'typing_extensions; python_version < "3.8.0"',

View File

@@ -27,9 +27,3 @@ from slixmpp.clientxmpp import ClientXMPP
from slixmpp.componentxmpp import ComponentXMPP
from slixmpp.version import __version__, __version_info__
__all__ = [
'Message', 'Presence', 'Iq', 'JID', 'InvalidJID', 'ET', 'ElementBase',
'register_stanza_plugin', 'XMLStream', 'BaseXMPP', 'ClientXMPP', 'ComponentXMPP',
'__version__', '__version_info__'
]

View File

@@ -138,8 +138,8 @@ class ClientXMPP(BaseXMPP):
self.credentials['password'] = value
def connect(self, address: Optional[Tuple[str, int]] = None, # type: ignore
use_ssl: Optional[bool] = None, force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
use_ssl: bool = False, force_starttls: bool = True,
disable_starttls: bool = False) -> None:
"""Connect to the XMPP server.
When no address is given, a SRV lookup for the server will
@@ -166,8 +166,8 @@ class ClientXMPP(BaseXMPP):
host, port = (self.boundjid.host, 5222)
self.dns_service = 'xmpp-client'
XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
return XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls)
def register_feature(self, name: str, handler: Callable, restart: bool = False, order: int = 5000) -> None:
"""Register a stream feature handler.

View File

@@ -9,8 +9,6 @@
import logging
import hashlib
from typing import Optional
from slixmpp import Message, Iq, Presence
from slixmpp.basexmpp import BaseXMPP
from slixmpp.stanza import Handshake
@@ -95,9 +93,7 @@ class ComponentXMPP(BaseXMPP):
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
def connect(self, host: Optional[str] = None, port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
def connect(self, host=None, port=None, use_ssl=False):
"""Connect to the server.
@@ -107,18 +103,17 @@ class ComponentXMPP(BaseXMPP):
Defauts to :attr:`server_port`.
:param use_ssl: Flag indicating if SSL should be used by connecting
directly to a port using SSL.
:param force_starttls: UNUSED
:param disable_starttls: UNUSED
"""
if host is not None:
self.server_host = host
if port:
self.server_port = port
if host is None:
host = self.server_host
if port is None:
port = self.server_port
self.server_name = self.boundjid.host
log.debug("Connecting to %s:%s", host, port)
XMLStream.connect(self, host=self.server_host, port=self.server_port, use_ssl=use_ssl)
return XMLStream.connect(self, host=host, port=port,
use_ssl=use_ssl)
def incoming_filter(self, xml):
"""

View File

@@ -37,8 +37,7 @@ class FeatureMechanisms(BasePlugin):
'unencrypted_digest': False,
'unencrypted_cram': False,
'unencrypted_scram': True,
'order': 100,
'tls_version': None,
'order': 100
}
def plugin_init(self):
@@ -97,20 +96,7 @@ class FeatureMechanisms(BasePlugin):
result[value] = creds.get('email', jid)
elif value == 'channel_binding':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
version = self.xmpp.socket.version()
# As of now, python does not implement anything else
# than tls-unique, which is forbidden on TLSv1.3
# see https://github.com/python/cpython/issues/95341
if version != 'TLSv1.3':
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-unique"
)
elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES:
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-exporter"
)
else:
result[value] = None
result[value] = self.xmpp.socket.get_channel_binding()
else:
result[value] = None
elif value == 'host':
@@ -135,11 +121,6 @@ class FeatureMechanisms(BasePlugin):
result[value] = True
else:
result[value] = False
elif value == 'tls_version':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.version()
elif value == 'binding_proposed':
result[value] = any(x for x in self.mech_list if x.endswith('-PLUS'))
else:
result[value] = self.config.get(value, False)
return result

View File

@@ -76,7 +76,6 @@ PLUGINS = [
'xep_0256', # Last Activity in Presence
'xep_0257', # Client Certificate Management for SASL EXTERNAL
'xep_0258', # Security Labels in XMPP
'xep_0264', # Jingle Content Thumbnails
# 'xep_0270', # XMPP Compliance Suites 2010. Dont automatically load
'xep_0279', # Server IP Check
'xep_0280', # Message Carbons
@@ -86,7 +85,6 @@ PLUGINS = [
# 'xep_0302', # XMPP Compliance Suites 2012. Dont automatically load
'xep_0308', # Last Message Correction
'xep_0313', # Message Archive Management
'xep_0317', # Hats
'xep_0319', # Last User Interaction in Presence
# 'xep_0323', # IoT Systems Sensor Data. Dont automatically load
# 'xep_0325', # IoT Systems Control. Dont automatically load
@@ -112,7 +110,7 @@ PLUGINS = [
'xep_0421', # Anonymous unique occupant identifiers for MUCs
'xep_0422', # Message Fastening
'xep_0424', # Message Retraction
'xep_0425', # Moderated Message Retraction
'xep_0425', # Message Moderation
'xep_0428', # Message Fallback
'xep_0437', # Room Activity Indicators
'xep_0439', # Quick Response
@@ -120,8 +118,6 @@ PLUGINS = [
'xep_0444', # Message Reactions
'xep_0447', # Stateless file sharing
'xep_0461', # Message Replies
'xep_0469', # Bookmarks Pinning
'xep_0490', # Message Displayed Synchronization
# Meant to be imported by plugins
]

View File

@@ -6,18 +6,14 @@
# Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2012 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details
from __future__ import annotations
import sys
import copy
import logging
import threading
from typing import Any, Dict, Set, ClassVar, Union, TYPE_CHECKING
from typing import Any, Dict, Set, ClassVar
if TYPE_CHECKING:
from slixmpp.clientxmpp import ClientXMPP
from slixmpp.componentxmpp import ComponentXMPP
log = logging.getLogger(__name__)
@@ -276,7 +272,7 @@ class BasePlugin(object):
#: `plugin.config['foo']`.
default_config: ClassVar[Dict[str, Any]] = {}
def __init__(self, xmpp: Union[ClientXMPP,ComponentXMPP], config=None):
def __init__(self, xmpp, config=None):
self.xmpp = xmpp
if self.xmpp:
self.api = self.xmpp.api.wrap(self.name)

View File

@@ -165,11 +165,11 @@ class DiscoInfo(ElementBase):
identities = []
for id_xml in self.xml.findall('{%s}identity' % self.namespace):
xml_lang = id_xml.attrib.get('{%s}lang' % self.xml_ns, None)
category = id_xml.attrib.get('category', None)
type_ = id_xml.attrib.get('type', None)
name = id_xml.attrib.get('name', None)
if lang is None or xml_lang == lang:
id = (category, type_, xml_lang, name)
id = (id_xml.attrib['category'],
id_xml.attrib['type'],
id_xml.attrib.get('{%s}lang' % self.xml_ns, None),
id_xml.attrib.get('name', None))
if isinstance(identities, set):
identities.add(id)
else:
@@ -253,12 +253,10 @@ class DiscoInfo(ElementBase):
else:
features = []
for feature_xml in self.xml.findall('{%s}feature' % self.namespace):
feature = feature_xml.attrib.get('var', None)
if feature:
if isinstance(features, set):
features.add(feature)
else:
features.append(feature)
if isinstance(features, set):
features.add(feature_xml.attrib['var'])
else:
features.append(feature_xml.attrib['var'])
return features
def set_features(self, features: Iterable[str]):

View File

@@ -49,13 +49,11 @@ from slixmpp.plugins.xep_0045.stanza import (
MUCUserItem,
)
from slixmpp.types import (
JidStr,
MucRole,
MucAffiliation,
MucRoomItem,
MucRoomItemKeys,
PresenceArgs,
PresenceShows,
)
JoinResult = Tuple[Presence, Message, List[Presence], List[Message]]
@@ -189,7 +187,7 @@ class XEP_0045(BasePlugin):
def _handle_config_change(self, msg: Message):
"""Handle a MUC configuration change (with status code)."""
self.xmpp.event('groupchat_config_status', msg)
self.xmpp.event('muc::%s::config_status' % msg['from'].bare, msg)
self.xmpp.event('muc::%s::config_status' % msg['from'].bare , msg)
def _client_handle_presence(self, pr: Presence):
"""As a client, handle a presence stanza"""
@@ -266,7 +264,7 @@ class XEP_0045(BasePlugin):
seconds: Optional[int] = None,
since: Optional[datetime] = None,
presence_options: Optional[PresenceArgs] = None,
timeout: int = 300) -> JoinResult:
timeout: Optional[int] = None) -> JoinResult:
"""
Try to join a MUC and block until we are joined or get an error.
@@ -312,7 +310,7 @@ class XEP_0045(BasePlugin):
stanza.send()
return await self._await_join(room, timeout)
async def _await_join(self, room: JID, timeout: int = 300) -> JoinResult:
async def _await_join(self, room: JID, timeout: Optional[int] = None) -> JoinResult:
"""Do the heavy lifting for awaiting a MUC join
A muc join, once the join stanza is sent, is:
@@ -360,7 +358,7 @@ class XEP_0045(BasePlugin):
return (pres, subject, occupant_buffer, history_buffer)
def join_muc(self, room: JID, nick: str, maxhistory="0", password='',
pstatus='', pshow: PresenceShows='chat', pfrom: JidStr='') -> asyncio.Future:
pstatus='', pshow='', pfrom='') -> asyncio.Future:
""" Join the specified room, requesting 'maxhistory' lines of history.
.. deprecated:: 1.8.0
@@ -414,7 +412,7 @@ class XEP_0045(BasePlugin):
)
del self.rooms[room]
def set_subject(self, room: JidStr, subject: str, *, mfrom: Optional[JID] = None):
def set_subject(self, room: JID, subject: str, *, mfrom: Optional[JID] = None):
"""Set a rooms subject.
:param room: JID of the room.
@@ -425,7 +423,7 @@ class XEP_0045(BasePlugin):
msg['subject'] = subject
msg.send()
async def get_room_config(self, room: JidStr, ifrom: Optional[JID] = None,
async def get_room_config(self, room: JID, ifrom: Optional[JID] = None,
**iqkwargs) -> Form:
"""Get the room config form in 0004 plugin format.
@@ -440,7 +438,7 @@ class XEP_0045(BasePlugin):
raise ValueError("Configuration form not found")
return form
async def set_room_config(self, room: JidStr, config: Form, *,
async def set_room_config(self, room: JID, config: Form, *,
ifrom: Optional[JID] = None, **iqkwargs):
"""Send a room config form.
@@ -453,8 +451,8 @@ class XEP_0045(BasePlugin):
iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom)
await iq.send(**iqkwargs)
async def cancel_config(self, room: JidStr, *,
ifrom: Optional[JidStr] = None, **iqkwargs):
async def cancel_config(self, room: JID, *,
ifrom: Optional[JID] = None, **iqkwargs):
"""Cancel a requested config form.
:param room: Room to cancel the form for.
@@ -464,8 +462,8 @@ class XEP_0045(BasePlugin):
iq = self.xmpp.make_iq_set(query, ito=room, ifrom=ifrom)
await iq.send(**iqkwargs)
async def destroy(self, room: JidStr, reason: str = '', altroom: Optional[JidStr] = None, *,
ifrom: Optional[JidStr] = None, **iqkwargs):
async def destroy(self, room: JID, reason: str = '', altroom: Optional[JID] = None, *,
ifrom: Optional[JID] = None, **iqkwargs):
"""Destroy a room.
:param room: Room JID to destroy.
@@ -481,10 +479,10 @@ class XEP_0045(BasePlugin):
iq['mucowner_query']['destroy']['reason'] = reason
await iq.send(**iqkwargs)
async def set_affiliation(self, room: JidStr, affiliation: MucAffiliation, *,
jid: Optional[JidStr] = None,
async def set_affiliation(self, room: JID, affiliation: MucAffiliation, *,
jid: Optional[JID] = None,
nick: Optional[str] = None, reason: str = '',
ifrom: Optional[JidStr] = None, **iqkwargs):
ifrom: Optional[JID] = None, **iqkwargs):
""" Change room affiliation for a JID or nickname.
:param room: Room to modify.
@@ -495,7 +493,7 @@ class XEP_0045(BasePlugin):
if affiliation not in AFFILIATIONS:
raise ValueError('%s is not a valid affiliation' % affiliation)
if affiliation == 'outcast' and not jid:
raise ValueError('Outcast affiliation requires using a jid')
raise ValueError('Outcast affiliation requires a using a jid')
if not any((jid, nick)):
raise ValueError('One of jid or nick must be set')
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)
@@ -508,8 +506,8 @@ class XEP_0045(BasePlugin):
iq['mucadmin_query']['item']['reason'] = reason
await iq.send(**iqkwargs)
async def get_affiliation_list(self, room: JidStr, affiliation: MucAffiliation, *,
ifrom: Optional[JidStr] = None, **iqkwargs) -> List[JID]:
async def get_affiliation_list(self, room: JID, affiliation: MucAffiliation, *,
ifrom: Optional[JID] = None, **iqkwargs) -> List[JID]:
"""Get a list of JIDs with the specified affiliation
:param room: Room to get affiliations from.
@@ -520,9 +518,9 @@ class XEP_0045(BasePlugin):
result = await iq.send(**iqkwargs)
return [item['jid'] for item in result['mucadmin_query']]
async def send_affiliation_list(self, room: JidStr,
affiliations: List[Tuple[JidStr, MucAffiliation]], *,
ifrom: Optional[JidStr] = None, **iqkwargs):
async def send_affiliation_list(self, room: JID,
affiliations: List[Tuple[JID, MucAffiliation]], *,
ifrom: Optional[JID] = None, **iqkwargs):
"""Send an affiliation delta list.
:param room: Room to send the affiliations to.
@@ -536,8 +534,8 @@ class XEP_0045(BasePlugin):
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs)
async def set_role(self, room: JidStr, nick: str, role: MucRole, *,
reason: str = '', ifrom: Optional[JidStr] = None, **iqkwargs):
async def set_role(self, room: JID, nick: str, role: MucRole, *,
reason: str = '', ifrom: Optional[JID] = None, **iqkwargs):
""" Change role property of a nick in a room.
Typically, roles are temporary (they last only as long as you are in the
room), whereas affiliations are permanent (they last across groupchat
@@ -557,8 +555,8 @@ class XEP_0045(BasePlugin):
iq['mucadmin_query']['item']['reason'] = reason
await iq.send(**iqkwargs)
async def get_roles_list(self, room: JidStr, role: MucRole, *,
ifrom: Optional[JidStr] = None, **iqkwargs) -> List[str]:
async def get_roles_list(self, room: JID, role: MucRole, *,
ifrom: Optional[JID] = None, **iqkwargs) -> List[str]:
""""Get a list of JIDs with the specified role
:param room: Room to get roles from.
@@ -569,8 +567,8 @@ class XEP_0045(BasePlugin):
result = await iq.send(**iqkwargs)
return [item['nick'] for item in result['mucadmin_query']]
async def send_role_list(self, room: JidStr, roles: List[Tuple[str, MucRole]], *,
ifrom: Optional[JidStr] = None, **iqkwargs):
async def send_role_list(self, room: JID, roles: List[Tuple[str, MucRole]], *,
ifrom: Optional[JID] = None, **iqkwargs):
"""Send a role delta list.
:param room: Room to send the roles to.
@@ -584,8 +582,8 @@ class XEP_0045(BasePlugin):
iq['mucadmin_query'].append(item)
await iq.send(**iqkwargs)
def invite(self, room: JidStr, jid: JidStr, reason: str = '', *,
mfrom: Optional[JidStr] = None):
def invite(self, room: JID, jid: JID, reason: str = '', *,
mfrom: Optional[JID] = None):
""" Invite a jid to a room (mediated invitation).
:param room: Room to invite the user in.
@@ -598,8 +596,8 @@ class XEP_0045(BasePlugin):
msg['muc']['invite']['reason'] = reason
self.xmpp.send(msg)
def invite_server(self, room: JidStr, jid: JidStr,
invite_from: JidStr, reason: str = ''):
def invite_server(self, room: JID, jid: JID,
invite_from: JID, reason: str = ''):
"""Send a mediated invite to a user, as a MUC service.
.. versionadded:: 1.8.0
@@ -617,8 +615,8 @@ class XEP_0045(BasePlugin):
msg['muc']['invite']['reason'] = reason
msg.send()
def decline(self, room: JidStr, jid: JidStr, reason: str = '', *,
mfrom: Optional[JidStr] = None):
def decline(self, room: JID, jid: JID, reason: str = '', *,
mfrom: Optional[JID] = None):
"""Decline a mediated invitation.
:param room: Room the invitation came from.
@@ -631,7 +629,7 @@ class XEP_0045(BasePlugin):
msg['muc']['decline']['reason'] = reason
self.xmpp.send(msg)
def request_voice(self, room: JidStr, role: str, *, mfrom: Optional[JidStr] = None):
def request_voice(self, room: JID, role: str, *, mfrom: Optional[JID] = None):
"""Request voice in a moderated room.
:param room: Room to request voice from.
@@ -648,49 +646,29 @@ class XEP_0045(BasePlugin):
"""Check if a JID is present in a room.
:param room: Room to check.
:param jid: FULL JID to check.
:param jid: JID to check.
"""
bare_match = False
for nick in self.rooms[room]:
entry = self.rooms[room][nick]
if not entry.get('jid'):
continue
if entry['jid'] == jid.full:
if entry is not None and entry['jid'].full == jid:
return True
elif JID(entry['jid']).bare == jid.bare:
bare_match = True
if bare_match:
logging.info(
"Could not retrieve full JID, falling back to bare JID for %s in %s",
jid, room
)
return bare_match
return False
def get_nick(self, room: JID, jid: JID) -> Optional[str]:
"""Get the nickname of a specific JID in a room.
:param room: Room to inspect.
:param jid: FULL JID whose nick to return.
:param jid: JID whose nick to return.
"""
bare_match = None
for nick in self.rooms[room]:
entry = self.rooms[room][nick]
if not entry.get('jid'):
continue
if entry['jid'] == jid.full:
if entry is not None and entry['jid'].full == jid:
return nick
elif JID(entry['jid']).bare == jid.bare:
bare_match = nick
if bare_match:
logging.info(
"Could not retrieve full JID, falling back to bare JID for %s in %s",
jid, room
)
return bare_match
return None
def get_joined_rooms(self) -> List[JID]:
"""Get the list of rooms we sent a join presence to
@@ -726,7 +704,7 @@ class XEP_0045(BasePlugin):
raise ValueError("Room %s is not joined" % room)
return list(self.rooms[room].keys())
def get_users_by_affiliation(self, room: JidStr, affiliation='member', *, ifrom: Optional[JidStr] = None):
def get_users_by_affiliation(self, room: JID, affiliation='member', *, ifrom: Optional[JID] = None):
# Preserve old API
if affiliation not in AFFILIATIONS:
raise ValueError("Affiliation %s does not exist" % affiliation)

View File

@@ -28,7 +28,7 @@ class MUCBase(ElementBase):
plugin_attrib = 'muc'
interfaces = {'affiliation', 'role', 'jid', 'nick', 'room', 'status_codes'}
def get_status_codes(self) -> Set[int]:
def get_status_codes(self) -> Set[str]:
status = self.xml.findall(f'{{{NS_USER}}}status')
return {int(status.attrib['code']) for status in status}
@@ -275,8 +275,7 @@ class MUCUserItem(ElementBase):
jid = self.xml.attrib.get('jid', None)
if jid:
return JID(jid)
else:
return None
return jid
class MUCActor(ElementBase):
@@ -289,8 +288,7 @@ class MUCActor(ElementBase):
jid = self.xml.attrib.get('jid', None)
if jid:
return JID(jid)
else:
return None
return jid
class MUCDestroy(ElementBase):

View File

@@ -7,8 +7,7 @@ import logging
import hashlib
import base64
from asyncio import Future, Lock
from collections import defaultdict
from asyncio import Future
from typing import Optional
from slixmpp import __version__
@@ -95,9 +94,6 @@ class XEP_0115(BasePlugin):
disco.assign_verstring = self.assign_verstring
disco.get_verstring = self.get_verstring
# prevent concurrent fetches for the same hash
self._locks = defaultdict(Lock)
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace)
self.xmpp.del_filter('out', self._filter_add_caps)
@@ -141,7 +137,7 @@ class XEP_0115(BasePlugin):
self.xmpp.event('entity_caps', p)
async def _process_caps(self, pres: Presence):
async def _process_caps(self, pres):
if not pres['caps']['hash']:
log.debug("Received unsupported legacy caps: %s, %s, %s",
pres['caps']['node'],
@@ -151,11 +147,7 @@ class XEP_0115(BasePlugin):
return
ver = pres['caps']['ver']
async with self._locks[ver]:
await self._process_caps_wrapped(pres, ver)
self._locks.pop(ver, None)
async def _process_caps_wrapped(self, pres: Presence, ver: str):
existing_verstring = await self.get_verstring(pres['from'].full)
if str(existing_verstring) == str(ver):
return

View File

@@ -137,14 +137,7 @@ class XEP_0199(BasePlugin):
async def _keepalive(self, event=None):
log.debug("Keepalive ping...")
try:
ifrom = None
if self.xmpp.is_component:
ifrom = self.xmpp.boundjid
rtt = await self.ping(
self.xmpp.boundjid.host,
timeout=self.timeout,
ifrom=ifrom
)
rtt = await self.ping(self.xmpp.boundjid.host, timeout=self.timeout)
except IqTimeout:
log.debug("Did not receive ping back in time. " + \
"Requesting Reconnect.")

View File

@@ -15,32 +15,6 @@ log = logging.getLogger(__name__)
class XEP_0221(BasePlugin):
"""
XEP-0221: Data Forms Media Element
In certain implementations of Data Forms (XEP-0004), it can be
helpful to include media data such as small images. One example is
CAPTCHA Forms (XEP-0158). This plugin implements a method for
including media data in a data form.
Typical use pattern:
.. code-block:: python
self.register_plugin('xep_0221')
self['xep_0050'].add_command(node="showimage",
name="Show my image",
handler=self.form_handler)
def form_handler(self,iq,session):
image_url="https://xmpp.org/images/logos/xmpp-logo.svg"
form=self['xep_0004'].make_form('result','My Image')
form.addField(var='myimage', ftype='text-single', label='My Image', value=image_url)
form.field['myimage']['media'].add_uri(value=image_url, itype="image/svg")
session['payload']=form
return session
"""
name = 'xep_0221'
description = 'XEP-0221: Data Forms Media Element'

View File

@@ -20,18 +20,6 @@ class XEP_0223(BasePlugin):
"""
XEP-0223: Persistent Storage of Private Data via PubSub
If a specific pubsub node requires additional publish options, edit the
:attr:`.node_profile` attribute of this plugin:
.. code-block:: python
self.xmpp.plugin["xep_0223"].node_profiles["urn:some:node"] = {
"pubsub#max_items" = "max"
}
This makes :meth:`.store` add these publish options whenever it is called
for the ``urn:some:node`` node.
"""
name = 'xep_0223'
@@ -40,7 +28,6 @@ class XEP_0223(BasePlugin):
profile = {'pubsub#persist_items': True,
'pubsub#access_model': 'whitelist'}
node_profiles = dict[str, dict[str, str]]()
def configure(self, node: str, **iqkwargs) -> Future:
"""
@@ -83,8 +70,7 @@ class XEP_0223(BasePlugin):
value='http://jabber.org/protocol/pubsub#publish-options')
fields = options['fields']
profile = self.profile | self.node_profiles.get(node, {})
for field, value in profile.items():
for field, value in self.profile.items():
if field not in fields:
options.add_field(var=field)
options.get_fields()[field]['value'] = value

View File

@@ -10,7 +10,6 @@ from asyncio import Future
from typing import Optional
from slixmpp import JID
from slixmpp.exceptions import XMPPError
from slixmpp.stanza import Iq, Message, Presence
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath
@@ -140,13 +139,6 @@ class XEP_0231(BasePlugin):
self.xmpp.event('bob', iq)
elif iq['type'] == 'get':
data = await self.api['get_bob'](iq['to'], None, iq['from'], args=cid)
if data is None:
raise XMPPError(
"item-not-found",
f"Bits of binary '{cid}' is not available",
)
if isinstance(data, Iq):
data['id'] = iq['id']
data.send()

View File

@@ -1,5 +0,0 @@
from slixmpp.plugins.base import register_plugin
from .thumbnail import XEP_0264
register_plugin(XEP_0264)

View File

@@ -1,36 +0,0 @@
from typing import Optional
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0234.stanza import File
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:thumbs:1"
class Thumbnail(ElementBase):
name = plugin_attrib = "thumbnail"
namespace = NS
interfaces = {"uri", "media-type", "width", "height"}
def get_width(self) -> int:
return _int_or_none(self._get_attr("width"))
def get_height(self) -> int:
return _int_or_none(self._get_attr("height"))
def set_width(self, v: int) -> None:
self._set_attr("width", str(v))
def set_height(self, v: int) -> None:
self._set_attr("height", str(v))
def _int_or_none(v) -> Optional[int]:
try:
return int(v)
except ValueError:
return None
def register_plugin():
register_stanza_plugin(File, Thumbnail)

View File

@@ -1,24 +0,0 @@
import logging
from slixmpp.plugins import BasePlugin
from . import stanza
log = logging.getLogger(__name__)
class XEP_0264(BasePlugin):
"""
XEP-0264: Jingle Content Thumbnails
Can also be used with 0385 (Stateless inline media sharing)
"""
name = "xep_0264"
description = "XEP-0264: Jingle Content Thumbnails"
dependencies = {"xep_0234"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -5,6 +5,7 @@
# See the file LICENSE for copying permissio
import logging
import slixmpp
from slixmpp.stanza import Message
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
@@ -44,8 +45,5 @@ class XEP_0308(BasePlugin):
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature(Replace.namespace)
def is_correction(self, msg: Message):
return msg.xml.find('{%s}replace' % Replace.namespace) is not None
def _handle_correction(self, msg: Message):
def _handle_correction(self, msg):
self.xmpp.event('message_correction', msg)

View File

@@ -52,10 +52,9 @@ class MAM(ElementBase):
#: fetch, not relevant for the stanza itself.
interfaces = {
'queryid', 'start', 'end', 'with', 'results',
'before_id', 'after_id', 'ids', 'flip_page',
'before_id', 'after_id', 'ids',
}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids',
'flip_page'}
sub_interfaces = {'start', 'end', 'with', 'before_id', 'after_id', 'ids'}
def setup(self, xml=None):
ElementBase.setup(self, xml)
@@ -82,7 +81,7 @@ class MAM(ElementBase):
def get_start(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('start')
if field and field["value"]:
if field:
return xep_0082.parse(field['value'])
return None
@@ -95,7 +94,7 @@ class MAM(ElementBase):
def get_end(self) -> Optional[datetime]:
fields = self.get_fields()
field = fields.get('end')
if field and field["value"]:
if field:
return xep_0082.parse(field['value'])
return None
@@ -169,8 +168,6 @@ class MAM(ElementBase):
def del_results(self):
self._results = []
def get_flip_page(self):
return self.xml.find(f'{{{self.namespace}}}flip-page') is not None
class Fin(ElementBase):
"""A MAM fin element (end of query).

View File

@@ -1,11 +0,0 @@
# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.plugins import register_plugin
from slixmpp.plugins.xep_0317 import stanza
from slixmpp.plugins.xep_0317.hats import XEP_0317
from slixmpp.plugins.xep_0317.stanza import Hat, Hats
register_plugin(XEP_0317)
__all__ = ['stanza', 'XEP_317']

View File

@@ -1,16 +0,0 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0317(BasePlugin):
"""
XEP-0317: Hats
"""
name = 'xep_0317'
description = 'XEP-0317: Hats'
dependencies = {'xep_0030', 'xep_0045', 'xep_0050'}
stanza = stanza
namespace = stanza.NS
def plugin_init(self):
stanza.register_plugin()

View File

@@ -1,58 +0,0 @@
from slixmpp import Presence
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from typing import List, Tuple
NS = 'urn:xmpp:hats:0'
class Hats(ElementBase):
"""
Hats element, container for multiple hats:
.. code-block::xml
<hats xmlns='urn:xmpp:hats:0'>
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
<hat title='Presenter' uri='http://schemas.example.com/hats#presenter' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#EC0524"/>
</hat>
</hats>
"""
name = 'hats'
namespace = NS
plugin_attrib = 'hats'
def add_hats(self, data: List[Tuple[str, str]]) -> None:
for uri, title in data:
hat = Hat()
hat["uri"] = uri
hat["title"] = title
self.append(hat)
class Hat(ElementBase):
"""
Hat element, has a title and url, may contain arbitrary sub-elements.
.. code-block::xml
<hat title='Host' uri='http://schemas.example.com/hats#host' xml:lang='en-us'>
<badge xmlns="urn:example:badges" fgcolor="#000000" bgcolor="#58C5BA"/>
</hat>
"""
name = 'hat'
plugin_attrib = 'hat'
namespace = NS
interfaces = {'title', 'uri'}
plugin_multi_attrib = "hats"
def register_plugin() -> None:
register_stanza_plugin(Hats, Hat, iterable=True)
register_stanza_plugin(Presence, Hats)

View File

@@ -4,6 +4,7 @@
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.plugins.base import register_plugin
from .retraction import XEP_0424
from slixmpp.plugins.xep_0424.stanza import *
from slixmpp.plugins.xep_0424.retraction import XEP_0424
register_plugin(XEP_0424)

View File

@@ -31,7 +31,7 @@ class XEP_0424(BasePlugin):
stanza.register_plugins()
self.xmpp.register_handler(Callback(
"Message Retracted",
StanzaPath("message/retract"),
StanzaPath("message/apply_to/retract"),
self._handle_retract_message,
))
@@ -64,6 +64,7 @@ class XEP_0424(BasePlugin):
if include_fallback:
msg['body'] = fallback_text
msg.enable('fallback')
msg['retract']['id'] = id
msg['apply_to']['id'] = id
msg['apply_to'].enable('retract')
msg.enable('store')
msg.send()

View File

@@ -8,27 +8,28 @@ from slixmpp.xmlstream import (
ElementBase,
register_stanza_plugin,
)
from slixmpp.plugins.xep_0422.stanza import ApplyTo
from slixmpp.plugins.xep_0359 import OriginID
NS = 'urn:xmpp:message-retract:1'
NS = 'urn:xmpp:message-retract:0'
class Retract(ElementBase):
namespace = NS
name = 'retract'
plugin_attrib = 'retract'
interfaces = {'reason', 'id'}
sub_interfaces = {'reason'}
class Retracted(ElementBase):
namespace = NS
name = 'retracted'
plugin_attrib = 'retracted'
interfaces = {'stamp', 'id', 'reason'}
sub_interfaces = {'reason'}
interfaces = {'stamp'}
def register_plugins():
register_stanza_plugin(Message, Retract)
register_stanza_plugin(ApplyTo, Retract)
register_stanza_plugin(Message, Retracted)
register_stanza_plugin(Retracted, OriginID)

View File

@@ -13,10 +13,10 @@ from slixmpp.plugins.xep_0425 import stanza
class XEP_0425(BasePlugin):
'''XEP-0425: Moderated Message Retraction'''
'''XEP-0425: Message Moderation'''
name = 'xep_0425'
description = 'XEP-0425: Moderated Message Retraction'
description = 'XEP-0425: Message Moderation'
dependencies = {'xep_0424', 'xep_0421'}
stanza = stanza
namespace = stanza.NS
@@ -25,7 +25,7 @@ class XEP_0425(BasePlugin):
stanza.register_plugins()
self.xmpp.register_handler(Callback(
'Moderated Message',
StanzaPath('message/retract/moderated'),
StanzaPath('message/apply_to/moderated/retract'),
self._handle_moderated,
))
@@ -42,7 +42,7 @@ class XEP_0425(BasePlugin):
async def moderate(self, room: JID, id: str, reason: str = '', *,
ifrom: Optional[JID] = None, **iqkwargs):
iq = self.xmpp.make_iq_set(ito=room.bare, ifrom=ifrom)
iq['moderate']['id'] = id
iq['moderate']['reason'] = reason
iq['moderate'].enable('retract')
iq['apply_to']['id'] = id
iq['apply_to']['moderate']['reason'] = reason
iq['apply_to']['moderate'].enable('retract')
await iq.send(**iqkwargs)

View File

@@ -8,11 +8,12 @@ from slixmpp.xmlstream import (
ElementBase,
register_stanza_plugin,
)
from slixmpp.plugins.xep_0422.stanza import ApplyTo
from slixmpp.plugins.xep_0421.stanza import OccupantId
from slixmpp.plugins.xep_0424.stanza import Retract, Retracted
NS = 'urn:xmpp:message-moderate:1'
NS = 'urn:xmpp:message-moderate:0'
class Moderate(ElementBase):
@@ -27,17 +28,17 @@ class Moderated(ElementBase):
namespace = NS
name = 'moderated'
plugin_attrib = 'moderated'
interfaces = {'by'}
interfaces = {'reason', 'by'}
sub_interfaces = {'reason'}
def register_plugins():
# for moderation requests
register_stanza_plugin(Iq, Moderate)
register_stanza_plugin(Iq, ApplyTo)
register_stanza_plugin(ApplyTo, Moderate)
register_stanza_plugin(Moderate, Retract)
# for moderation events
register_stanza_plugin(Retract, Moderated)
register_stanza_plugin(Message, Moderated)
register_stanza_plugin(ApplyTo, Moderated)
register_stanza_plugin(Moderated, Retract)
register_stanza_plugin(Moderated, Retracted)
register_stanza_plugin(Moderated, OccupantId)
# for tombstones
register_stanza_plugin(Retracted, Moderated)

View File

@@ -1,5 +1,3 @@
from typing import Optional
from slixmpp.plugins import BasePlugin
from slixmpp.types import JidStr
from slixmpp.xmlstream import StanzaBase
@@ -38,35 +36,13 @@ class XEP_0461(BasePlugin):
def _handle_reply_to_message(self, msg: StanzaBase):
self.xmpp.event("message_reply", msg)
def make_reply(self, reply_to: JidStr, reply_id: str,
fallback: Optional[str] = None,
quoted_nick: Optional[str] = None, **msg_kwargs):
"""Create a replies message stanza
def send_reply(self, reply_to: JidStr, reply_id: str, **msg_kwargs):
"""
:param reply_to: Full JID of the quoted author
:param reply_id: ID of the message to reply to
:param fallback: Body of the quoted message
:param quoted_nick: nickname of the quoted participant
:param msg_kwargs: Parameters are consistent with the make_message method,
required parameters are ``mto`` and ``mbody``
"""
msg = self.xmpp.make_message(**msg_kwargs)
msg["reply"]["to"] = reply_to
msg["reply"]["id"] = reply_id
if fallback:
msg["reply"].add_quoted_fallback(fallback, quoted_nick)
return msg
def send_reply(self, reply_to: JidStr, reply_id: str,
fallback: Optional[str] = None,
quoted_nick: Optional[str] = None, **msg_kwargs):
"""
:param reply_to: Full JID of the quoted author
:param reply_id: ID of the message to reply to
:param fallback: Body of the quoted message
:param quoted_nick: nickname of the quoted participant
"""
msg = self.make_reply(reply_to, reply_id, fallback, quoted_nick, **msg_kwargs)
msg.send()

View File

@@ -30,11 +30,11 @@ class Reply(ElementBase):
if nickname:
quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"]
fallback_elem = Fallback()
fallback_elem["for"] = NS
fallback_elem["body"]["start"] = 0
fallback_elem["body"]["end"] = len(quoted)
msg.append(fallback_elem)
fallback = Fallback()
fallback["for"] = NS
fallback["body"]["start"] = 0
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
def get_fallback_body(self) -> str:
msg = self.parent()
@@ -50,23 +50,6 @@ class Reply(ElementBase):
return body[start:end]
else:
return ""
def strip_fallback_content(self) -> str:
msg = self.parent()
for fallback in msg["fallbacks"]:
if fallback["for"] == NS:
break
else:
return msg["body"]
start = fallback["body"]["start"]
end = fallback["body"]["end"]
body = msg["body"]
if 0 <= start < end <= len(body):
return body[:start] + body[end:]
else:
return body
def register_plugins():

View File

@@ -1,8 +0,0 @@
from slixmpp.plugins.base import register_plugin
from . import stanza
from .pinning import XEP_0469
register_plugin(XEP_0469)
__all__ = ['stanza', 'XEP_0469']

View File

@@ -1,17 +0,0 @@
from slixmpp.plugins import BasePlugin
from . import stanza
class XEP_0469(BasePlugin):
"""
XEP-0469: Bookmark Pinning
"""
name = "xep_0469"
description = "XEP-0469: Bookmark Pinning"
dependencies = {"xep_0402"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()

View File

@@ -1,31 +0,0 @@
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0402.stanza import Extensions
from slixmpp.xmlstream import ElementBase
NS = "urn:xmpp:bookmarks-pinning:0"
class Pinned(ElementBase):
"""
Pinned bookmark element
To enable it on a Conference element, use enable() like this:
.. code-block::python
# C being a Conference element
C['extensions'].enable('pinned')
Which will add the <pinned> element to the <extensions> element.
"""
namespace = NS
name = "pinned"
plugin_attrib = "pinned"
interfaces = {"pinned"}
bool_interfaces = {"pinned"}
is_extension = True
def register_plugin():
register_stanza_plugin(Extensions, Pinned)

View File

@@ -1,8 +0,0 @@
from slixmpp.plugins.base import register_plugin
from . import stanza
from .mds import XEP_0490
register_plugin(XEP_0490)
__all__ = ['stanza', 'XEP_0490']

View File

@@ -1,42 +0,0 @@
from asyncio import Future
from slixmpp import Iq
from slixmpp.plugins import BasePlugin
from slixmpp.types import JidStr
from . import stanza
from ..xep_0004 import Form
class XEP_0490(BasePlugin):
"""
XEP-0490: Message Displayed Synchronization
"""
name = "xep_0490"
description = "XEP-0490: Message Displayed Synchronization"
dependencies = {"xep_0060", "xep_0163", "xep_0223", "xep_0359"}
stanza = stanza
def plugin_init(self):
stanza.register_plugin()
self.xmpp.plugin["xep_0163"].register_pep(
"message_displayed_synchronization",
stanza.Displayed,
)
self.xmpp.plugin["xep_0223"].node_profiles[self.stanza.NS] = {
"pubsub#max_items": "max",
"pubsub#send_last_published_item": "never",
}
def flag_chat(self, chat: JidStr, stanza_id: str, **kwargs) -> Future[Iq]:
displayed = stanza.Displayed()
displayed["stanza_id"]["id"] = stanza_id
return self.xmpp.plugin["xep_0223"].store(
displayed, node=stanza.NS, id=str(chat), **kwargs
)
def catch_up(self, **kwargs):
return self.xmpp.plugin["xep_0060"].get_items(
self.xmpp.boundjid.bare, stanza.NS, **kwargs
)

View File

@@ -1,17 +0,0 @@
from slixmpp import register_stanza_plugin
from slixmpp.plugins.xep_0060.stanza import Item
from slixmpp.xmlstream import ElementBase
from slixmpp.plugins.xep_0359.stanza import StanzaID
NS = "urn:xmpp:mds:displayed:0"
class Displayed(ElementBase):
namespace = NS
name = "displayed"
plugin_attrib = "displayed"
def register_plugin():
register_stanza_plugin(Displayed, StanzaID)
register_stanza_plugin(Item, Displayed)

View File

@@ -69,14 +69,12 @@ from slixmpp.plugins.xep_0249 import XEP_0249
from slixmpp.plugins.xep_0256 import XEP_0256
from slixmpp.plugins.xep_0257 import XEP_0257
from slixmpp.plugins.xep_0258 import XEP_0258
from slixmpp.plugins.xep_0264 import XEP_0264
from slixmpp.plugins.xep_0279 import XEP_0279
from slixmpp.plugins.xep_0280 import XEP_0280
from slixmpp.plugins.xep_0297 import XEP_0297
from slixmpp.plugins.xep_0300 import XEP_0300
from slixmpp.plugins.xep_0308 import XEP_0308
from slixmpp.plugins.xep_0313 import XEP_0313
from slixmpp.plugins.xep_0317 import XEP_0317
from slixmpp.plugins.xep_0319 import XEP_0319
from slixmpp.plugins.xep_0332 import XEP_0332
from slixmpp.plugins.xep_0333 import XEP_0333
@@ -102,8 +100,6 @@ from slixmpp.plugins.xep_0428 import XEP_0428
from slixmpp.plugins.xep_0437 import XEP_0437
from slixmpp.plugins.xep_0439 import XEP_0439
from slixmpp.plugins.xep_0444 import XEP_0444
from slixmpp.plugins.xep_0461 import XEP_0461
from slixmpp.plugins.xep_0490 import XEP_0490
class PluginsDict(TypedDict):
@@ -166,14 +162,12 @@ class PluginsDict(TypedDict):
xep_0256: XEP_0256
xep_0257: XEP_0257
xep_0258: XEP_0258
xep_0264: XEP_0264
xep_0279: XEP_0279
xep_0280: XEP_0280
xep_0297: XEP_0297
xep_0300: XEP_0300
xep_0308: XEP_0308
xep_0313: XEP_0313
xep_0317: XEP_0317
xep_0319: XEP_0319
xep_0332: XEP_0332
xep_0333: XEP_0333
@@ -199,5 +193,3 @@ class PluginsDict(TypedDict):
xep_0437: XEP_0437
xep_0439: XEP_0439
xep_0444: XEP_0444
xep_0461: XEP_0461
xep_0490: XEP_0490

View File

@@ -29,9 +29,9 @@ class SlixIntegration(IsolatedAsyncioTestCase):
self.clients = []
self.addAsyncCleanup(self._destroy)
def envjid(self, name: str, *, default: Optional[str] = None) -> JID:
def envjid(self, name):
"""Get a JID from an env var"""
value = os.getenv(name, default=default)
value = os.getenv(name)
return JID(value)
def envstr(self, name):

View File

@@ -3,7 +3,6 @@
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
import atexit
import unittest
from queue import Queue
from xml.parsers.expat import ExpatError
@@ -751,12 +750,3 @@ class SlixTest(unittest.TestCase):
Error.namespace = 'jabber:client'
for st in Message, Iq, Presence:
register_stanza_plugin(st, Error)
@atexit.register
def cleanup():
try:
loop = asyncio.get_event_loop()
loop.close()
except:
pass

View File

@@ -53,20 +53,17 @@ MucAffiliation = Literal[
'outcast', 'member', 'admin', 'owner', 'none'
]
OptJid = Optional[JID]
JidStr = Union[str, JID]
OptJidStr = Optional[Union[str, JID]]
class PresenceArgs(TypedDict, total=False):
pfrom: JidStr
pto: JidStr
pfrom: JID
pto: JID
pshow: PresenceShows
ptype: PresenceTypes
pstatus: str
class MucRoomItem(TypedDict, total=False):
jid: str
jid: JID
role: MucRole
affiliation: MucAffiliation
show: Optional[PresenceShows]
@@ -78,6 +75,10 @@ MucRoomItemKeys = Literal[
'jid', 'role', 'affiliation', 'show', 'status', 'alt_nick',
]
OptJid = Optional[JID]
JidStr = Union[str, JID]
OptJidStr = Optional[Union[str, JID]]
MAMDefault = Literal['always', 'never', 'roster']
FilterString = Literal['in', 'out', 'out_sync']

View File

@@ -181,7 +181,7 @@ class SCRAM(Mech):
channel_binding = True
required_credentials = {'username', 'password'}
optional_credentials = {'authzid', 'channel_binding'}
security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'}
security = {'encrypted', 'unencrypted_scram'}
def setup(self, name):
self.use_channel_binding = False
@@ -244,15 +244,11 @@ class SCRAM(Mech):
self.cnonce = bytes(('%s' % random.random())[2:])
gs2_cbind_flag = b'n'
if self.security_settings['binding_proposed']:
if self.credentials['channel_binding'] and \
self.use_channel_binding:
if self.security_settings['tls_version'] != 'TLSv1.3':
gs2_cbind_flag = b'p=tls-unique'
else:
gs2_cbind_flag = b'p=tls-exporter'
else:
gs2_cbind_flag = b'y'
if self.credentials['channel_binding']:
if self.use_channel_binding:
gs2_cbind_flag = b'p=tls-unique'
else:
gs2_cbind_flag = b'y'
authzid = b''
if self.credentials['authzid']:
@@ -284,7 +280,7 @@ class SCRAM(Mech):
raise SASLCancelled('Invalid nonce')
cbind_data = b''
if self.use_channel_binding and self.credentials['channel_binding']:
if self.use_channel_binding:
cbind_data = self.credentials['channel_binding']
cbind_input = self.gs2_header + cbind_data
channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')

View File

@@ -5,5 +5,5 @@
# We don't want to have to import the entire library
# just to get the version info for setup.py
__version__ = '1.8.6'
__version_info__ = (1, 8, 6)
__version__ = '1.8.4'
__version_info__ = (1, 8, 4)

View File

@@ -732,9 +732,6 @@ class ElementBase(object):
return plugin[full_attrib]
return plugin
else:
# XXX: This is legacy from SleekXMPP
# We've probably missed the opportunity to fix it
logging.warning("Unknown stanza interface: %s" % full_attrib)
return ''
def __setitem__(self, attrib: str, value: Any) -> Any:
@@ -1233,7 +1230,7 @@ class ElementBase(object):
if type(item) == XML_TYPE:
return self.appendxml(item)
else:
raise TypeError(f"Cannot append {item!r} to a stanza")
raise TypeError
self.xml.append(item.xml)
if item.__class__ == self.plugin_tag_map.get(item.tag_name(), None):
self.init_plugin(item.plugin_attrib,

View File

@@ -290,8 +290,8 @@ class XMLStream(asyncio.BaseProtocol):
self.xml_depth = 0
self.xml_root = None
self.force_starttls = True
self.disable_starttls = False
self.force_starttls = None
self.disable_starttls = None
self.waiting_queue = asyncio.Queue()
@@ -405,9 +405,8 @@ class XMLStream(asyncio.BaseProtocol):
self.disconnected.set_result(True)
self.disconnected = asyncio.Future()
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = None,
force_starttls: Optional[bool] = None,
disable_starttls: Optional[bool] = None) -> None:
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = False,
force_starttls: Optional[bool] = True, disable_starttls: Optional[bool] = False) -> None:
"""Create a new socket and connect to the server.
:param host: The name of the desired server for the connection.
@@ -524,7 +523,7 @@ class XMLStream(asyncio.BaseProtocol):
else:
self.loop.run_until_complete(self.disconnected)
else:
tasks: List[Union[asyncio.Task, asyncio.Future]] = [asyncio.Task(asyncio.sleep(timeout))]
tasks: List[Awaitable] = [asyncio.sleep(timeout)]
if not forever:
tasks.append(self.disconnected)
self.loop.run_until_complete(asyncio.wait(tasks))
@@ -850,8 +849,6 @@ class XMLStream(asyncio.BaseProtocol):
log.debug("Connection error:", exc_info=True)
self.disconnect()
return False
if transp is None:
raise Exception("Transport should not be none")
der_cert = transp.get_extra_info("ssl_object").getpeercert(True)
pem_cert = ssl.DER_cert_to_PEM_cert(der_cert)
self.event('ssl_cert', pem_cert)
@@ -1350,7 +1347,6 @@ class XMLStream(asyncio.BaseProtocol):
if isinstance(data, (RootStanza, str)) and not passthrough:
self.__queued_stanzas.append((data, use_filters))
log.debug('NOT SENT: %s %s', type(data), data)
self.event('stanza_not_sent', data)
return
self.waiting_queue.put_nowait((data, use_filters))
@@ -1414,11 +1410,7 @@ class XMLStream(asyncio.BaseProtocol):
# Convert the raw XML object into a stanza object. If no registered
# stanza type applies, a generic StanzaBase stanza will be used.
try:
stanza: Optional[StanzaBase] = self._build_stanza(xml)
except Exception as exc:
log.exception("Unable to parse stanza: %s,\n%s", exc, xml)
stanza = None
stanza: Optional[StanzaBase] = self._build_stanza(xml)
for filter in self.__filters['in']:
if stanza is not None:
filter = cast(SyncFilter, filter)

View File

@@ -1,67 +0,0 @@
import unittest
from slixmpp import Presence
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0317 as xep_0317
from slixmpp.plugins.xep_0317 import stanza
class TestStanzaHats(SlixTest):
def setUp(self):
stanza.register_plugin()
def test_create_hats(self):
raw_xml = """
<hats xmlns="urn:xmpp:hats:0">
<hat uri="http://example.com/hats#Teacher" title="Teacher"/>
</hats>
"""
hats = xep_0317.Hats()
hat = xep_0317.Hat()
hat['uri'] = 'http://example.com/hats#Teacher'
hat['title'] = 'Teacher'
hats.append(hat)
self.check(hats, raw_xml, use_values=False)
def test_set_single_hat(self):
presence = Presence()
presence["hats"]["hat"]["uri"] = "test-uri"
presence["hats"]["hat"]["title"] = "test-title"
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='test-uri' title='test-title'/>
</hats>
</presence>
""",
)
def test_set_multi_hat(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
self.check(
presence, # language=XML
"""
<presence>
<hats xmlns='urn:xmpp:hats:0'>
<hat uri='uri1' title='title1'/>
<hat uri='uri2' title='title2'/>
</hats>
</presence>
""",
)
def test_get_hats(self):
presence = Presence()
presence["hats"].add_hats([("uri1", "title1"), ("uri2", "title2")])
for i, hat in enumerate(presence["hats"]["hats"], start=1):
self.assertEqual(hat["uri"], f"uri{i}")
self.assertEqual(hat["title"], f"title{i}")
suite = unittest.TestLoader().loadTestsFromTestCase(TestStanzaHats)

View File

@@ -2,33 +2,38 @@ import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0424 import stanza
from slixmpp.plugins.xep_0422 import stanza as astanza
class TestRetraction(SlixTest):
def setUp(self):
astanza.register_plugins()
stanza.register_plugins()
def testRetract(self):
message = Message()
message['retract']['id'] = 'some-id'
message['apply_to']['id'] = 'some-id'
message['apply_to']['retract']
self.check(message, """
<message>
<retract xmlns="urn:xmpp:message-retract:1" id="some-id"/>
<apply-to xmlns="urn:xmpp:fasten:0" id="some-id">
<retract xmlns="urn:xmpp:message-retract:0"/>
</apply-to>
</message>
""", use_values=False)
def testRetracted(self):
message = Message()
message['retracted']['stamp'] = '2019-09-20T23:09:32Z'
message['retracted']['id'] = 'originid'
message['retracted']['origin_id']['id'] = 'originid'
self.check(message, """
<message>
<retracted stamp="2019-09-20T23:09:32Z"
xmlns="urn:xmpp:message-retract:1"
id="originid" />
<retracted stamp="2019-09-20T23:09:32Z" xmlns="urn:xmpp:message-retract:0">
<origin-id xmlns="urn:xmpp:sid:0" id="originid"/>
</retracted>
</message>
""")

View File

@@ -1,48 +1,45 @@
import unittest
from slixmpp import Message, Iq, JID
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0424 import stanza as stanza424
from slixmpp.plugins.xep_0425 import stanza
class TestModeration(SlixTest):
def setUp(self):
stanza424.register_plugins()
stanza.register_plugins()
def testModerate(self):
iq = Iq()
iq['type'] = 'set'
iq['id'] = 'a'
iq['moderate']['id'] = 'some-id'
iq['moderate'].enable('retract')
iq['moderate']['reason'] = 'R'
iq['apply_to']['id'] = 'some-id'
iq['apply_to']['moderate'].enable('retract')
iq['apply_to']['moderate']['reason'] = 'R'
self.check(iq, """
<iq type='set' id='a'>
<moderate xmlns='urn:xmpp:message-moderate:1'>
<retract xmlns='urn:xmpp:message-retract:1'/>
<reason>R</reason>
</moderate>
<apply-to id="some-id" xmlns="urn:xmpp:fasten:0">
<moderate xmlns='urn:xmpp:message-moderate:0'>
<retract xmlns='urn:xmpp:message-retract:0'/>
<reason>R</reason>
</moderate>
</apply-to>
</iq>
""", use_values=False)
def testModerated(self):
message = Message()
message['retract']['id'] = 'some-id'
message['retract']['moderated']['by'] = JID('toto@titi')
message['retract']['moderated']['occupant-id']['id'] = 'oc-id'
message['retract']['reason'] = 'R'
message['moderated']['by'] = JID('toto@titi')
message['moderated']['retracted']['stamp'] = '2019-09-20T23:09:32Z'
message['moderated']['reason'] = 'R'
self.check(message, """
<message>
<retract id='some-id' xmlns='urn:xmpp:message-retract:1'>
<moderated by='toto@titi' xmlns='urn:xmpp:message-moderate:1'>
<occupant-id xmlns="urn:xmpp:occupant-id:0" id="oc-id" />
</moderated>
<moderated xmlns="urn:xmpp:message-moderate:0" by="toto@titi">
<retracted stamp="2019-09-20T23:09:32Z" xmlns="urn:xmpp:message-retract:0" />
<reason>R</reason>
</retract>
</moderated>
</message>
""")

View File

@@ -1,36 +0,0 @@
import unittest
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0469 import stanza
from slixmpp.plugins.xep_0402 import stanza as b_stanza
class TestBookmarksPinning(SlixTest):
def setUp(self):
b_stanza.register_plugin()
stanza.register_plugin()
def test_pinned(self):
bookmark = b_stanza.Conference()
bookmark["password"] = "pass"
bookmark["nick"] = "nick"
bookmark["autojoin"] = False
bookmark["extensions"].enable("pinned")
self.check(
bookmark,
"""
<conference xmlns='urn:xmpp:bookmarks:1'
autojoin='false'>
<nick>nick</nick>
<password>pass</password>
<extensions>
<pinned xmlns="urn:xmpp:bookmarks-pinning:0" />
</extensions>
</conference>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestBookmarksPinning)

View File

@@ -1,76 +0,0 @@
import logging
import unittest
from slixmpp.test import SlixTest
class TestCaps(SlixTest):
def setUp(self):
self.stream_start(plugins=["xep_0115"])
def testConcurrentSameHash(self):
"""
Check that we only resolve a given ver string to a disco info once,
even if we receive several presences with that same ver string
consecutively.
"""
self.recv( # language=XML
"""
<presence from='romeo@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.recv( # language=XML
"""
<presence from='i-dont-know-much-shakespeare@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.send( # language=XML
"""
<iq xmlns="jabber:client"
id="1"
to="romeo@montague.lit/orchard"
type="get">
<query xmlns="http://jabber.org/protocol/disco#info"
node="a-node#h0TdMvqNR8FHUfFG1HauOLYZDqE="/>
</iq>
"""
)
self.send(None)
self.recv( # language=XML
"""
<iq from='romeo@montague.lit/orchard'
id='1'
type='result'>
<query xmlns='http://jabber.org/protocol/disco#info'
node='a-nodes#h0TdMvqNR8FHUfFG1HauOLYZDqE='>
<identity category='client' name='a client' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
</query>
</iq>
"""
)
self.send(None)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"romeo@montague.lit/orchard", "http://jabber.org/protocol/caps"
)
)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"i-dont-know-much-shakespeare@montague.lit/orchard",
"http://jabber.org/protocol/caps",
)
)
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestCaps)

View File

@@ -8,14 +8,12 @@ class TestReply(SlixTest):
self.stream_start(plugins=["xep_0461"])
def testFallBackBody(self):
def on_reply(msg):
async def on_reply(msg):
start = msg["fallback"]["body"]["start"]
end = msg["fallback"]["body"]["end"]
self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(),
reply_id=msg.get_id(),
fallback=msg["reply"].strip_fallback_content(),
quoted_nick="res",
mto="test@test.com",
mbody=f"{start} to {end}",
)
@@ -28,7 +26,7 @@ class TestReply(SlixTest):
<reply xmlns="urn:xmpp:reply:0" id="some-id" />
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
<body start="0" end="8" />
</fallback>
</message>
"""
@@ -36,11 +34,8 @@ class TestReply(SlixTest):
self.send(
"""
<message xmlns="jabber:client" to="test@test.com" type="normal">
<body>&gt; res:\n&gt; some-body\n0 to 9</body>
<reply xmlns="urn:xmpp:reply:0" id="other-id" to="from@from.com/res" />
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="19" />
</fallback>
<body>0 to 8</body>
</message>
"""
)

View File

@@ -1,135 +0,0 @@
import unittest.mock
from slixmpp.test import SlixTest
# from slixmpp.plugins import xep_0490
class TestMessageDisplaySynchronization(SlixTest):
def setUp(self):
self.stream_start(jid="juliet@capulet.lit", plugins={"xep_0490"})
def test_catch_up(self):
future = self.xmpp.plugin["xep_0490"].catch_up()
self.send( # language=XML
"""
<iq type="get" to="juliet@capulet.lit" id="1">
<pubsub xmlns="http://jabber.org/protocol/pubsub">
<items node="urn:xmpp:mds:displayed:0" />
</pubsub>
</iq>
"""
)
self.recv( # language=XML
"""
<iq type='result'
to='juliet@capulet.lit/balcony'
id='1'>
<pubsub xmlns='http://jabber.org/protocol/pubsub'>
<items node='urn:xmpp:mds:displayed:0'>
<item id='romeo@montegue.lit'>
<displayed xmlns='urn:xmpp:mds:displayed:0'>
<stanza-id xmlns='urn:xmpp:sid:0'
id='0f710f2b-52ed-4d52-b928-784dad74a52b'
by='juliet@capulet.lit'/>
</displayed>
</item>
<item id='example@conference.shakespeare.lit'>
<displayed xmlns='urn:xmpp:mds:displayed:0'>
<stanza-id xmlns='urn:xmpp:sid:0'
id='ca21deaf-812c-48f1-8f16-339a674f2864'
by='example@conference.shakespeare.lit'/>
</displayed>
</item>
</items>
</pubsub>
</iq>
"""
)
iq = future.result()
item = list(iq["pubsub"]["items"])
self.assertEqual(item[0]["id"], "romeo@montegue.lit")
self.assertEqual(
item[0]["displayed"]["stanza_id"]["id"],
"0f710f2b-52ed-4d52-b928-784dad74a52b",
)
self.assertEqual(item[1]["id"], "example@conference.shakespeare.lit")
self.assertEqual(
item[1]["displayed"]["stanza_id"]["id"],
"ca21deaf-812c-48f1-8f16-339a674f2864",
)
def test_flag_chat(self):
self.xmpp.plugin["xep_0490"].flag_chat(
"romeo@montegue.lit", "0f710f2b-52ed-4d52-b928-784dad74a52b"
)
self.send( # language=XML
"""
<iq type='set' id='1'>
<pubsub xmlns='http://jabber.org/protocol/pubsub'>
<publish node='urn:xmpp:mds:displayed:0'>
<item id='romeo@montegue.lit'>
<displayed xmlns='urn:xmpp:mds:displayed:0'>
<stanza-id xmlns='urn:xmpp:sid:0'
id="0f710f2b-52ed-4d52-b928-784dad74a52b" />
</displayed>
</item>
</publish>
<publish-options>
<x xmlns='jabber:x:data' type='submit'>
<field var='FORM_TYPE' type='hidden'>
<value>http://jabber.org/protocol/pubsub#publish-options</value>
</field>
<field var='pubsub#persist_items'>
<value>1</value>
</field>
<field var='pubsub#max_items'>
<value>max</value>
</field>
<field var='pubsub#send_last_published_item'>
<value>never</value>
</field>
<field var='pubsub#access_model'>
<value>whitelist</value>
</field>
</x>
</publish-options>
</pubsub>
</iq>
""",
use_values=False,
)
def test_notification(self):
handler = unittest.mock.Mock()
self.xmpp.add_event_handler(
"message_displayed_synchronization_publish", handler
)
self.recv( # language=XML
"""
<message from='juliet@capulet.lit' to='juliet@capulet.lit/balcony' type='headline' id='new-displayed-pep-event'>
<event xmlns='http://jabber.org/protocol/pubsub#event'>
<items node='urn:xmpp:mds:displayed:0'>
<item id='romeo@montegue.lit'>
<displayed xmlns='urn:xmpp:mds:displayed:0'>
<stanza-id xmlns='urn:xmpp:sid:0' by='juliet@capulet.lit' id='0423e3a9-d516-493d-bb06-bee0e51ab9fb'/>
</displayed>
</item>
</items>
</event>
</message>
"""
)
handler.assert_called()
msg = handler.call_args[0][0]
self.assertEqual(
msg["pubsub_event"]["items"]["item"]["id"], "romeo@montegue.lit"
)
self.assertEqual(
msg["pubsub_event"]["items"]["item"]["displayed"]["stanza_id"]["id"],
"0423e3a9-d516-493d-bb06-bee0e51ab9fb",
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMessageDisplaySynchronization)