Compare commits
3 Commits
pyproject-
...
xep356-iq
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56004802fa | ||
|
|
dcfa0f20f9 | ||
|
|
8bfe6177f4 |
22
.readthedocs.yaml
Normal file
22
.readthedocs.yaml
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
# .readthedocs.yaml
|
||||||
|
# Read the Docs configuration file
|
||||||
|
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
|
||||||
|
|
||||||
|
# Required
|
||||||
|
version: 2
|
||||||
|
|
||||||
|
# Set the version of Python and other tools you might need
|
||||||
|
build:
|
||||||
|
os: ubuntu-22.04
|
||||||
|
tools:
|
||||||
|
python: "3.11"
|
||||||
|
|
||||||
|
# Build documentation in the docs/ directory with Sphinx
|
||||||
|
sphinx:
|
||||||
|
configuration: docs/conf.py
|
||||||
|
|
||||||
|
# We recommend specifying your dependencies to enable reproducible builds:
|
||||||
|
# https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
|
||||||
|
python:
|
||||||
|
install:
|
||||||
|
- requirements: docs/requirements.txt
|
||||||
7
.travis.yml
Normal file
7
.travis.yml
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
language: python
|
||||||
|
python:
|
||||||
|
- "3.7"
|
||||||
|
- "3.8-dev"
|
||||||
|
install:
|
||||||
|
- "pip install ."
|
||||||
|
script: testall.py
|
||||||
@@ -1,50 +0,0 @@
|
|||||||
"""
|
|
||||||
Cythonize the .pyx file for sdist, and compile it for wheels.
|
|
||||||
NB: produced wheels are not valid, but the sdist should be.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from Cython.Build import cythonize
|
|
||||||
from subprocess import check_output, CalledProcessError, DEVNULL, call
|
|
||||||
from tempfile import TemporaryFile
|
|
||||||
|
|
||||||
from setuptools.command.build_py import build_py
|
|
||||||
|
|
||||||
|
|
||||||
def check_include(library_name, header):
|
|
||||||
command = [os.environ.get("PKG_CONFIG", "pkg-config"), "--cflags", library_name]
|
|
||||||
try:
|
|
||||||
cflags = check_output(command).decode("utf-8").split()
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("pkg-config not found.")
|
|
||||||
return False
|
|
||||||
except CalledProcessError:
|
|
||||||
# pkg-config already prints the missing libraries on stderr.
|
|
||||||
return False
|
|
||||||
command = [os.environ.get("CC", "cc")] + cflags + ["-E", "-"]
|
|
||||||
with TemporaryFile("w+") as c_file:
|
|
||||||
c_file.write("#include <%s>" % header)
|
|
||||||
c_file.seek(0)
|
|
||||||
try:
|
|
||||||
return call(command, stdin=c_file, stdout=DEVNULL, stderr=DEVNULL) == 0
|
|
||||||
except FileNotFoundError:
|
|
||||||
print("%s headers not found." % library_name)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
class Build(build_py):
|
|
||||||
def run(self):
|
|
||||||
self.run_command("build_ext")
|
|
||||||
return super().run()
|
|
||||||
|
|
||||||
def initialize_options(self):
|
|
||||||
super().initialize_options()
|
|
||||||
|
|
||||||
has_python_headers = check_include("python3", "Python.h")
|
|
||||||
has_stringprep_headers = check_include("libidn", "stringprep.h")
|
|
||||||
|
|
||||||
if has_python_headers and has_stringprep_headers:
|
|
||||||
self.distribution.ext_modules = cythonize("slixmpp/stringprep.pyx")
|
|
||||||
|
|
||||||
else:
|
|
||||||
print("Falling back to the slow stringprep module.")
|
|
||||||
15
mypy.ini
Normal file
15
mypy.ini
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
[mypy]
|
||||||
|
check_untyped_defs = False
|
||||||
|
ignore_missing_imports = True
|
||||||
|
|
||||||
|
[mypy-slixmpp.types]
|
||||||
|
ignore_errors = True
|
||||||
|
|
||||||
|
[mypy-slixmpp.thirdparty.*]
|
||||||
|
ignore_errors = True
|
||||||
|
|
||||||
|
[mypy-slixmpp.plugins.*]
|
||||||
|
ignore_errors = True
|
||||||
|
|
||||||
|
[mypy-slixmpp.plugins.base]
|
||||||
|
ignore_errors = False
|
||||||
@@ -1,67 +0,0 @@
|
|||||||
[project]
|
|
||||||
name = "slixmpp"
|
|
||||||
version = "1.8.4"
|
|
||||||
description = 'Slixmpp is an elegant Python library for XMPP (aka Jabber).'
|
|
||||||
requires-python = ">=3.7"
|
|
||||||
dependencies = [
|
|
||||||
"aiodns >= 1.0",
|
|
||||||
"pyasn1",
|
|
||||||
"pyasn1_modules",
|
|
||||||
"typing_extensions; python_version < '3.8.0'",
|
|
||||||
]
|
|
||||||
classifiers = [
|
|
||||||
'Intended Audience :: Developers',
|
|
||||||
'License :: OSI Approved :: MIT License',
|
|
||||||
'Programming Language :: Python',
|
|
||||||
'Programming Language :: Python :: 3.7',
|
|
||||||
'Programming Language :: Python :: 3.8',
|
|
||||||
'Programming Language :: Python :: 3.9',
|
|
||||||
'Topic :: Internet :: XMPP',
|
|
||||||
'Topic :: Software Development :: Libraries :: Python Modules',
|
|
||||||
]
|
|
||||||
readme = "README.rst"
|
|
||||||
license = { file = "LICENSE" }
|
|
||||||
|
|
||||||
[[project.authors]]
|
|
||||||
name = "Florent Le Coz"
|
|
||||||
email = "louiz@louiz.org"
|
|
||||||
|
|
||||||
[project.urls]
|
|
||||||
Repository = 'https://codeberg.org/poezio/slixmpp'
|
|
||||||
|
|
||||||
[project.optional-dependencies]
|
|
||||||
XEP-0363 = ['aiohttp']
|
|
||||||
XEP-0444 = ['emoji']
|
|
||||||
XEP-0454 = ['cryptography']
|
|
||||||
Safer-XML-parsing = ['defusedxml']
|
|
||||||
|
|
||||||
[build-system]
|
|
||||||
requires = ["setuptools", "cython"]
|
|
||||||
build-backend = "setuptools.build_meta"
|
|
||||||
|
|
||||||
[tool.setuptools]
|
|
||||||
packages = ["slixmpp"]
|
|
||||||
py-modules = ["_custom_build"]
|
|
||||||
|
|
||||||
[tool.setuptools.cmdclass]
|
|
||||||
build_py = "_custom_build.Build"
|
|
||||||
|
|
||||||
[tool.mypy]
|
|
||||||
check_untyped_defs = false
|
|
||||||
ignore_missing_imports = true
|
|
||||||
|
|
||||||
[[tool.mypy.overrides]]
|
|
||||||
module = 'slixmpp.types'
|
|
||||||
ignore_errors = true
|
|
||||||
|
|
||||||
[[tool.mypy.overrides]]
|
|
||||||
module = 'slixmpp.thirdparty.*'
|
|
||||||
ignore_errors = true
|
|
||||||
|
|
||||||
[[tool.mypy.overrides]]
|
|
||||||
module = 'slixmpp.plugins.*'
|
|
||||||
ignore_errors = true
|
|
||||||
|
|
||||||
[[tool.mypy.overrides]]
|
|
||||||
module = 'slixmpp.plugins.base'
|
|
||||||
ignore_errors = false
|
|
||||||
103
setup.py
Executable file
103
setup.py
Executable file
@@ -0,0 +1,103 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
#
|
||||||
|
# Copyright (C) 2007-2011 Nathanael C. Fritz
|
||||||
|
# All Rights Reserved
|
||||||
|
#
|
||||||
|
# This software is licensed as described in the README.rst and LICENSE
|
||||||
|
# file, which you should have received as part of this distribution.
|
||||||
|
|
||||||
|
import runpy
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
from subprocess import call, DEVNULL, check_output, CalledProcessError
|
||||||
|
from tempfile import TemporaryFile
|
||||||
|
try:
|
||||||
|
from setuptools import setup
|
||||||
|
except ImportError:
|
||||||
|
from distutils.core import setup
|
||||||
|
|
||||||
|
from run_tests import TestCommand
|
||||||
|
|
||||||
|
version_mod = runpy.run_path('slixmpp/version.py')
|
||||||
|
VERSION = version_mod['__version__']
|
||||||
|
|
||||||
|
DESCRIPTION = ('Slixmpp is an elegant Python library for XMPP (aka Jabber).')
|
||||||
|
with open('README.rst', encoding='utf8') as readme:
|
||||||
|
LONG_DESCRIPTION = readme.read()
|
||||||
|
|
||||||
|
CLASSIFIERS = [
|
||||||
|
'Intended Audience :: Developers',
|
||||||
|
'License :: OSI Approved :: MIT License',
|
||||||
|
'Programming Language :: Python',
|
||||||
|
'Programming Language :: Python :: 3.7',
|
||||||
|
'Programming Language :: Python :: 3.8',
|
||||||
|
'Programming Language :: Python :: 3.9',
|
||||||
|
'Topic :: Internet :: XMPP',
|
||||||
|
'Topic :: Software Development :: Libraries :: Python Modules',
|
||||||
|
]
|
||||||
|
|
||||||
|
packages = [str(mod.parent) for mod in Path('slixmpp').rglob('__init__.py')]
|
||||||
|
|
||||||
|
def check_include(library_name, header):
|
||||||
|
command = [os.environ.get('PKG_CONFIG', 'pkg-config'), '--cflags', library_name]
|
||||||
|
try:
|
||||||
|
cflags = check_output(command).decode('utf-8').split()
|
||||||
|
except FileNotFoundError:
|
||||||
|
print('pkg-config not found.')
|
||||||
|
return False
|
||||||
|
except CalledProcessError:
|
||||||
|
# pkg-config already prints the missing libraries on stderr.
|
||||||
|
return False
|
||||||
|
command = [os.environ.get('CC', 'cc')] + cflags + ['-E', '-']
|
||||||
|
with TemporaryFile('w+') as c_file:
|
||||||
|
c_file.write('#include <%s>' % header)
|
||||||
|
c_file.seek(0)
|
||||||
|
try:
|
||||||
|
return call(command, stdin=c_file, stdout=DEVNULL, stderr=DEVNULL) == 0
|
||||||
|
except FileNotFoundError:
|
||||||
|
print('%s headers not found.' % library_name)
|
||||||
|
return False
|
||||||
|
|
||||||
|
HAS_PYTHON_HEADERS = check_include('python3', 'Python.h')
|
||||||
|
HAS_STRINGPREP_HEADERS = check_include('libidn', 'stringprep.h')
|
||||||
|
|
||||||
|
ext_modules = None
|
||||||
|
if HAS_PYTHON_HEADERS and HAS_STRINGPREP_HEADERS:
|
||||||
|
try:
|
||||||
|
from Cython.Build import cythonize
|
||||||
|
except ImportError:
|
||||||
|
print('Cython not found, falling back to the slow stringprep module.')
|
||||||
|
else:
|
||||||
|
ext_modules = cythonize('slixmpp/stringprep.pyx')
|
||||||
|
else:
|
||||||
|
print('Falling back to the slow stringprep module.')
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name="slixmpp",
|
||||||
|
version=VERSION,
|
||||||
|
description=DESCRIPTION,
|
||||||
|
long_description=LONG_DESCRIPTION,
|
||||||
|
author='Florent Le Coz',
|
||||||
|
author_email='louiz@louiz.org',
|
||||||
|
url='https://codeberg.org/poezio/slixmpp',
|
||||||
|
license='MIT',
|
||||||
|
platforms=['any'],
|
||||||
|
package_data={'slixmpp': ['py.typed']},
|
||||||
|
packages=packages,
|
||||||
|
ext_modules=ext_modules,
|
||||||
|
install_requires=[
|
||||||
|
'aiodns>=1.0',
|
||||||
|
'pyasn1',
|
||||||
|
'pyasn1_modules',
|
||||||
|
'typing_extensions; python_version < "3.8.0"',
|
||||||
|
],
|
||||||
|
extras_require={
|
||||||
|
'XEP-0363': ['aiohttp'],
|
||||||
|
'XEP-0444 compliance': ['emoji'],
|
||||||
|
'XEP-0454': ['cryptography'],
|
||||||
|
'Safer XML parsing': ['defusedxml'],
|
||||||
|
},
|
||||||
|
classifiers=CLASSIFIERS,
|
||||||
|
cmdclass={'test': TestCommand}
|
||||||
|
)
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
from slixmpp.plugins.base import register_plugin
|
from slixmpp.plugins.base import register_plugin
|
||||||
|
|
||||||
from slixmpp.plugins.xep_0356 import stanza
|
from . import stanza
|
||||||
from slixmpp.plugins.xep_0356.stanza import Perm, Privilege
|
from .privilege import XEP_0356
|
||||||
from slixmpp.plugins.xep_0356.privilege import XEP_0356
|
from .stanza import Perm, Privilege
|
||||||
|
|
||||||
register_plugin(XEP_0356)
|
register_plugin(XEP_0356)
|
||||||
|
|||||||
36
slixmpp/plugins/xep_0356/permissions.py
Normal file
36
slixmpp/plugins/xep_0356/permissions.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import dataclasses
|
||||||
|
from collections import defaultdict
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
class RosterAccess(str, Enum):
|
||||||
|
NONE = "none"
|
||||||
|
GET = "get"
|
||||||
|
SET = "set"
|
||||||
|
BOTH = "both"
|
||||||
|
|
||||||
|
|
||||||
|
class MessagePermission(str, Enum):
|
||||||
|
NONE = "none"
|
||||||
|
OUTGOING = "outgoing"
|
||||||
|
|
||||||
|
|
||||||
|
class IqPermission(str, Enum):
|
||||||
|
NONE = "none"
|
||||||
|
GET = "get"
|
||||||
|
SET = "set"
|
||||||
|
BOTH = "both"
|
||||||
|
|
||||||
|
|
||||||
|
class PresencePermission(str, Enum):
|
||||||
|
NONE = "none"
|
||||||
|
MANAGED_ENTITY = "managed_entity"
|
||||||
|
ROSTER = "roster"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Permissions:
|
||||||
|
roster = RosterAccess.NONE
|
||||||
|
message = MessagePermission.NONE
|
||||||
|
iq = defaultdict(lambda: IqPermission.NONE)
|
||||||
|
presence = PresencePermission.NONE
|
||||||
@@ -1,14 +1,16 @@
|
|||||||
import logging
|
import logging
|
||||||
import typing
|
import typing
|
||||||
|
import uuid
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
from slixmpp import Message, JID, Iq
|
from slixmpp import JID, Iq, Message
|
||||||
from slixmpp.plugins.base import BasePlugin
|
from slixmpp.plugins.base import BasePlugin
|
||||||
from slixmpp.xmlstream.matcher import StanzaPath
|
from slixmpp.xmlstream import StanzaBase
|
||||||
from slixmpp.xmlstream.handler import Callback
|
from slixmpp.xmlstream.handler import Callback
|
||||||
from slixmpp.xmlstream import register_stanza_plugin
|
from slixmpp.xmlstream.matcher import StanzaPath
|
||||||
|
|
||||||
from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
|
|
||||||
|
|
||||||
|
from . import stanza
|
||||||
|
from .permissions import IqPermission, MessagePermission, Permissions, RosterAccess
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -29,7 +31,7 @@ class XEP_0356(BasePlugin):
|
|||||||
dependencies = {"xep_0297"}
|
dependencies = {"xep_0297"}
|
||||||
stanza = stanza
|
stanza = stanza
|
||||||
|
|
||||||
granted_privileges = {"roster": "none", "message": "none", "presence": "none"}
|
granted_privileges = defaultdict(Permissions)
|
||||||
|
|
||||||
def plugin_init(self):
|
def plugin_init(self):
|
||||||
if not self.xmpp.is_component:
|
if not self.xmpp.is_component:
|
||||||
@@ -49,32 +51,42 @@ class XEP_0356(BasePlugin):
|
|||||||
def plugin_end(self):
|
def plugin_end(self):
|
||||||
self.xmpp.remove_handler("Privileges")
|
self.xmpp.remove_handler("Privileges")
|
||||||
|
|
||||||
def _handle_privilege(self, msg: Message):
|
def _handle_privilege(self, msg: StanzaBase):
|
||||||
"""
|
"""
|
||||||
Called when the XMPP server advertise the component's privileges.
|
Called when the XMPP server advertise the component's privileges.
|
||||||
|
|
||||||
Stores the privileges in this instance's granted_privileges attribute (a dict)
|
Stores the privileges in this instance's granted_privileges attribute (a dict)
|
||||||
and raises the privileges_advertised event
|
and raises the privileges_advertised event
|
||||||
"""
|
"""
|
||||||
|
permissions = self.granted_privileges[msg.get_from()]
|
||||||
for perm in msg["privilege"]["perms"]:
|
for perm in msg["privilege"]["perms"]:
|
||||||
self.granted_privileges[perm["access"]] = perm["type"]
|
access = perm["access"]
|
||||||
|
if access == "iq":
|
||||||
|
for ns in perm["namespaces"]:
|
||||||
|
permissions.iq[ns["ns"]] = ns["type"]
|
||||||
|
elif access in _VALID_ACCESSES:
|
||||||
|
setattr(permissions, access, perm["type"])
|
||||||
|
else:
|
||||||
|
log.warning("Received an invalid privileged access: %s", access)
|
||||||
log.debug(f"Privileges: {self.granted_privileges}")
|
log.debug(f"Privileges: {self.granted_privileges}")
|
||||||
self.xmpp.event("privileges_advertised")
|
self.xmpp.event("privileges_advertised")
|
||||||
|
|
||||||
def send_privileged_message(self, msg: Message):
|
def send_privileged_message(self, msg: Message):
|
||||||
if self.granted_privileges["message"] == "outgoing":
|
if (
|
||||||
self._make_privileged_message(msg).send()
|
self.granted_privileges[msg.get_from().domain].message
|
||||||
else:
|
!= MessagePermission.OUTGOING
|
||||||
log.error(
|
):
|
||||||
|
raise PermissionError(
|
||||||
"The server hasn't authorized us to send messages on behalf of other users"
|
"The server hasn't authorized us to send messages on behalf of other users"
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
self._make_privileged_message(msg).send()
|
||||||
|
|
||||||
def _make_privileged_message(self, msg: Message):
|
def _make_privileged_message(self, msg: Message):
|
||||||
stanza = self.xmpp.make_message(
|
server = msg.get_from().domain
|
||||||
mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
|
wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
|
||||||
)
|
wrapped["privilege"]["forwarded"].append(msg)
|
||||||
stanza["privilege"]["forwarded"].append(msg)
|
return wrapped
|
||||||
return stanza
|
|
||||||
|
|
||||||
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
|
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
|
||||||
return self.xmpp.make_iq_get(
|
return self.xmpp.make_iq_get(
|
||||||
@@ -106,9 +118,15 @@ class XEP_0356(BasePlugin):
|
|||||||
|
|
||||||
:param jid: user we want to fetch the roster from
|
:param jid: user we want to fetch the roster from
|
||||||
"""
|
"""
|
||||||
if self.granted_privileges["roster"] not in ("get", "both"):
|
if isinstance(jid, str):
|
||||||
log.error("The server did not grant us privileges to get rosters")
|
jid = JID(jid)
|
||||||
raise ValueError
|
if self.granted_privileges[jid.domain].roster not in (
|
||||||
|
RosterAccess.GET,
|
||||||
|
RosterAccess.BOTH,
|
||||||
|
):
|
||||||
|
raise PermissionError(
|
||||||
|
"The server did not grant us privileges to get rosters"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
return await self._make_get_roster(jid).send(**send_kwargs)
|
return await self._make_get_roster(jid).send(**send_kwargs)
|
||||||
|
|
||||||
@@ -137,8 +155,56 @@ class XEP_0356(BasePlugin):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
if self.granted_privileges["roster"] not in ("set", "both"):
|
if isinstance(jid, str):
|
||||||
log.error("The server did not grant us privileges to set rosters")
|
jid = JID(jid)
|
||||||
raise ValueError
|
if self.granted_privileges[jid.domain].roster not in (
|
||||||
|
RosterAccess.GET,
|
||||||
|
RosterAccess.BOTH,
|
||||||
|
):
|
||||||
|
raise PermissionError(
|
||||||
|
"The server did not grant us privileges to set rosters"
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
|
return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
|
||||||
|
|
||||||
|
async def send_privileged_iq(
|
||||||
|
self, encapsulated_iq: Iq, iq_id: typing.Optional[str] = None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Send an IQ on behalf of a user
|
||||||
|
|
||||||
|
Caution: the IQ *must* have the jabber:client namespace
|
||||||
|
"""
|
||||||
|
iq_id = iq_id or str(uuid.uuid4())
|
||||||
|
encapsulated_iq["id"] = iq_id
|
||||||
|
server = encapsulated_iq.get_to().domain
|
||||||
|
perms = self.granted_privileges.get(server)
|
||||||
|
if not perms:
|
||||||
|
raise PermissionError(f"{server} has not granted us any privilege")
|
||||||
|
itype = encapsulated_iq["type"]
|
||||||
|
for ns in encapsulated_iq.plugins.values():
|
||||||
|
type_ = perms.iq[ns.namespace]
|
||||||
|
if type_ == IqPermission.NONE:
|
||||||
|
raise PermissionError(
|
||||||
|
f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
|
||||||
|
)
|
||||||
|
elif type_ == IqPermission.BOTH:
|
||||||
|
pass
|
||||||
|
elif type_ != itype:
|
||||||
|
raise PermissionError(
|
||||||
|
f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
|
||||||
|
)
|
||||||
|
iq = self.xmpp.make_iq(
|
||||||
|
itype=itype,
|
||||||
|
ifrom=self.xmpp.boundjid.bare,
|
||||||
|
ito=encapsulated_iq.get_from(),
|
||||||
|
id=iq_id,
|
||||||
|
)
|
||||||
|
iq["privileged_iq"].append(encapsulated_iq)
|
||||||
|
|
||||||
|
resp = await iq.send()
|
||||||
|
return resp["privilege"]["forwarded"]["iq"]
|
||||||
|
|
||||||
|
|
||||||
|
# does not include iq access that is handled differently
|
||||||
|
_VALID_ACCESSES = {"message", "roster", "presence"}
|
||||||
|
|||||||
@@ -1,13 +1,12 @@
|
|||||||
from slixmpp.stanza import Message
|
|
||||||
from slixmpp.xmlstream import (
|
|
||||||
ElementBase,
|
|
||||||
register_stanza_plugin,
|
|
||||||
)
|
|
||||||
from slixmpp.plugins.xep_0297 import Forwarded
|
from slixmpp.plugins.xep_0297 import Forwarded
|
||||||
|
from slixmpp.stanza import Iq, Message
|
||||||
|
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
|
||||||
|
|
||||||
|
NS = "urn:xmpp:privilege:2"
|
||||||
|
|
||||||
|
|
||||||
class Privilege(ElementBase):
|
class Privilege(ElementBase):
|
||||||
namespace = "urn:xmpp:privilege:2"
|
namespace = NS
|
||||||
name = "privilege"
|
name = "privilege"
|
||||||
plugin_attrib = "privilege"
|
plugin_attrib = "privilege"
|
||||||
|
|
||||||
@@ -25,26 +24,40 @@ class Privilege(ElementBase):
|
|||||||
def presence(self):
|
def presence(self):
|
||||||
return self.permission("presence")
|
return self.permission("presence")
|
||||||
|
|
||||||
def iq(self):
|
def add_perm(self, access, type_):
|
||||||
return self.permission("iq")
|
|
||||||
|
|
||||||
def add_perm(self, access, type):
|
|
||||||
# This should only be needed for servers, so maybe out of scope for slixmpp
|
# This should only be needed for servers, so maybe out of scope for slixmpp
|
||||||
perm = Perm()
|
perm = Perm()
|
||||||
perm["type"] = type
|
perm["type"] = type_
|
||||||
perm["access"] = access
|
perm["access"] = access
|
||||||
self.append(perm)
|
self.append(perm)
|
||||||
|
|
||||||
|
|
||||||
class Perm(ElementBase):
|
class Perm(ElementBase):
|
||||||
namespace = "urn:xmpp:privilege:2"
|
namespace = NS
|
||||||
name = "perm"
|
name = "perm"
|
||||||
plugin_attrib = "perm"
|
plugin_attrib = "perm"
|
||||||
plugin_multi_attrib = "perms"
|
plugin_multi_attrib = "perms"
|
||||||
interfaces = {"type", "access"}
|
interfaces = {"type", "access"}
|
||||||
|
|
||||||
|
|
||||||
|
class NameSpace(ElementBase):
|
||||||
|
namespace = NS
|
||||||
|
name = "namespace"
|
||||||
|
plugin_attrib = "namespace"
|
||||||
|
plugin_multi_attrib = "namespaces"
|
||||||
|
interfaces = {"ns", "type"}
|
||||||
|
|
||||||
|
|
||||||
|
class PrivilegedIq(ElementBase):
|
||||||
|
namespace = NS
|
||||||
|
name = "privileged_iq"
|
||||||
|
plugin_attrib = "privileged_iq"
|
||||||
|
|
||||||
|
|
||||||
def register():
|
def register():
|
||||||
register_stanza_plugin(Message, Privilege)
|
register_stanza_plugin(Message, Privilege)
|
||||||
|
register_stanza_plugin(Iq, Privilege)
|
||||||
register_stanza_plugin(Privilege, Forwarded)
|
register_stanza_plugin(Privilege, Forwarded)
|
||||||
register_stanza_plugin(Privilege, Perm, iterable=True)
|
register_stanza_plugin(Privilege, Perm, iterable=True)
|
||||||
|
register_stanza_plugin(Perm, NameSpace, iterable=True)
|
||||||
|
register_stanza_plugin(Iq, PrivilegedIq)
|
||||||
|
|||||||
@@ -1,9 +1,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from slixmpp import Message
|
|
||||||
from slixmpp.test import SlixTest
|
from slixmpp.test import SlixTest
|
||||||
from slixmpp.xmlstream import register_stanza_plugin
|
|
||||||
|
|
||||||
from slixmpp.plugins.xep_0356 import stanza
|
from slixmpp.plugins.xep_0356 import stanza, permissions
|
||||||
|
|
||||||
|
|
||||||
class TestPermissions(SlixTest):
|
class TestPermissions(SlixTest):
|
||||||
@@ -12,30 +10,57 @@ class TestPermissions(SlixTest):
|
|||||||
|
|
||||||
def testAdvertisePermission(self):
|
def testAdvertisePermission(self):
|
||||||
xmlstring = """
|
xmlstring = """
|
||||||
<message from='capulet.net' to='pubub.capulet.lit'>
|
<message from='capulet.lit' to='pubsub.capulet.lit'>
|
||||||
<privilege xmlns='urn:xmpp:privilege:2'>
|
<privilege xmlns='urn:xmpp:privilege:2'>
|
||||||
<perm access='roster' type='both'/>
|
<perm access='roster' type='both'/>
|
||||||
<perm access='message' type='outgoing'/>
|
<perm access='message' type='outgoing'/>
|
||||||
<perm access='presence' type='managed_entity'/>
|
<perm access='presence' type='managed_entity'/>
|
||||||
|
<perm access='iq' type='both'/>
|
||||||
</privilege>
|
</privilege>
|
||||||
</message>
|
</message>
|
||||||
"""
|
"""
|
||||||
msg = self.Message()
|
msg = self.Message()
|
||||||
msg["from"] = "capulet.net"
|
msg["from"] = "capulet.lit"
|
||||||
msg["to"] = "pubub.capulet.lit"
|
msg["to"] = "pubsub.capulet.lit"
|
||||||
# This raises AttributeError: 'NoneType' object has no attribute 'use_origin_id'
|
|
||||||
# msg["id"] = "id"
|
|
||||||
|
|
||||||
for access, type_ in [
|
for access, type_ in [
|
||||||
("roster", "both"),
|
("roster", permissions.RosterAccess.BOTH),
|
||||||
("message", "outgoing"),
|
("message", permissions.MessagePermission.OUTGOING),
|
||||||
("presence", "managed_entity"),
|
("presence", permissions.PresencePermission.MANAGED_ENTITY),
|
||||||
|
("iq", permissions.IqPermission.BOTH),
|
||||||
]:
|
]:
|
||||||
msg["privilege"].add_perm(access, type_)
|
msg["privilege"].add_perm(access, type_)
|
||||||
|
|
||||||
self.check(msg, xmlstring)
|
self.check(msg, xmlstring)
|
||||||
# Should this one work? → # AttributeError: 'Message' object has no attribute 'permission'
|
|
||||||
# self.assertEqual(msg.permission["roster"], "both")
|
def testIqPermission(self):
|
||||||
|
x = stanza.Privilege()
|
||||||
|
x["access"] = "iq"
|
||||||
|
ns = stanza.NameSpace()
|
||||||
|
ns["ns"] = "some_ns"
|
||||||
|
ns["type"] = "get"
|
||||||
|
x["perm"]["access"] = "iq"
|
||||||
|
x["perm"].append(ns)
|
||||||
|
ns = stanza.NameSpace()
|
||||||
|
ns["ns"] = "some_other_ns"
|
||||||
|
ns["type"] = "both"
|
||||||
|
x["perm"].append(ns)
|
||||||
|
self.check(
|
||||||
|
x,
|
||||||
|
"""
|
||||||
|
<privilege xmlns='urn:xmpp:privilege:2'>
|
||||||
|
<perm access='iq'>
|
||||||
|
<namespace ns='some_ns' type='get' />
|
||||||
|
<namespace ns='some_other_ns' type='both' />
|
||||||
|
</perm>
|
||||||
|
</privilege>
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
nss = set()
|
||||||
|
for perm in x["perms"]:
|
||||||
|
for ns in perm["namespaces"]:
|
||||||
|
nss.add((ns["ns"], ns["type"]))
|
||||||
|
assert nss == {("some_ns", "get"), ("some_other_ns", "both")}
|
||||||
|
|
||||||
|
|
||||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from slixmpp import ComponentXMPP, Iq, Message
|
from slixmpp import Message, JID, Iq
|
||||||
from slixmpp.roster import RosterItem
|
from slixmpp.plugins.xep_0356 import permissions
|
||||||
from slixmpp.test import SlixTest
|
from slixmpp.test import SlixTest
|
||||||
|
|
||||||
|
|
||||||
@@ -9,9 +9,9 @@ class TestPermissions(SlixTest):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.stream_start(
|
self.stream_start(
|
||||||
mode="component",
|
mode="component",
|
||||||
plugins=["xep_0356"],
|
plugins=["xep_0356", "xep_0045"],
|
||||||
jid="pubsub.capulet.lit",
|
jid="pubsub.capulet.lit",
|
||||||
server="capulet.net",
|
server="capulet.lit",
|
||||||
)
|
)
|
||||||
|
|
||||||
def testPluginEnd(self):
|
def testPluginEnd(self):
|
||||||
@@ -23,26 +23,44 @@ class TestPermissions(SlixTest):
|
|||||||
self.assertFalse(exc)
|
self.assertFalse(exc)
|
||||||
|
|
||||||
def testGrantedPrivileges(self):
|
def testGrantedPrivileges(self):
|
||||||
# https://xmpp.org/extensions/xep-0356.html#example-4
|
|
||||||
results = {"event": False}
|
results = {"event": False}
|
||||||
|
x = self.xmpp["xep_0356"]
|
||||||
self.xmpp.add_event_handler(
|
self.xmpp.add_event_handler(
|
||||||
"privileges_advertised", lambda msg: results.__setitem__("event", True)
|
"privileges_advertised", lambda msg: results.__setitem__("event", True)
|
||||||
)
|
)
|
||||||
self.recv(
|
self.recv(
|
||||||
"""
|
"""
|
||||||
<message from='capulet.net' to='pubub.capulet.lit' id='54321'>
|
<message from='capulet.lit' to='pubsub.capulet.lit' id='54321'>
|
||||||
<privilege xmlns='urn:xmpp:privilege:2'>
|
<privilege xmlns='urn:xmpp:privilege:2'>
|
||||||
<perm access='roster' type='both'/>
|
<perm access='roster' type='both'/>
|
||||||
<perm access='message' type='outgoing'/>
|
<perm access='message' type='outgoing'/>
|
||||||
|
<perm access='iq'>
|
||||||
|
<namespace ns='some_ns' type='get' />
|
||||||
|
<namespace ns='some_other_ns' type='both' />
|
||||||
|
</perm>
|
||||||
</privilege>
|
</privilege>
|
||||||
</message>
|
</message>
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["roster"], "both")
|
server = JID("capulet.lit")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self.xmpp["xep_0356"].granted_privileges["message"], "outgoing"
|
x.granted_privileges[server].roster, permissions.RosterAccess.BOTH
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
x.granted_privileges[server].message, permissions.MessagePermission.OUTGOING
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
x.granted_privileges[server].presence, permissions.PresencePermission.NONE
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
x.granted_privileges[server].iq["nope"], permissions.IqPermission.NONE
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
x.granted_privileges[server].iq["some_ns"], permissions.IqPermission.GET
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
x.granted_privileges[server].iq["some_other_ns"], permissions.IqPermission.BOTH
|
||||||
)
|
)
|
||||||
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["presence"], "none")
|
|
||||||
self.assertTrue(results["event"])
|
self.assertTrue(results["event"])
|
||||||
|
|
||||||
def testGetRosterIq(self):
|
def testGetRosterIq(self):
|
||||||
@@ -94,7 +112,7 @@ class TestPermissions(SlixTest):
|
|||||||
|
|
||||||
def testMakeOutgoingMessage(self):
|
def testMakeOutgoingMessage(self):
|
||||||
xmlstring = """
|
xmlstring = """
|
||||||
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.net'>
|
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.lit'>
|
||||||
<privilege xmlns='urn:xmpp:privilege:2'>
|
<privilege xmlns='urn:xmpp:privilege:2'>
|
||||||
<forwarded xmlns='urn:xmpp:forward:0'>
|
<forwarded xmlns='urn:xmpp:forward:0'>
|
||||||
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
|
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
|
||||||
@@ -108,9 +126,49 @@ class TestPermissions(SlixTest):
|
|||||||
msg["from"] = "juliet@capulet.lit"
|
msg["from"] = "juliet@capulet.lit"
|
||||||
msg["to"] = "romeo@montague.lit"
|
msg["to"] = "romeo@montague.lit"
|
||||||
msg["body"] = "I do not hate you"
|
msg["body"] = "I do not hate you"
|
||||||
|
|
||||||
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
|
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
|
||||||
self.check(priv_msg, xmlstring, use_values=False)
|
self.check(priv_msg, xmlstring, use_values=False)
|
||||||
|
|
||||||
|
def testDetectServer(self):
|
||||||
|
msg = Message()
|
||||||
|
msg["from"] = "juliet@something"
|
||||||
|
msg["to"] = "romeo@montague.lit"
|
||||||
|
msg["body"] = "I do not hate you"
|
||||||
|
|
||||||
|
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
|
||||||
|
assert priv_msg.get_to() == "something"
|
||||||
|
assert priv_msg.get_from() == "pubsub.capulet.lit"
|
||||||
|
|
||||||
|
def testIqOnBehalf(self):
|
||||||
|
iq = Iq()
|
||||||
|
iq["mucadmin_query"]["item"]["affiliation"] = "member"
|
||||||
|
iq.set_from("juliet@xxx")
|
||||||
|
iq.set_to("somemuc@conf")
|
||||||
|
iq.set_type("get")
|
||||||
|
self.xmpp["xep_0356"].granted_privileges["conf"].iq["http://jabber.org/protocol/muc#admin"] = permissions.IqPermission.BOTH
|
||||||
|
r = self.xmpp.loop.create_task(self.xmpp["xep_0356"].send_privileged_iq(iq, iq_id="0"))
|
||||||
|
self.send(
|
||||||
|
"""
|
||||||
|
<iq from="pubsub.capulet.lit"
|
||||||
|
to="juliet@xxx"
|
||||||
|
xmlns="jabber:component:accept"
|
||||||
|
type="get" id="0">
|
||||||
|
<privileged_iq xmlns='urn:xmpp:privilege:2'>
|
||||||
|
<iq xmlns='jabber:client'
|
||||||
|
type='get'
|
||||||
|
to='somemuc@conf'
|
||||||
|
from='juliet@xxx'
|
||||||
|
id="0">
|
||||||
|
<query xmlns='http://jabber.org/protocol/muc#admin'>
|
||||||
|
<item affiliation='member'/>
|
||||||
|
</query>
|
||||||
|
</iq>
|
||||||
|
</privileged_iq>
|
||||||
|
</iq>
|
||||||
|
""",
|
||||||
|
use_values=False
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)
|
||||||
|
|||||||
Reference in New Issue
Block a user