Merge branch 'xep-0030-typing-and-co' into 'master'

XEP-0030: add typing and more docs

See merge request poezio/slixmpp!143
This commit is contained in:
mathieui 2021-03-04 22:14:59 +01:00
commit c1a598c34b
6 changed files with 425 additions and 286 deletions

View File

@ -9,14 +9,31 @@ XEP-0030: Service Discovery
:exclude-members: session_bind, plugin_init, plugin_end :exclude-members: session_bind, plugin_init, plugin_end
.. _api-0030:
Internal API Methods
--------------------
All ``api`` operations supported by the 0030 plugin are implemented as part of
the :class:`~.StaticDisco` class which implement an in-memory cache for disco
info and items.
.. automodule:: slixmpp.plugins.xep_0030.static
:members:
:member-order: bysource
Stanza elements Stanza elements
--------------- ---------------
.. automodule:: slixmpp.plugins.xep_0030.stanza.info .. automodule:: slixmpp.plugins.xep_0030.stanza.info
:members: :members:
:member-order: bysource
:undoc-members: :undoc-members:
.. automodule:: slixmpp.plugins.xep_0030.stanza.items .. automodule:: slixmpp.plugins.xep_0030.stanza.items
:members: :members:
:member-order: bysource
:undoc-members: :undoc-members:

View File

@ -7,14 +7,20 @@ import asyncio
import logging import logging
from asyncio import Future from asyncio import Future
from typing import Optional, Callable from typing import (
Optional,
Callable,
List,
Union,
)
from slixmpp import Iq from slixmpp import JID
from slixmpp import future_wrapper from slixmpp.stanza import Iq
from slixmpp.types import OptJid
from slixmpp.plugins import BasePlugin from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream.handler import Callback, CoroutineCallback from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, JID from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0030 import stanza, DiscoInfo, DiscoItems from slixmpp.plugins.xep_0030 import stanza, DiscoInfo, DiscoItems
from slixmpp.plugins.xep_0030 import StaticDisco from slixmpp.plugins.xep_0030 import StaticDisco
@ -42,14 +48,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
Stream Handlers: Stream Handlers:
@ -62,19 +68,13 @@ class XEP_0030(BasePlugin):
Events: Events:
:: - :term:`disco_info` -- Received a disco#info Iq query result.
- :term:`disco_items` -- Received a disco#items Iq query result.
disco_info -- Received a disco#info Iq query result.
disco_items -- Received a disco#items Iq query result.
disco_info_query -- Received a disco#info Iq query request.
disco_items_query -- Received a disco#items Iq query request.
Attributes: Attributes:
:var static: Object containing the default set of :var static: Object containing the default set of
static node handlers. static node handlers.
:var default_handlers: A dictionary mapping operations to the default
global handler (by default, the static handlers).
""" """
name = 'xep_0030' name = 'xep_0030'
@ -85,20 +85,23 @@ class XEP_0030(BasePlugin):
'use_cache': True, 'use_cache': True,
'wrap_results': False 'wrap_results': False
} }
static: StaticDisco
def plugin_init(self): def plugin_init(self):
""" """
Start the XEP-0030 plugin. Start the XEP-0030 plugin.
""" """
self.xmpp.register_handler( self.xmpp.register_handler(CoroutineCallback(
CoroutineCallback('Disco Info', 'Disco Info',
StanzaPath('iq/disco_info'), StanzaPath('iq/disco_info'),
self._handle_disco_info)) self._handle_disco_info
))
self.xmpp.register_handler( self.xmpp.register_handler(CoroutineCallback(
CoroutineCallback('Disco Items', 'Disco Items',
StanzaPath('iq/disco_items'), StanzaPath('iq/disco_items'),
self._handle_disco_items)) self._handle_disco_items
))
register_stanza_plugin(Iq, DiscoInfo) register_stanza_plugin(Iq, DiscoInfo)
register_stanza_plugin(Iq, DiscoItems) register_stanza_plugin(Iq, DiscoItems)
@ -127,7 +130,7 @@ class XEP_0030(BasePlugin):
self.api.register(default_handler, op) self.api.register(default_handler, op)
self.api.register_default(default_handler, op) self.api.register_default(default_handler, op)
def set_node_handler(self, htype: str, jid: Optional[JID] = None, def set_node_handler(self, htype: str, jid: OptJid = None,
node: Optional[str] = None, node: Optional[str] = None,
handler: Optional[Callable] = None): handler: Optional[Callable] = None):
""" """
@ -142,14 +145,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
Handler types: Handler types:
@ -180,7 +183,7 @@ class XEP_0030(BasePlugin):
""" """
self.api.register(handler, htype, jid, node) self.api.register(handler, htype, jid, node)
def del_node_handler(self, htype, jid, node): def del_node_handler(self, htype: str, jid: OptJid, node: Optional[str]):
""" """
Remove a handler type for a JID and node combination. Remove a handler type for a JID and node combination.
@ -190,14 +193,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
:param htype: The type of handler to remove. :param htype: The type of handler to remove.
:param jid: The JID from which to remove the handler. :param jid: The JID from which to remove the handler.
@ -205,7 +208,8 @@ class XEP_0030(BasePlugin):
""" """
self.api.unregister(htype, jid, node) self.api.unregister(htype, jid, node)
def restore_defaults(self, jid=None, node=None, handlers=None): def restore_defaults(self, jid: OptJid = None, node: Optional[str] = None,
handlers: Optional[List[Callable]] = None):
""" """
Change all or some of a node's handlers to the default Change all or some of a node's handlers to the default
handlers. Useful for manually overriding the contents handlers. Useful for manually overriding the contents
@ -227,19 +231,15 @@ class XEP_0030(BasePlugin):
for op in handlers: for op in handlers:
self.api.restore_default(op, jid, node) self.api.restore_default(op, jid, node)
def supports(self, jid=None, node=None, feature=None, local=False, def supports(self, jid: OptJid = None, node: Optional[str] = None,
cached=True, ifrom=None) -> Future: feature: Optional[str] = None, local: bool = False,
cached: bool = True, ifrom: OptJid = None) -> Future:
""" """
Check if a JID supports a given feature. Check if a JID supports a given feature.
.. versionchanged:: 1.8.0 .. versionchanged:: 1.8.0
This function now returns a Future. This function now returns a Future.
Return values:
:param True: The feature is supported
:param False: The feature is not listed as supported
:param None: Nothing could be found due to a timeout
:param jid: Request info from this JID. :param jid: Request info from this JID.
:param node: The particular node to query. :param node: The particular node to query.
:param feature: The name of the feature to check. :param feature: The name of the feature to check.
@ -255,24 +255,27 @@ class XEP_0030(BasePlugin):
be useful. If set to false, then the cache will be useful. If set to false, then the cache will
be skipped, even if a result has already been be skipped, even if a result has already been
cached. Defaults to false. cached. Defaults to false.
:returns True: The feature is supported
:returns False: The feature is not listed as supported
:returns None: Nothing could be found due to a timeout
""" """
data = {'feature': feature, data = {'feature': feature,
'local': local, 'local': local,
'cached': cached} 'cached': cached}
return self.api['supports'](jid, node, ifrom, data) return self.api['supports'](jid, node, ifrom, data)
def has_identity(self, jid=None, node=None, category=None, itype=None, def has_identity(self, jid: OptJid = None, node: Optional[str] = None,
lang=None, local=False, cached=True, ifrom=None) -> Future: category: Optional[str] = None,
itype: Optional[str] = None, lang: Optional[str] = None,
local: bool = False, cached: bool = True,
ifrom: OptJid = None) -> Future:
""" """
Check if a JID provides a given identity. Check if a JID provides a given identity.
.. versionchanged:: 1.8.0 .. versionchanged:: 1.8.0
This function now returns a Future. This function now returns a Future.
Return values:
:param True: The identity is provided
:param False: The identity is not listed
:param None: Nothing could be found due to a timeout
:param jid: Request info from this JID. :param jid: Request info from this JID.
:param node: The particular node to query. :param node: The particular node to query.
@ -291,6 +294,10 @@ class XEP_0030(BasePlugin):
be useful. If set to false, then the cache will be useful. If set to false, then the cache will
be skipped, even if a result has already been be skipped, even if a result has already been
cached. Defaults to false. cached. Defaults to false.
:returns True: The identity is provided
:returns False: The identity is not listed
:returns None: Nothing could be found due to a timeout
""" """
data = {'category': category, data = {'category': category,
'itype': itype, 'itype': itype,
@ -301,7 +308,8 @@ class XEP_0030(BasePlugin):
async def get_info_from_domain(self, domain=None, timeout=None, async def get_info_from_domain(self, domain=None, timeout=None,
cached=True, callback=None): cached=True, callback=None):
"""Fetch disco#info of specified domain and one disco#items level below""" """Fetch disco#info of specified domain and one disco#items level below
"""
if domain is None: if domain is None:
domain = self.xmpp.boundjid.domain domain = self.xmpp.boundjid.domain
@ -322,7 +330,9 @@ class XEP_0030(BasePlugin):
) )
self.domain_infos[domain] = [ self.domain_infos[domain] = [
future.result() for future in info_futures if not future.exception()] future.result()
for future in info_futures if not future.exception()
]
results = self.domain_infos[domain] results = self.domain_infos[domain]
@ -330,8 +340,9 @@ class XEP_0030(BasePlugin):
callback(results) callback(results)
return results return results
async def get_info(self, jid=None, node=None, local=None, async def get_info(self, jid: OptJid = None, node: Optional[str] = None,
cached=None, **kwargs): local: Optional[bool] = None,
cached: Optional[bool] = None, **kwargs) -> Iq:
""" """
Retrieve the disco#info results from a given JID/node combination. Retrieve the disco#info results from a given JID/node combination.
@ -375,7 +386,7 @@ class XEP_0030(BasePlugin):
local = True local = True
if local: if local:
log.debug("Looking up local disco#info data " + \ log.debug("Looking up local disco#info data "
"for %s, node %s.", jid, node) "for %s, node %s.", jid, node)
info = await self.api['get_info']( info = await self.api['get_info'](
jid, node, kwargs.get('ifrom', None), jid, node, kwargs.get('ifrom', None),
@ -385,7 +396,7 @@ class XEP_0030(BasePlugin):
return self._wrap(kwargs.get('ifrom', None), jid, info) return self._wrap(kwargs.get('ifrom', None), jid, info)
if cached: if cached:
log.debug("Looking up cached disco#info data " + \ log.debug("Looking up cached disco#info data "
"for %s, node %s.", jid, node) "for %s, node %s.", jid, node)
info = await self.api['get_cached_info']( info = await self.api['get_cached_info'](
jid, node, jid, node,
@ -401,11 +412,10 @@ class XEP_0030(BasePlugin):
iq['to'] = jid iq['to'] = jid
iq['type'] = 'get' iq['type'] = 'get'
iq['disco_info']['node'] = node if node else '' iq['disco_info']['node'] = node if node else ''
return await iq.send(timeout=kwargs.get('timeout', None), return await iq.send(**kwargs)
callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_info(self, jid=None, node=None, info=None) -> Future: def set_info(self, jid: OptJid = None, node: Optional[str] = None,
info: Optional[Union[Iq, DiscoInfo]] = None) -> Future:
""" """
Set the disco#info data for a JID/node based on an existing Set the disco#info data for a JID/node based on an existing
disco#info stanza. disco#info stanza.
@ -418,7 +428,9 @@ class XEP_0030(BasePlugin):
info = info['disco_info'] info = info['disco_info']
return self.api['set_info'](jid, node, None, info) return self.api['set_info'](jid, node, None, info)
async def get_items(self, jid=None, node=None, local=False, **kwargs): async def get_items(self, jid: OptJid = None, node: Optional[str] = None,
local: bool = False, ifrom: OptJid = None,
**kwargs) -> Iq:
""" """
Retrieve the disco#items results from a given JID/node combination. Retrieve the disco#items results from a given JID/node combination.
@ -445,25 +457,22 @@ class XEP_0030(BasePlugin):
Otherwise the parameter is ignored. Otherwise the parameter is ignored.
""" """
if local or local is None and jid is None: if local or local is None and jid is None:
items = await self.api['get_items'](jid, node, items = await self.api['get_items'](jid, node, ifrom, kwargs)
kwargs.get('ifrom', None),
kwargs)
return self._wrap(kwargs.get('ifrom', None), jid, items) return self._wrap(kwargs.get('ifrom', None), jid, items)
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
# Check dfrom parameter for backwards compatibility # Check dfrom parameter for backwards compatibility
iq['from'] = kwargs.get('ifrom', kwargs.get('dfrom', '')) iq['from'] = ifrom or kwargs.get('dfrom', '')
iq['to'] = jid iq['to'] = jid
iq['type'] = 'get' iq['type'] = 'get'
iq['disco_items']['node'] = node if node else '' iq['disco_items']['node'] = node if node else ''
if kwargs.get('iterator', False) and self.xmpp['xep_0059']: if kwargs.get('iterator', False) and self.xmpp['xep_0059']:
return self.xmpp['xep_0059'].iterate(iq, 'disco_items') return self.xmpp['xep_0059'].iterate(iq, 'disco_items')
else: else:
return await iq.send(timeout=kwargs.get('timeout', None), return await iq.send(**kwargs)
callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_items(self, jid=None, node=None, **kwargs) -> Future: def set_items(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Set or replace all items for the specified JID/node combination. Set or replace all items for the specified JID/node combination.
@ -480,7 +489,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_items'](jid, node, None, kwargs) return self.api['set_items'](jid, node, None, kwargs)
def del_items(self, jid=None, node=None, **kwargs) -> Future: def del_items(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all items from the given JID/node combination. Remove all items from the given JID/node combination.
@ -493,7 +503,9 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_items'](jid, node, None, kwargs) return self.api['del_items'](jid, node, None, kwargs)
def add_item(self, jid='', name='', node=None, subnode='', ijid=None) -> Future: def add_item(self, jid: str = '', name: str = '',
node: Optional[str] = None, subnode: str = '',
ijid: OptJid = None) -> Future:
""" """
Add a new item element to the given JID/node combination. Add a new item element to the given JID/node combination.
@ -516,7 +528,8 @@ class XEP_0030(BasePlugin):
'inode': subnode} 'inode': subnode}
return self.api['add_item'](ijid, node, None, kwargs) return self.api['add_item'](ijid, node, None, kwargs)
def del_item(self, jid=None, node=None, **kwargs) -> Future: def del_item(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove a single item from the given JID/node combination. Remove a single item from the given JID/node combination.
@ -527,8 +540,9 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_item'](jid, node, None, kwargs) return self.api['del_item'](jid, node, None, kwargs)
def add_identity(self, category='', itype='', name='', def add_identity(self, category: str = '', itype: str = '', name: str = '',
node=None, jid=None, lang=None) -> Future: node: Optional[str] = None, jid: OptJid = None,
lang: Optional[str] = None) -> Future:
""" """
Add a new identity to the given JID/node combination. Add a new identity to the given JID/node combination.
@ -557,7 +571,7 @@ class XEP_0030(BasePlugin):
return self.api['add_identity'](jid, node, None, kwargs) return self.api['add_identity'](jid, node, None, kwargs)
def add_feature(self, feature: str, node: Optional[str] = None, def add_feature(self, feature: str, node: Optional[str] = None,
jid: Optional[JID] = None) -> Future: jid: OptJid = None) -> Future:
""" """
Add a feature to a JID/node combination. Add a feature to a JID/node combination.
@ -571,7 +585,7 @@ class XEP_0030(BasePlugin):
kwargs = {'feature': feature} kwargs = {'feature': feature}
return self.api['add_feature'](jid, node, None, kwargs) return self.api['add_feature'](jid, node, None, kwargs)
def del_identity(self, jid: Optional[JID] = None, def del_identity(self, jid: OptJid = None,
node: Optional[str] = None, **kwargs) -> Future: node: Optional[str] = None, **kwargs) -> Future:
""" """
Remove an identity from the given JID/node combination. Remove an identity from the given JID/node combination.
@ -588,7 +602,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_identity'](jid, node, None, kwargs) return self.api['del_identity'](jid, node, None, kwargs)
def del_feature(self, jid=None, node=None, **kwargs) -> Future: def del_feature(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove a feature from a given JID/node combination. Remove a feature from a given JID/node combination.
@ -601,7 +616,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_feature'](jid, node, None, kwargs) return self.api['del_feature'](jid, node, None, kwargs)
def set_identities(self, jid=None, node=None, **kwargs) -> Future: def set_identities(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Add or replace all identities for the given JID/node combination. Add or replace all identities for the given JID/node combination.
@ -618,7 +634,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_identities'](jid, node, None, kwargs) return self.api['set_identities'](jid, node, None, kwargs)
def del_identities(self, jid=None, node=None, **kwargs) -> Future: def del_identities(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all identities for a JID/node combination. Remove all identities for a JID/node combination.
@ -635,7 +652,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_identities'](jid, node, None, kwargs) return self.api['del_identities'](jid, node, None, kwargs)
def set_features(self, jid=None, node=None, **kwargs) -> Future: def set_features(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Add or replace the set of supported features Add or replace the set of supported features
for a JID/node combination. for a JID/node combination.
@ -649,7 +667,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_features'](jid, node, None, kwargs) return self.api['set_features'](jid, node, None, kwargs)
def del_features(self, jid=None, node=None, **kwargs) -> Future: def del_features(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all features from a JID/node combination. Remove all features from a JID/node combination.
@ -661,7 +680,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_features'](jid, node, None, kwargs) return self.api['del_features'](jid, node, None, kwargs)
async def _run_node_handler(self, htype, jid, node=None, ifrom=None, data=None): async def _run_node_handler(self, htype, jid, node: Optional[str] = None,
ifrom: OptJid = None, data=None):
""" """
Execute the most specific node handler for the given Execute the most specific node handler for the given
JID/node combination. JID/node combination.
@ -676,7 +696,7 @@ class XEP_0030(BasePlugin):
return await self.api[htype](jid, node, ifrom, data) return await self.api[htype](jid, node, ifrom, data)
async def _handle_disco_info(self, iq): async def _handle_disco_info(self, iq: Iq):
""" """
Process an incoming disco#info stanza. If it is a get Process an incoming disco#info stanza. If it is a get
request, find and return the appropriate identities request, find and return the appropriate identities
@ -686,7 +706,7 @@ class XEP_0030(BasePlugin):
:param iq: The incoming disco#items stanza. :param iq: The incoming disco#items stanza.
""" """
if iq['type'] == 'get': if iq['type'] == 'get':
log.debug("Received disco info query from " + \ log.debug("Received disco info query from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
info = await self.api['get_info'](iq['to'], info = await self.api['get_info'](iq['to'],
iq['disco_info']['node'], iq['disco_info']['node'],
@ -704,11 +724,11 @@ class XEP_0030(BasePlugin):
iq.set_payload(info.xml) iq.set_payload(info.xml)
iq.send() iq.send()
elif iq['type'] == 'result': elif iq['type'] == 'result':
log.debug("Received disco info result from " + \ log.debug("Received disco info result from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
if self.use_cache: if self.use_cache:
log.debug("Caching disco info result from " \ log.debug("Caching disco info result from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
if self.xmpp.is_component: if self.xmpp.is_component:
ito = iq['to'].full ito = iq['to'].full
else: else:
@ -719,7 +739,7 @@ class XEP_0030(BasePlugin):
iq) iq)
self.xmpp.event('disco_info', iq) self.xmpp.event('disco_info', iq)
async def _handle_disco_items(self, iq): async def _handle_disco_items(self, iq: Iq):
""" """
Process an incoming disco#items stanza. If it is a get Process an incoming disco#items stanza. If it is a get
request, find and return the appropriate items. If it request, find and return the appropriate items. If it
@ -728,7 +748,7 @@ class XEP_0030(BasePlugin):
:param iq: The incoming disco#items stanza. :param iq: The incoming disco#items stanza.
""" """
if iq['type'] == 'get': if iq['type'] == 'get':
log.debug("Received disco items query from " + \ log.debug("Received disco items query from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
items = await self.api['get_items'](iq['to'], items = await self.api['get_items'](iq['to'],
iq['disco_items']['node'], iq['disco_items']['node'],
@ -742,11 +762,11 @@ class XEP_0030(BasePlugin):
iq.set_payload(items.xml) iq.set_payload(items.xml)
iq.send() iq.send()
elif iq['type'] == 'result': elif iq['type'] == 'result':
log.debug("Received disco items result from " + \ log.debug("Received disco items result from "
"%s to %s.", iq['from'], iq['to']) "%s to %s.", iq['from'], iq['to'])
self.xmpp.event('disco_items', iq) self.xmpp.event('disco_items', iq)
def _fix_default_info(self, info): def _fix_default_info(self, info: DiscoInfo):
""" """
Disco#info results for a JID are required to include at least Disco#info results for a JID are required to include at least
one identity and feature. As a default, if no other identity is one identity and feature. As a default, if no other identity is
@ -762,20 +782,20 @@ class XEP_0030(BasePlugin):
if not info['node']: if not info['node']:
if not info['identities']: if not info['identities']:
if self.xmpp.is_component: if self.xmpp.is_component:
log.debug("No identity found for this entity. " + \ log.debug("No identity found for this entity. "
"Using default component identity.") "Using default component identity.")
info.add_identity('component', 'generic') info.add_identity('component', 'generic')
else: else:
log.debug("No identity found for this entity. " + \ log.debug("No identity found for this entity. "
"Using default client identity.") "Using default client identity.")
info.add_identity('client', 'bot') info.add_identity('client', 'bot')
if not info['features']: if not info['features']:
log.debug("No features found for this entity. " + \ log.debug("No features found for this entity. "
"Using default disco#info feature.") "Using default disco#info feature.")
info.add_feature(info.namespace) info.add_feature(info.namespace)
return result return result
def _wrap(self, ito, ifrom, payload, force=False): def _wrap(self, ito: OptJid, ifrom: OptJid, payload, force=False) -> Iq:
""" """
Ensure that results are wrapped in an Iq stanza Ensure that results are wrapped in an Iq stanza
if self.wrap_results has been set to True. if self.wrap_results has been set to True.

View File

@ -1,10 +1,19 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from typing import (
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)
from slixmpp.xmlstream import ElementBase, ET from slixmpp.xmlstream import ElementBase, ET
IdentityType = Tuple[str, str, Optional[str], Optional[str]]
class DiscoInfo(ElementBase): class DiscoInfo(ElementBase):
@ -18,21 +27,23 @@ class DiscoInfo(ElementBase):
category with a type of 'pc' to indicate the agent is a human operated category with a type of 'pc' to indicate the agent is a human operated
client with a GUI, or a category of 'gateway' with a type of 'aim' to client with a GUI, or a category of 'gateway' with a type of 'aim' to
identify the agent as a gateway for the legacy AIM protocol. See identify the agent as a gateway for the legacy AIM protocol. See
<http://xmpp.org/registrar/disco-categories.html> for a full list of `XMPP Registrar Disco Categories`_ for a full list of
accepted category and type combinations. accepted category and type combinations.
.. _XMPP Registrar Disco Categories: <http://xmpp.org/registrar/disco-categories.html>
Features are simply a set of the namespaces that identify the supported Features are simply a set of the namespaces that identify the supported
features. For example, a client that supports service discovery will features. For example, a client that supports service discovery will
include the feature 'http://jabber.org/protocol/disco#info'. include the feature ``http://jabber.org/protocol/disco#info``.
Since clients and components may operate in several roles at once, identity Since clients and components may operate in several roles at once, identity
and feature information may be grouped into "nodes". If one were to write and feature information may be grouped into "nodes". If one were to write
all of the identities and features used by a client, then node names would all of the identities and features used by a client, then node names would
be like section headings. be like section headings.
Example disco#info stanzas: Example disco#info stanza:
:: .. code-block:: xml
<iq type="get"> <iq type="get">
<query xmlns="http://jabber.org/protocol/disco#info" /> <query xmlns="http://jabber.org/protocol/disco#info" />
@ -46,30 +57,26 @@ class DiscoInfo(ElementBase):
<feature var="urn:xmpp:ping" /> <feature var="urn:xmpp:ping" />
</query> </query>
</iq> </iq>
Stanza Interface:
::
node -- The name of the node to either
query or return info from.
identities -- A set of 4-tuples, where each tuple contains
the category, type, xml:lang, and name
of an identity.
features -- A set of namespaces for features.
""" """
name = 'query' name = 'query'
namespace = 'http://jabber.org/protocol/disco#info' namespace = 'http://jabber.org/protocol/disco#info'
plugin_attrib = 'disco_info' plugin_attrib = 'disco_info'
#: Stanza interfaces:
#:
#: - ``node``: The name of the node to either query or return the info from
#: - ``identities``: A set of 4-tuples, where each tuple contains the
#: category, type, xml:lang and name of an identity
#: - ``features``: A set of namespaces for features
#:
interfaces = {'node', 'features', 'identities'} interfaces = {'node', 'features', 'identities'}
lang_interfaces = {'identities'} lang_interfaces = {'identities'}
# Cache identities and features # Cache identities and features
_identities = set() _identities: Set[Tuple[str, str, Optional[str]]]
_features = set() _features: Set[str]
def setup(self, xml=None): def setup(self, xml: Optional[ET.ElementTree] = None):
""" """
Populate the stanza object using an optional XML object. Populate the stanza object using an optional XML object.
@ -84,7 +91,9 @@ class DiscoInfo(ElementBase):
self._identities = {id[0:3] for id in self['identities']} self._identities = {id[0:3] for id in self['identities']}
self._features = self['features'] self._features = self['features']
def add_identity(self, category, itype, name=None, lang=None): def add_identity(self, category: str, itype: str,
name: Optional[str] = None, lang: Optional[str] = None
) -> bool:
""" """
Add a new identity element. Each identity must be unique Add a new identity element. Each identity must be unique
in terms of all four identity components. in terms of all four identity components.
@ -113,7 +122,8 @@ class DiscoInfo(ElementBase):
return True return True
return False return False
def del_identity(self, category, itype, name=None, lang=None): def del_identity(self, category: str, itype: str, name=None,
lang: Optional[str] = None) -> bool:
""" """
Remove a given identity. Remove a given identity.
@ -134,7 +144,8 @@ class DiscoInfo(ElementBase):
return True return True
return False return False
def get_identities(self, lang=None, dedupe=True): def get_identities(self, lang: Optional[str] = None, dedupe: bool = True
) -> Iterable[IdentityType]:
""" """
Return a set of all identities in tuple form as so: Return a set of all identities in tuple form as so:
@ -147,6 +158,7 @@ class DiscoInfo(ElementBase):
:param dedupe: If True, de-duplicate identities, otherwise :param dedupe: If True, de-duplicate identities, otherwise
return a list of all identities. return a list of all identities.
""" """
identities: Union[List[IdentityType], Set[IdentityType]]
if dedupe: if dedupe:
identities = set() identities = set()
else: else:
@ -158,13 +170,14 @@ class DiscoInfo(ElementBase):
id_xml.attrib['type'], id_xml.attrib['type'],
id_xml.attrib.get('{%s}lang' % self.xml_ns, None), id_xml.attrib.get('{%s}lang' % self.xml_ns, None),
id_xml.attrib.get('name', None)) id_xml.attrib.get('name', None))
if dedupe: if isinstance(identities, set):
identities.add(id) identities.add(id)
else: else:
identities.append(id) identities.append(id)
return identities return identities
def set_identities(self, identities, lang=None): def set_identities(self, identities: Iterable[IdentityType],
lang: Optional[str] = None):
""" """
Add or replace all identities. The identities must be a in set Add or replace all identities. The identities must be a in set
where each identity is a tuple of the form: where each identity is a tuple of the form:
@ -187,7 +200,7 @@ class DiscoInfo(ElementBase):
category, itype, lang, name = identity category, itype, lang, name = identity
self.add_identity(category, itype, name, lang) self.add_identity(category, itype, name, lang)
def del_identities(self, lang=None): def del_identities(self, lang: Optional[str] = None):
""" """
Remove all identities. If a language was specified, only Remove all identities. If a language was specified, only
remove identities using that language. remove identities using that language.
@ -204,7 +217,7 @@ class DiscoInfo(ElementBase):
id_xml.attrib.get('{%s}lang' % self.xml_ns, None))) id_xml.attrib.get('{%s}lang' % self.xml_ns, None)))
self.xml.remove(id_xml) self.xml.remove(id_xml)
def add_feature(self, feature): def add_feature(self, feature: str) -> bool:
""" """
Add a single, new feature. Add a single, new feature.
@ -218,7 +231,7 @@ class DiscoInfo(ElementBase):
return True return True
return False return False
def del_feature(self, feature): def del_feature(self, feature: str) -> bool:
""" """
Remove a single feature. Remove a single feature.
@ -232,20 +245,21 @@ class DiscoInfo(ElementBase):
return True return True
return False return False
def get_features(self, dedupe=True): def get_features(self, dedupe: bool = True) -> Iterable[str]:
"""Return the set of all supported features.""" """Return the set of all supported features."""
features: Union[List[str], Set[str]]
if dedupe: if dedupe:
features = set() features = set()
else: else:
features = [] features = []
for feature_xml in self.xml.findall('{%s}feature' % self.namespace): for feature_xml in self.xml.findall('{%s}feature' % self.namespace):
if dedupe: if isinstance(features, set):
features.add(feature_xml.attrib['var']) features.add(feature_xml.attrib['var'])
else: else:
features.append(feature_xml.attrib['var']) features.append(feature_xml.attrib['var'])
return features return features
def set_features(self, features): def set_features(self, features: Iterable[str]):
""" """
Add or replace the set of supported features. Add or replace the set of supported features.

View File

@ -1,9 +1,34 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from slixmpp.xmlstream import ElementBase, register_stanza_plugin from typing import (
Iterable,
Optional,
Set,
Tuple,
)
from slixmpp import JID
from slixmpp.xmlstream import (
ElementBase,
ET,
register_stanza_plugin,
)
class DiscoItem(ElementBase):
name = 'item'
namespace = 'http://jabber.org/protocol/disco#items'
plugin_attrib = name
interfaces = {'jid', 'node', 'name'}
def get_node(self) -> Optional[str]:
"""Return the item's node name or ``None``."""
return self._get_attr('node', None)
def get_name(self) -> Optional[str]:
"""Return the item's human readable name, or ``None``."""
return self._get_attr('name', None)
class DiscoItems(ElementBase): class DiscoItems(ElementBase):
@ -11,7 +36,7 @@ class DiscoItems(ElementBase):
""" """
Example disco#items stanzas: Example disco#items stanzas:
:: .. code-block:: xml
<iq type="get"> <iq type="get">
<query xmlns="http://jabber.org/protocol/disco#items" /> <query xmlns="http://jabber.org/protocol/disco#items" />
@ -28,25 +53,24 @@ class DiscoItems(ElementBase):
</query> </query>
</iq> </iq>
Stanza Interface:
::
node -- The name of the node to either
query or return info from.
items -- A list of 3-tuples, where each tuple contains
the JID, node, and name of an item.
""" """
name = 'query' name = 'query'
namespace = 'http://jabber.org/protocol/disco#items' namespace = 'http://jabber.org/protocol/disco#items'
plugin_attrib = 'disco_items' plugin_attrib = 'disco_items'
#: Stanza Interface:
#:
#: - ``node``: The name of the node to either
#: query or return info from.
#: - ``items``: A list of 3-tuples, where each tuple contains
#: the JID, node, and name of an item.
#:
interfaces = {'node', 'items'} interfaces = {'node', 'items'}
# Cache items # Cache items
_items = set() _items: Set[Tuple[JID, Optional[str]]]
def setup(self, xml=None): def setup(self, xml: Optional[ET.ElementTree] = None):
""" """
Populate the stanza object using an optional XML object. Populate the stanza object using an optional XML object.
@ -59,7 +83,8 @@ class DiscoItems(ElementBase):
ElementBase.setup(self, xml) ElementBase.setup(self, xml)
self._items = {item[0:2] for item in self['items']} self._items = {item[0:2] for item in self['items']}
def add_item(self, jid, node=None, name=None): def add_item(self, jid: JID, node: Optional[str] = None,
name: Optional[str] = None):
""" """
Add a new item element. Each item is required to have a Add a new item element. Each item is required to have a
JID, but may also specify a node value to reference JID, but may also specify a node value to reference
@ -80,7 +105,7 @@ class DiscoItems(ElementBase):
return True return True
return False return False
def del_item(self, jid, node=None): def del_item(self, jid: JID, node: Optional[str] = None) -> bool:
""" """
Remove a single item. Remove a single item.
@ -96,7 +121,7 @@ class DiscoItems(ElementBase):
return True return True
return False return False
def get_items(self): def get_items(self) -> Set[DiscoItem]:
"""Return all items.""" """Return all items."""
items = set() items = set()
for item in self['substanzas']: for item in self['substanzas']:
@ -104,7 +129,7 @@ class DiscoItems(ElementBase):
items.add((item['jid'], item['node'], item['name'])) items.add((item['jid'], item['node'], item['name']))
return items return items
def set_items(self, items): def set_items(self, items: Iterable[DiscoItem]):
""" """
Set or replace all items. The given items must be in a Set or replace all items. The given items must be in a
list or set where each item is a tuple of the form: list or set where each item is a tuple of the form:
@ -127,19 +152,4 @@ class DiscoItems(ElementBase):
self.iterables.remove(item) self.iterables.remove(item)
class DiscoItem(ElementBase):
name = 'item'
namespace = 'http://jabber.org/protocol/disco#items'
plugin_attrib = name
interfaces = {'jid', 'node', 'name'}
def get_node(self):
"""Return the item's node name or ``None``."""
return self._get_attr('node', None)
def get_name(self):
"""Return the item's human readable name, or ``None``."""
return self._get_attr('name', None)
register_stanza_plugin(DiscoItems, DiscoItem, iterable=True) register_stanza_plugin(DiscoItems, DiscoItem, iterable=True)

View File

@ -1,20 +1,46 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from __future__ import annotations
import logging import logging
from slixmpp import Iq from typing import (
Optional,
Any,
Dict,
Tuple,
TYPE_CHECKING,
Union,
Collection,
)
from slixmpp import BaseXMPP, JID
from slixmpp.stanza import Iq
from slixmpp.types import TypedDict, OptJidStr, OptJid
from slixmpp.exceptions import XMPPError, IqError, IqTimeout from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.xmlstream import JID
from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
if TYPE_CHECKING:
from slixmpp.plugins.xep_0030 import XEP_0030
class StaticDisco(object):
class NodeType(TypedDict):
info: DiscoInfo
items: DiscoItems
NodesType = Dict[
Tuple[str, str, str],
NodeType
]
class StaticDisco:
""" """
While components will likely require fully dynamic handling While components will likely require fully dynamic handling
@ -25,75 +51,87 @@ class StaticDisco(object):
StaticDisco provides a set of node handlers that will store StaticDisco provides a set of node handlers that will store
static sets of disco info and items in memory. static sets of disco info and items in memory.
Attributes: :var nodes: A dictionary mapping (JID, node) tuples to a dict
nodes -- A dictionary mapping (JID, node) tuples to a dict containing a disco#info and a disco#items stanza.
containing a disco#info and a disco#items stanza. :var xmpp: The main Slixmpp object.
xmpp -- The main Slixmpp object. :var disco: The instance of the XEP-0030 plugin.
""" """
def __init__(self, xmpp, disco): def __init__(self, xmpp: 'BaseXMPP', disco: 'XEP_0030'):
""" """
Create a static disco interface. Sets of disco#info and Create a static disco interface. Sets of disco#info and
disco#items are maintained for every given JID and node disco#items are maintained for every given JID and node
combination. These stanzas are used to store disco combination. These stanzas are used to store disco
information in memory without any additional processing. information in memory without any additional processing.
Arguments: :param xmpp: The main Slixmpp object.
xmpp -- The main Slixmpp object. :param disco: The XEP-0030 plugin.
""" """
self.nodes = {} self.nodes: NodesType = {}
self.xmpp = xmpp self.xmpp: BaseXMPP = xmpp
self.disco = disco self.disco: 'XEP_0030' = disco
def add_node(self, jid=None, node=None, ifrom=None): def add_node(self, jid: OptJidStr = None, node: Optional[str] = None,
""" ifrom: OptJidStr = None) -> NodeType:
Create a new set of stanzas for the provided
JID and node combination.
Arguments:
jid -- The JID that will own the new stanzas.
node -- The node that will own the new stanzas.
"""
if jid is None: if jid is None:
jid = self.xmpp.boundjid.full node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
if ifrom is None:
node_ifrom = ''
elif isinstance(ifrom, JID):
node_ifrom = ifrom.full
else:
node_ifrom = ifrom
if node is None:
node = ''
if (node_jid, node, node_ifrom) not in self.nodes:
node_dict: NodeType = {
'info': DiscoInfo(),
'items': DiscoItems(),
}
node_dict['info']['node'] = node
node_dict['items']['node'] = node
self.nodes[(node_jid, node, node_ifrom)] = node_dict
return self.nodes[(node_jid, node, node_ifrom)]
def get_node(self, jid: OptJidStr = None, node: Optional[str] = None,
ifrom: OptJidStr = None) -> NodeType:
if jid is None:
node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
else:
node_jid = jid
if node is None: if node is None:
node = '' node = ''
if ifrom is None: if ifrom is None:
ifrom = '' node_ifrom = ''
if isinstance(ifrom, JID): elif isinstance(ifrom, JID):
ifrom = ifrom.full node_ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes: else:
new_node = {'info': DiscoInfo(), 'items': DiscoItems()} node_ifrom = ifrom
new_node['info']['node'] = node if (node_jid, node, node_ifrom) not in self.nodes:
new_node['items']['node'] = node self.add_node(node_jid, node, node_ifrom)
self.nodes[(jid, node, ifrom)] = new_node return self.nodes[(node_jid, node, node_ifrom)]
return self.nodes[(jid, node, ifrom)]
def get_node(self, jid=None, node=None, ifrom=None): def node_exists(self, jid: OptJidStr = None, node: Optional[str] = None,
ifrom: OptJidStr = None) -> bool:
if jid is None: if jid is None:
jid = self.xmpp.boundjid.full node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
else:
node_jid = jid
if node is None: if node is None:
node = '' node = ''
if ifrom is None: if ifrom is None:
ifrom = '' node_ifrom = ''
if isinstance(ifrom, JID): elif isinstance(ifrom, JID):
ifrom = ifrom.full node_ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes: else:
self.add_node(jid, node, ifrom) node_ifrom = ifrom
return self.nodes[(jid, node, ifrom)] return (node_jid, node, node_ifrom) in self.nodes
def node_exists(self, jid=None, node=None, ifrom=None):
if jid is None:
jid = self.xmpp.boundjid.full
if node is None:
node = ''
if ifrom is None:
ifrom = ''
if isinstance(ifrom, JID):
ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes:
return False
return True
# ================================================================= # =================================================================
# Node Handlers # Node Handlers
@ -109,18 +147,20 @@ class StaticDisco(object):
# the requester's JID, except for cached results. To do that, # the requester's JID, except for cached results. To do that,
# register a custom node handler. # register a custom node handler.
async def supports(self, jid, node, ifrom, data): async def supports(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[bool]:
""" """
Check if a JID supports a given feature. Check if a JID supports a given feature.
The data parameter may provide: The data parameter may provide:
feature -- The feature to check for support.
local -- If true, then the query is for a JID/node :param feature: The feature to check for support.
:param local: If true, then the query is for a JID/node
combination handled by this Slixmpp instance and combination handled by this Slixmpp instance and
no stanzas need to be sent. no stanzas need to be sent.
Otherwise, a disco stanza must be sent to the Otherwise, a disco stanza must be sent to the
remove JID to retrieve the info. remove JID to retrieve the info.
cached -- If true, then look for the disco info data from :param cached: If true, then look for the disco info data from
the local cache system. If no results are found, the local cache system. If no results are found,
send the query as usual. The self.use_cache send the query as usual. The self.use_cache
setting must be set to true for this option to setting must be set to true for this option to
@ -147,26 +187,29 @@ class StaticDisco(object):
except IqTimeout: except IqTimeout:
return None return None
async def has_identity(self, jid, node, ifrom, data): async def has_identity(self, jid: OptJid, node: Optional[str],
ifrom: OptJid, data: Dict[str, Any]
) -> Optional[bool]:
""" """
Check if a JID has a given identity. Check if a JID has a given identity.
The data parameter may provide: The data parameter may provide:
category -- The category of the identity to check.
itype -- The type of the identity to check. :param category: The category of the identity to check.
lang -- The language of the identity to check. :param itype: The type of the identity to check.
local -- If true, then the query is for a JID/node :param lang: The language of the identity to check.
combination handled by this Slixmpp instance and :param local: If true, then the query is for a JID/node
no stanzas need to be sent. combination handled by this Slixmpp instance and
Otherwise, a disco stanza must be sent to the no stanzas need to be sent.
remove JID to retrieve the info. Otherwise, a disco stanza must be sent to the
cached -- If true, then look for the disco info data from remove JID to retrieve the info.
the local cache system. If no results are found, :param cached: If true, then look for the disco info data from
send the query as usual. The self.use_cache the local cache system. If no results are found,
setting must be set to true for this option to send the query as usual. The self.use_cache
be useful. If set to false, then the cache will setting must be set to true for this option to
be skipped, even if a result has already been be useful. If set to false, then the cache will
cached. Defaults to false. be skipped, even if a result has already been
cached. Defaults to false.
""" """
identity = (data.get('category', None), identity = (data.get('category', None),
data.get('itype', None), data.get('itype', None),
@ -179,14 +222,17 @@ class StaticDisco(object):
info = await self.disco.get_info(jid=jid, node=node, info = await self.disco.get_info(jid=jid, node=node,
ifrom=ifrom, **data) ifrom=ifrom, **data)
info = self.disco._wrap(ifrom, jid, info, True) info = self.disco._wrap(ifrom, jid, info, True)
trunc = lambda i: (i[0], i[1], i[2])
def trunc(i):
return (i[0], i[1], i[2])
return identity in map(trunc, info['disco_info']['identities']) return identity in map(trunc, info['disco_info']['identities'])
except IqError: except IqError:
return False return False
except IqTimeout: except IqTimeout:
return None return None
def get_info(self, jid, node, ifrom, data): def get_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoInfo]:
""" """
Return the stored info data for the requested JID/node combination. Return the stored info data for the requested JID/node combination.
@ -200,7 +246,8 @@ class StaticDisco(object):
else: else:
return self.get_node(jid, node)['info'] return self.get_node(jid, node)['info']
def set_info(self, jid, node, ifrom, data): def set_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: DiscoInfo):
""" """
Set the entire info stanza for a JID/node at once. Set the entire info stanza for a JID/node at once.
@ -209,7 +256,8 @@ class StaticDisco(object):
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'] = data new_node['info'] = data
def del_info(self, jid, node, ifrom, data): def del_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Reset the info stanza for a given JID/node combination. Reset the info stanza for a given JID/node combination.
@ -218,7 +266,8 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'] = DiscoInfo() self.get_node(jid, node)['info'] = DiscoInfo()
def get_items(self, jid, node, ifrom, data): def get_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoItems]:
""" """
Return the stored items data for the requested JID/node combination. Return the stored items data for the requested JID/node combination.
@ -232,7 +281,8 @@ class StaticDisco(object):
else: else:
return self.get_node(jid, node)['items'] return self.get_node(jid, node)['items']
def set_items(self, jid, node, ifrom, data): def set_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[Tuple]]):
""" """
Replace the stored items data for a JID/node combination. Replace the stored items data for a JID/node combination.
@ -243,7 +293,8 @@ class StaticDisco(object):
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['items']['items'] = items new_node['items']['items'] = items
def del_items(self, jid, node, ifrom, data): def del_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Reset the items stanza for a given JID/node combination. Reset the items stanza for a given JID/node combination.
@ -252,15 +303,17 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['items'] = DiscoItems() self.get_node(jid, node)['items'] = DiscoItems()
def add_identity(self, jid, node, ifrom, data): def add_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Optional[str]]):
""" """
Add a new identity to the JID/node combination. Add a new identity to the JID/node combination.
The data parameter may provide: The data parameter may provide:
category -- The general category to which the agent belongs.
itype -- A more specific designation with the category. :param category: The general category to which the agent belongs.
name -- Optional human readable name for this identity. :param itype: A more specific designation with the category.
lang -- Optional standard xml:lang value. :param name: Optional human readable name for this identity.
:param lang: Optional standard xml:lang value.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'].add_identity( new_node['info'].add_identity(
@ -269,27 +322,31 @@ class StaticDisco(object):
data.get('name', None), data.get('name', None),
data.get('lang', None)) data.get('lang', None))
def set_identities(self, jid, node, ifrom, data): def set_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[str]]):
""" """
Add or replace all identities for a JID/node combination. Add or replace all identities for a JID/node combination.
The data parameter should include: The data parameter should include:
identities -- A list of identities in tuple form:
(category, type, name, lang) :param identities: A list of identities in tuple form:
(category, type, name, lang)
""" """
identities = data.get('identities', set()) identities = data.get('identities', set())
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info']['identities'] = identities new_node['info']['identities'] = identities
def del_identity(self, jid, node, ifrom, data): def del_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Optional[str]]):
""" """
Remove an identity from a JID/node combination. Remove an identity from a JID/node combination.
The data parameter may provide: The data parameter may provide:
category -- The general category to which the agent belonged.
itype -- A more specific designation with the category. :param category: The general category to which the agent belonged.
name -- Optional human readable name for this identity. :param itype: A more specific designation with the category.
lang -- Optional, standard xml:lang value. :param name: Optional human readable name for this identity.
:param lang: Optional, standard xml:lang value.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'].del_identity( self.get_node(jid, node)['info'].del_identity(
@ -298,7 +355,8 @@ class StaticDisco(object):
data.get('name', None), data.get('name', None),
data.get('lang', None)) data.get('lang', None))
def del_identities(self, jid, node, ifrom, data): def del_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Remove all identities from a JID/node combination. Remove all identities from a JID/node combination.
@ -307,40 +365,47 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
del self.get_node(jid, node)['info']['identities'] del self.get_node(jid, node)['info']['identities']
def add_feature(self, jid, node, ifrom, data): def add_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Add a feature to a JID/node combination. Add a feature to a JID/node combination.
The data parameter should include: The data parameter should include:
feature -- The namespace of the supported feature.
:param feature: The namespace of the supported feature.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'].add_feature( new_node['info'].add_feature(
data.get('feature', '')) data.get('feature', ''))
def set_features(self, jid, node, ifrom, data): def set_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[str]]):
""" """
Add or replace all features for a JID/node combination. Add or replace all features for a JID/node combination.
The data parameter should include: The data parameter should include:
features -- The new set of supported features.
:param features: The new set of supported features.
""" """
features = data.get('features', set()) features = data.get('features', set())
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info']['features'] = features new_node['info']['features'] = features
def del_feature(self, jid, node, ifrom, data): def del_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Remove a feature from a JID/node combination. Remove a feature from a JID/node combination.
The data parameter should include: The data parameter should include:
feature -- The namespace of the removed feature.
:param feature: The namespace of the removed feature.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'].del_feature( self.get_node(jid, node)['info'].del_feature(
data.get('feature', '')) data.get('feature', ''))
def del_features(self, jid, node, ifrom, data): def del_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Remove all features from a JID/node combination. Remove all features from a JID/node combination.
@ -350,15 +415,17 @@ class StaticDisco(object):
return return
del self.get_node(jid, node)['info']['features'] del self.get_node(jid, node)['info']['features']
def add_item(self, jid, node, ifrom, data): def add_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Add an item to a JID/node combination. Add an item to a JID/node combination.
The data parameter may include: The data parameter may include:
ijid -- The JID for the item.
inode -- Optional additional information to reference :param ijid: The JID for the item.
non-addressable items. :param inode: Optional additional information to reference
name -- Optional human readable name for the item. non-addressable items.
:param name: Optional human readable name for the item.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['items'].add_item( new_node['items'].add_item(
@ -366,20 +433,23 @@ class StaticDisco(object):
node=data.get('inode', ''), node=data.get('inode', ''),
name=data.get('name', '')) name=data.get('name', ''))
def del_item(self, jid, node, ifrom, data): def del_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Remove an item from a JID/node combination. Remove an item from a JID/node combination.
The data parameter may include: The data parameter may include:
ijid -- JID of the item to remove.
inode -- Optional extra identifying information. :param ijid: JID of the item to remove.
:param inode: Optional extra identifying information.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['items'].del_item( self.get_node(jid, node)['items'].del_item(
data.get('ijid', ''), data.get('ijid', ''),
node=data.get('inode', None)) node=data.get('inode', None))
def cache_info(self, jid, node, ifrom, data): def cache_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Union[Iq, DiscoInfo]):
""" """
Cache disco information for an external JID. Cache disco information for an external JID.
@ -388,12 +458,15 @@ class StaticDisco(object):
the disco#info substanza itself. the disco#info substanza itself.
""" """
if isinstance(data, Iq): if isinstance(data, Iq):
data = data['disco_info'] info = data['disco_info']
else:
info = data
new_node = self.add_node(jid, node, ifrom) new_node = self.add_node(jid, node, ifrom)
new_node['info'] = data new_node['info'] = info
def get_cached_info(self, jid, node, ifrom, data): def get_cached_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoInfo]:
""" """
Retrieve cached disco info data. Retrieve cached disco info data.
@ -401,5 +474,4 @@ class StaticDisco(object):
""" """
if not self.node_exists(jid, node, ifrom): if not self.node_exists(jid, node, ifrom):
return None return None
else: return self.get_node(jid, node, ifrom)['info']
return self.get_node(jid, node, ifrom)['info']

View File

@ -7,7 +7,10 @@
This file contains boilerplate to define types relevant to slixmpp. This file contains boilerplate to define types relevant to slixmpp.
""" """
from typing import Optional from typing import (
Optional,
Union,
)
try: try:
from typing import ( from typing import (
@ -31,7 +34,6 @@ PresenceShows = Literal[
'away', 'chat', 'dnd', 'xa', 'away', 'chat', 'dnd', 'xa',
] ]
MessageTypes = Literal[ MessageTypes = Literal[
'chat', 'error', 'groupchat', 'chat', 'error', 'groupchat',
'headline', 'normal', 'headline', 'normal',
@ -70,3 +72,7 @@ class MucRoomItem(TypedDict, total=False):
MucRoomItemKeys = Literal[ MucRoomItemKeys = Literal[
'jid', 'role', 'affiliation', 'show', 'status', 'alt_nick', 'jid', 'role', 'affiliation', 'show', 'status', 'alt_nick',
] ]
OptJid = Optional[JID]
JidStr = Union[str, JID]
OptJidStr = Optional[Union[str, JID]]