xep0356: implement IQ privilege
Also included: - correctly handle privileges from different servers - check that privileges have been granted before attempting to send something and raise PermissionError if not - use dataclass and enums to store permissions instead of untyped dict
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
from slixmpp.plugins.base import register_plugin
|
||||
|
||||
from slixmpp.plugins.xep_0356 import stanza
|
||||
from slixmpp.plugins.xep_0356.stanza import Perm, Privilege
|
||||
from slixmpp.plugins.xep_0356.privilege import XEP_0356
|
||||
from . import stanza
|
||||
from .privilege import XEP_0356
|
||||
from .stanza import Perm, Privilege
|
||||
|
||||
register_plugin(XEP_0356)
|
||||
|
||||
36
slixmpp/plugins/xep_0356/permissions.py
Normal file
36
slixmpp/plugins/xep_0356/permissions.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import dataclasses
|
||||
from collections import defaultdict
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class RosterAccess(str, Enum):
|
||||
NONE = "none"
|
||||
GET = "get"
|
||||
SET = "set"
|
||||
BOTH = "both"
|
||||
|
||||
|
||||
class MessagePermission(str, Enum):
|
||||
NONE = "none"
|
||||
OUTGOING = "outgoing"
|
||||
|
||||
|
||||
class IqPermission(str, Enum):
|
||||
NONE = "none"
|
||||
GET = "get"
|
||||
SET = "set"
|
||||
BOTH = "both"
|
||||
|
||||
|
||||
class PresencePermission(str, Enum):
|
||||
NONE = "none"
|
||||
MANAGED_ENTITY = "managed_entity"
|
||||
ROSTER = "roster"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Permissions:
|
||||
roster = RosterAccess.NONE
|
||||
message = MessagePermission.NONE
|
||||
iq = defaultdict(lambda: IqPermission.NONE)
|
||||
presence = PresencePermission.NONE
|
||||
@@ -1,14 +1,16 @@
|
||||
import logging
|
||||
import typing
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
|
||||
from slixmpp import Message, JID, Iq
|
||||
from slixmpp import JID, Iq, Message
|
||||
from slixmpp.plugins.base import BasePlugin
|
||||
from slixmpp.xmlstream.matcher import StanzaPath
|
||||
from slixmpp.xmlstream import StanzaBase
|
||||
from slixmpp.xmlstream.handler import Callback
|
||||
from slixmpp.xmlstream import register_stanza_plugin
|
||||
|
||||
from slixmpp.plugins.xep_0356 import stanza, Privilege, Perm
|
||||
from slixmpp.xmlstream.matcher import StanzaPath
|
||||
|
||||
from . import stanza
|
||||
from .permissions import IqPermission, MessagePermission, Permissions, RosterAccess
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -29,7 +31,7 @@ class XEP_0356(BasePlugin):
|
||||
dependencies = {"xep_0297"}
|
||||
stanza = stanza
|
||||
|
||||
granted_privileges = {"roster": "none", "message": "none", "presence": "none"}
|
||||
granted_privileges = defaultdict(Permissions)
|
||||
|
||||
def plugin_init(self):
|
||||
if not self.xmpp.is_component:
|
||||
@@ -49,32 +51,42 @@ class XEP_0356(BasePlugin):
|
||||
def plugin_end(self):
|
||||
self.xmpp.remove_handler("Privileges")
|
||||
|
||||
def _handle_privilege(self, msg: Message):
|
||||
def _handle_privilege(self, msg: StanzaBase):
|
||||
"""
|
||||
Called when the XMPP server advertise the component's privileges.
|
||||
|
||||
Stores the privileges in this instance's granted_privileges attribute (a dict)
|
||||
and raises the privileges_advertised event
|
||||
"""
|
||||
permissions = self.granted_privileges[msg.get_from()]
|
||||
for perm in msg["privilege"]["perms"]:
|
||||
self.granted_privileges[perm["access"]] = perm["type"]
|
||||
access = perm["access"]
|
||||
if access == "iq":
|
||||
for ns in perm["namespaces"]:
|
||||
permissions.iq[ns["ns"]] = ns["type"]
|
||||
elif access in _VALID_ACCESSES:
|
||||
setattr(permissions, access, perm["type"])
|
||||
else:
|
||||
log.warning("Received an invalid privileged access: %s", access)
|
||||
log.debug(f"Privileges: {self.granted_privileges}")
|
||||
self.xmpp.event("privileges_advertised")
|
||||
|
||||
def send_privileged_message(self, msg: Message):
|
||||
if self.granted_privileges["message"] == "outgoing":
|
||||
self._make_privileged_message(msg).send()
|
||||
else:
|
||||
log.error(
|
||||
if (
|
||||
self.granted_privileges[msg.get_from().domain].message
|
||||
!= MessagePermission.OUTGOING
|
||||
):
|
||||
raise PermissionError(
|
||||
"The server hasn't authorized us to send messages on behalf of other users"
|
||||
)
|
||||
else:
|
||||
self._make_privileged_message(msg).send()
|
||||
|
||||
def _make_privileged_message(self, msg: Message):
|
||||
stanza = self.xmpp.make_message(
|
||||
mto=self.xmpp.server_host, mfrom=self.xmpp.boundjid.bare
|
||||
)
|
||||
stanza["privilege"]["forwarded"].append(msg)
|
||||
return stanza
|
||||
server = msg.get_from().domain
|
||||
wrapped = self.xmpp.make_message(mto=server, mfrom=self.xmpp.boundjid.bare)
|
||||
wrapped["privilege"]["forwarded"].append(msg)
|
||||
return wrapped
|
||||
|
||||
def _make_get_roster(self, jid: typing.Union[JID, str], **iq_kwargs):
|
||||
return self.xmpp.make_iq_get(
|
||||
@@ -106,9 +118,15 @@ class XEP_0356(BasePlugin):
|
||||
|
||||
:param jid: user we want to fetch the roster from
|
||||
"""
|
||||
if self.granted_privileges["roster"] not in ("get", "both"):
|
||||
log.error("The server did not grant us privileges to get rosters")
|
||||
raise ValueError
|
||||
if isinstance(jid, str):
|
||||
jid = JID(jid)
|
||||
if self.granted_privileges[jid.domain].roster not in (
|
||||
RosterAccess.GET,
|
||||
RosterAccess.BOTH,
|
||||
):
|
||||
raise PermissionError(
|
||||
"The server did not grant us privileges to get rosters"
|
||||
)
|
||||
else:
|
||||
return await self._make_get_roster(jid).send(**send_kwargs)
|
||||
|
||||
@@ -137,8 +155,56 @@ class XEP_0356(BasePlugin):
|
||||
},
|
||||
}
|
||||
"""
|
||||
if self.granted_privileges["roster"] not in ("set", "both"):
|
||||
log.error("The server did not grant us privileges to set rosters")
|
||||
raise ValueError
|
||||
if isinstance(jid, str):
|
||||
jid = JID(jid)
|
||||
if self.granted_privileges[jid.domain].roster not in (
|
||||
RosterAccess.GET,
|
||||
RosterAccess.BOTH,
|
||||
):
|
||||
raise PermissionError(
|
||||
"The server did not grant us privileges to set rosters"
|
||||
)
|
||||
else:
|
||||
return await self._make_set_roster(jid, roster_items).send(**send_kwargs)
|
||||
|
||||
async def send_privileged_iq(
|
||||
self, encapsulated_iq: Iq, iq_id: typing.Optional[str] = None
|
||||
):
|
||||
"""
|
||||
Send an IQ on behalf of a user
|
||||
|
||||
Caution: the IQ *must* have the jabber:client namespace
|
||||
"""
|
||||
iq_id = iq_id or str(uuid.uuid4())
|
||||
encapsulated_iq["id"] = iq_id
|
||||
server = encapsulated_iq.get_to().domain
|
||||
perms = self.granted_privileges.get(server)
|
||||
if not perms:
|
||||
raise PermissionError(f"{server} has not granted us any privilege")
|
||||
itype = encapsulated_iq["type"]
|
||||
for ns in encapsulated_iq.plugins.values():
|
||||
type_ = perms.iq[ns.namespace]
|
||||
if type_ == IqPermission.NONE:
|
||||
raise PermissionError(
|
||||
f"{server} has not granted any IQ privilege for namespace {ns.namespace}"
|
||||
)
|
||||
elif type_ == IqPermission.BOTH:
|
||||
pass
|
||||
elif type_ != itype:
|
||||
raise PermissionError(
|
||||
f"{server} has not granted IQ {itype} privilege for namespace {ns.namespace}"
|
||||
)
|
||||
iq = self.xmpp.make_iq(
|
||||
itype=itype,
|
||||
ifrom=self.xmpp.boundjid.bare,
|
||||
ito=encapsulated_iq.get_from(),
|
||||
id=iq_id,
|
||||
)
|
||||
iq["privileged_iq"].append(encapsulated_iq)
|
||||
|
||||
resp = await iq.send()
|
||||
return resp["privilege"]["forwarded"]["iq"]
|
||||
|
||||
|
||||
# does not include iq access that is handled differently
|
||||
_VALID_ACCESSES = {"message", "roster", "presence"}
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
from slixmpp.stanza import Message
|
||||
from slixmpp.xmlstream import (
|
||||
ElementBase,
|
||||
register_stanza_plugin,
|
||||
)
|
||||
from slixmpp.plugins.xep_0297 import Forwarded
|
||||
from slixmpp.stanza import Iq, Message
|
||||
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
|
||||
|
||||
NS = "urn:xmpp:privilege:2"
|
||||
|
||||
|
||||
class Privilege(ElementBase):
|
||||
namespace = "urn:xmpp:privilege:2"
|
||||
namespace = NS
|
||||
name = "privilege"
|
||||
plugin_attrib = "privilege"
|
||||
|
||||
@@ -25,26 +24,40 @@ class Privilege(ElementBase):
|
||||
def presence(self):
|
||||
return self.permission("presence")
|
||||
|
||||
def iq(self):
|
||||
return self.permission("iq")
|
||||
|
||||
def add_perm(self, access, type):
|
||||
def add_perm(self, access, type_):
|
||||
# This should only be needed for servers, so maybe out of scope for slixmpp
|
||||
perm = Perm()
|
||||
perm["type"] = type
|
||||
perm["type"] = type_
|
||||
perm["access"] = access
|
||||
self.append(perm)
|
||||
|
||||
|
||||
class Perm(ElementBase):
|
||||
namespace = "urn:xmpp:privilege:2"
|
||||
namespace = NS
|
||||
name = "perm"
|
||||
plugin_attrib = "perm"
|
||||
plugin_multi_attrib = "perms"
|
||||
interfaces = {"type", "access"}
|
||||
|
||||
|
||||
class NameSpace(ElementBase):
|
||||
namespace = NS
|
||||
name = "namespace"
|
||||
plugin_attrib = "namespace"
|
||||
plugin_multi_attrib = "namespaces"
|
||||
interfaces = {"ns", "type"}
|
||||
|
||||
|
||||
class PrivilegedIq(ElementBase):
|
||||
namespace = NS
|
||||
name = "privileged_iq"
|
||||
plugin_attrib = "privileged_iq"
|
||||
|
||||
|
||||
def register():
|
||||
register_stanza_plugin(Message, Privilege)
|
||||
register_stanza_plugin(Iq, Privilege)
|
||||
register_stanza_plugin(Privilege, Forwarded)
|
||||
register_stanza_plugin(Privilege, Perm, iterable=True)
|
||||
register_stanza_plugin(Perm, NameSpace, iterable=True)
|
||||
register_stanza_plugin(Iq, PrivilegedIq)
|
||||
|
||||
Reference in New Issue
Block a user