XEP-0030 plugin: Fix PEP8, types

This commit is contained in:
mathieui 2021-03-01 20:54:53 +01:00
parent 2ff72d88fd
commit 10611525a0

View File

@ -7,14 +7,20 @@ import asyncio
import logging import logging
from asyncio import Future from asyncio import Future
from typing import Optional, Callable from typing import (
Optional,
Callable,
List,
Union,
)
from slixmpp import Iq from slixmpp import JID
from slixmpp import future_wrapper from slixmpp.stanza import Iq
from slixmpp.types import OptJid
from slixmpp.plugins import BasePlugin from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream.handler import Callback, CoroutineCallback from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, JID from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins.xep_0030 import stanza, DiscoInfo, DiscoItems from slixmpp.plugins.xep_0030 import stanza, DiscoInfo, DiscoItems
from slixmpp.plugins.xep_0030 import StaticDisco from slixmpp.plugins.xep_0030 import StaticDisco
@ -42,14 +48,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
Stream Handlers: Stream Handlers:
@ -62,19 +68,13 @@ class XEP_0030(BasePlugin):
Events: Events:
:: - :term:`disco_info` -- Received a disco#info Iq query result.
- :term:`disco_items` -- Received a disco#items Iq query result.
disco_info -- Received a disco#info Iq query result.
disco_items -- Received a disco#items Iq query result.
disco_info_query -- Received a disco#info Iq query request.
disco_items_query -- Received a disco#items Iq query request.
Attributes: Attributes:
:var static: Object containing the default set of :var static: Object containing the default set of
static node handlers. static node handlers.
:var default_handlers: A dictionary mapping operations to the default
global handler (by default, the static handlers).
""" """
name = 'xep_0030' name = 'xep_0030'
@ -85,20 +85,23 @@ class XEP_0030(BasePlugin):
'use_cache': True, 'use_cache': True,
'wrap_results': False 'wrap_results': False
} }
static: StaticDisco
def plugin_init(self): def plugin_init(self):
""" """
Start the XEP-0030 plugin. Start the XEP-0030 plugin.
""" """
self.xmpp.register_handler( self.xmpp.register_handler(CoroutineCallback(
CoroutineCallback('Disco Info', 'Disco Info',
StanzaPath('iq/disco_info'), StanzaPath('iq/disco_info'),
self._handle_disco_info)) self._handle_disco_info
))
self.xmpp.register_handler( self.xmpp.register_handler(CoroutineCallback(
CoroutineCallback('Disco Items', 'Disco Items',
StanzaPath('iq/disco_items'), StanzaPath('iq/disco_items'),
self._handle_disco_items)) self._handle_disco_items
))
register_stanza_plugin(Iq, DiscoInfo) register_stanza_plugin(Iq, DiscoInfo)
register_stanza_plugin(Iq, DiscoItems) register_stanza_plugin(Iq, DiscoItems)
@ -127,7 +130,7 @@ class XEP_0030(BasePlugin):
self.api.register(default_handler, op) self.api.register(default_handler, op)
self.api.register_default(default_handler, op) self.api.register_default(default_handler, op)
def set_node_handler(self, htype: str, jid: Optional[JID] = None, def set_node_handler(self, htype: str, jid: OptJid = None,
node: Optional[str] = None, node: Optional[str] = None,
handler: Optional[Callable] = None): handler: Optional[Callable] = None):
""" """
@ -142,14 +145,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
Handler types: Handler types:
@ -180,7 +183,7 @@ class XEP_0030(BasePlugin):
""" """
self.api.register(handler, htype, jid, node) self.api.register(handler, htype, jid, node)
def del_node_handler(self, htype, jid, node): def del_node_handler(self, htype: str, jid: OptJid, node: Optional[str]):
""" """
Remove a handler type for a JID and node combination. Remove a handler type for a JID and node combination.
@ -190,14 +193,14 @@ class XEP_0030(BasePlugin):
Node handler hierarchy: Node handler hierarchy:
:: ====== ======= ============================
JID Node Level
JID | Node | Level ====== ======= ============================
--------------------- None None Global
None | None | Global Given None All nodes for the JID
Given | None | All nodes for the JID None Given Node on self.xmpp.boundjid
None | Given | Node on self.xmpp.boundjid Given Given A single node
Given | Given | A single node ====== ======= ============================
:param htype: The type of handler to remove. :param htype: The type of handler to remove.
:param jid: The JID from which to remove the handler. :param jid: The JID from which to remove the handler.
@ -205,7 +208,8 @@ class XEP_0030(BasePlugin):
""" """
self.api.unregister(htype, jid, node) self.api.unregister(htype, jid, node)
def restore_defaults(self, jid=None, node=None, handlers=None): def restore_defaults(self, jid: OptJid = None, node: Optional[str] = None,
handlers: Optional[List[Callable]] = None):
""" """
Change all or some of a node's handlers to the default Change all or some of a node's handlers to the default
handlers. Useful for manually overriding the contents handlers. Useful for manually overriding the contents
@ -227,19 +231,15 @@ class XEP_0030(BasePlugin):
for op in handlers: for op in handlers:
self.api.restore_default(op, jid, node) self.api.restore_default(op, jid, node)
def supports(self, jid=None, node=None, feature=None, local=False, def supports(self, jid: OptJid = None, node: Optional[str] = None,
cached=True, ifrom=None) -> Future: feature: Optional[str] = None, local: bool = False,
cached: bool = True, ifrom: OptJid = None) -> Future:
""" """
Check if a JID supports a given feature. Check if a JID supports a given feature.
.. versionchanged:: 1.8.0 .. versionchanged:: 1.8.0
This function now returns a Future. This function now returns a Future.
Return values:
:param True: The feature is supported
:param False: The feature is not listed as supported
:param None: Nothing could be found due to a timeout
:param jid: Request info from this JID. :param jid: Request info from this JID.
:param node: The particular node to query. :param node: The particular node to query.
:param feature: The name of the feature to check. :param feature: The name of the feature to check.
@ -255,24 +255,27 @@ class XEP_0030(BasePlugin):
be useful. If set to false, then the cache will be useful. If set to false, then the cache will
be skipped, even if a result has already been be skipped, even if a result has already been
cached. Defaults to false. cached. Defaults to false.
:returns True: The feature is supported
:returns False: The feature is not listed as supported
:returns None: Nothing could be found due to a timeout
""" """
data = {'feature': feature, data = {'feature': feature,
'local': local, 'local': local,
'cached': cached} 'cached': cached}
return self.api['supports'](jid, node, ifrom, data) return self.api['supports'](jid, node, ifrom, data)
def has_identity(self, jid=None, node=None, category=None, itype=None, def has_identity(self, jid: OptJid = None, node: Optional[str] = None,
lang=None, local=False, cached=True, ifrom=None) -> Future: category: Optional[str] = None,
itype: Optional[str] = None, lang: Optional[str] = None,
local: bool = False, cached: bool = True,
ifrom: OptJid = None) -> Future:
""" """
Check if a JID provides a given identity. Check if a JID provides a given identity.
.. versionchanged:: 1.8.0 .. versionchanged:: 1.8.0
This function now returns a Future. This function now returns a Future.
Return values:
:param True: The identity is provided
:param False: The identity is not listed
:param None: Nothing could be found due to a timeout
:param jid: Request info from this JID. :param jid: Request info from this JID.
:param node: The particular node to query. :param node: The particular node to query.
@ -291,6 +294,10 @@ class XEP_0030(BasePlugin):
be useful. If set to false, then the cache will be useful. If set to false, then the cache will
be skipped, even if a result has already been be skipped, even if a result has already been
cached. Defaults to false. cached. Defaults to false.
:returns True: The identity is provided
:returns False: The identity is not listed
:returns None: Nothing could be found due to a timeout
""" """
data = {'category': category, data = {'category': category,
'itype': itype, 'itype': itype,
@ -301,7 +308,8 @@ class XEP_0030(BasePlugin):
async def get_info_from_domain(self, domain=None, timeout=None, async def get_info_from_domain(self, domain=None, timeout=None,
cached=True, callback=None): cached=True, callback=None):
"""Fetch disco#info of specified domain and one disco#items level below""" """Fetch disco#info of specified domain and one disco#items level below
"""
if domain is None: if domain is None:
domain = self.xmpp.boundjid.domain domain = self.xmpp.boundjid.domain
@ -322,7 +330,9 @@ class XEP_0030(BasePlugin):
) )
self.domain_infos[domain] = [ self.domain_infos[domain] = [
future.result() for future in info_futures if not future.exception()] future.result()
for future in info_futures if not future.exception()
]
results = self.domain_infos[domain] results = self.domain_infos[domain]
@ -330,8 +340,9 @@ class XEP_0030(BasePlugin):
callback(results) callback(results)
return results return results
async def get_info(self, jid=None, node=None, local=None, async def get_info(self, jid: OptJid = None, node: Optional[str] = None,
cached=None, **kwargs): local: Optional[bool] = None,
cached: Optional[bool] = None, **kwargs) -> Iq:
""" """
Retrieve the disco#info results from a given JID/node combination. Retrieve the disco#info results from a given JID/node combination.
@ -375,7 +386,7 @@ class XEP_0030(BasePlugin):
local = True local = True
if local: if local:
log.debug("Looking up local disco#info data " + \ log.debug("Looking up local disco#info data "
"for %s, node %s.", jid, node) "for %s, node %s.", jid, node)
info = await self.api['get_info']( info = await self.api['get_info'](
jid, node, kwargs.get('ifrom', None), jid, node, kwargs.get('ifrom', None),
@ -385,7 +396,7 @@ class XEP_0030(BasePlugin):
return self._wrap(kwargs.get('ifrom', None), jid, info) return self._wrap(kwargs.get('ifrom', None), jid, info)
if cached: if cached:
log.debug("Looking up cached disco#info data " + \ log.debug("Looking up cached disco#info data "
"for %s, node %s.", jid, node) "for %s, node %s.", jid, node)
info = await self.api['get_cached_info']( info = await self.api['get_cached_info'](
jid, node, jid, node,
@ -401,11 +412,10 @@ class XEP_0030(BasePlugin):
iq['to'] = jid iq['to'] = jid
iq['type'] = 'get' iq['type'] = 'get'
iq['disco_info']['node'] = node if node else '' iq['disco_info']['node'] = node if node else ''
return await iq.send(timeout=kwargs.get('timeout', None), return await iq.send(**kwargs)
callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_info(self, jid=None, node=None, info=None) -> Future: def set_info(self, jid: OptJid = None, node: Optional[str] = None,
info: Optional[Union[Iq, DiscoInfo]] = None) -> Future:
""" """
Set the disco#info data for a JID/node based on an existing Set the disco#info data for a JID/node based on an existing
disco#info stanza. disco#info stanza.
@ -418,7 +428,9 @@ class XEP_0030(BasePlugin):
info = info['disco_info'] info = info['disco_info']
return self.api['set_info'](jid, node, None, info) return self.api['set_info'](jid, node, None, info)
async def get_items(self, jid=None, node=None, local=False, **kwargs): async def get_items(self, jid: OptJid = None, node: Optional[str] = None,
local: bool = False, ifrom: OptJid = None,
**kwargs) -> Iq:
""" """
Retrieve the disco#items results from a given JID/node combination. Retrieve the disco#items results from a given JID/node combination.
@ -445,25 +457,22 @@ class XEP_0030(BasePlugin):
Otherwise the parameter is ignored. Otherwise the parameter is ignored.
""" """
if local or local is None and jid is None: if local or local is None and jid is None:
items = await self.api['get_items'](jid, node, items = await self.api['get_items'](jid, node, ifrom, kwargs)
kwargs.get('ifrom', None),
kwargs)
return self._wrap(kwargs.get('ifrom', None), jid, items) return self._wrap(kwargs.get('ifrom', None), jid, items)
iq = self.xmpp.Iq() iq = self.xmpp.Iq()
# Check dfrom parameter for backwards compatibility # Check dfrom parameter for backwards compatibility
iq['from'] = kwargs.get('ifrom', kwargs.get('dfrom', '')) iq['from'] = ifrom or kwargs.get('dfrom', '')
iq['to'] = jid iq['to'] = jid
iq['type'] = 'get' iq['type'] = 'get'
iq['disco_items']['node'] = node if node else '' iq['disco_items']['node'] = node if node else ''
if kwargs.get('iterator', False) and self.xmpp['xep_0059']: if kwargs.get('iterator', False) and self.xmpp['xep_0059']:
return self.xmpp['xep_0059'].iterate(iq, 'disco_items') return self.xmpp['xep_0059'].iterate(iq, 'disco_items')
else: else:
return await iq.send(timeout=kwargs.get('timeout', None), return await iq.send(**kwargs)
callback=kwargs.get('callback', None),
timeout_callback=kwargs.get('timeout_callback', None))
def set_items(self, jid=None, node=None, **kwargs) -> Future: def set_items(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Set or replace all items for the specified JID/node combination. Set or replace all items for the specified JID/node combination.
@ -480,7 +489,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_items'](jid, node, None, kwargs) return self.api['set_items'](jid, node, None, kwargs)
def del_items(self, jid=None, node=None, **kwargs) -> Future: def del_items(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all items from the given JID/node combination. Remove all items from the given JID/node combination.
@ -493,7 +503,9 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_items'](jid, node, None, kwargs) return self.api['del_items'](jid, node, None, kwargs)
def add_item(self, jid='', name='', node=None, subnode='', ijid=None) -> Future: def add_item(self, jid: str = '', name: str = '',
node: Optional[str] = None, subnode: str = '',
ijid: OptJid = None) -> Future:
""" """
Add a new item element to the given JID/node combination. Add a new item element to the given JID/node combination.
@ -516,7 +528,8 @@ class XEP_0030(BasePlugin):
'inode': subnode} 'inode': subnode}
return self.api['add_item'](ijid, node, None, kwargs) return self.api['add_item'](ijid, node, None, kwargs)
def del_item(self, jid=None, node=None, **kwargs) -> Future: def del_item(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove a single item from the given JID/node combination. Remove a single item from the given JID/node combination.
@ -527,8 +540,9 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_item'](jid, node, None, kwargs) return self.api['del_item'](jid, node, None, kwargs)
def add_identity(self, category='', itype='', name='', def add_identity(self, category: str = '', itype: str = '', name: str = '',
node=None, jid=None, lang=None) -> Future: node: Optional[str] = None, jid: OptJid = None,
lang: Optional[str] = None) -> Future:
""" """
Add a new identity to the given JID/node combination. Add a new identity to the given JID/node combination.
@ -557,7 +571,7 @@ class XEP_0030(BasePlugin):
return self.api['add_identity'](jid, node, None, kwargs) return self.api['add_identity'](jid, node, None, kwargs)
def add_feature(self, feature: str, node: Optional[str] = None, def add_feature(self, feature: str, node: Optional[str] = None,
jid: Optional[JID] = None) -> Future: jid: OptJid = None) -> Future:
""" """
Add a feature to a JID/node combination. Add a feature to a JID/node combination.
@ -571,7 +585,7 @@ class XEP_0030(BasePlugin):
kwargs = {'feature': feature} kwargs = {'feature': feature}
return self.api['add_feature'](jid, node, None, kwargs) return self.api['add_feature'](jid, node, None, kwargs)
def del_identity(self, jid: Optional[JID] = None, def del_identity(self, jid: OptJid = None,
node: Optional[str] = None, **kwargs) -> Future: node: Optional[str] = None, **kwargs) -> Future:
""" """
Remove an identity from the given JID/node combination. Remove an identity from the given JID/node combination.
@ -588,7 +602,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_identity'](jid, node, None, kwargs) return self.api['del_identity'](jid, node, None, kwargs)
def del_feature(self, jid=None, node=None, **kwargs) -> Future: def del_feature(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove a feature from a given JID/node combination. Remove a feature from a given JID/node combination.
@ -601,7 +616,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_feature'](jid, node, None, kwargs) return self.api['del_feature'](jid, node, None, kwargs)
def set_identities(self, jid=None, node=None, **kwargs) -> Future: def set_identities(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Add or replace all identities for the given JID/node combination. Add or replace all identities for the given JID/node combination.
@ -618,7 +634,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_identities'](jid, node, None, kwargs) return self.api['set_identities'](jid, node, None, kwargs)
def del_identities(self, jid=None, node=None, **kwargs) -> Future: def del_identities(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all identities for a JID/node combination. Remove all identities for a JID/node combination.
@ -635,7 +652,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_identities'](jid, node, None, kwargs) return self.api['del_identities'](jid, node, None, kwargs)
def set_features(self, jid=None, node=None, **kwargs) -> Future: def set_features(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Add or replace the set of supported features Add or replace the set of supported features
for a JID/node combination. for a JID/node combination.
@ -649,7 +667,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['set_features'](jid, node, None, kwargs) return self.api['set_features'](jid, node, None, kwargs)
def del_features(self, jid=None, node=None, **kwargs) -> Future: def del_features(self, jid: OptJid = None, node: Optional[str] = None,
**kwargs) -> Future:
""" """
Remove all features from a JID/node combination. Remove all features from a JID/node combination.
@ -661,7 +680,8 @@ class XEP_0030(BasePlugin):
""" """
return self.api['del_features'](jid, node, None, kwargs) return self.api['del_features'](jid, node, None, kwargs)
async def _run_node_handler(self, htype, jid, node=None, ifrom=None, data=None): async def _run_node_handler(self, htype, jid, node: Optional[str] = None,
ifrom: OptJid = None, data=None):
""" """
Execute the most specific node handler for the given Execute the most specific node handler for the given
JID/node combination. JID/node combination.
@ -676,7 +696,7 @@ class XEP_0030(BasePlugin):
return await self.api[htype](jid, node, ifrom, data) return await self.api[htype](jid, node, ifrom, data)
async def _handle_disco_info(self, iq): async def _handle_disco_info(self, iq: Iq):
""" """
Process an incoming disco#info stanza. If it is a get Process an incoming disco#info stanza. If it is a get
request, find and return the appropriate identities request, find and return the appropriate identities
@ -686,7 +706,7 @@ class XEP_0030(BasePlugin):
:param iq: The incoming disco#items stanza. :param iq: The incoming disco#items stanza.
""" """
if iq['type'] == 'get': if iq['type'] == 'get':
log.debug("Received disco info query from " + \ log.debug("Received disco info query from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
info = await self.api['get_info'](iq['to'], info = await self.api['get_info'](iq['to'],
iq['disco_info']['node'], iq['disco_info']['node'],
@ -704,11 +724,11 @@ class XEP_0030(BasePlugin):
iq.set_payload(info.xml) iq.set_payload(info.xml)
iq.send() iq.send()
elif iq['type'] == 'result': elif iq['type'] == 'result':
log.debug("Received disco info result from " + \ log.debug("Received disco info result from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
if self.use_cache: if self.use_cache:
log.debug("Caching disco info result from " \ log.debug("Caching disco info result from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
if self.xmpp.is_component: if self.xmpp.is_component:
ito = iq['to'].full ito = iq['to'].full
else: else:
@ -719,7 +739,7 @@ class XEP_0030(BasePlugin):
iq) iq)
self.xmpp.event('disco_info', iq) self.xmpp.event('disco_info', iq)
async def _handle_disco_items(self, iq): async def _handle_disco_items(self, iq: Iq):
""" """
Process an incoming disco#items stanza. If it is a get Process an incoming disco#items stanza. If it is a get
request, find and return the appropriate items. If it request, find and return the appropriate items. If it
@ -728,7 +748,7 @@ class XEP_0030(BasePlugin):
:param iq: The incoming disco#items stanza. :param iq: The incoming disco#items stanza.
""" """
if iq['type'] == 'get': if iq['type'] == 'get':
log.debug("Received disco items query from " + \ log.debug("Received disco items query from "
"<%s> to <%s>.", iq['from'], iq['to']) "<%s> to <%s>.", iq['from'], iq['to'])
items = await self.api['get_items'](iq['to'], items = await self.api['get_items'](iq['to'],
iq['disco_items']['node'], iq['disco_items']['node'],
@ -742,11 +762,11 @@ class XEP_0030(BasePlugin):
iq.set_payload(items.xml) iq.set_payload(items.xml)
iq.send() iq.send()
elif iq['type'] == 'result': elif iq['type'] == 'result':
log.debug("Received disco items result from " + \ log.debug("Received disco items result from "
"%s to %s.", iq['from'], iq['to']) "%s to %s.", iq['from'], iq['to'])
self.xmpp.event('disco_items', iq) self.xmpp.event('disco_items', iq)
def _fix_default_info(self, info): def _fix_default_info(self, info: DiscoInfo):
""" """
Disco#info results for a JID are required to include at least Disco#info results for a JID are required to include at least
one identity and feature. As a default, if no other identity is one identity and feature. As a default, if no other identity is
@ -762,20 +782,20 @@ class XEP_0030(BasePlugin):
if not info['node']: if not info['node']:
if not info['identities']: if not info['identities']:
if self.xmpp.is_component: if self.xmpp.is_component:
log.debug("No identity found for this entity. " + \ log.debug("No identity found for this entity. "
"Using default component identity.") "Using default component identity.")
info.add_identity('component', 'generic') info.add_identity('component', 'generic')
else: else:
log.debug("No identity found for this entity. " + \ log.debug("No identity found for this entity. "
"Using default client identity.") "Using default client identity.")
info.add_identity('client', 'bot') info.add_identity('client', 'bot')
if not info['features']: if not info['features']:
log.debug("No features found for this entity. " + \ log.debug("No features found for this entity. "
"Using default disco#info feature.") "Using default disco#info feature.")
info.add_feature(info.namespace) info.add_feature(info.namespace)
return result return result
def _wrap(self, ito, ifrom, payload, force=False): def _wrap(self, ito: OptJid, ifrom: OptJid, payload, force=False) -> Iq:
""" """
Ensure that results are wrapped in an Iq stanza Ensure that results are wrapped in an Iq stanza
if self.wrap_results has been set to True. if self.wrap_results has been set to True.