Merge branch 'upload-encrypt' into 'master'

XEP-0454: OMEMO Media Sharing

See merge request poezio/slixmpp!189
This commit is contained in:
Maxime Buquet 2022-03-21 17:01:40 +01:00
commit 82ff68cfac
7 changed files with 300 additions and 12 deletions

View File

@ -20,7 +20,7 @@ test:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test-3.10:
@ -31,7 +31,7 @@ test-3.10:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test-3.11:
@ -43,7 +43,7 @@ test-3.11:
script:
- apt update
- apt install -y python3 python3-pip cython3 gpg
- pip3 install emoji aiohttp
- pip3 install emoji aiohttp cryptography
- ./run_tests.py
test_integration:

View File

@ -892,6 +892,14 @@
<xmpp:since>1.6.0</xmpp:since>
</xmpp:SupportedXep>
</implements>
<implements>
<xmpp:SupportedXep>
<xmpp:xep rdf:resource="https://xmpp.org/extensions/xep-0454.html"/>
<xmpp:status>no thumbnail support</xmpp:status>
<xmpp:version>0.1.0</xmpp:version>
<xmpp:since>1.8.1</xmpp:since>
</xmpp:SupportedXep>
</implements>
<release>
<Version>

View File

@ -5,11 +5,16 @@
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from typing import Optional
import sys
import logging
from pathlib import Path
from getpass import getpass
from argparse import ArgumentParser
import slixmpp
from slixmpp import JID
from slixmpp.exceptions import IqTimeout
log = logging.getLogger(__name__)
@ -21,20 +26,40 @@ class HttpUpload(slixmpp.ClientXMPP):
A basic client asking an entity if they confirm the access to an HTTP URL.
"""
def __init__(self, jid, password, recipient, filename, domain=None):
def __init__(
self,
jid: JID,
password: str,
recipient: JID,
filename: Path,
domain: Optional[JID] = None,
encrypted: bool = False,
):
slixmpp.ClientXMPP.__init__(self, jid, password)
self.recipient = recipient
self.filename = filename
self.domain = domain
self.encrypted = encrypted
self.add_event_handler("session_start", self.start)
async def start(self, event):
log.info('Uploading file %s...', self.filename)
try:
url = await self['xep_0363'].upload_file(
self.filename, domain=self.domain, timeout=10
upload_file = self['xep_0363'].upload_file
if self.encrypted and not self['xep_0454']:
print(
'The xep_0454 module isn\'t available. '
'Ensure you have \'cryptography\' '
'from extras_require installed.',
file=sys.stderr,
)
return
elif self.encrypted:
upload_file = self['xep_0454'].upload_file
url = await upload_file(
self.filename, domain=self.domain, timeout=10,
)
except IqTimeout:
raise TimeoutError('Could not send message in time')
@ -79,6 +104,10 @@ if __name__ == '__main__':
parser.add_argument("--domain",
help="Domain to use for HTTP File Upload (leave out for your own servers)")
parser.add_argument("-e", "--encrypt", dest="encrypted",
help="Whether to encrypt", action="store_true",
default=False)
args = parser.parse_args()
# Setup logging.
@ -86,15 +115,41 @@ if __name__ == '__main__':
format='%(levelname)-8s %(message)s')
if args.jid is None:
args.jid = input("Username: ")
args.jid = JID(input("Username: "))
if args.password is None:
args.password = getpass("Password: ")
xmpp = HttpUpload(args.jid, args.password, args.recipient, args.file, args.domain)
domain = args.domain
if domain is not None:
domain = JID(domain)
if args.encrypted:
print(
'You are using the --encrypt flag. '
'Be aware that the transport being used is NOT end-to-end '
'encrypted. The server will be able to decrypt the file.',
file=sys.stderr,
)
xmpp = HttpUpload(
jid=args.jid,
password=args.password,
recipient=JID(args.recipient),
filename=Path(args.file),
domain=domain,
encrypted=args.encrypted,
)
xmpp.register_plugin('xep_0066')
xmpp.register_plugin('xep_0071')
xmpp.register_plugin('xep_0128')
xmpp.register_plugin('xep_0363')
try:
xmpp.register_plugin('xep_0454')
except slixmpp.plugins.base.PluginNotFound:
log.error(
'Could not load xep_0454. '
'Ensure you have \'cryptography\' from extras_require installed.'
)
# Connect to the XMPP server and start processing XMPP stanzas.
xmpp.connect()

View File

@ -86,10 +86,16 @@ setup(
package_data={'slixmpp': ['py.typed']},
packages=packages,
ext_modules=ext_modules,
install_requires=['aiodns>=1.0', 'pyasn1', 'pyasn1_modules', 'typing_extensions; python_version < "3.8.0"'],
install_requires=[
'aiodns>=1.0',
'pyasn1',
'pyasn1_modules',
'typing_extensions; python_version < "3.8.0"',
],
extras_require={
'XEP-0363': ['aiohttp'],
'XEP-0444 compliance': ['emoji'],
'XEP-0454': ['cryptography'],
'Safer XML parsing': ['defusedxml'],
},
classifiers=CLASSIFIERS,

View File

@ -14,6 +14,8 @@ from typing import (
IO,
)
from pathlib import Path
from slixmpp import JID, __version__
from slixmpp.stanza import Iq
from slixmpp.plugins import BasePlugin
@ -113,7 +115,7 @@ class XEP_0363(BasePlugin):
if feature == Request.namespace:
return info
def request_slot(self, jid: JID, filename: str, size: int,
def request_slot(self, jid: JID, filename: Path, size: int,
content_type: Optional[str] = None, *,
ifrom: Optional[JID] = None, **iqkwargs) -> Future:
"""Request an HTTP upload slot from a service.
@ -125,12 +127,12 @@ class XEP_0363(BasePlugin):
"""
iq = self.xmpp.make_iq_get(ito=jid, ifrom=ifrom)
request = iq['http_upload_request']
request['filename'] = filename
request['filename'] = str(filename)
request['size'] = str(size)
request['content-type'] = content_type or self.default_content_type
return iq.send(**iqkwargs)
async def upload_file(self, filename: str, size: Optional[int] = None,
async def upload_file(self, filename: Path, size: Optional[int] = None,
content_type: Optional[str] = None, *,
input_file: Optional[IO[bytes]]=None,
domain: Optional[JID] = None,

View File

@ -0,0 +1,176 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# See the LICENSE file for copying permissions.
"""
XEP-0454: OMEMO Media Sharing
"""
from typing import IO, Optional, Tuple
from os import urandom
from pathlib import Path
from io import BytesIO, SEEK_END
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.base import register_plugin
class InvalidURL(Exception):
"""Raised for URLs that either aren't HTTPS or already contain a fragment."""
EXTENSIONS_MAP = {
'jpeg': 'jpg',
'text': 'txt',
}
class XEP_0454(BasePlugin):
"""
XEP-0454: OMEMO Media Sharing
"""
name = 'xep_0454'
description = 'XEP-0454: OMEMO Media Sharing'
dependencies = {'xep_0363'}
@staticmethod
def encrypt(input_file: Optional[IO[bytes]] = None, filename: Optional[Path] = None) -> Tuple[bytes, str]:
"""
Encrypts file as specified in XEP-0454 for use in file sharing
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
One of input_file or filename must be specified. If both are
passed, input_file will be used and filename ignored.
"""
if input_file is None and filename is None:
raise ValueError('Specify either filename or input_file parameter')
aes_gcm_iv = urandom(12)
aes_gcm_key = urandom(32)
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv),
).encryptor()
if input_file is None:
input_file = open(filename, 'rb')
payload = b''
while True:
buf = input_file.read(4096)
if not buf:
break
payload += aes_gcm.update(buf)
payload += aes_gcm.finalize() + aes_gcm.tag
fragment = aes_gcm_iv.hex() + aes_gcm_key.hex()
return (payload, fragment)
@staticmethod
def decrypt(input_file: IO[bytes], fragment: str) -> bytes:
"""
Decrypts file-like.
:param input_file: Binary file stream on the file, containing the
tag (16 bytes) at the end.
:param fragment: 88 hex chars string composed of iv (24 chars)
+ key (64 chars).
"""
assert len(fragment) == 88
aes_gcm_iv = bytes.fromhex(fragment[:24])
aes_gcm_key = bytes.fromhex(fragment[24:])
# Find 16 bytes tag
input_file.seek(-16, SEEK_END)
tag = input_file.read()
aes_gcm = Cipher(
algorithms.AES(aes_gcm_key),
modes.GCM(aes_gcm_iv, tag),
).decryptor()
size = input_file.seek(0, SEEK_END)
input_file.seek(0)
count = size - 16
plain = b''
while count >= 0:
buf = input_file.read(4096)
count -= len(buf)
if count <= 0:
buf += input_file.read()
buf = buf[:-16]
plain += aes_gcm.update(buf)
plain += aes_gcm.finalize()
return plain
@staticmethod
def format_url(url: str, fragment: str) -> str:
"""Helper to format a HTTPS URL to an AESGCM URI"""
if not url.startswith('https://') or url.find('#') != -1:
raise InvalidURL
return 'aesgcm://' + url[len('https://'):] + '#' + fragment
@staticmethod
def map_extensions(ext: str) -> str:
"""
Apply conversions to extensions to reduce the number of
variations, (e.g., JPEG -> jpg).
"""
return EXTENSIONS_MAP.get(ext, ext).lower()
async def upload_file(
self,
filename: Path,
_size: Optional[int] = None,
content_type: Optional[str] = None,
**kwargs,
) -> str:
"""
Wrapper to xep_0363 (HTTP Upload)'s upload_file method.
:param input_file: Binary file stream on the file.
:param filename: Path to the file to upload.
Same as `XEP_0454.encrypt`, one of input_file or filename must be
specified. If both are passed, input_file will be used and
filename ignored.
Other arguments passed in are passed to the actual
`XEP_0363.upload_file` call.
"""
input_file = kwargs.get('input_file')
payload, fragment = self.encrypt(input_file, filename)
# Prepare kwargs for upload_file call
new_filename = urandom(12).hex() # Random filename to hide user-provided path
if filename.suffix:
new_filename += self.map_extensions(filename.suffix)
kwargs['filename'] = new_filename
input_enc = BytesIO(payload)
kwargs['input_file'] = input_enc
# Size must also be overriden if provided
size = input_enc.seek(0, SEEK_END)
input_enc.seek(0)
kwargs['size'] = size
kwargs['content_type'] = content_type
url = await self.xmpp['xep_0363'].upload_file(**kwargs)
return self.format_url(url, fragment)
register_plugin(XEP_0454)

41
tests/test_xep_0454.py Normal file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 et ts=4 sts=4 sw=4
#
# Copyright © 2022 Maxime “pep” Buquet <pep@bouah.net>
#
# Distributed under terms of the GPLv3+ license.
"""
Tests for XEP-0454 (OMEMO Media Sharing) plugin.
"""
import unittest
from io import BytesIO
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0454 import XEP_0454
class TestMediaSharing(SlixTest):
def testEncryptDecryptSmall(self):
plain = b'qwertyuiop'
ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain))
result = XEP_0454.decrypt(BytesIO(ciphertext), fragment)
self.assertEqual(plain, result)
def testEncryptDecrypt(self):
plain = b'a' * 4096 + b'qwertyuiop'
ciphertext, fragment = XEP_0454.encrypt(input_file=BytesIO(plain))
result = XEP_0454.decrypt(BytesIO(ciphertext), fragment)
self.assertEqual(plain, result)
def testFormatURL(self):
url = 'https://foo.bar'
fragment = 'a' * 88
result = XEP_0454.format_url(url, fragment)
self.assertEqual('aesgcm://foo.bar#' + 'a' * 88, result)
suite = unittest.TestLoader().loadTestsFromTestCase(TestMediaSharing)