Compare commits

...

2 Commits

Author SHA1 Message Date
nicoco
56004802fa Merge branch 'master' into xep356-iq 2023-12-19 14:13:35 +00:00
Nicolas Cedilnik
8bfe6177f4 xep0356: implement IQ privilege
Also included:

- correctly handle privileges from different
  servers
- check that privileges have been granted before
  attempting to send something and raise
  PermissionError if not
- use dataclass and enums to store permissions instead of
  untyped dict
2023-07-23 15:38:43 +02:00
6 changed files with 260 additions and 62 deletions

View File

@ -1,7 +1,7 @@
from slixmpp.plugins.base import register_plugin from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0356 import stanza from . import stanza
from slixmpp.plugins.xep_0356.stanza import Perm, Privilege from .privilege import XEP_0356
from slixmpp.plugins.xep_0356.privilege import XEP_0356 from .stanza import Perm, Privilege
register_plugin(XEP_0356) register_plugin(XEP_0356)

View File

@ -0,0 +1,36 @@
import dataclasses
from collections import defaultdict
from enum import Enum
class RosterAccess(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class MessagePermission(str, Enum):
NONE = "none"
OUTGOING = "outgoing"
class IqPermission(str, Enum):
NONE = "none"
GET = "get"
SET = "set"
BOTH = "both"
class PresencePermission(str, Enum):
NONE = "none"
MANAGED_ENTITY = "managed_entity"
ROSTER = "roster"
@dataclasses.dataclass
class Permissions:
roster = RosterAccess.NONE
message = MessagePermission.NONE
iq = defaultdict(lambda: IqPermission.NONE)
presence = PresencePermission.NONE

View File

@ -1,14 +1,16 @@
import logging import logging
import typing import typing
import uuid
from collections import defaultdict
from slixmpp import Message, JID, Iq from slixmpp import JID, Iq, Message
from slixmpp.plugins.base import BasePlugin from slixmpp.plugins.base import BasePlugin
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream import StanzaBase
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream import register_stanza_plugin from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
from . import stanza
from .permissions import IqPermission, MessagePermission, Permissions, RosterAccess
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -29,7 +31,7 @@ class XEP_0356(BasePlugin):
dependencies = {"xep_0297"} dependencies = {"xep_0297"}
stanza = stanza stanza = stanza
granted_privileges = {"roster": "none", "message": "none", "presence": "none"} granted_privileges = defaultdict(Permissions)
def plugin_init(self): def plugin_init(self):
if not self.xmpp.is_component: if not self.xmpp.is_component:
@ -49,32 +51,42 @@ class XEP_0356(BasePlugin):
def plugin_end(self): def plugin_end(self):
self.xmpp.remove_handler("Privileges") self.xmpp.remove_handler("Privileges")
def _handle_privilege(self, msg: Message): def _handle_privilege(self, msg: StanzaBase):
""" """
Called when the XMPP server advertise the component's privileges. Called when the XMPP server advertise the component's privileges.
Stores the privileges in this instance's granted_privileges attribute (a dict) Stores the privileges in this instance's granted_privileges attribute (a dict)
and raises the privileges_advertised event and raises the privileges_advertised event
""" """
permissions = self.granted_privileges[msg.get_from()]
for perm in msg["privilege"]["perms"]: for perm in msg["privilege"]["perms"]:
self.granted_privileges[perm["access"]] = perm["type"] access = perm["access"]
if access == "iq":
for ns in perm["namespaces"]:
permissions.iq[ns["ns"]] = ns["type"]
elif access in _VALID_ACCESSES:
setattr(permissions, access, perm["type"])
else:
log.warning("Received an invalid privileged access: %s", access)
log.debug(f"Privileges: {self.granted_privileges}") log.debug(f"Privileges: {self.granted_privileges}")
self.xmpp.event("privileges_advertised") self.xmpp.event("privileges_advertised")
def send_privileged_message(self, msg: Message): def send_privileged_message(self, msg: Message):
if self.granted_privileges["message"] == "outgoing": if (
self._make_privileged_message(msg).send() self.granted_privileges[msg.get_from().domain].message
else: != MessagePermission.OUTGOING
log.error( ):
raise PermissionError(
"The server hasn't authorized us to send messages on behalf of other users" "The server hasn't authorized us to send messages on behalf of other users"
) )
else:
self._make_privileged_message(msg).send()
def _make_privileged_message(self, msg: Message): def _make_privileged_message(self, msg: Message):
stanza = self.xmpp.make_message( server = msg.get_from().domain
mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
) wrapped["privilege"]["forwarded"].append(msg)
stanza["privilege"]["forwarded"].append(msg) return wrapped
return stanza
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs): def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
return self.xmpp.make_iq_get( return self.xmpp.make_iq_get(
@ -106,9 +118,15 @@ class XEP_0356(BasePlugin):
:param jid: user we want to fetch the roster from :param jid: user we want to fetch the roster from
""" """
if self.granted_privileges["roster"] not in ("get", "both"): if isinstance(jid, str):
log.error("The server did not grant us privileges to get rosters") jid = JID(jid)
raise ValueError if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to get rosters"
)
else: else:
return await self._make_get_roster(jid).send(**send_kwargs) return await self._make_get_roster(jid).send(**send_kwargs)
@ -137,8 +155,56 @@ class XEP_0356(BasePlugin):
}, },
} }
""" """
if self.granted_privileges["roster"] not in ("set", "both"): if isinstance(jid, str):
log.error("The server did not grant us privileges to set rosters") jid = JID(jid)
raise ValueError if self.granted_privileges[jid.domain].roster not in (
RosterAccess.GET,
RosterAccess.BOTH,
):
raise PermissionError(
"The server did not grant us privileges to set rosters"
)
else: else:
return await self._make_set_roster(jid, roster_items).send(**send_kwargs) return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
async def send_privileged_iq(
self, encapsulated_iq: Iq, iq_id: typing.Optional[str] = None
):
"""
Send an IQ on behalf of a user
Caution: the IQ *must* have the jabber:client namespace
"""
iq_id = iq_id or str(uuid.uuid4())
encapsulated_iq["id"] = iq_id
server = encapsulated_iq.get_to().domain
perms = self.granted_privileges.get(server)
if not perms:
raise PermissionError(f"{server} has not granted us any privilege")
itype = encapsulated_iq["type"]
for ns in encapsulated_iq.plugins.values():
type_ = perms.iq[ns.namespace]
if type_ == IqPermission.NONE:
raise PermissionError(
f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
)
elif type_ == IqPermission.BOTH:
pass
elif type_ != itype:
raise PermissionError(
f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
)
iq = self.xmpp.make_iq(
itype=itype,
ifrom=self.xmpp.boundjid.bare,
ito=encapsulated_iq.get_from(),
id=iq_id,
)
iq["privileged_iq"].append(encapsulated_iq)
resp = await iq.send()
return resp["privilege"]["forwarded"]["iq"]
# does not include iq access that is handled differently
_VALID_ACCESSES = {"message", "roster", "presence"}

View File

@ -1,13 +1,12 @@
from slixmpp.stanza import Message
from slixmpp.xmlstream import (
ElementBase,
register_stanza_plugin,
)
from slixmpp.plugins.xep_0297 import Forwarded from slixmpp.plugins.xep_0297 import Forwarded
from slixmpp.stanza import Iq, Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
NS = "urn:xmpp:privilege:2"
class Privilege(ElementBase): class Privilege(ElementBase):
namespace = "urn:xmpp:privilege:2" namespace = NS
name = "privilege" name = "privilege"
plugin_attrib = "privilege" plugin_attrib = "privilege"
@ -25,26 +24,40 @@ class Privilege(ElementBase):
def presence(self): def presence(self):
return self.permission("presence") return self.permission("presence")
def iq(self): def add_perm(self, access, type_):
return self.permission("iq")
def add_perm(self, access, type):
# This should only be needed for servers, so maybe out of scope for slixmpp # This should only be needed for servers, so maybe out of scope for slixmpp
perm = Perm() perm = Perm()
perm["type"] = type perm["type"] = type_
perm["access"] = access perm["access"] = access
self.append(perm) self.append(perm)
class Perm(ElementBase): class Perm(ElementBase):
namespace = "urn:xmpp:privilege:2" namespace = NS
name = "perm" name = "perm"
plugin_attrib = "perm" plugin_attrib = "perm"
plugin_multi_attrib = "perms" plugin_multi_attrib = "perms"
interfaces = {"type", "access"} interfaces = {"type", "access"}
class NameSpace(ElementBase):
namespace = NS
name = "namespace"
plugin_attrib = "namespace"
plugin_multi_attrib = "namespaces"
interfaces = {"ns", "type"}
class PrivilegedIq(ElementBase):
namespace = NS
name = "privileged_iq"
plugin_attrib = "privileged_iq"
def register(): def register():
register_stanza_plugin(Message, Privilege) register_stanza_plugin(Message, Privilege)
register_stanza_plugin(Iq, Privilege)
register_stanza_plugin(Privilege, Forwarded) register_stanza_plugin(Privilege, Forwarded)
register_stanza_plugin(Privilege, Perm, iterable=True) register_stanza_plugin(Privilege, Perm, iterable=True)
register_stanza_plugin(Perm, NameSpace, iterable=True)
register_stanza_plugin(Iq, PrivilegedIq)

View File

@ -1,9 +1,7 @@
import unittest import unittest
from slixmpp import Message
from slixmpp.test import SlixTest from slixmpp.test import SlixTest
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0356 import stanza from slixmpp.plugins.xep_0356 import stanza, permissions
class TestPermissions(SlixTest): class TestPermissions(SlixTest):
@ -12,30 +10,57 @@ class TestPermissions(SlixTest):
def testAdvertisePermission(self): def testAdvertisePermission(self):
xmlstring = """ xmlstring = """
<message from='capulet.net' to='pubub.capulet.lit'> <message from='capulet.lit' to='pubsub.capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'> <privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/> <perm access='roster' type='both'/>
<perm access='message' type='outgoing'/> <perm access='message' type='outgoing'/>
<perm access='presence' type='managed_entity'/> <perm access='presence' type='managed_entity'/>
<perm access='iq' type='both'/>
</privilege> </privilege>
</message> </message>
""" """
msg = self.Message() msg = self.Message()
msg["from"] = "capulet.net" msg["from"] = "capulet.lit"
msg["to"] = "pubub.capulet.lit" msg["to"] = "pubsub.capulet.lit"
# This raises AttributeError: 'NoneType' object has no attribute 'use_origin_id'
# msg["id"] = "id"
for access, type_ in [ for access, type_ in [
("roster", "both"), ("roster", permissions.RosterAccess.BOTH),
("message", "outgoing"), ("message", permissions.MessagePermission.OUTGOING),
("presence", "managed_entity"), ("presence", permissions.PresencePermission.MANAGED_ENTITY),
("iq", permissions.IqPermission.BOTH),
]: ]:
msg["privilege"].add_perm(access, type_) msg["privilege"].add_perm(access, type_)
self.check(msg, xmlstring) self.check(msg, xmlstring)
# Should this one work? → # AttributeError: 'Message' object has no attribute 'permission'
# self.assertEqual(msg.permission["roster"], "both") def testIqPermission(self):
x = stanza.Privilege()
x["access"] = "iq"
ns = stanza.NameSpace()
ns["ns"] = "some_ns"
ns["type"] = "get"
x["perm"]["access"] = "iq"
x["perm"].append(ns)
ns = stanza.NameSpace()
ns["ns"] = "some_other_ns"
ns["type"] = "both"
x["perm"].append(ns)
self.check(
x,
"""
<privilege xmlns='urn:xmpp:privilege:2'>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege>
"""
)
nss = set()
for perm in x["perms"]:
for ns in perm["namespaces"]:
nss.add((ns["ns"], ns["type"]))
assert nss == {("some_ns", "get"), ("some_other_ns", "both")}
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions) suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)

View File

@ -1,7 +1,7 @@
import unittest import unittest
from slixmpp import ComponentXMPP, Iq, Message from slixmpp import Message, JID, Iq
from slixmpp.roster import RosterItem from slixmpp.plugins.xep_0356 import permissions
from slixmpp.test import SlixTest from slixmpp.test import SlixTest
@ -9,9 +9,9 @@ class TestPermissions(SlixTest):
def setUp(self): def setUp(self):
self.stream_start( self.stream_start(
mode="component", mode="component",
plugins=["xep_0356"], plugins=["xep_0356", "xep_0045"],
jid="pubsub.capulet.lit", jid="pubsub.capulet.lit",
server="capulet.net", server="capulet.lit",
) )
def testPluginEnd(self): def testPluginEnd(self):
@ -23,26 +23,44 @@ class TestPermissions(SlixTest):
self.assertFalse(exc) self.assertFalse(exc)
def testGrantedPrivileges(self): def testGrantedPrivileges(self):
# https://xmpp.org/extensions/xep-0356.html#example-4
results = {"event": False} results = {"event": False}
x = self.xmpp["xep_0356"]
self.xmpp.add_event_handler( self.xmpp.add_event_handler(
"privileges_advertised", lambda msg: results.__setitem__("event", True) "privileges_advertised", lambda msg: results.__setitem__("event", True)
) )
self.recv( self.recv(
""" """
<message from='capulet.net' to='pubub.capulet.lit' id='54321'> <message from='capulet.lit' to='pubsub.capulet.lit' id='54321'>
<privilege xmlns='urn:xmpp:privilege:2'> <privilege xmlns='urn:xmpp:privilege:2'>
<perm access='roster' type='both'/> <perm access='roster' type='both'/>
<perm access='message' type='outgoing'/> <perm access='message' type='outgoing'/>
<perm access='iq'>
<namespace ns='some_ns' type='get' />
<namespace ns='some_other_ns' type='both' />
</perm>
</privilege> </privilege>
</message> </message>
""" """
) )
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["roster"], "both") server = JID("capulet.lit")
self.assertEqual( self.assertEqual(
self.xmpp["xep_0356"].granted_privileges["message"], "outgoing" x.granted_privileges[server].roster, permissions.RosterAccess.BOTH
)
self.assertEqual(
x.granted_privileges[server].message, permissions.MessagePermission.OUTGOING
)
self.assertEqual(
x.granted_privileges[server].presence, permissions.PresencePermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["nope"], permissions.IqPermission.NONE
)
self.assertEqual(
x.granted_privileges[server].iq["some_ns"], permissions.IqPermission.GET
)
self.assertEqual(
x.granted_privileges[server].iq["some_other_ns"], permissions.IqPermission.BOTH
) )
self.assertEqual(self.xmpp["xep_0356"].granted_privileges["presence"], "none")
self.assertTrue(results["event"]) self.assertTrue(results["event"])
def testGetRosterIq(self): def testGetRosterIq(self):
@ -94,7 +112,7 @@ class TestPermissions(SlixTest):
def testMakeOutgoingMessage(self): def testMakeOutgoingMessage(self):
xmlstring = """ xmlstring = """
<message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.net'> <message xmlns="jabber:component:accept" from='pubsub.capulet.lit' to='capulet.lit'>
<privilege xmlns='urn:xmpp:privilege:2'> <privilege xmlns='urn:xmpp:privilege:2'>
<forwarded xmlns='urn:xmpp:forward:0'> <forwarded xmlns='urn:xmpp:forward:0'>
<message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client"> <message from="juliet@capulet.lit" to="romeo@montague.lit" xmlns="jabber:client">
@ -108,9 +126,49 @@ class TestPermissions(SlixTest):
msg["from"] = "juliet@capulet.lit" msg["from"] = "juliet@capulet.lit"
msg["to"] = "romeo@montague.lit" msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you" msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg) priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
self.check(priv_msg, xmlstring, use_values=False) self.check(priv_msg, xmlstring, use_values=False)
def testDetectServer(self):
msg = Message()
msg["from"] = "juliet@something"
msg["to"] = "romeo@montague.lit"
msg["body"] = "I do not hate you"
priv_msg = self.xmpp["xep_0356"]._make_privileged_message(msg)
assert priv_msg.get_to() == "something"
assert priv_msg.get_from() == "pubsub.capulet.lit"
def testIqOnBehalf(self):
iq = Iq()
iq["mucadmin_query"]["item"]["affiliation"] = "member"
iq.set_from("juliet@xxx")
iq.set_to("somemuc@conf")
iq.set_type("get")
self.xmpp["xep_0356"].granted_privileges["conf"].iq["http://jabber.org/protocol/muc#admin"] = permissions.IqPermission.BOTH
r = self.xmpp.loop.create_task(self.xmpp["xep_0356"].send_privileged_iq(iq, iq_id="0"))
self.send(
"""
<iq from="pubsub.capulet.lit"
to="juliet@xxx"
xmlns="jabber:component:accept"
type="get" id="0">
<privileged_iq xmlns='urn:xmpp:privilege:2'>
<iq xmlns='jabber:client'
type='get'
to='somemuc@conf'
from='juliet@xxx'
id="0">
<query xmlns='http://jabber.org/protocol/muc#admin'>
<item affiliation='member'/>
</query>
</iq>
</privileged_iq>
</iq>
""",
use_values=False
)
suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions) suite = unittest.TestLoader().loadTestsFromTestCase(TestPermissions)