Compare commits

...

75 Commits

Author SHA1 Message Date
Maxime “pep” Buquet
752f4258df Release 1.8.3
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-11-12 21:39:50 +01:00
Maxime “pep” Buquet
b60b1b985d CVE-2022-45197: Fix missing certificate hostname validation
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-11-12 21:36:11 +01:00
Maxime Buquet
e93e43df66 Merge branch 'fix-adhoc-crash' into 'master'
fix crash on adhoc command with bad clients

See merge request poezio/slixmpp!222
2022-11-06 12:55:56 +00:00
nicoco
cfd1af88eb fix crash on adhoc command with bad clients
If a command has no "next" handler, slixmpp
crashes if a client acts as if there was a
next step.
This raises an XMPPError instead
2022-11-06 08:12:37 +01:00
Maxime Buquet
7a0fb97083 Merge branch 'restore-stringprep-warning' into 'master'
logger: remove NullHandler for the "slixmpp" handler

See merge request poezio/slixmpp!220
2022-10-03 08:20:19 +00:00
nicoco
189bbcce19 logger: remove NullHandler for the "slixmpp" handler
This does not seem to accomplish anything besides
hiding the "using the slow, pure python stringprep"
warning, unless you import logging and add another
handler before to the "slixmpp" logger *BEFORE*
importing slixmpp.
2022-10-03 10:16:02 +02:00
Maxime Buquet
79607e43f1 Merge branch 'fix-0084' into 'master'
xep_0084: fix typo and getters

See merge request poezio/slixmpp!219
2022-09-23 08:59:05 +00:00
nicoco
e062181f84 xep_0084: fix typo and getters
"with" instead of "width"; wrong syntax for getters
2022-09-22 23:19:16 +02:00
mathieui
97b0c7ffac Merge branch 'xep0055' into 'master'
Add XEP-0055 (Jabber Search)

See merge request poezio/slixmpp!204
2022-09-12 18:18:51 +00:00
nicoco
c2ece57dee Add XEP-0055 (Jabber Search) 2022-09-11 23:22:44 +02:00
mathieui
afdfa1ee57 Merge branch 'xep0363-as-component' into 'master'
XEP-0363: Fix upload service auto discovery for components

See merge request poezio/slixmpp!207
2022-09-09 16:07:46 +00:00
mathieui
cba5dc7ddc Merge branch 'component-ifrom' into 'master'
xep_0030: fix ifrom for disco queries sent by components

See merge request poezio/slixmpp!216
2022-09-09 16:06:38 +00:00
mathieui
b3a6c7a4ea Merge branch 'aiodns-gethostbyname' into 'master'
Use gethostbyname when using aiodns

See merge request poezio/slixmpp!212
2022-09-09 16:04:14 +00:00
mathieui
11e27d1d7d Merge branch 'mypy-workaround' into 'master'
Fix gitlab pipelines

See merge request poezio/slixmpp!217
2022-09-09 16:02:18 +00:00
nicoco
fbdff30dda fix emoji==2.0.0 compatibility 2022-08-29 00:59:14 +02:00
nicoco
62701bc562 xmlstream: ignore task type (mypy)
This is not satisfying, but having gitlab pipelines running would be nice, wouldn't it?
2022-08-29 00:20:36 +02:00
nicoco
b14918808c xep_0030: fix ifrom for disco queries sent by components
xep_0030 automatically sends disco queries with ifrom=None
Prosody's mod_component had a workaround to allow this non-standard behaviour, but it will change in a future release.
2022-08-29 00:03:55 +02:00
Link Mauve
f5cb9fe66b Merge branch 'doap-sasl-anon' into 'master'
DOAP: Add 0175. It's been here forever

See merge request poezio/slixmpp!215
2022-08-23 09:18:02 +00:00
Maxime “pep” Buquet
8bd53f7098 DOAP: Add 0175. It's been here forever
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-08-23 11:12:26 +02:00
Link Mauve
c955cf1c66 Merge branch 'xep-0461' into 'master'
XEP-0461: Message Replies

See merge request poezio/slixmpp!213
2022-08-21 12:24:08 +00:00
Maxime Buquet
6904ae63f5 Merge branch 'optional-setters' into 'master'
JID: Make node and resource setters accept None

See merge request poezio/slixmpp!214
2022-08-21 12:22:17 +00:00
Emmanuel Gil Peyrot
1caada197a JID: Make node and resource setters accept None
This is the proper way to unset these.
2022-08-21 14:18:53 +02:00
nicoco
450aaa7f86 XEP-0461: Message Replies 2022-08-20 13:35:38 +02:00
Daniel Roschka
d43c83800e Use gethostbyname when using aiodns
Slixmpp behaves differently when resolving host names, whether aiodns
is used or not. With aiodns only DNS is used, while without
`asyncio.loop.getaddrinfo()` is used instead, which utilizes the Name
Service Switch (NSS) to resolve host names by other means (hosts-file,
mDNS, ...) as well.

To unify the behavior, this replaces the use of
`aiodns.DNSResolver().query()` with
`aiodns.DNSResolver().gethostbyname()`. This makes the behavior
resolving host names more consistent between using aiodns or not, as
both now honor the NSS configuration and removes the need for the
previously existing workaround to resolve localhost.
2022-07-31 13:15:25 +02:00
nicoco
14786abd34 Revert "Make it clear that filename does *not* have to be path, and is mandatory"
This reverts commit ed820bf551.
2022-07-16 20:23:48 +02:00
Maxime Buquet
1f47acaec1 Merge branch 'fix-xep_0115-static' into 'master'
XEP-0115: Make get_caps() async

See merge request poezio/slixmpp!203
2022-07-16 19:02:06 +02:00
nicoco
ed820bf551 Make it clear that filename does *not* have to be path, and is mandatory 2022-07-16 17:17:22 +02:00
nicoco
afedfa4b06 Merge branch 'master' of https://lab.louiz.org/poezio/slixmpp 2022-07-16 17:08:21 +02:00
Maxime Buquet
5998069203 Merge branch 'mini_dateutil-no-more' into 'master'
Remove mini_dateutil and replace it with datetime

See merge request poezio/slixmpp!210
2022-07-12 13:39:02 +02:00
Maxime Buquet
356f16f5af Merge branch 'prevent-naive-datetime' into 'master'
XEP-0203: Prevent naïve datetime from being passed

Closes #3471

See merge request poezio/slixmpp!211
2022-07-12 13:38:52 +02:00
Link Mauve
b8f301b26f Merge branch 'affs-outcast-jid' into 'master'
xep_0045: Require JID when setting outcast affiliation

See merge request poezio/slixmpp!188
2022-07-12 13:28:02 +02:00
Link Mauve
ffaeb31219 Merge branch 'nicoco-master-patch-90506' into 'master'
Add xep_0356 to plugins.__all__

See merge request poezio/slixmpp!201
2022-07-12 13:26:54 +02:00
Link Mauve
9560f39de7 Merge branch 'xep0356-v0.4' into 'master'
XEP-0356: namespace version bump

See merge request poezio/slixmpp!206
2022-07-12 13:26:14 +02:00
Link Mauve
f7a38a028a Merge branch 'default-to-CAs' into 'master'
xmlstream: load default CA store by default

See merge request poezio/slixmpp!209
2022-07-12 13:24:32 +02:00
Emmanuel Gil Peyrot
65d70fe417 XEP-0203: Prevent naïve datetime from being passed
The specification says “The format MUST adhere to the dateTime format
specified in XEP-0082 and MUST be expressed in UTC.”

We now respect this requirement, by rejecting naïve datetimes with a
ValueError exception, and converting the passed datetime to UTC.

Fixes #3471.
2022-07-12 13:15:31 +02:00
Emmanuel Gil Peyrot
108a256537 thirdparty: Remove the mini_dateutil module
The builtin datetime module already provides the same features, there is
no need to carry that code any longer.
2022-07-12 12:55:20 +02:00
Emmanuel Gil Peyrot
78a5f79240 XEP-0202: Remove usage of mini_dateutil
Like the previous commit, we now use the builtin datetime module always.
2022-07-12 12:54:35 +02:00
Emmanuel Gil Peyrot
fc63768cfc XEP-0082: Move from mini_dateutil to datetime
Since datetime got merged into Python (probably around py3k), it’s now
usable for all of our needs and so we can do away with the old fallback.
2022-07-12 12:51:22 +02:00
Maxime “pep” Buquet
90e79af18a xmlstream: load default CA store by default
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-07-11 14:46:00 +02:00
Link Mauve
5e5a741994 Merge branch 'dns-reconnect' into 'master'
Fix delayed reconnect after DNS failure

See merge request poezio/slixmpp!208
2022-06-22 11:50:12 +02:00
Georg Lukas
b44ab17c8f Fix delayed reconnect after DNS failure
The XML stream will re-schedule a reconnect on socket errors, except
for DNS failures. If a user has no uplink connection, then DNS will
also fail, preventing an automatic reconnection.

This patch consolidates the two code paths and sets a maximum back-off
time of 5min (300s).
2022-06-22 11:39:44 +02:00
Nicolas Cedilnik
afb5419b68 XEP-0363: Fix upload service auto discovery for components 2022-06-18 06:09:36 +02:00
Nicolas Cedilnik
a1a5f3984d XEP-0356: namespace version bump 2022-06-09 16:45:36 +02:00
Nicolas Cedilnik
8eb8769862 XEP-0115: Make get_caps() async 2022-06-09 15:33:02 +02:00
Link Mauve
5ceb48bbcd Merge branch 'origin-id-non-default' into 'master'
Change origin-id defaults to False

See merge request poezio/slixmpp!202
2022-05-28 17:44:42 +02:00
Maxime “pep” Buquet
916894ab7c Change origin-id defaults to False
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-05-28 13:50:09 +02:00
Nicoco K
2b45c22fcb Add xep_0356 to plugins.__all__ 2022-05-19 14:40:45 +02:00
mathieui
566e7dc771 Merge branch 'nicoco-master-patch-38938' into 'master'
Fix typo in chat markers (fixes #3469)

Closes #3469

See merge request poezio/slixmpp!199
2022-05-15 16:44:10 +02:00
Nicoco K
aa492f905c Fix typo in chat markers (fixes #3469) 2022-05-15 07:48:00 +02:00
mathieui
e1a240ec6c Merge branch 'release-version-1.8.2' into 'master'
Update version to 1.8.2

See merge request poezio/slixmpp!197
2022-04-06 22:44:40 +02:00
mathieui
771839242c Update version to 1.8.2 2022-04-06 22:41:40 +02:00
mathieui
8bac744009 Merge branch 'starttls-exception' into 'master'
features_starttls/Proceed: raise exception on InvalidCABundle

See merge request poezio/slixmpp!196
2022-04-05 20:15:49 +02:00
Maxime “pep” Buquet
88d2f5dae4 features_starttls/Proceed: raise exception on InvalidCABundle
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-04-05 19:42:49 +02:00
mathieui
f7902d056e Merge branch 'exn-invalidcabundle-arg' into 'master'
Pass in useful value when raising InvalidCABundle

See merge request poezio/slixmpp!195
2022-04-05 19:42:06 +02:00
Maxime “pep” Buquet
41afbb10df Pass in useful value when raising InvalidCABundle
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-04-05 01:24:14 +02:00
mathieui
aca4addb9c Merge branch 'fix-old-session' into 'master'
stream features: fix old "session" establishment

Closes #3468

See merge request poezio/slixmpp!193
2022-04-01 21:01:31 +02:00
mathieui
914ce40fd5 stream features: fix old "session" establishment
As it is and old and deprecated code path, nobody noticed that it was
broken by the new filtering code.

Fix #3468
2022-04-01 20:56:02 +02:00
Maxime Buquet
82ff68cfac Merge branch 'upload-encrypt' into 'master'
XEP-0454: OMEMO Media Sharing

See merge request poezio/slixmpp!189
2022-03-21 17:01:40 +01:00
Maxime “pep” Buquet
28d44ecf74 xep_0454: str.removeprefix is available since 3.9
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-20 21:34:55 +01:00
Maxime “pep” Buquet
06e4e480c1 xep_0454: keep original filename extension if available
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-20 01:02:14 +01:00
Maxime “pep” Buquet
82ee250295 xep_0454: use staticmethods where possible
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-20 01:02:14 +01:00
Maxime “pep” Buquet
53d38a8115 setup.py: add cryptography in extras_require; update example
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-20 01:02:14 +01:00
Maxime “pep” Buquet
0fba8fd7f8 doap: add 454 entry
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
b899baabd8 xep_0454: also include finalize's result in the payload
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
acad41f3b7 xep_0454: Don't force content-type to application/octect-stream
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
bde5aaaf3e examples/http_upload.py: Add --encrypt parameter to send encrypted files
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
7222ade0dd xep_0454: Ensure format_url returns a str
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
14a6c7801d tests: XEP-0454
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
b52540e49f xep_0454: implement decrypt method
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
c1aeab328b xep_0454: use streaming API from CipherContext
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
51644e301b xep_0454: Add wrapper to xep_363's upload_file
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
bc8af3cc61 xep_0454: new plugin. OMEMO Media Sharing
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
3c08f471cf xep_0363: change filename to Path
This shouldn't break anything as I'm not using Path specific APIs

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
54b724c28b examples/http_upload: Add some typing
Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-19 10:31:34 +01:00
Maxime “pep” Buquet
60df4ef7aa xep_0045: Require JID when setting outcast affiliation
Found out when reading poezio/poezio#3536.

“An admin or owner can ban one or more users from a room. The ban MUST
be performed based on the occupant's bare JID.”

Signed-off-by: Maxime “pep” Buquet <pep@bouah.net>
2022-03-16 16:12:20 +01:00
42 changed files with 1000 additions and 365 deletions

View File

@@ -20,7 +20,7 @@ test:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test-3.10:
@@ -31,7 +31,7 @@ test-3.10:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test-3.11:
@@ -43,7 +43,7 @@ test-3.11:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test_integration:

View File

@@ -455,6 +455,14 @@
<xmpp:since>1.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
<implements>
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0175.html"/>
<xmpp:status>complete</xmpp:status>
<xmpp:version>1.2</xmpp:version>
<xmpp:since>1.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
<implements>
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0184.html"/>
@@ -892,6 +900,14 @@
<xmpp:since>1.6.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
<implements>
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0454.html"/>
<xmpp:status>no thumbnail support</xmpp:status>
<xmpp:version>0.1.0</xmpp:version>
<xmpp:since>1.8.1</xmpp:since>
</xmpp:SupportedXep>
</implements>
<release>
<Version>
@@ -1026,5 +1042,19 @@
<file-release rdf:resource="https://lab.louiz.org/poezio/slixmpp/-/archive/slix-1.8.1/slixmpp-slix-1.8.1.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.2</revision>
<created>2022-04-06</created>
<file-release rdf:resource="https://lab.louiz.org/poezio/slixmpp/-/archive/slix-1.8.2/slixmpp-slix-1.8.2.tar.gz"/>
</Version>
</release>
<release>
<Version>
<revision>1.8.3</revision>
<created>2022-11-12</created>
<file-release rdf:resource="https://lab.louiz.org/poezio/slixmpp/-/archive/slix-1.8.3/slixmpp-slix-1.8.3.tar.gz"/>
</Version>
</release>
</Project>
</rdf:RDF>

View File

@@ -0,0 +1,18 @@
XEP-0055: Jabber search
=======================
.. module:: slixmpp.plugins.xep_0055
.. autoclass:: XEP_0055
:members:
:exclude-members: session_bind, plugin_init, plugin_end
Stanza elements
---------------
.. automodule:: slixmpp.plugins.xep_0055.stanza
:members:
:undoc-members:

View File

@@ -5,11 +5,16 @@
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from typing import Optional
import sys
import logging
from pathlib import Path
from getpass import getpass
from argparse import ArgumentParser
import slixmpp
from slixmpp import JID
from slixmpp.exceptions import IqTimeout
log = logging.getLogger(__name__)
@@ -21,20 +26,40 @@ class HttpUpload(slixmpp.ClientXMPP):
A basic client asking an entity if they confirm the access to an HTTP URL.
"""
def __init__(self, jid, password, recipient, filename, domain=None):
def __init__(
self,
jid: JID,
password: str,
recipient: JID,
filename: Path,
domain: Optional[JID] = None,
encrypted: bool = False,
):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.recipient = recipient
self.filename = filename
self.domain = domain
self.encrypted = encrypted
self.add_event_handler("session_start", self.start)
async def start(self, event):
log.info('Uploading file %s...', self.filename)
try:
url = await self['xep_0363'].upload_file(
self.filename, domain=self.domain, timeout=10
upload_file = self['xep_0363'].upload_file
if self.encrypted and not self['xep_0454']:
print(
'The xep_0454 module isn\'t available. '
'Ensure you have \'cryptography\' '
'from extras_require installed.',
file=sys.stderr,
)
return
elif self.encrypted:
upload_file = self['xep_0454'].upload_file
url = await upload_file(
self.filename, domain=self.domain, timeout=10,
)
except IqTimeout:
raise TimeoutError('Could not send message in time')
@@ -79,6 +104,10 @@ if __name__ == '__main__':
parser.add_argument("--domain",
help="Domain to use for HTTP File Upload (leave out for your own servers)")
parser.add_argument("-e", "--encrypt", dest="encrypted",
help="Whether to encrypt", action="store_true",
default=False)
args = parser.parse_args()
# Setup logging.
@@ -86,15 +115,41 @@ if __name__ == '__main__':
format='%(levelname)-8s %(message)s')
if args.jid is None:
args.jid = input("Username: ")
args.jid = JID(input("Username: "))
if args.password is None:
args.password = getpass("Password: ")
xmpp = HttpUpload(args.jid, args.password, args.recipient, args.file, args.domain)
domain = args.domain
if domain is not None:
domain = JID(domain)
if args.encrypted:
print(
'You are using the --encrypt flag. '
'Be aware that the transport being used is NOT end-to-end '
'encrypted. The server will be able to decrypt the file.',
file=sys.stderr,
)
xmpp = HttpUpload(
jid=args.jid,
password=args.password,
recipient=JID(args.recipient),
filename=Path(args.file),
domain=domain,
encrypted=args.encrypted,
)
xmpp.register_plugin('xep_0066')
xmpp.register_plugin('xep_0071')
xmpp.register_plugin('xep_0128')
xmpp.register_plugin('xep_0363')
try:
xmpp.register_plugin('xep_0454')
except slixmpp.plugins.base.PluginNotFound:
log.error(
'Could not load xep_0454. '
'Ensure you have \'cryptography\' from extras_require installed.'
)
# Connect to the XMPP server and start processing XMPP stanzas.
xmpp.connect()

View File

@@ -86,10 +86,16 @@ setup(
package_data={'slixmpp': ['py.typed']},
packages=packages,
ext_modules=ext_modules,
install_requires=['aiodns>=1.0', 'pyasn1', 'pyasn1_modules', 'typing_extensions; python_version < "3.8.0"'],
install_requires=[
'aiodns>=1.0',
'pyasn1',
'pyasn1_modules',
'typing_extensions; python_version < "3.8.0"',
],
extras_require={
'XEP-0363': ['aiohttp'],
'XEP-0444 compliance': ['emoji'],
'XEP-0454': ['cryptography'],
'Safer XML parsing': ['defusedxml'],
},
classifiers=CLASSIFIERS,

View File

@@ -5,7 +5,6 @@
# See the file LICENSE for copying permission.
import logging
from os import getenv
logging.getLogger(__name__).addHandler(logging.NullHandler())
# Use defusedxml if wanted
# Since enabling it can have adverse consequences for the programs using

View File

@@ -140,7 +140,7 @@ class BaseXMPP(XMLStream):
self.use_presence_ids = True
#: XEP-0359 <origin-id/> tag that gets added to <message/> stanzas.
self.use_origin_id = True
self.use_origin_id = False
#: The API registry is a way to process callbacks based on
#: JID+node combinations. Each callback in the registry is

View File

@@ -3,8 +3,12 @@
# Copyright (C) 2011 Nathanael C. Fritz
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.xmlstream import StanzaBase, ElementBase
from typing import Set, ClassVar
from slixmpp.xmlstream import StanzaBase, ElementBase
from slixmpp.xmlstream.xmlstream import InvalidCABundle
import logging
log = logging.getLogger(__name__)
class STARTTLS(StanzaBase):
@@ -36,6 +40,12 @@ class Proceed(StanzaBase):
namespace = 'urn:ietf:params:xml:ns:xmpp-tls'
interfaces: ClassVar[Set[str]] = set()
def exception(self, e: Exception) -> None:
log.exception('Error handling {%s}%s stanza',
self.namespace, self.name)
if isinstance(e, InvalidCABundle):
raise e
class Failure(StanzaBase):
"""

View File

@@ -368,7 +368,7 @@ class JID:
return self._node
@node.setter
def node(self, value: str):
def node(self, value: Optional[str]):
self._node = _validate_node(value)
self._update_bare_full()
@@ -386,7 +386,7 @@ class JID:
return self._resource
@resource.setter
def resource(self, value: str):
def resource(self, value: Optional[str]):
self._resource = _validate_resource(value)
self._update_bare_full()

View File

@@ -1,4 +1,3 @@
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz
# This file is part of Slixmpp.
@@ -24,6 +23,7 @@ __all__ = [
'xep_0049', # Private XML Storage
'xep_0050', # Ad-hoc Commands
'xep_0054', # vcard-temp
'xep_0055', # Jabber Search
'xep_0059', # Result Set Management
'xep_0060', # Pubsub (Client)
'xep_0065', # SOCKS5 Bytestreams
@@ -93,6 +93,7 @@ __all__ = [
'xep_0335', # JSON Containers
'xep_0352', # Client State Indication
'xep_0353', # Jingle Message Initiation
'xep_0356', # Privileged entity
'xep_0359', # Unique and Stable Stanza IDs
'xep_0363', # HTTP File Upload
'xep_0369', # MIX-CORE

View File

@@ -385,6 +385,8 @@ class XEP_0030(BasePlugin):
local = True
ifrom = kwargs.pop('ifrom', None)
if self.xmpp.is_component and ifrom is None:
ifrom = self.xmpp.boundjid
if local:
log.debug("Looking up local disco#info data "
"for %s, node %s.", jid, node)

View File

@@ -493,6 +493,8 @@ class XEP_0045(BasePlugin):
"""
if affiliation not in AFFILIATIONS:
raise ValueError('%s is not a valid affiliation' % affiliation)
if affiliation == 'outcast' and not jid:
raise ValueError('Outcast affiliation requires a using a jid')
if not any((jid, nick)):
raise ValueError('One of jid or nick must be set')
iq = self.xmpp.make_iq_set(ito=room, ifrom=ifrom)

View File

@@ -620,6 +620,8 @@ class XEP_0050(BasePlugin):
async def _await_if_needed(handler, *args):
if handler is None:
raise XMPPError("bad-request", text="The command is completed")
if asyncio.iscoroutinefunction(handler):
log.debug(f"%s is async", handler)
return await handler(*args)

View File

@@ -0,0 +1,6 @@
from slixmpp.plugins.base import register_plugin
from .search import XEP_0055
register_plugin(XEP_0055)

View File

@@ -0,0 +1,89 @@
import logging
from slixmpp import CoroutineCallback, StanzaPath, Iq, register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import StanzaBase
from . import stanza
class XEP_0055(BasePlugin):
"""
XEP-0055: Jabber Search
The config options are only useful for a "server-side" search feature,
and if the ``provide_search`` option is set to True.
API
===
``search_get_form``: customize the search form content (ie fields)
``search_query``: return search results
"""
name = "xep_0055"
description = "XEP-0055: Jabber search"
dependencies = {"xep_0004", "xep_0030"}
stanza = stanza
default_config = {
"form_fields": {"first", "last"},
"form_instructions": "",
"form_title": "",
"provide_search": True
}
def plugin_init(self):
register_stanza_plugin(Iq, stanza.Search)
register_stanza_plugin(stanza.Search, self.xmpp["xep_0004"].stanza.Form)
if self.provide_search:
self.xmpp["xep_0030"].add_feature(stanza.Search.namespace)
self.xmpp.register_handler(
CoroutineCallback(
"search",
StanzaPath("/iq/search"),
self._handle_search,
)
)
self.api.register(self._get_form, "search_get_form")
self.api.register(self._get_results, "search_query")
async def _handle_search(self, iq: StanzaBase):
if iq["search"]["form"].get_values():
reply = await self.api["search_query"](None, None, iq.get_from(), iq)
reply["search"]["form"]["type"] = "result"
else:
reply = await self.api["search_get_form"](None, None, iq.get_from(), iq)
reply["search"]["form"].add_field(
"FORM_TYPE", value=stanza.Search.namespace, ftype="hidden"
)
reply.send()
async def _get_form(self, jid, node, ifrom, iq):
reply = iq.reply()
form = reply["search"]["form"]
form["title"] = self.form_title
form["instructions"] = self.form_instructions
for field in self.form_fields:
form.add_field(field)
return reply
async def _get_results(self, jid, node, ifrom, iq):
reply = iq.reply()
form = reply["search"]["form"]
form["type"] = "result"
for field in self.form_fields:
form.add_reported(field)
return reply
def make_search_iq(self, **kwargs):
iq = self.xmpp.make_iq(itype="set", **kwargs)
iq["search"]["form"].set_type("submit")
iq["search"]["form"].add_field(
"FORM_TYPE", value=stanza.Search.namespace, ftype="hidden"
)
return iq
log = logging.getLogger(__name__)

View File

@@ -0,0 +1,10 @@
from typing import Set, ClassVar
from slixmpp.xmlstream import ElementBase
class Search(ElementBase):
namespace = "jabber:iq:search"
name = "query"
plugin_attrib = "search"
interfaces: ClassVar[Set[str]] = set()

View File

@@ -6,7 +6,6 @@
import datetime as dt
from slixmpp.plugins import BasePlugin, register_plugin
from slixmpp.thirdparty import tzutc, tzoffset, parse_iso
# =====================================================================
@@ -21,7 +20,10 @@ def parse(time_str):
Arguments:
time_str -- A formatted timestamp string.
"""
return parse_iso(time_str)
try:
return dt.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S.%f%z')
except ValueError:
return dt.datetime.strptime(time_str, '%Y-%m-%dT%H:%M:%S%z')
def format_date(time_obj):
@@ -52,7 +54,7 @@ def format_time(time_obj):
if isinstance(time_obj, dt.datetime):
time_obj = time_obj.timetz()
timestamp = time_obj.isoformat()
if time_obj.tzinfo == tzutc():
if time_obj.tzinfo == dt.timezone.utc:
timestamp = timestamp[:-6]
return '%sZ' % timestamp
return timestamp
@@ -69,7 +71,7 @@ def format_datetime(time_obj):
time_obj -- A datetime object.
"""
timestamp = time_obj.isoformat('T')
if time_obj.tzinfo == tzutc():
if time_obj.tzinfo == dt.timezone.utc:
timestamp = timestamp[:-6]
return '%sZ' % timestamp
return timestamp
@@ -128,9 +130,9 @@ def time(hour=None, min=None, sec=None, micro=None, offset=None, obj=False):
if micro is None:
micro = now.microsecond
if offset in (None, 0):
offset = tzutc()
offset = dt.timezone.utc
elif not isinstance(offset, dt.tzinfo):
offset = tzoffset(None, offset)
offset = dt.timezone(dt.timedelta(seconds=offset))
value = dt.time(hour, min, sec, micro, offset)
if obj:
return value
@@ -175,9 +177,9 @@ def datetime(year=None, month=None, day=None, hour=None,
if micro is None:
micro = now.microsecond
if offset in (None, 0):
offset = tzutc()
offset = dt.timezone.utc
elif not isinstance(offset, dt.tzinfo):
offset = tzoffset(None, offset)
offset = dt.timezone(dt.timedelta(seconds=offset))
value = dt.datetime(year, month, day, hour,
min, sec, micro, offset)

View File

@@ -80,16 +80,16 @@ class Info(ElementBase):
self._set_int('bytes', value)
def get_height(self) -> int:
self._get_int('height')
return self._get_int('height')
def set_height(self, value: int):
self._set_int('height', value)
def get_width(self) -> int:
self._get_int(self, 'width')
return self._get_int('width')
def set_width(self, value: int):
self._set_int('with', value)
self._set_int('width', value)
class Pointer(ElementBase):

View File

@@ -60,7 +60,7 @@ class StaticCaps(object):
return False
if node in (None, ''):
info = self.caps.get_caps(jid)
info = await self.caps.get_caps(jid)
if info and feature in info['features']:
return True
@@ -134,7 +134,7 @@ class StaticCaps(object):
def get_verstring(self, jid, node, ifrom, data):
return self.jid_vers.get(jid, None)
def get_caps(self, jid, node, ifrom, data):
async def get_caps(self, jid, node, ifrom, data):
verstring = data.get('verstring', None)
if verstring is None:
return None

View File

@@ -8,7 +8,6 @@ import datetime as dt
from slixmpp.xmlstream import ElementBase
from slixmpp.plugins import xep_0082
from slixmpp.thirdparty import tzutc, tzoffset
class EntityTime(ElementBase):
@@ -87,7 +86,7 @@ class EntityTime(ElementBase):
seconds (positive or negative) to offset.
"""
time = xep_0082.time(offset=value)
if xep_0082.parse(time).tzinfo == tzutc():
if xep_0082.parse(time).tzinfo == dt.timezone.utc:
self._set_sub_text('tzo', 'Z')
else:
self._set_sub_text('tzo', time[-6:])
@@ -111,6 +110,6 @@ class EntityTime(ElementBase):
date = value
if not isinstance(value, dt.datetime):
date = xep_0082.parse(value)
date = date.astimezone(tzutc())
date = date.astimezone(dt.timezone.utc)
value = xep_0082.format_datetime(date)
self._set_sub_text('utc', value)

View File

@@ -30,6 +30,10 @@ class Delay(ElementBase):
def set_stamp(self, value):
if isinstance(value, dt.datetime):
if value.tzinfo is None:
raise ValueError(f'Datetime provided without timezone information: {value}')
if value.tzinfo != dt.timezone.utc:
value = value.astimezone(dt.timezone.utc)
value = xep_0082.format_datetime(value)
self._set_attr('stamp', value)

View File

@@ -1,4 +1,3 @@
# slixmpp: The Slick XMPP Library
# Copyright (C) 2016 Emmanuel Gil Peyrot
# This file is part of slixmpp.
@@ -68,11 +67,11 @@ class XEP_0333(BasePlugin):
:param JID mto: recipient of the marker
:param str id: Identifier of the marked message
:param str marker: Marker to send (one of
displayed, retrieved, or acknowledged)
displayed, received, or acknowledged)
:param str thread: Message thread
:param str mfrom: Use a specific JID to send the message
"""
if marker not in ('displayed', 'retrieved', 'acknowledged'):
if marker not in ('displayed', 'received', 'acknowledged'):
raise ValueError('Invalid marker: %s' % marker)
msg = self.xmpp.make_message(mto=mto, mfrom=mfrom)
if thread:

View File

@@ -7,7 +7,7 @@ from slixmpp.plugins.xep_0297 import Forwarded
class Privilege(ElementBase):
namespace = "urn:xmpp:privilege:1"
namespace = "urn:xmpp:privilege:2"
name = "privilege"
plugin_attrib = "privilege"
@@ -24,7 +24,10 @@ class Privilege(ElementBase):
def presence(self):
return self.permission("presence")
def iq(self):
return self.permission("iq")
def add_perm(self, access, type):
# This should only be needed for servers, so maybe out of scope for slixmpp
perm = Perm()
@@ -34,7 +37,7 @@ class Privilege(ElementBase):
class Perm(ElementBase):
namespace = "urn:xmpp:privilege:1"
namespace = "urn:xmpp:privilege:2"
name = "perm"
plugin_attrib = "perm"
plugin_multi_attrib = "perms"
@@ -44,4 +47,4 @@ class Perm(ElementBase):
def register():
register_stanza_plugin(Message, Privilege)
register_stanza_plugin(Privilege, Forwarded)
register_stanza_plugin(Privilege, Perm, iterable=True)
register_stanza_plugin(Privilege, Perm, iterable=True)

View File

@@ -14,6 +14,8 @@ from typing import (
IO,
)
from pathlib import Path
from slixmpp import JID, __version__
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
@@ -99,12 +101,17 @@ class XEP_0363(BasePlugin):
:param domain: Domain to disco to find a service.
"""
if domain is None and self.xmpp.is_component:
domain = self.xmpp.server_host
results = await self.xmpp['xep_0030'].get_info_from_domain(
domain=domain, **iqkwargs
)
candidates = []
for info in results:
if not info['disco_info']:
continue
for identity in info['disco_info']['identities']:
if identity[0] == 'store' and identity[1] == 'file':
candidates.append(info)
@@ -113,7 +120,7 @@ class XEP_0363(BasePlugin):
if feature == Request.namespace:
return info
def request_slot(self, jid: JID, filename: str, size: int,
def request_slot(self, jid: JID, filename: Path, size: int,
content_type: Optional[str] = None, *,
ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Request an HTTP upload slot from a service.
@@ -125,12 +132,12 @@ class XEP_0363(BasePlugin):
"""
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
request = iq['http_upload_request']
request['filename'] = filename
request['filename'] = str(filename)
request['size'] = str(size)
request['content-type'] = content_type or self.default_content_type
return iq.send(**iqkwargs)
async def upload_file(self, filename: str, size: Optional[int] = None,
async def upload_file(self, filename: Path, size: Optional[int] = None,
content_type: Optional[str] = None, *,
input_file: Optional[IO[bytes]]=None,
domain: Optional[JID] = None,

View File

@@ -6,9 +6,7 @@
from typing import Set, Iterable
from slixmpp.xmlstream import ElementBase
try:
from emoji import UNICODE_EMOJI
if UNICODE_EMOJI.get('en'):
UNICODE_EMOJI = UNICODE_EMOJI['en']
from emoji import EMOJI_DATA as UNICODE_EMOJI
except ImportError:
UNICODE_EMOJI = None

View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# See the LICENSE file for copying permissions.
"""
XEP-0454: OMEMO Media Sharing
"""
from typing import IO, Optional, Tuple
from os import urandom
from pathlib import Path
from io import BytesIO, SEEK_END
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.base import register_plugin
class InvalidURL(Exception):
"""Raised for URLs that either aren't HTTPS or already contain a fragment."""
EXTENSIONS_MAP = {
'jpeg': 'jpg',
'text': 'txt',
}
class XEP_0454(BasePlugin):
"""
XEP-0454: OMEMO Media Sharing
"""
name = 'xep_0454'
description = 'XEP-0454: OMEMO Media Sharing'
dependencies = {'xep_0363'}
@staticmethod
def encrypt(input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]:
"""
Encrypts file as specified in XEP-0454 for use in file sharing
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
One of input_file or filename must be specified. If both are
passed, input_file will be used and filename ignored.
"""
if input_file is None and filename is None:
raise ValueError('Specify either filename or input_file parameter')
aes_gcm_iv = urandom(12)
aes_gcm_key = urandom(32)
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv),
).encryptor()
if input_file is None:
input_file = open(filename, 'rb')
payload = b''
while True:
buf = input_file.read(4096)
if not buf:
break
payload += aes_gcm.update(buf)
payload += aes_gcm.finalize() + aes_gcm.tag
fragment = aes_gcm_iv.hex() + aes_gcm_key.hex()
return (payload, fragment)
@staticmethod
def decrypt(input_file: IO[bytes], fragment: str) -> bytes:
"""
Decrypts file-like.
:param input_file: Binary file stream on the file, containing the
tag (16 bytes) at the end.
:param fragment: 88 hex chars string composed of iv (24 chars)
+ key (64 chars).
"""
assert len(fragment) == 88
aes_gcm_iv = bytes.fromhex(fragment[:24])
aes_gcm_key = bytes.fromhex(fragment[24:])
# Find 16 bytes tag
input_file.seek(-16, SEEK_END)
tag = input_file.read()
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv, tag),
).decryptor()
size = input_file.seek(0, SEEK_END)
input_file.seek(0)
count = size - 16
plain = b''
while count >= 0:
buf = input_file.read(4096)
count -= len(buf)
if count <= 0:
buf += input_file.read()
buf = buf[:-16]
plain += aes_gcm.update(buf)
plain += aes_gcm.finalize()
return plain
@staticmethod
def format_url(url: str, fragment: str) -> str:
"""Helper to format a HTTPS URL to an AESGCM URI"""
if not url.startswith('https://') or url.find('#') != -1:
raise InvalidURL
return 'aesgcm://' + url[len('https://'):] + '#' + fragment
@staticmethod
def map_extensions(ext: str) -> str:
"""
Apply conversions to extensions to reduce the number of
variations, (e.g., JPEG -> jpg).
"""
return EXTENSIONS_MAP.get(ext, ext).lower()
async def upload_file(
self,
filename: Path,
_size: Optional[int] = None,
content_type: Optional[str] = None,
**kwargs,
) -> str:
"""
Wrapper to xep_0363 (HTTP Upload)'s upload_file method.
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
Same as `XEP_0454.encrypt`, one of input_file or filename must be
specified. If both are passed, input_file will be used and
filename ignored.
Other arguments passed in are passed to the actual
`XEP_0363.upload_file` call.
"""
input_file = kwargs.get('input_file')
payload, fragment = self.encrypt(input_file, filename)
# Prepare kwargs for upload_file call
new_filename = urandom(12).hex() # Random filename to hide user-provided path
if filename.suffix:
new_filename += self.map_extensions(filename.suffix)
kwargs['filename'] = new_filename
input_enc = BytesIO(payload)
kwargs['input_file'] = input_enc
# Size must also be overriden if provided
size = input_enc.seek(0, SEEK_END)
input_enc.seek(0)
kwargs['size'] = size
kwargs['content_type'] = content_type
url = await self.xmpp['xep_0363'].upload_file(**kwargs)
return self.format_url(url, fragment)
register_plugin(XEP_0454)

View File

@@ -0,0 +1,6 @@
from slixmpp.plugins.base import register_plugin
from .reply import XEP_0461
from . import stanza
register_plugin(XEP_0461)

View File

@@ -0,0 +1,48 @@
from slixmpp.plugins import BasePlugin
from slixmpp.types import JidStr
from slixmpp.xmlstream import StanzaBase
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from . import stanza
class XEP_0461(BasePlugin):
"""XEP-0461: Message Replies"""
name = "xep_0461"
description = "XEP-0461: Message Replies"
dependencies = {"xep_0030"}
stanza = stanza
namespace = stanza.NS
def plugin_init(self) -> None:
stanza.register_plugins()
self.xmpp.register_handler(
Callback(
"Message replied to",
StanzaPath("message/reply"),
self._handle_reply_to_message,
)
)
def plugin_end(self):
self.xmpp.plugin["xep_0030"].del_feature(feature=stanza.NS)
def session_bind(self, jid):
self.xmpp.plugin["xep_0030"].add_feature(feature=stanza.NS)
def _handle_reply_to_message(self, msg: StanzaBase):
self.xmpp.event("message_reply", msg)
def send_reply(self, reply_to: JidStr, reply_id: str, **msg_kwargs):
"""
:param reply_to: Full JID of the quoted author
:param reply_id: ID of the message to reply to
"""
msg = self.xmpp.make_message(**msg_kwargs)
msg["reply"]["to"] = reply_to
msg["reply"]["id"] = reply_id
msg.send()

View File

@@ -0,0 +1,47 @@
from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
NS = "urn:xmpp:reply:0"
class Reply(ElementBase):
namespace = NS
name = "reply"
plugin_attrib = "reply"
interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:feature-fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
try:
start = int(start)
end = int(end)
except ValueError:
return body
else:
return body[:start] + body[end:]
class FallBackBody(ElementBase):
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
namespace = FeatureFallBack.namespace
name = "body"
plugin_attrib = "fallback_body"
interfaces = {"start", "end"}
def register_plugins():
register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@@ -64,9 +64,9 @@ class Message(RootStanza):
if self.stream:
use_ids = getattr(self.stream, 'use_message_ids', None)
if use_ids:
self['id'] = self.stream.new_id()
self.set_id(self.stream.new_id())
else:
del self['origin_id']
self.del_origin_id()
def get_type(self):
"""
@@ -96,8 +96,8 @@ class Message(RootStanza):
self.xml.attrib['id'] = value
if self.stream:
use_orig_ids = getattr(self.stream, 'use_origin_id', None)
if not use_orig_ids:
if not getattr(self.stream, 'use_origin_id', False):
self.del_origin_id()
return None
sub = self.xml.find(ORIGIN_NAME)

View File

@@ -3,5 +3,4 @@ try:
except:
from slixmpp.thirdparty.gnupg import GPG
from slixmpp.thirdparty.mini_dateutil import tzutc, tzoffset, parse_iso
from slixmpp.thirdparty.orderedset import OrderedSet

View File

@@ -1,273 +0,0 @@
# This module is a very stripped down version of the dateutil
# package for when dateutil has not been installed. As a replacement
# for dateutil.parser.parse, the parsing methods from
# http://blog.mfabrik.com/2008/06/30/relativity-of-time-shortcomings-in-python-datetime-and-workaround/
#As such, the following copyrights and licenses applies:
# dateutil - Extensions to the standard python 2.3+ datetime module.
#
# Copyright (c) 2003-2011 - Gustavo Niemeyer <gustavo@niemeyer.net>
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# fixed_dateime
#
# Copyright (c) 2008, Red Innovation Ltd., Finland
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of Red Innovation nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY RED INNOVATION ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL RED INNOVATION BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
import math
import datetime
ZERO = datetime.timedelta(0)
try:
from dateutil.parser import parse as parse_iso
from dateutil.tz import tzoffset, tzutc
except:
# As a stopgap, define the two timezones here based
# on the dateutil code.
class tzutc(datetime.tzinfo):
def utcoffset(self, dt):
return ZERO
def dst(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def __eq__(self, other):
return (isinstance(other, tzutc) or
(isinstance(other, tzoffset) and other._offset == ZERO))
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return "%s()" % self.__class__.__name__
__reduce__ = object.__reduce__
class tzoffset(datetime.tzinfo):
def __init__(self, name, offset):
self._name = name
self._offset = datetime.timedelta(minutes=offset)
def utcoffset(self, dt):
return self._offset
def dst(self, dt):
return ZERO
def tzname(self, dt):
return self._name
def __eq__(self, other):
return (isinstance(other, tzoffset) and
self._offset == other._offset)
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
return "%s(%s, %s)" % (self.__class__.__name__,
repr(self._name),
self._offset.days*86400+self._offset.seconds)
__reduce__ = object.__reduce__
_fixed_offset_tzs = { }
UTC = tzutc()
def _get_fixed_offset_tz(offsetmins):
"""For internal use only: Returns a tzinfo with
the given fixed offset. This creates only one instance
for each offset; the zones are kept in a dictionary"""
if offsetmins == 0:
return UTC
if not offsetmins in _fixed_offset_tzs:
if offsetmins < 0:
sign = '-'
absoff = -offsetmins
else:
sign = '+'
absoff = offsetmins
name = "UTC%s%02d:%02d" % (sign, int(absoff / 60), absoff % 60)
inst = tzoffset(name,offsetmins)
_fixed_offset_tzs[offsetmins] = inst
return _fixed_offset_tzs[offsetmins]
_iso8601_parser = re.compile(r"""
^
(?P<year> [0-9]{4})?(?P<ymdsep>-?)?
(?P<month>[0-9]{2})?(?P=ymdsep)?
(?P<day> [0-9]{2})?
(?P<time>
(?: # time part... optional... at least hour must be specified
(?:T|\s+)?
(?P<hour>[0-9]{2})
(?:
# minutes, separated with :, or none, from hours
(?P<hmssep>[:]?)
(?P<minute>[0-9]{2})
(?:
# same for seconds, separated with :, or none, from hours
(?P=hmssep)
(?P<second>[0-9]{2})
)?
)?
# fractions
(?: [,.] (?P<frac>[0-9]{1,10}))?
# timezone, Z, +-hh or +-hh:?mm. MUST BE, but complain if not there.
(
(?P<tzempty>Z)
|
(?P<tzh>[+-][0-9]{2})
(?: :? # optional separator
(?P<tzm>[0-9]{2})
)?
)?
)
)?
$
""", re.X) # """
def parse_iso(timestamp):
"""Internal function for parsing a timestamp in
ISO 8601 format"""
timestamp = timestamp.strip()
m = _iso8601_parser.match(timestamp)
if not m:
raise ValueError("Not a proper ISO 8601 timestamp!: %s" % timestamp)
vals = m.groupdict()
def_vals = {'year': 1970, 'month': 1, 'day': 1}
for key in vals:
if vals[key] is None:
vals[key] = def_vals.get(key, 0)
elif key not in ['time', 'ymdsep', 'hmssep', 'tzempty']:
vals[key] = int(vals[key])
year = vals['year']
month = vals['month']
day = vals['day']
if m.group('time') is None:
return datetime.date(year, month, day)
h, min, s, us = None, None, None, 0
frac = 0
if m.group('tzempty') == None and m.group('tzh') == None:
raise ValueError("Not a proper ISO 8601 timestamp: " +
"missing timezone (Z or +hh[:mm])!")
if m.group('frac'):
frac = m.group('frac')
power = len(frac)
frac = int(frac) / 10.0 ** power
if m.group('hour'):
h = vals['hour']
if m.group('minute'):
min = vals['minute']
if m.group('second'):
s = vals['second']
if frac != None:
# ok, fractions of hour?
if min == None:
frac, min = math.modf(frac * 60.0)
min = int(min)
# fractions of second?
if s == None:
frac, s = math.modf(frac * 60.0)
s = int(s)
# and extract microseconds...
us = int(frac * 1000000)
if m.group('tzempty') == 'Z':
offsetmins = 0
else:
# timezone: hour diff with sign
offsetmins = vals['tzh'] * 60
tzm = m.group('tzm')
# add optional minutes
if tzm != None:
tzm = int(tzm)
offsetmins += tzm if offsetmins > 0 else -tzm
tz = _get_fixed_offset_tz(offsetmins)
return datetime.datetime(year, month, day, h, min, s, us, tz)

View File

@@ -5,5 +5,5 @@
# We don't want to have to import the entire library
# just to get the version info for setup.py
__version__ = '1.8.1'
__version_info__ = (1, 8, 1)
__version__ = '1.8.3'
__version_info__ = (1, 8, 3)

View File

@@ -15,7 +15,13 @@ from slixmpp.types import Protocol
log = logging.getLogger(__name__)
class AnswerProtocol(Protocol):
class GetHostByNameAnswerProtocol(Protocol):
name: str
aliases: List[str]
addresses: List[str]
class QueryAnswerProtocol(Protocol):
host: str
priority: int
weight: int
@@ -23,6 +29,9 @@ class AnswerProtocol(Protocol):
class ResolverProtocol(Protocol):
def gethostbyname(self, host: str, socket_family: socket.AddressFamily) -> Future:
...
def query(self, query: str, querytype: str) -> Future:
...
@@ -147,11 +156,6 @@ async def resolve(host: str, port: int, *, loop: AbstractEventLoop,
results = []
for host, port in hosts:
if host == 'localhost':
if use_ipv6:
results.append((host, '::1', port))
results.append((host, '127.0.0.1', port))
if use_ipv6:
aaaa = await get_AAAA(host, resolver=resolver,
use_aiodns=use_aiodns, loop=loop)
@@ -201,13 +205,13 @@ async def get_A(host: str, *, loop: AbstractEventLoop,
return []
# Using aiodns:
future = resolver.query(host, 'A')
future = resolver.gethostbyname(host, socket.AF_INET)
try:
recs = cast(Iterable[AnswerProtocol], await future)
recs = cast(GetHostByNameAnswerProtocol, await future)
except Exception as e:
log.debug('DNS: Exception while querying for %s A records: %s', host, e)
recs = []
return [rec.host for rec in recs]
return []
return [addr for addr in recs.addresses]
async def get_AAAA(host: str, *, loop: AbstractEventLoop,
@@ -249,13 +253,13 @@ async def get_AAAA(host: str, *, loop: AbstractEventLoop,
return []
# Using aiodns:
future = resolver.query(host, 'AAAA')
future = resolver.gethostbyname(host, socket.AF_INET6)
try:
recs = cast(Iterable[AnswerProtocol], await future)
recs = cast(GetHostByNameAnswerProtocol, await future)
except Exception as e:
log.debug('DNS: Exception while querying for %s AAAA records: %s', host, e)
recs = []
return [rec.host for rec in recs]
return []
return [addr for addr in recs.addresses]
async def get_SRV(host: str, port: int, service: str,
@@ -295,12 +299,12 @@ async def get_SRV(host: str, port: int, service: str,
try:
future = resolver.query('_%s._%s.%s' % (service, proto, host),
'SRV')
recs = cast(Iterable[AnswerProtocol], await future)
recs = cast(Iterable[QueryAnswerProtocol], await future)
except Exception as e:
log.debug('DNS: Exception while querying for %s SRV records: %s', host, e)
return []
answers: Dict[int, List[AnswerProtocol]] = {}
answers: Dict[int, List[QueryAnswerProtocol]] = {}
for rec in recs:
if rec.priority not in answers:
answers[rec.priority] = []

View File

@@ -35,6 +35,7 @@ import ssl
import uuid
import warnings
import weakref
import collections
from contextlib import contextmanager
import xml.etree.ElementTree as ET
@@ -82,7 +83,7 @@ class InvalidCABundle(Exception):
Exception raised when the CA Bundle file hasn't been found.
"""
def __init__(self, path: Optional[Path]):
def __init__(self, path: Optional[Union[Path, Iterable[Path]]]):
self.path = path
@@ -298,8 +299,8 @@ class XMLStream(asyncio.BaseProtocol):
self.scheduled_events = {}
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
self.ssl_context.check_hostname = True
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.event_when_connected = "connected"
@@ -483,25 +484,21 @@ class XMLStream(asyncio.BaseProtocol):
if self._current_connection_attempt is None:
return
try:
server_hostname = self.default_domain if self.use_ssl else None
await self.loop.create_connection(lambda: self,
self.address[0],
self.address[1],
ssl=ssl_context,
server_hostname=self.default_domain if self.use_ssl else None)
server_hostname=server_hostname)
self._connect_loop_wait = 0
except Socket.gaierror as e:
self.event('connection_failed',
'No DNS record available for %s' % self.default_domain)
self.reschedule_connection_attempt()
except OSError as e:
log.debug('Connection failed: %s', e)
self.event("connection_failed", e)
if self._current_connection_attempt is None:
return
self._connect_loop_wait = self._connect_loop_wait * 2 + 1
self._current_connection_attempt = asyncio.ensure_future(
self._connect_routine(),
loop=self.loop,
)
self.reschedule_connection_attempt()
def process(self, *, forever: bool = True, timeout: Optional[int] = None) -> None:
"""Process all the available XMPP events (receiving or sending data on the
@@ -578,7 +575,7 @@ class XMLStream(asyncio.BaseProtocol):
stream=self,
top_level=True,
open_only=True))
self.start_stream_handler(self.xml_root)
self.start_stream_handler(self.xml_root) # type:ignore
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
@@ -637,6 +634,20 @@ class XMLStream(asyncio.BaseProtocol):
self._set_disconnected_future()
self.event("disconnected", self.disconnect_reason or exception)
def reschedule_connection_attempt(self) -> None:
"""
Increase the exponential back-off and initate another background
_connect_routine call to connect to the server.
"""
# abort if there is no ongoing connection attempt
if self._current_connection_attempt is None:
return
self._connect_loop_wait = min(300, self._connect_loop_wait * 2 + 1)
self._current_connection_attempt = asyncio.ensure_future(
self._connect_routine(),
loop=self.loop,
)
def cancel_connection_attempt(self) -> None:
"""
Immediately cancel the current create_connection() Future.
@@ -793,11 +804,14 @@ class XMLStream(asyncio.BaseProtocol):
if bundle.is_file():
ca_cert = bundle
break
if ca_cert is None:
raise InvalidCABundle(ca_cert)
if ca_cert is None and \
isinstance(self.ca_certs, (Path, collections.abc.Iterable)):
raise InvalidCABundle(self.ca_certs)
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_verify_locations(cafile=ca_cert)
else:
self.ssl_context.set_default_verify_paths()
return self.ssl_context
@@ -814,15 +828,15 @@ class XMLStream(asyncio.BaseProtocol):
try:
if hasattr(self.loop, 'start_tls'):
transp = await self.loop.start_tls(self.transport,
self, ssl_context)
self, ssl_context,
server_hostname=self.default_domain)
# Python < 3.7
else:
transp, _ = await self.loop.create_connection(
lambda: self,
ssl=self.ssl_context,
sock=self.socket,
server_hostname=self.default_domain
)
server_hostname=self.default_domain)
except ssl.SSLError as e:
log.debug('SSL: Unable to connect', exc_info=True)
log.error('CERT: Invalid certificate trust chain.')
@@ -1254,7 +1268,7 @@ class XMLStream(asyncio.BaseProtocol):
already_run_filters.add(filter)
if iscoroutinefunction(filter):
filter = cast(AsyncFilter, filter)
task = asyncio.create_task(filter(data))
task = asyncio.create_task(filter(data)) # type:ignore
completed, pending = await wait(
{task},
timeout=1,
@@ -1318,10 +1332,16 @@ class XMLStream(asyncio.BaseProtocol):
# Avoid circular imports
from slixmpp.stanza.rootstanza import RootStanza
from slixmpp.stanza import Iq, Handshake
passthrough = (
(isinstance(data, Iq) and data.get_plugin('bind', check=True))
or isinstance(data, Handshake)
)
passthrough = False
if isinstance(data, Iq):
if data.get_plugin('bind', check=True):
passthrough = True
elif data.get_plugin('session', check=True):
passthrough = True
elif isinstance(data, Handshake):
passthrough = True
if isinstance(data, (RootStanza, str)) and not passthrough:
self.__queued_stanzas.append((data, use_filters))
log.debug('NOT SENT: %s %s', type(data), data)

View File

@@ -0,0 +1,59 @@
import unittest
from slixmpp import register_stanza_plugin, Iq
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0055 import stanza
class TestJabberSearch(SlixTest):
def setUp(self):
register_stanza_plugin(Iq, stanza.Search)
self.stream_start(plugins={"xep_0055"})
def testRequestSearchFields(self):
iq = self.Iq()
iq.set_from("juliet@capulet.com/balcony")
iq.set_to("characters.shakespeare.lit")
iq.set_type("get")
iq.enable("search")
iq["id"] = "0"
self.check(
iq,
"""
<iq type='get'
from='juliet@capulet.com/balcony'
to='characters.shakespeare.lit'>
<query xmlns='jabber:iq:search'/>
</iq>
""",
)
def testSendSearch(self):
iq = self.xmpp["xep_0055"].make_search_iq(
ifrom="juliet@capulet.com/balcony", ito="characters.shakespeare.lit"
)
iq["search"]["form"].add_field(var="x-gender", value="male")
self.check(
iq,
"""
<iq type='set'
from='juliet@capulet.com/balcony'
to='characters.shakespeare.lit'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='submit'>
<field type='hidden' var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<field var='x-gender'>
<value>male</value>
</field>
</x>
</query>
</iq>
""",
use_values=False,
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestJabberSearch)

View File

@@ -13,7 +13,7 @@ class TestPermissions(SlixTest):
def testAdvertisePermission(self):
xmlstring = """
<message from='capulet.net' to='pubub.capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:1'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
<perm access='presence' type='managed_entity'/>

View File

@@ -0,0 +1,48 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest):
def setUp(self):
stanza.register_plugins()
def testReply(self):
message = Message()
message["reply"]["id"] = "some-id"
message["body"] = "some-body"
self.check(
message,
"""
<message>
<reply xmlns="urn:xmpp:reply:0" id="some-id" />
<body>some-body</body>
</message>
""",
)
def testFallback(self):
message = Message()
message["body"] = "12345\nrealbody"
message["feature_fallback"]["for"] = "NS"
message["feature_fallback"]["fallback_body"]["start"] = "0"
message["feature_fallback"]["fallback_body"]["end"] = "6"
self.check(
message,
"""
<message xmlns="jabber:client">
<body>12345\nrealbody</body>
<fallback xmlns='urn:xmpp:feature-fallback:0' for='NS'>
<body start="0" end="6" />
</fallback>
</message>
""",
)
assert message["feature_fallback"].get_stripped_body() == "realbody"
suite = unittest.TestLoader().loadTestsFromTestCase(TestReply)

View File

@@ -0,0 +1,170 @@
import unittest
from slixmpp.test import SlixTest
class TestJabberSearch(SlixTest):
def setUp(self):
self.stream_start(
mode="component",
plugin_config={
"xep_0055": {
"form_fields": {"first", "last"},
"form_instructions": "INSTRUCTIONS",
"form_title": "User Directory Search",
}
},
jid="characters.shakespeare.lit",
plugins={"xep_0055"}
)
self.xmpp["xep_0055"].api.register(get_results, "search_query")
self.xmpp["xep_0055"].api.register(get_results, "search_query")
def tearDown(self):
self.stream_close()
def testRequestingSearchFields(self):
self.recv(
"""
<iq type='get'
from='juliet@capulet.com/balcony'
to='characters.shakespeare.lit'
id='search3'
xml:lang='en'>
<query xmlns='jabber:iq:search'/>
</iq>
"""
)
self.send(
"""
<iq type='result'
from='characters.shakespeare.lit'
to='juliet@capulet.com/balcony'
id='search3'
xml:lang='en'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='form'>
<title>User Directory Search</title>
<instructions>INSTRUCTIONS</instructions>
<field type='hidden'
var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<field var='first'/>
<field var='last'/>
</x>
</query>
</iq>
""",
use_values=False,
)
def testSearchResult(self):
self.recv(
"""
<iq type='get'
from='juliet@capulet.com/balcony'
to='characters.shakespeare.lit'
id='search2'
xml:lang='en'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='submit'>
<field type='hidden' var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<field var='last'>
<value>Montague</value>
</field>
</x>
</query>
</iq>
"""
)
self.send(
"""
<iq type='result'
from='characters.shakespeare.lit'
to='juliet@capulet.com/balcony'
id='search2'
xml:lang='en'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='result'>
<field type='hidden' var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<reported>
<field var='first' label='Given Name' />
<field var='last' label='Family Name' />
</reported>
<item>
<field var='first'><value>Benvolio</value></field>
<field var='last'><value>Montague</value></field>
</item>
</x>
</query>
</iq>
""",
use_values=False, # TypeError: element indices must be integers without that
)
def testSearchNoResult(self):
self.xmpp["xep_0055"].api.register(get_results, "search_query")
self.recv(
"""
<iq type='get'
from='juliet@capulet.com/balcony'
to='characters.shakespeare.lit'
id='search2'
xml:lang='en'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='submit'>
<field type='hidden' var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<field var='last'>
<value>Capulet</value>
</field>
</x>
</query>
</iq>
"""
)
self.send(
"""
<iq type='result'
from='characters.shakespeare.lit'
to='juliet@capulet.com/balcony'
id='search2'
xml:lang='en'>
<query xmlns='jabber:iq:search'>
<x xmlns='jabber:x:data' type='result'>
<field type='hidden' var='FORM_TYPE'>
<value>jabber:iq:search</value>
</field>
<reported>
<field var='first' label='Given Name' />
<field var='last' label='Family Name' />
</reported>
</x>
</query>
</iq>
""",
use_values=False, # TypeError: element indices must be integers without that
)
async def get_results(jid, node, ifrom, iq):
reply = iq.reply()
form = reply["search"]["form"]
form["type"] = "result"
form.add_reported("first", label="Given Name")
form.add_reported("last", label="Family Name")
d = iq["search"]["form"].get_values()
if d["last"] == "Montague":
form.add_item({"first": "Benvolio", "last": "Montague"})
return reply
suite = unittest.TestLoader().loadTestsFromTestCase(TestJabberSearch)

View File

@@ -31,7 +31,7 @@ class TestPermissions(SlixTest):
self.recv(
"""
<message from='capulet.net' to='pubub.capulet.lit' id='54321'>
<privilege xmlns='urn:xmpp:privilege:1'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
</privilege>
@@ -95,7 +95,7 @@ class TestPermissions(SlixTest):
def testMakeOutgoingMessage(self):
xmlstring = """
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.net'>
<privilege xmlns='urn:xmpp:privilege:1'>
<privilege xmlns='urn:xmpp:privilege:2'>
<forwarded xmlns='urn:xmpp:forward:0'>
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
<body>I do not hate you</body>

View File

@@ -0,0 +1,48 @@
import logging
import unittest
from slixmpp.test import SlixTest
class TestReply(SlixTest):
def setUp(self):
self.stream_start(plugins=["xep_0461"])
def tearDown(self):
self.stream_close()
def testFallBackBody(self):
async def on_reply(msg):
start = msg["feature_fallback"]["fallback_body"]["start"]
end = msg["feature_fallback"]["fallback_body"]["end"]
self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(),
reply_id=msg.get_id(),
mto="test@test.com",
mbody=f"{start} to {end}",
)
self.xmpp.add_event_handler("message_reply", on_reply)
self.recv(
"""
<message id="other-id" from="from@from.com/res">
<reply xmlns="urn:xmpp:reply:0" id="some-id" />
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:feature-fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="8" />
</fallback>
</message>
"""
)
self.send(
"""
<message xmlns="jabber:client" to="test@test.com" type="normal">
<reply xmlns="urn:xmpp:reply:0" id="other-id" to="from@from.com/res" />
<body>0 to 8</body>
</message>
"""
)
logging.basicConfig(level=logging.DEBUG)
suite = unittest.TestLoader().loadTestsFromTestCase(TestReply)

41
tests/test_xep_0454.py Normal file
View File

@@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the GPLv3+ license.
"""
Tests for XEP-0454 (OMEMO Media Sharing) plugin.
"""
import unittest
from io import BytesIO
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0454 import XEP_0454
class TestMediaSharing(SlixTest):
def testEncryptDecryptSmall(self):
plain = b'qwertyuiop'
ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain))
result = XEP_0454.decrypt(BytesIO(ciphertext), fragment)
self.assertEqual(plain, result)
def testEncryptDecrypt(self):
plain = b'a' * 4096 + b'qwertyuiop'
ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain))
result = XEP_0454.decrypt(BytesIO(ciphertext), fragment)
self.assertEqual(plain, result)
def testFormatURL(self):
url = 'https://foo.bar'
fragment = 'a' * 88
result = XEP_0454.format_url(url, fragment)
self.assertEqual('aesgcm://foo.bar#' + 'a' * 88, result)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMediaSharing)