Compare commits

...

5 Commits

Author SHA1 Message Date
nicoco
985926ed7b XEP-0461: rely on XEP-0428 for fallback
Breaks the previous fallback helpers, we now
rely on XEP-0461 instead
2023-12-19 14:15:24 +00:00
nicoco
8d63bd68cf XEP-0428: add fallback body and subject elements
+ tests
+ helpers to strip the fallback content
2023-12-19 14:15:24 +00:00
nicoco
465e735d18 ElementBase: add weak ref to parent when using append() 2023-12-19 14:15:24 +00:00
nicoco
fea4ee83be fix slixmpp.xmlstream.__all__ 2023-12-19 14:15:24 +00:00
Nicolas Cedilnik
76a11d4899 xep0356: implement IQ privilege
Also included:

- correctly handle privileges from different
  servers
- check that privileges have been granted before
  attempting to send something and raise
  PermissionError if not
- use dataclass and enums to store permissions instead of
  untyped dict
2023-12-19 14:14:16 +00:00
14 changed files with 526 additions and 142 deletions

View File

@@ -1,7 +1,7 @@
from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0356 import stanza
from slixmpp.plugins.xep_0356.stanza import Perm, Privilege
from slixmpp.plugins.xep_0356.privilege import XEP_0356
from . import stanza
from .privilege import XEP_0356
from .stanza import Perm, Privilege
register_plugin(XEP_0356)

View File

@@ -0,0 +1,36 @@
import dataclasses
from collections import defaultdict
from enum import Enum
class RosterAccess(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class MessagePermission(str, Enum):
NONE = "none"
OUTGOING = "outgoing"
class IqPermission(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class PresencePermission(str, Enum):
NONE = "none"
MANAGED_ENTITY = "managed_entity"
ROSTER = "roster"
@dataclasses.dataclass
class Permissions:
roster = RosterAccess.NONE
message = MessagePermission.NONE
iq = defaultdict(lambda: IqPermission.NONE)
presence = PresencePermission.NONE

View File

@@ -1,14 +1,16 @@
import logging
import typing
import uuid
from collections import defaultdict
from slixmpp import Message, JID, Iq
from slixmpp import JID, Iq, Message
from slixmpp.plugins.base import BasePlugin
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import StanzaBase
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
from slixmpp.xmlstream.matcher import StanzaPath
from . import stanza
from .permissions import IqPermission, MessagePermission, Permissions, RosterAccess
log = logging.getLogger(__name__)
@@ -29,7 +31,7 @@ class XEP_0356(BasePlugin):
dependencies = {"xep_0297"}
stanza = stanza
granted_privileges = {"roster": "none", "message": "none", "presence": "none"}
granted_privileges = defaultdict(Permissions)
def plugin_init(self):
if not self.xmpp.is_component:
@@ -49,32 +51,42 @@ class XEP_0356(BasePlugin):
def plugin_end(self):
self.xmpp.remove_handler("Privileges")
def _handle_privilege(self, msg: Message):
def _handle_privilege(self, msg: StanzaBase):
"""
Called when the XMPP server advertise the component's privileges.
Stores the privileges in this instance's granted_privileges attribute (a dict)
and raises the privileges_advertised event
"""
permissions = self.granted_privileges[msg.get_from()]
for perm in msg["privilege"]["perms"]:
self.granted_privileges[perm["access"]] = perm["type"]
access = perm["access"]
if access == "iq":
for ns in perm["namespaces"]:
permissions.iq[ns["ns"]] = ns["type"]
elif access in _VALID_ACCESSES:
setattr(permissions, access, perm["type"])
else:
log.warning("Received an invalid privileged access: %s", access)
log.debug(f"Privileges: {self.granted_privileges}")
self.xmpp.event("privileges_advertised")
def send_privileged_message(self, msg: Message):
if self.granted_privileges["message"] == "outgoing":
self._make_privileged_message(msg).send()
else:
log.error(
if (
self.granted_privileges[msg.get_from().domain].message
!= MessagePermission.OUTGOING
):
raise PermissionError(
"The server hasn't authorized us to send messages on behalf of other users"
)
else:
self._make_privileged_message(msg).send()
def _make_privileged_message(self, msg: Message):
stanza = self.xmpp.make_message(
mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
)
stanza["privilege"]["forwarded"].append(msg)
return stanza
server = msg.get_from().domain
wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
wrapped["privilege"]["forwarded"].append(msg)
return wrapped
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
return self.xmpp.make_iq_get(
@@ -106,9 +118,15 @@ class XEP_0356(BasePlugin):
:param jid: user we want to fetch the roster from
"""
if self.granted_privileges["roster"] not in ("get", "both"):
log.error("The server did not grant us privileges to get rosters")
raise ValueError
if isinstance(jid, str):
jid = JID(jid)
if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to get rosters"
)
else:
return await self._make_get_roster(jid).send(**send_kwargs)
@@ -137,8 +155,56 @@ class XEP_0356(BasePlugin):
},
}
"""
if self.granted_privileges["roster"] not in ("set", "both"):
log.error("The server did not grant us privileges to set rosters")
raise ValueError
if isinstance(jid, str):
jid = JID(jid)
if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to set rosters"
)
else:
return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
async def send_privileged_iq(
self, encapsulated_iq: Iq, iq_id: typing.Optional[str] = None
):
"""
Send an IQ on behalf of a user
Caution: the IQ *must* have the jabber:client namespace
"""
iq_id = iq_id or str(uuid.uuid4())
encapsulated_iq["id"] = iq_id
server = encapsulated_iq.get_to().domain
perms = self.granted_privileges.get(server)
if not perms:
raise PermissionError(f"{server} has not granted us any privilege")
itype = encapsulated_iq["type"]
for ns in encapsulated_iq.plugins.values():
type_ = perms.iq[ns.namespace]
if type_ == IqPermission.NONE:
raise PermissionError(
f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
)
elif type_ == IqPermission.BOTH:
pass
elif type_ != itype:
raise PermissionError(
f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
)
iq = self.xmpp.make_iq(
itype=itype,
ifrom=self.xmpp.boundjid.bare,
ito=encapsulated_iq.get_from(),
id=iq_id,
)
iq["privileged_iq"].append(encapsulated_iq)
resp = await iq.send()
return resp["privilege"]["forwarded"]["iq"]
# does not include iq access that is handled differently
_VALID_ACCESSES = {"message", "roster", "presence"}

View File

@@ -1,13 +1,12 @@
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
register_stanza_plugin,
)
from slixmpp.plugins.xep_0297 import Forwarded
from slixmpp.stanza import Iq, Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
NS = "urn:xmpp:privilege:2"
class Privilege(ElementBase):
namespace = "urn:xmpp:privilege:2"
namespace = NS
name = "privilege"
plugin_attrib = "privilege"
@@ -25,26 +24,40 @@ class Privilege(ElementBase):
def presence(self):
return self.permission("presence")
def iq(self):
return self.permission("iq")
def add_perm(self, access, type):
def add_perm(self, access, type_):
# This should only be needed for servers, so maybe out of scope for slixmpp
perm = Perm()
perm["type"] = type
perm["type"] = type_
perm["access"] = access
self.append(perm)
class Perm(ElementBase):
namespace = "urn:xmpp:privilege:2"
namespace = NS
name = "perm"
plugin_attrib = "perm"
plugin_multi_attrib = "perms"
interfaces = {"type", "access"}
class NameSpace(ElementBase):
namespace = NS
name = "namespace"
plugin_attrib = "namespace"
plugin_multi_attrib = "namespaces"
interfaces = {"ns", "type"}
class PrivilegedIq(ElementBase):
namespace = NS
name = "privileged_iq"
plugin_attrib = "privileged_iq"
def register():
register_stanza_plugin(Message, Privilege)
register_stanza_plugin(Iq, Privilege)
register_stanza_plugin(Privilege, Forwarded)
register_stanza_plugin(Privilege, Perm, iterable=True)
register_stanza_plugin(Perm, NameSpace, iterable=True)
register_stanza_plugin(Iq, PrivilegedIq)

View File

@@ -1,8 +1,13 @@
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp.
# See the file LICENSE for copying permissio
from abc import ABC
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
@@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
)
NS = 'urn:xmpp:fallback:0'
NS = "urn:xmpp:fallback:0"
class Fallback(ElementBase):
namespace = NS
name = 'fallback'
plugin_attrib = 'fallback'
name = "fallback"
plugin_attrib = "fallback"
plugin_multi_attrib = "fallbacks"
interfaces = {"for"}
def _find_fallback(self, fallback_for: str) -> "Fallback":
if self["for"] == fallback_for:
return self
for fallback in self.parent()["fallbacks"]:
if fallback["for"] == fallback_for:
return fallback
raise AttributeError("No fallback for this namespace", fallback_for)
def get_stripped_body(
self, fallback_for: str, element: Literal["body", "subject"] = "body"
) -> str:
"""
Get the body of a message, with the fallback part stripped
:param fallback_for: namespace of the fallback to strip
:param element: set this to "subject" get the stripped subject instead
of body
:return: body (or subject) content minus the fallback part
"""
fallback = self._find_fallback(fallback_for)
start = fallback[element]["start"]
end = fallback[element]["end"]
body = self.parent()[element]
if start == end == 0:
return ""
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
class FallbackMixin(ABC):
namespace = NS
name = NotImplemented
plugin_attrib = NotImplemented
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
return _int_or_zero(self._get_attr("start"))
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
return _int_or_zero(self._get_attr("end"))
class FallbackBody(FallbackMixin, ElementBase):
name = plugin_attrib = "body"
class FallbackSubject(FallbackMixin, ElementBase):
name = plugin_attrib = "subject"
def _int_or_zero(v: str):
try:
return int(v)
except ValueError:
return 0
def register_plugins():
register_stanza_plugin(Message, Fallback)
register_stanza_plugin(Message, Fallback, iterable=True)
register_stanza_plugin(Fallback, FallbackBody)
register_stanza_plugin(Fallback, FallbackSubject)

View File

@@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
name = "xep_0461"
description = "XEP-0461: Message Replies"
dependencies = {"xep_0030"}
dependencies = {"xep_0030", "xep_0428"}
stanza = stanza
namespace = stanza.NS

View File

@@ -2,6 +2,7 @@ from typing import Optional
from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0428.stanza import Fallback
NS = "urn:xmpp:reply:0"
@@ -12,39 +13,11 @@ class Reply(ElementBase):
plugin_attrib = "reply"
interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_fallback_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end:
return body[start:end]
else:
return ""
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
"""
Add plain text fallback for clients not implementing XEP-0461.
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
fallback_body attributes accordingly, so that clients implementing
XEP-0461 can hide the fallback text.
@@ -57,39 +30,27 @@ class FeatureFallBack(ElementBase):
if nickname:
quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"]
msg["feature_fallback"]["for"] = NS
msg["feature_fallback"]["fallback_body"]["start"] = 0
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
fallback = Fallback()
fallback["for"] = NS
fallback["body"]["start"] = 0
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
class FallBackBody(ElementBase):
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
namespace = FeatureFallBack.namespace
name = "body"
plugin_attrib = "fallback_body"
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
try:
return int(self._get_attr("start"))
except ValueError:
return 0
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
try:
return int(self._get_attr("end"))
except ValueError:
return 0
def get_fallback_body(self) -> str:
msg = self.parent()
for fallback in msg["fallbacks"]:
if fallback["for"] == NS:
break
else:
return ""
start = fallback["body"]["start"]
end = fallback["body"]["end"]
body = msg["body"]
if start <= end:
return body[start:end]
else:
return ""
def register_plugins():
register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
__all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT']
'ET', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT', 'register_stanza_plugin']

View File

@@ -1243,7 +1243,7 @@ class ElementBase(object):
self.init_plugin(item.__class__.plugin_multi_attrib)
else:
self.iterables.append(item)
item.parent = weakref.ref(self)
return self
def appendxml(self, xml: ET.Element) -> ElementBase:

View File

@@ -1,9 +1,7 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0356 import stanza
from slixmpp.plugins.xep_0356 import stanza, permissions
class TestPermissions(SlixTest):
@@ -12,30 +10,57 @@ class TestPermissions(SlixTest):
def testAdvertisePermission(self):
xmlstring = """
<message from='capulet.net' to='pubub.capulet.lit'>
<message from='capulet.lit' to='pubsub.capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
<perm access='presence' type='managed_entity'/>
<perm access='iq' type='both'/>
</privilege>
</message>
"""
msg = self.Message()
msg["from"] = "capulet.net"
msg["to"] = "pubub.capulet.lit"
# This raises AttributeError: 'NoneType' object has no attribute 'use_origin_id'
# msg["id"] = "id"
msg["from"] = "capulet.lit"
msg["to"] = "pubsub.capulet.lit"
for access, type_ in [
("roster", "both"),
("message", "outgoing"),
("presence", "managed_entity"),
("roster", permissions.RosterAccess.BOTH),
("message", permissions.MessagePermission.OUTGOING),
("presence", permissions.PresencePermission.MANAGED_ENTITY),
("iq", permissions.IqPermission.BOTH),
]:
msg["privilege"].add_perm(access, type_)
self.check(msg, xmlstring)
# Should this one work? → # AttributeError: 'Message' object has no attribute 'permission'
# self.assertEqual(msg.permission["roster"], "both")
def testIqPermission(self):
x = stanza.Privilege()
x["access"] = "iq"
ns = stanza.NameSpace()
ns["ns"] = "some_ns"
ns["type"] = "get"
x["perm"]["access"] = "iq"
x["perm"].append(ns)
ns = stanza.NameSpace()
ns["ns"] = "some_other_ns"
ns["type"] = "both"
x["perm"].append(ns)
self.check(
x,
"""
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege>
"""
)
nss = set()
for perm in x["perms"]:
for ns in perm["namespaces"]:
nss.add((ns["ns"], ns["type"]))
assert nss == {("some_ns", "get"), ("some_other_ns", "both")}
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)

View File

@@ -0,0 +1,149 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza
from slixmpp.plugins import xep_0461
from slixmpp.plugins import xep_0444
class TestFallback(SlixTest):
def setUp(self):
stanza.register_plugins()
def testSingleFallbackBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackSubject(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["subject"]["start"] = 0
message["fallback"]["subject"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<subject start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackWholeBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
def testMultiFallback(self):
message = Message()
f1 = stanza.Fallback()
f1["for"] = "ns1"
f2 = stanza.Fallback()
f2["for"] = "ns2"
message.append(f1)
message.append(f2)
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
</message>
""",
)
for i, fallback in enumerate(message["fallbacks"], start=1):
self.assertEqual(fallback["for"], f"ns{i}")
def testStripFallbackPartOfBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
</fallback>
</message>
""",
)
self.assertEqual(
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
)
def testStripWholeBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
def testStripMultiFallback(self):
message = Message()
message["body"] = "> huuuuu\n👍"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
reaction_fallback = stanza.Fallback()
reaction_fallback["for"] = xep_0444.stanza.NS
reaction_fallback.enable("body")
message.append(reaction_fallback)
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)

View File

@@ -1,11 +1,13 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest):
def setUp(self):
fallback_stanza.register_plugins()
stanza.register_plugins()
def testReply(self):
@@ -26,9 +28,9 @@ class TestReply(SlixTest):
def testFallback(self):
message = Message()
message["body"] = "12345\nrealbody"
message["feature_fallback"]["for"] = "NS"
message["feature_fallback"]["fallback_body"]["start"] = 0
message["feature_fallback"]["fallback_body"]["end"] = 6
message["fallback"]["for"] = "NS"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 6
self.check(
message,
@@ -42,18 +44,18 @@ class TestReply(SlixTest):
""",
)
assert message["feature_fallback"].get_stripped_body() == "realbody"
assert message["fallback"].get_stripped_body("NS") == "realbody"
def testAddFallBackHelper(self):
msg = Message()
msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
# ugly dedent but the test does not pass without it
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
self.check(
msg,
msg, # language=XML
"""
<message xmlns="jabber:client" type="normal">
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
<reply xmlns="urn:xmpp:reply:0" />
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
<body start='0' end='33' />
</fallback>
@@ -67,8 +69,8 @@ class TestReply(SlixTest):
msg = Message()
msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback(body)
body2 = msg["feature_fallback"].get_fallback_body()
msg["reply"].add_quoted_fallback(body)
body2 = msg["reply"].get_fallback_body()
self.assertTrue(body2 == quoted, body2)

View File

@@ -1,7 +1,7 @@
import unittest
from slixmpp import ComponentXMPP, Iq, Message
from slixmpp.roster import RosterItem
from slixmpp import Message, JID, Iq
from slixmpp.plugins.xep_0356 import permissions
from slixmpp.test import SlixTest
@@ -9,9 +9,9 @@ class TestPermissions(SlixTest):
def setUp(self):
self.stream_start(
mode="component",
plugins=["xep_0356"],
plugins=["xep_0356", "xep_0045"],
jid="pubsub.capulet.lit",
server="capulet.net",
server="capulet.lit",
)
def testPluginEnd(self):
@@ -23,26 +23,44 @@ class TestPermissions(SlixTest):
self.assertFalse(exc)
def testGrantedPrivileges(self):
# https://xmpp.org/extensions/xep-0356.html#example-4
results = {"event": False}
x = self.xmpp["xep_0356"]
self.xmpp.add_event_handler(
"privileges_advertised", lambda msg: results.__setitem__("event", True)
)
self.recv(
"""
<message from='capulet.net' to='pubub.capulet.lit' id='54321'>
<message from='capulet.lit' to='pubsub.capulet.lit' id='54321'>
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/>
<perm access='message' type='outgoing'/>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege>
</message>
"""
)
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["roster"], "both")
server = JID("capulet.lit")
self.assertEqual(
self.xmpp["xep_0356"].granted_privileges["message"], "outgoing"
x.granted_privileges[server].roster, permissions.RosterAccess.BOTH
)
self.assertEqual(
x.granted_privileges[server].message, permissions.MessagePermission.OUTGOING
)
self.assertEqual(
x.granted_privileges[server].presence, permissions.PresencePermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["nope"], permissions.IqPermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["some_ns"], permissions.IqPermission.GET
)
self.assertEqual(
x.granted_privileges[server].iq["some_other_ns"], permissions.IqPermission.BOTH
)
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["presence"], "none")
self.assertTrue(results["event"])
def testGetRosterIq(self):
@@ -94,7 +112,7 @@ class TestPermissions(SlixTest):
def testMakeOutgoingMessage(self):
xmlstring = """
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.net'>
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'>
<forwarded xmlns='urn:xmpp:forward:0'>
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
@@ -108,9 +126,49 @@ class TestPermissions(SlixTest):
msg["from"] = "juliet@capulet.lit"
msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
self.check(priv_msg, xmlstring, use_values=False)
def testDetectServer(self):
msg = Message()
msg["from"] = "juliet@something"
msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
assert priv_msg.get_to() == "something"
assert priv_msg.get_from() == "pubsub.capulet.lit"
def testIqOnBehalf(self):
iq = Iq()
iq["mucadmin_query"]["item"]["affiliation"] = "member"
iq.set_from("juliet@xxx")
iq.set_to("somemuc@conf")
iq.set_type("get")
self.xmpp["xep_0356"].granted_privileges["conf"].iq["http://jabber.org/protocol/muc#admin"] = permissions.IqPermission.BOTH
r = self.xmpp.loop.create_task(self.xmpp["xep_0356"].send_privileged_iq(iq, iq_id="0"))
self.send(
"""
<iq from="pubsub.capulet.lit"
to="juliet@xxx"
xmlns="jabber:component:accept"
type="get" id="0">
<privileged_iq xmlns='urn:xmpp:privilege:2'>
<iq xmlns='jabber:client'
type='get'
to='somemuc@conf'
from='juliet@xxx'
id="0">
<query xmlns='http://jabber.org/protocol/muc#admin'>
<item affiliation='member'/>
</query>
</iq>
</privileged_iq>
</iq>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)

View File

@@ -9,8 +9,8 @@ class TestReply(SlixTest):
def testFallBackBody(self):
async def on_reply(msg):
start = msg["feature_fallback"]["fallback_body"]["start"]
end = msg["feature_fallback"]["fallback_body"]["end"]
start = msg["fallback"]["body"]["start"]
end = msg["fallback"]["body"]["end"]
self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(),
reply_id=msg.get_id(),