Compare commits

..

2 Commits

Author SHA1 Message Date
nicoco
56004802fa Merge branch 'master' into xep356-iq 2023-12-19 14:13:35 +00:00
Nicolas Cedilnik
8bfe6177f4 xep0356: implement IQ privilege
Also included:

- correctly handle privileges from different
  servers
- check that privileges have been granted before
  attempting to send something and raise
  PermissionError if not
- use dataclass and enums to store permissions instead of
  untyped dict
2023-07-23 15:38:43 +02:00
21 changed files with 106 additions and 561 deletions

View File

@@ -1,6 +0,0 @@
steps:
mypy:
image: python:3
script:
- pip3 install mypy types-setuptools
- mypy slixmpp

View File

@@ -1,9 +0,0 @@
steps:
test_integration:
image: "python:3.11"
secrets: [ci_account1, ci_account1_password, ci_account2, ci_account2_password]
commands:
- apt-get update
- apt-get install -y python3-pip cython3 gpg
- pip3 install emoji aiohttp aiodns
- ./run_integration_tests.py

View File

@@ -1,17 +0,0 @@
steps:
unit_tests:
image: "python:${TAG}"
commands:
- apt-get update
- apt-get install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
matrix:
TAG:
- "3.7"
- "3.9"
- "3.8"
- "3.10"
- "3.11"
- "3.12"

View File

@@ -1,95 +0,0 @@
Projects Using Slixmpp
======================
Applications
------------
sendxmpp-py
~~~~~~~~~~~
sendxmpp is a command line program and is the XMPP equivalent of sendmail. It is a Python version of the original sendxmpp which is written in Perl.
- `Source <https://github.com/moparisthebest/sendxmpp-py>`_
Bots
----
BotLogMauve
~~~~~~~~~~~
XMPP bot which logs groupchat messages. Logs are in text format, with one file per day and per groupchat.
- `Source <https://git.khaganat.net/khaganat/BotLogMauve>`_
LinkBot
~~~~~~~
This bot reveals the title of any shared link in a groupchat for quick content insight.
- `Source <https://git.xmpp-it.net/mario/XMPPBot>`_
llama-bot
~~~~~~~~~
Llama-bot enables engaging communication with the LLM (large language model) of llama.cpp, providing seamless and dynamic conversation with it.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://github.com/decent-im/llama-bot>`_
- `Demo <xmpp:llama@decent.im?message>`_
Morbot
~~~~~~
Morbot is a simple Slixmpp bot that will take new articles from listed RSS feeds and send them to assigned XMPP MUCs.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/Morbot>`_
Slixfeed
~~~~~~~~
Slixfeed aims to be an easy to use and fully-featured news aggregator bot for XMPP. It provides a convenient access to Blogs, Fediverse and News websites along with filtering functionality.
- `Groupchat <xmpp:slixfeed@chat.woodpeckersnest.space?join>`_
- `Source <https://gitgud.io/sjehuda/slixfeed>`_
sms4you
~~~~~~~
sms4you forwards messages from and to SMS and connects either with sms4you-xmpp or sms4you-email to choose the other mean of communication. Nice for receiving or sending SMS, independently from carrying a SIM card.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Homepage <https://sms4you-team.pages.debian.net/sms4you/>`_
- `Source <https://salsa.debian.org/sms4you-team/sms4you>`_
Stable Diffusion
~~~~~~~~~~~~~~~~
XMPP bot that generates digital images from textual descriptions.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Source <https://www.nicoco.fr/blog/2022/08/31/xmpp-bot-stable-diffusion/>`_
WhisperBot
~~~~~~~~~~
XMPP bot that transliterates audio messages using OpenAI's Whisper libraries.
- `Groupchat <xmpp:slixmpp@muc.poez.io?join>`_
- `Source <https://codeberg.org/TheCoffeMaker/WhisperBot>`_
XMPP MUC Message Gateway
~~~~~~~~~~~~~~~~~~~~~~~~
A multipurpose JSON forwarder microservice from HTTP POST to XMPP MUC room over TLSv1.2 with SliXMPP.
- `Source <https://github.com/immanuelfodor/xmpp-muc-message-gateway>`_
Services
--------
AtomToPubsub
~~~~~~~~~~~~
AtomToPubsub is a simple Python script that parses Atom + RSS feeds and pushes the entries to a designated XMPP Pubsub Node.
- `Groupchat <xmpp:movim@conference.movim.eu?join>`_
- `Source <https://github.com/imattau/atomtopubsub>`_
Slidge
~~~~~~
Slidge is a general purpose XMPP gateway framework in Python.
- `Groupchat <xmpp:slidge@conference.nicoco.fr?join>`_
- `Homepage <https://slidge.im/core/>`_
- `Source <https://sr.ht/~nicoco/slidge>`_

View File

@@ -138,8 +138,8 @@ class ClientXMPP(BaseXMPP):
self.credentials['password'] = value self.credentials['password'] = value
def connect(self, address: Optional[Tuple[str, int]] = None, # type: ignore def connect(self, address: Optional[Tuple[str, int]] = None, # type: ignore
use_ssl: Optional[bool] = None, force_starttls: Optional[bool] = None, use_ssl: bool = False, force_starttls: bool = True,
disable_starttls: Optional[bool] = None) -> None: disable_starttls: bool = False) -> None:
"""Connect to the XMPP server. """Connect to the XMPP server.
When no address is given, a SRV lookup for the server will When no address is given, a SRV lookup for the server will
@@ -166,8 +166,8 @@ class ClientXMPP(BaseXMPP):
host, port = (self.boundjid.host, 5222) host, port = (self.boundjid.host, 5222)
self.dns_service = 'xmpp-client' self.dns_service = 'xmpp-client'
XMLStream.connect(self, host, port, use_ssl=use_ssl, return XMLStream.connect(self, host, port, use_ssl=use_ssl,
force_starttls=force_starttls, disable_starttls=disable_starttls) force_starttls=force_starttls, disable_starttls=disable_starttls)
def register_feature(self, name: str, handler: Callable, restart: bool = False, order: int = 5000) -> None: def register_feature(self, name: str, handler: Callable, restart: bool = False, order: int = 5000) -> None:
"""Register a stream feature handler. """Register a stream feature handler.

View File

@@ -9,8 +9,6 @@
import logging import logging
import hashlib import hashlib
from typing import Optional
from slixmpp import Message, Iq, Presence from slixmpp import Message, Iq, Presence
from slixmpp.basexmpp import BaseXMPP from slixmpp.basexmpp import BaseXMPP
from slixmpp.stanza import Handshake from slixmpp.stanza import Handshake
@@ -95,7 +93,7 @@ class ComponentXMPP(BaseXMPP):
for st in Message, Iq, Presence: for st in Message, Iq, Presence:
register_stanza_plugin(st, Error) register_stanza_plugin(st, Error)
def connect(self, host: str = None, port: int = 0, use_ssl: Optional[bool] = None) -> None: def connect(self, host=None, port=None, use_ssl=False):
"""Connect to the server. """Connect to the server.
@@ -106,15 +104,16 @@ class ComponentXMPP(BaseXMPP):
:param use_ssl: Flag indicating if SSL should be used by connecting :param use_ssl: Flag indicating if SSL should be used by connecting
directly to a port using SSL. directly to a port using SSL.
""" """
if host is not None: if host is None:
self.server_host = host host = self.server_host
if port: if port is None:
self.server_port = port port = self.server_port
self.server_name = self.boundjid.host self.server_name = self.boundjid.host
log.debug("Connecting to %s:%s", host, port) log.debug("Connecting to %s:%s", host, port)
XMLStream.connect(self, host=self.server_host, port=self.server_port, use_ssl=use_ssl) return XMLStream.connect(self, host=host, port=port,
use_ssl=use_ssl)
def incoming_filter(self, xml): def incoming_filter(self, xml):
""" """

View File

@@ -37,8 +37,7 @@ class FeatureMechanisms(BasePlugin):
'unencrypted_digest': False, 'unencrypted_digest': False,
'unencrypted_cram': False, 'unencrypted_cram': False,
'unencrypted_scram': True, 'unencrypted_scram': True,
'order': 100, 'order': 100
'tls_version': None,
} }
def plugin_init(self): def plugin_init(self):
@@ -97,20 +96,7 @@ class FeatureMechanisms(BasePlugin):
result[value] = creds.get('email', jid) result[value] = creds.get('email', jid)
elif value == 'channel_binding': elif value == 'channel_binding':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)): if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
version = self.xmpp.socket.version() result[value] = self.xmpp.socket.get_channel_binding()
# As of now, python does not implement anything else
# than tls-unique, which is forbidden on TLSv1.3
# see https://github.com/python/cpython/issues/95341
if version != 'TLSv1.3':
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-unique"
)
elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES:
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-exporter"
)
else:
result[value] = None
else: else:
result[value] = None result[value] = None
elif value == 'host': elif value == 'host':
@@ -135,11 +121,6 @@ class FeatureMechanisms(BasePlugin):
result[value] = True result[value] = True
else: else:
result[value] = False result[value] = False
elif value == 'tls_version':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.version()
elif value == 'binding_proposed':
result[value] = any(x for x in self.mech_list if x.endswith('-PLUS'))
else: else:
result[value] = self.config.get(value, False) result[value] = self.config.get(value, False)
return result return result

View File

@@ -7,8 +7,7 @@ import logging
import hashlib import hashlib
import base64 import base64
from asyncio import Future, Lock from asyncio import Future
from collections import defaultdict
from typing import Optional from typing import Optional
from slixmpp import __version__ from slixmpp import __version__
@@ -95,9 +94,6 @@ class XEP_0115(BasePlugin):
disco.assign_verstring = self.assign_verstring disco.assign_verstring = self.assign_verstring
disco.get_verstring = self.get_verstring disco.get_verstring = self.get_verstring
# prevent concurrent fetches for the same hash
self._locks = defaultdict(Lock)
def plugin_end(self): def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace) self.xmpp['xep_0030'].del_feature(feature=stanza.Capabilities.namespace)
self.xmpp.del_filter('out', self._filter_add_caps) self.xmpp.del_filter('out', self._filter_add_caps)
@@ -141,7 +137,7 @@ class XEP_0115(BasePlugin):
self.xmpp.event('entity_caps', p) self.xmpp.event('entity_caps', p)
async def _process_caps(self, pres: Presence): async def _process_caps(self, pres):
if not pres['caps']['hash']: if not pres['caps']['hash']:
log.debug("Received unsupported legacy caps: %s, %s, %s", log.debug("Received unsupported legacy caps: %s, %s, %s",
pres['caps']['node'], pres['caps']['node'],
@@ -151,11 +147,7 @@ class XEP_0115(BasePlugin):
return return
ver = pres['caps']['ver'] ver = pres['caps']['ver']
async with self._locks[ver]:
await self._process_caps_wrapped(pres, ver)
self._locks.pop(ver, None)
async def _process_caps_wrapped(self, pres: Presence, ver: str):
existing_verstring = await self.get_verstring(pres['from'].full) existing_verstring = await self.get_verstring(pres['from'].full)
if str(existing_verstring) == str(ver): if str(existing_verstring) == str(ver):
return return

View File

@@ -15,32 +15,6 @@ log = logging.getLogger(__name__)
class XEP_0221(BasePlugin): class XEP_0221(BasePlugin):
"""
XEP-0221: Data Forms Media Element
In certain implementations of Data Forms (XEP-0004), it can be
helpful to include media data such as small images. One example is
CAPTCHA Forms (XEP-0158). This plugin implements a method for
including media data in a data form.
Typical use pattern:
.. code-block:: python
self.register_plugin('xep_0221')
self['xep_0050'].add_command(node="showimage",
name="Show my image",
handler=self.form_handler)
def form_handler(self,iq,session):
image_url="https://xmpp.org/images/logos/xmpp-logo.svg"
form=self['xep_0004'].make_form('result','My Image')
form.addField(var='myimage', ftype='text-single', label='My Image', value=image_url)
form.field['myimage']['media'].add_uri(value=image_url, itype="image/svg")
session['payload']=form
return session
"""
name = 'xep_0221' name = 'xep_0221'
description = 'XEP-0221: Data Forms Media Element' description = 'XEP-0221: Data Forms Media Element'

View File

@@ -1,13 +1,8 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net> # Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permissio # See the file LICENSE for copying permissio
from abc import ABC
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from slixmpp.stanza import Message from slixmpp.stanza import Message
from slixmpp.xmlstream import ( from slixmpp.xmlstream import (
ElementBase, ElementBase,
@@ -15,83 +10,14 @@ from slixmpp.xmlstream import (
) )
NS = "urn:xmpp:fallback:0" NS = 'urn:xmpp:fallback:0'
class Fallback(ElementBase): class Fallback(ElementBase):
namespace = NS namespace = NS
name = "fallback" name = 'fallback'
plugin_attrib = "fallback" plugin_attrib = 'fallback'
plugin_multi_attrib = "fallbacks"
interfaces = {"for"}
def _find_fallback(self, fallback_for: str) -> "Fallback":
if self["for"] == fallback_for:
return self
for fallback in self.parent()["fallbacks"]:
if fallback["for"] == fallback_for:
return fallback
raise AttributeError("No fallback for this namespace", fallback_for)
def get_stripped_body(
self, fallback_for: str, element: Literal["body", "subject"] = "body"
) -> str:
"""
Get the body of a message, with the fallback part stripped
:param fallback_for: namespace of the fallback to strip
:param element: set this to "subject" get the stripped subject instead
of body
:return: body (or subject) content minus the fallback part
"""
fallback = self._find_fallback(fallback_for)
start = fallback[element]["start"]
end = fallback[element]["end"]
body = self.parent()[element]
if start == end == 0:
return ""
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
class FallbackMixin(ABC):
namespace = NS
name = NotImplemented
plugin_attrib = NotImplemented
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
return _int_or_zero(self._get_attr("start"))
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
return _int_or_zero(self._get_attr("end"))
class FallbackBody(FallbackMixin, ElementBase):
name = plugin_attrib = "body"
class FallbackSubject(FallbackMixin, ElementBase):
name = plugin_attrib = "subject"
def _int_or_zero(v: str):
try:
return int(v)
except ValueError:
return 0
def register_plugins(): def register_plugins():
register_stanza_plugin(Message, Fallback, iterable=True) register_stanza_plugin(Message, Fallback)
register_stanza_plugin(Fallback, FallbackBody)
register_stanza_plugin(Fallback, FallbackSubject)

View File

@@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
name = "xep_0461" name = "xep_0461"
description = "XEP-0461: Message Replies" description = "XEP-0461: Message Replies"
dependencies = {"xep_0030", "xep_0428"} dependencies = {"xep_0030"}
stanza = stanza stanza = stanza
namespace = stanza.NS namespace = stanza.NS

View File

@@ -2,7 +2,6 @@ from typing import Optional
from slixmpp.stanza import Message from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0428.stanza import Fallback
NS = "urn:xmpp:reply:0" NS = "urn:xmpp:reply:0"
@@ -13,11 +12,39 @@ class Reply(ElementBase):
plugin_attrib = "reply" plugin_attrib = "reply"
interfaces = {"id", "to"} interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_fallback_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end:
return body[start:end]
else:
return ""
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None): def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
""" """
Add plain text fallback for clients not implementing XEP-0461. Add plain text fallback for clients not implementing XEP-0461.
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will ``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
prepend "> Bob:\n> Some text\n" to the body of the message, and set the prepend "> Bob:\n> Some text\n" to the body of the message, and set the
fallback_body attributes accordingly, so that clients implementing fallback_body attributes accordingly, so that clients implementing
XEP-0461 can hide the fallback text. XEP-0461 can hide the fallback text.
@@ -30,27 +57,39 @@ class Reply(ElementBase):
if nickname: if nickname:
quoted = "> " + nickname + ":\n" + quoted quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"] msg["body"] = quoted + msg["body"]
fallback = Fallback() msg["feature_fallback"]["for"] = NS
fallback["for"] = NS msg["feature_fallback"]["fallback_body"]["start"] = 0
fallback["body"]["start"] = 0 msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
def get_fallback_body(self) -> str:
msg = self.parent() class FallBackBody(ElementBase):
for fallback in msg["fallbacks"]: # According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
if fallback["for"] == NS: # this should be a multi_attrib *but* since it's a protoXEP, we'll see...
break namespace = FeatureFallBack.namespace
else: name = "body"
return "" plugin_attrib = "fallback_body"
start = fallback["body"]["start"] interfaces = {"start", "end"}
end = fallback["body"]["end"]
body = msg["body"] def set_start(self, v: int):
if start <= end: self._set_attr("start", str(v))
return body[start:end]
else: def get_start(self):
return "" try:
return int(self._get_attr("start"))
except ValueError:
return 0
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
try:
return int(self._get_attr("end"))
except ValueError:
return 0
def register_plugins(): def register_plugins():
register_stanza_plugin(Message, Reply) register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@@ -3,7 +3,6 @@
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
import atexit
import unittest import unittest
from queue import Queue from queue import Queue
from xml.parsers.expat import ExpatError from xml.parsers.expat import ExpatError
@@ -751,9 +750,3 @@ class SlixTest(unittest.TestCase):
Error.namespace = 'jabber:client' Error.namespace = 'jabber:client'
for st in Message, Iq, Presence: for st in Message, Iq, Presence:
register_stanza_plugin(st, Error) register_stanza_plugin(st, Error)
@atexit.register
def cleanup():
loop = asyncio.get_event_loop()
loop.close()

View File

@@ -181,7 +181,7 @@ class SCRAM(Mech):
channel_binding = True channel_binding = True
required_credentials = {'username', 'password'} required_credentials = {'username', 'password'}
optional_credentials = {'authzid', 'channel_binding'} optional_credentials = {'authzid', 'channel_binding'}
security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'} security = {'encrypted', 'unencrypted_scram'}
def setup(self, name): def setup(self, name):
self.use_channel_binding = False self.use_channel_binding = False
@@ -244,15 +244,11 @@ class SCRAM(Mech):
self.cnonce = bytes(('%s' % random.random())[2:]) self.cnonce = bytes(('%s' % random.random())[2:])
gs2_cbind_flag = b'n' gs2_cbind_flag = b'n'
if self.security_settings['binding_proposed']: if self.credentials['channel_binding']:
if self.credentials['channel_binding'] and \ if self.use_channel_binding:
self.use_channel_binding: gs2_cbind_flag = b'p=tls-unique'
if self.security_settings['tls_version'] != 'TLSv1.3': else:
gs2_cbind_flag = b'p=tls-unique' gs2_cbind_flag = b'y'
else:
gs2_cbind_flag = b'p=tls-exporter'
else:
gs2_cbind_flag = b'y'
authzid = b'' authzid = b''
if self.credentials['authzid']: if self.credentials['authzid']:
@@ -284,7 +280,7 @@ class SCRAM(Mech):
raise SASLCancelled('Invalid nonce') raise SASLCancelled('Invalid nonce')
cbind_data = b'' cbind_data = b''
if self.use_channel_binding and self.credentials['channel_binding']: if self.use_channel_binding:
cbind_data = self.credentials['channel_binding'] cbind_data = self.credentials['channel_binding']
cbind_input = self.gs2_header + cbind_data cbind_input = self.gs2_header + cbind_data
channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'') channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')

View File

@@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
__all__ = ['JID', 'StanzaBase', 'ElementBase', __all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'tostring', 'highlight', 'XMLStream', 'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT', 'register_stanza_plugin'] 'RESPONSE_TIMEOUT']

View File

@@ -1243,7 +1243,7 @@ class ElementBase(object):
self.init_plugin(item.__class__.plugin_multi_attrib) self.init_plugin(item.__class__.plugin_multi_attrib)
else: else:
self.iterables.append(item) self.iterables.append(item)
item.parent = weakref.ref(self)
return self return self
def appendxml(self, xml: ET.Element) -> ElementBase: def appendxml(self, xml: ET.Element) -> ElementBase:

View File

@@ -290,8 +290,8 @@ class XMLStream(asyncio.BaseProtocol):
self.xml_depth = 0 self.xml_depth = 0
self.xml_root = None self.xml_root = None
self.force_starttls = True self.force_starttls = None
self.disable_starttls = False self.disable_starttls = None
self.waiting_queue = asyncio.Queue() self.waiting_queue = asyncio.Queue()
@@ -405,9 +405,8 @@ class XMLStream(asyncio.BaseProtocol):
self.disconnected.set_result(True) self.disconnected.set_result(True)
self.disconnected = asyncio.Future() self.disconnected = asyncio.Future()
def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = None, def connect(self, host: str = '', port: int = 0, use_ssl: Optional[bool] = False,
force_starttls: Optional[bool] = None, force_starttls: Optional[bool] = True, disable_starttls: Optional[bool] = False) -> None:
disable_starttls: Optional[bool] = None) -> None:
"""Create a new socket and connect to the server. """Create a new socket and connect to the server.
:param host: The name of the desired server for the connection. :param host: The name of the desired server for the connection.

View File

@@ -1,149 +0,0 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza
from slixmpp.plugins import xep_0461
from slixmpp.plugins import xep_0444
class TestFallback(SlixTest):
def setUp(self):
stanza.register_plugins()
def testSingleFallbackBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackSubject(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["subject"]["start"] = 0
message["fallback"]["subject"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<subject start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackWholeBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
def testMultiFallback(self):
message = Message()
f1 = stanza.Fallback()
f1["for"] = "ns1"
f2 = stanza.Fallback()
f2["for"] = "ns2"
message.append(f1)
message.append(f2)
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
</message>
""",
)
for i, fallback in enumerate(message["fallbacks"], start=1):
self.assertEqual(fallback["for"], f"ns{i}")
def testStripFallbackPartOfBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
</fallback>
</message>
""",
)
self.assertEqual(
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
)
def testStripWholeBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
def testStripMultiFallback(self):
message = Message()
message["body"] = "> huuuuu\n👍"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
reaction_fallback = stanza.Fallback()
reaction_fallback["for"] = xep_0444.stanza.NS
reaction_fallback.enable("body")
message.append(reaction_fallback)
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)

View File

@@ -1,13 +1,11 @@
import unittest import unittest
from slixmpp import Message from slixmpp import Message
from slixmpp.test import SlixTest from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
from slixmpp.plugins.xep_0461 import stanza from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest): class TestReply(SlixTest):
def setUp(self): def setUp(self):
fallback_stanza.register_plugins()
stanza.register_plugins() stanza.register_plugins()
def testReply(self): def testReply(self):
@@ -28,9 +26,9 @@ class TestReply(SlixTest):
def testFallback(self): def testFallback(self):
message = Message() message = Message()
message["body"] = "12345\nrealbody" message["body"] = "12345\nrealbody"
message["fallback"]["for"] = "NS" message["feature_fallback"]["for"] = "NS"
message["fallback"]["body"]["start"] = 0 message["feature_fallback"]["fallback_body"]["start"] = 0
message["fallback"]["body"]["end"] = 6 message["feature_fallback"]["fallback_body"]["end"] = 6
self.check( self.check(
message, message,
@@ -44,18 +42,18 @@ class TestReply(SlixTest):
""", """,
) )
assert message["fallback"].get_stripped_body("NS") == "realbody" assert message["feature_fallback"].get_stripped_body() == "realbody"
def testAddFallBackHelper(self): def testAddFallBackHelper(self):
msg = Message() msg = Message()
msg["body"] = "Great" msg["body"] = "Great"
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?") msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
# ugly dedent but the test does not pass without it
self.check( self.check(
msg, # language=XML msg,
""" """
<message xmlns="jabber:client" type="normal"> <message xmlns="jabber:client" type="normal">
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body> <body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
<reply xmlns="urn:xmpp:reply:0" />
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0"> <fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
<body start='0' end='33' /> <body start='0' end='33' />
</fallback> </fallback>
@@ -69,8 +67,8 @@ class TestReply(SlixTest):
msg = Message() msg = Message()
msg["body"] = "Great" msg["body"] = "Great"
msg["reply"].add_quoted_fallback(body) msg["feature_fallback"].add_quoted_fallback(body)
body2 = msg["reply"].get_fallback_body() body2 = msg["feature_fallback"].get_fallback_body()
self.assertTrue(body2 == quoted, body2) self.assertTrue(body2 == quoted, body2)

View File

@@ -1,76 +0,0 @@
import logging
import unittest
from slixmpp.test import SlixTest
class TestCaps(SlixTest):
def setUp(self):
self.stream_start(plugins=["xep_0115"])
def testConcurrentSameHash(self):
"""
Check that we only resolve a given ver string to a disco info once,
even if we receive several presences with that same ver string
consecutively.
"""
self.recv( # language=XML
"""
<presence from='romeo@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.recv( # language=XML
"""
<presence from='i-dont-know-much-shakespeare@montague.lit/orchard'>
<c xmlns='http://jabber.org/protocol/caps'
hash='sha-1'
node='a-node'
ver='h0TdMvqNR8FHUfFG1HauOLYZDqE='/>
</presence>
"""
)
self.send( # language=XML
"""
<iq xmlns="jabber:client"
id="1"
to="romeo@montague.lit/orchard"
type="get">
<query xmlns="http://jabber.org/protocol/disco#info"
node="a-node#h0TdMvqNR8FHUfFG1HauOLYZDqE="/>
</iq>
"""
)
self.send(None)
self.recv( # language=XML
"""
<iq from='romeo@montague.lit/orchard'
id='1'
type='result'>
<query xmlns='http://jabber.org/protocol/disco#info'
node='a-nodes#h0TdMvqNR8FHUfFG1HauOLYZDqE='>
<identity category='client' name='a client' type='pc'/>
<feature var='http://jabber.org/protocol/caps'/>
</query>
</iq>
"""
)
self.send(None)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"romeo@montague.lit/orchard", "http://jabber.org/protocol/caps"
)
)
self.assertTrue(
self.xmpp["xep_0030"].supports(
"i-dont-know-much-shakespeare@montague.lit/orchard",
"http://jabber.org/protocol/caps",
)
)
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestCaps)

View File

@@ -9,8 +9,8 @@ class TestReply(SlixTest):
def testFallBackBody(self): def testFallBackBody(self):
async def on_reply(msg): async def on_reply(msg):
start = msg["fallback"]["body"]["start"] start = msg["feature_fallback"]["fallback_body"]["start"]
end = msg["fallback"]["body"]["end"] end = msg["feature_fallback"]["fallback_body"]["end"]
self.xmpp["xep_0461"].send_reply( self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(), reply_to=msg.get_from(),
reply_id=msg.get_id(), reply_id=msg.get_id(),