XEP-0030 static: Fix PEP8, types

This commit is contained in:
mathieui 2021-03-01 20:55:46 +01:00
parent e3027dabb2
commit 49416dacbc

View File

@ -1,20 +1,46 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout # Copyright (C) 2010 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from __future__ import annotations
import logging import logging
from slixmpp import Iq from typing import (
Optional,
Any,
Dict,
Tuple,
TYPE_CHECKING,
Union,
Collection,
)
from slixmpp import BaseXMPP, JID
from slixmpp.stanza import Iq
from slixmpp.types import TypedDict, OptJidStr, OptJid
from slixmpp.exceptions import XMPPError, IqError, IqTimeout from slixmpp.exceptions import XMPPError, IqError, IqTimeout
from slixmpp.xmlstream import JID
from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems from slixmpp.plugins.xep_0030 import DiscoInfo, DiscoItems
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
if TYPE_CHECKING:
from slixmpp.plugins.xep_0030 import XEP_0030
class StaticDisco(object):
class NodeType(TypedDict):
info: DiscoInfo
items: DiscoItems
NodesType = Dict[
Tuple[str, str, str],
NodeType
]
class StaticDisco:
""" """
While components will likely require fully dynamic handling While components will likely require fully dynamic handling
@ -25,75 +51,87 @@ class StaticDisco(object):
StaticDisco provides a set of node handlers that will store StaticDisco provides a set of node handlers that will store
static sets of disco info and items in memory. static sets of disco info and items in memory.
Attributes: :var nodes: A dictionary mapping (JID, node) tuples to a dict
nodes -- A dictionary mapping (JID, node) tuples to a dict containing a disco#info and a disco#items stanza.
containing a disco#info and a disco#items stanza. :var xmpp: The main Slixmpp object.
xmpp -- The main Slixmpp object. :var disco: The instance of the XEP-0030 plugin.
""" """
def __init__(self, xmpp, disco): def __init__(self, xmpp: 'BaseXMPP', disco: 'XEP_0030'):
""" """
Create a static disco interface. Sets of disco#info and Create a static disco interface. Sets of disco#info and
disco#items are maintained for every given JID and node disco#items are maintained for every given JID and node
combination. These stanzas are used to store disco combination. These stanzas are used to store disco
information in memory without any additional processing. information in memory without any additional processing.
Arguments: :param xmpp: The main Slixmpp object.
xmpp -- The main Slixmpp object. :param disco: The XEP-0030 plugin.
""" """
self.nodes = {} self.nodes: NodesType = {}
self.xmpp = xmpp self.xmpp: BaseXMPP = xmpp
self.disco = disco self.disco: 'XEP_0030' = disco
def add_node(self, jid=None, node=None, ifrom=None): def add_node(self, jid: OptJidStr = None, node: Optional[str] = None,
""" ifrom: OptJidStr = None) -> NodeType:
Create a new set of stanzas for the provided
JID and node combination.
Arguments:
jid -- The JID that will own the new stanzas.
node -- The node that will own the new stanzas.
"""
if jid is None: if jid is None:
jid = self.xmpp.boundjid.full node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
if ifrom is None:
node_ifrom = ''
elif isinstance(ifrom, JID):
node_ifrom = ifrom.full
else:
node_ifrom = ifrom
if node is None:
node = ''
if (node_jid, node, node_ifrom) not in self.nodes:
node_dict: NodeType = {
'info': DiscoInfo(),
'items': DiscoItems(),
}
node_dict['info']['node'] = node
node_dict['items']['node'] = node
self.nodes[(node_jid, node, node_ifrom)] = node_dict
return self.nodes[(node_jid, node, node_ifrom)]
def get_node(self, jid: OptJidStr = None, node: Optional[str] = None,
ifrom: OptJidStr = None) -> NodeType:
if jid is None:
node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
else:
node_jid = jid
if node is None: if node is None:
node = '' node = ''
if ifrom is None: if ifrom is None:
ifrom = '' node_ifrom = ''
if isinstance(ifrom, JID): elif isinstance(ifrom, JID):
ifrom = ifrom.full node_ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes: else:
new_node = {'info': DiscoInfo(), 'items': DiscoItems()} node_ifrom = ifrom
new_node['info']['node'] = node if (node_jid, node, node_ifrom) not in self.nodes:
new_node['items']['node'] = node self.add_node(node_jid, node, node_ifrom)
self.nodes[(jid, node, ifrom)] = new_node return self.nodes[(node_jid, node, node_ifrom)]
return self.nodes[(jid, node, ifrom)]
def get_node(self, jid=None, node=None, ifrom=None): def node_exists(self, jid: OptJidStr = None, node: Optional[str] = None,
ifrom: OptJidStr = None) -> bool:
if jid is None: if jid is None:
jid = self.xmpp.boundjid.full node_jid = self.xmpp.boundjid.full
elif isinstance(jid, JID):
node_jid = jid.full
else:
node_jid = jid
if node is None: if node is None:
node = '' node = ''
if ifrom is None: if ifrom is None:
ifrom = '' node_ifrom = ''
if isinstance(ifrom, JID): elif isinstance(ifrom, JID):
ifrom = ifrom.full node_ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes: else:
self.add_node(jid, node, ifrom) node_ifrom = ifrom
return self.nodes[(jid, node, ifrom)] return (node_jid, node, node_ifrom) in self.nodes
def node_exists(self, jid=None, node=None, ifrom=None):
if jid is None:
jid = self.xmpp.boundjid.full
if node is None:
node = ''
if ifrom is None:
ifrom = ''
if isinstance(ifrom, JID):
ifrom = ifrom.full
if (jid, node, ifrom) not in self.nodes:
return False
return True
# ================================================================= # =================================================================
# Node Handlers # Node Handlers
@ -109,18 +147,20 @@ class StaticDisco(object):
# the requester's JID, except for cached results. To do that, # the requester's JID, except for cached results. To do that,
# register a custom node handler. # register a custom node handler.
async def supports(self, jid, node, ifrom, data): async def supports(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[bool]:
""" """
Check if a JID supports a given feature. Check if a JID supports a given feature.
The data parameter may provide: The data parameter may provide:
feature -- The feature to check for support.
local -- If true, then the query is for a JID/node :param feature: The feature to check for support.
:param local: If true, then the query is for a JID/node
combination handled by this Slixmpp instance and combination handled by this Slixmpp instance and
no stanzas need to be sent. no stanzas need to be sent.
Otherwise, a disco stanza must be sent to the Otherwise, a disco stanza must be sent to the
remove JID to retrieve the info. remove JID to retrieve the info.
cached -- If true, then look for the disco info data from :param cached: If true, then look for the disco info data from
the local cache system. If no results are found, the local cache system. If no results are found,
send the query as usual. The self.use_cache send the query as usual. The self.use_cache
setting must be set to true for this option to setting must be set to true for this option to
@ -147,26 +187,29 @@ class StaticDisco(object):
except IqTimeout: except IqTimeout:
return None return None
async def has_identity(self, jid, node, ifrom, data): async def has_identity(self, jid: OptJid, node: Optional[str],
ifrom: OptJid, data: Dict[str, Any]
) -> Optional[bool]:
""" """
Check if a JID has a given identity. Check if a JID has a given identity.
The data parameter may provide: The data parameter may provide:
category -- The category of the identity to check.
itype -- The type of the identity to check. :param category: The category of the identity to check.
lang -- The language of the identity to check. :param itype: The type of the identity to check.
local -- If true, then the query is for a JID/node :param lang: The language of the identity to check.
combination handled by this Slixmpp instance and :param local: If true, then the query is for a JID/node
no stanzas need to be sent. combination handled by this Slixmpp instance and
Otherwise, a disco stanza must be sent to the no stanzas need to be sent.
remove JID to retrieve the info. Otherwise, a disco stanza must be sent to the
cached -- If true, then look for the disco info data from remove JID to retrieve the info.
the local cache system. If no results are found, :param cached: If true, then look for the disco info data from
send the query as usual. The self.use_cache the local cache system. If no results are found,
setting must be set to true for this option to send the query as usual. The self.use_cache
be useful. If set to false, then the cache will setting must be set to true for this option to
be skipped, even if a result has already been be useful. If set to false, then the cache will
cached. Defaults to false. be skipped, even if a result has already been
cached. Defaults to false.
""" """
identity = (data.get('category', None), identity = (data.get('category', None),
data.get('itype', None), data.get('itype', None),
@ -179,14 +222,17 @@ class StaticDisco(object):
info = await self.disco.get_info(jid=jid, node=node, info = await self.disco.get_info(jid=jid, node=node,
ifrom=ifrom, **data) ifrom=ifrom, **data)
info = self.disco._wrap(ifrom, jid, info, True) info = self.disco._wrap(ifrom, jid, info, True)
trunc = lambda i: (i[0], i[1], i[2])
def trunc(i):
return (i[0], i[1], i[2])
return identity in map(trunc, info['disco_info']['identities']) return identity in map(trunc, info['disco_info']['identities'])
except IqError: except IqError:
return False return False
except IqTimeout: except IqTimeout:
return None return None
def get_info(self, jid, node, ifrom, data): def get_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoInfo]:
""" """
Return the stored info data for the requested JID/node combination. Return the stored info data for the requested JID/node combination.
@ -200,7 +246,8 @@ class StaticDisco(object):
else: else:
return self.get_node(jid, node)['info'] return self.get_node(jid, node)['info']
def set_info(self, jid, node, ifrom, data): def set_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: DiscoInfo):
""" """
Set the entire info stanza for a JID/node at once. Set the entire info stanza for a JID/node at once.
@ -209,7 +256,8 @@ class StaticDisco(object):
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'] = data new_node['info'] = data
def del_info(self, jid, node, ifrom, data): def del_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Reset the info stanza for a given JID/node combination. Reset the info stanza for a given JID/node combination.
@ -218,7 +266,8 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'] = DiscoInfo() self.get_node(jid, node)['info'] = DiscoInfo()
def get_items(self, jid, node, ifrom, data): def get_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoItems]:
""" """
Return the stored items data for the requested JID/node combination. Return the stored items data for the requested JID/node combination.
@ -232,7 +281,8 @@ class StaticDisco(object):
else: else:
return self.get_node(jid, node)['items'] return self.get_node(jid, node)['items']
def set_items(self, jid, node, ifrom, data): def set_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[Tuple]]):
""" """
Replace the stored items data for a JID/node combination. Replace the stored items data for a JID/node combination.
@ -243,7 +293,8 @@ class StaticDisco(object):
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['items']['items'] = items new_node['items']['items'] = items
def del_items(self, jid, node, ifrom, data): def del_items(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Reset the items stanza for a given JID/node combination. Reset the items stanza for a given JID/node combination.
@ -252,15 +303,17 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['items'] = DiscoItems() self.get_node(jid, node)['items'] = DiscoItems()
def add_identity(self, jid, node, ifrom, data): def add_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Optional[str]]):
""" """
Add a new identity to the JID/node combination. Add a new identity to the JID/node combination.
The data parameter may provide: The data parameter may provide:
category -- The general category to which the agent belongs.
itype -- A more specific designation with the category. :param category: The general category to which the agent belongs.
name -- Optional human readable name for this identity. :param itype: A more specific designation with the category.
lang -- Optional standard xml:lang value. :param name: Optional human readable name for this identity.
:param lang: Optional standard xml:lang value.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'].add_identity( new_node['info'].add_identity(
@ -269,27 +322,31 @@ class StaticDisco(object):
data.get('name', None), data.get('name', None),
data.get('lang', None)) data.get('lang', None))
def set_identities(self, jid, node, ifrom, data): def set_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[str]]):
""" """
Add or replace all identities for a JID/node combination. Add or replace all identities for a JID/node combination.
The data parameter should include: The data parameter should include:
identities -- A list of identities in tuple form:
(category, type, name, lang) :param identities: A list of identities in tuple form:
(category, type, name, lang)
""" """
identities = data.get('identities', set()) identities = data.get('identities', set())
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info']['identities'] = identities new_node['info']['identities'] = identities
def del_identity(self, jid, node, ifrom, data): def del_identity(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Optional[str]]):
""" """
Remove an identity from a JID/node combination. Remove an identity from a JID/node combination.
The data parameter may provide: The data parameter may provide:
category -- The general category to which the agent belonged.
itype -- A more specific designation with the category. :param category: The general category to which the agent belonged.
name -- Optional human readable name for this identity. :param itype: A more specific designation with the category.
lang -- Optional, standard xml:lang value. :param name: Optional human readable name for this identity.
:param lang: Optional, standard xml:lang value.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'].del_identity( self.get_node(jid, node)['info'].del_identity(
@ -298,7 +355,8 @@ class StaticDisco(object):
data.get('name', None), data.get('name', None),
data.get('lang', None)) data.get('lang', None))
def del_identities(self, jid, node, ifrom, data): def del_identities(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Remove all identities from a JID/node combination. Remove all identities from a JID/node combination.
@ -307,40 +365,47 @@ class StaticDisco(object):
if self.node_exists(jid, node): if self.node_exists(jid, node):
del self.get_node(jid, node)['info']['identities'] del self.get_node(jid, node)['info']['identities']
def add_feature(self, jid, node, ifrom, data): def add_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Add a feature to a JID/node combination. Add a feature to a JID/node combination.
The data parameter should include: The data parameter should include:
feature -- The namespace of the supported feature.
:param feature: The namespace of the supported feature.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info'].add_feature( new_node['info'].add_feature(
data.get('feature', '')) data.get('feature', ''))
def set_features(self, jid, node, ifrom, data): def set_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, Collection[str]]):
""" """
Add or replace all features for a JID/node combination. Add or replace all features for a JID/node combination.
The data parameter should include: The data parameter should include:
features -- The new set of supported features.
:param features: The new set of supported features.
""" """
features = data.get('features', set()) features = data.get('features', set())
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['info']['features'] = features new_node['info']['features'] = features
def del_feature(self, jid, node, ifrom, data): def del_feature(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Remove a feature from a JID/node combination. Remove a feature from a JID/node combination.
The data parameter should include: The data parameter should include:
feature -- The namespace of the removed feature.
:param feature: The namespace of the removed feature.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['info'].del_feature( self.get_node(jid, node)['info'].del_feature(
data.get('feature', '')) data.get('feature', ''))
def del_features(self, jid, node, ifrom, data): def del_features(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any):
""" """
Remove all features from a JID/node combination. Remove all features from a JID/node combination.
@ -350,15 +415,17 @@ class StaticDisco(object):
return return
del self.get_node(jid, node)['info']['features'] del self.get_node(jid, node)['info']['features']
def add_item(self, jid, node, ifrom, data): def add_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Add an item to a JID/node combination. Add an item to a JID/node combination.
The data parameter may include: The data parameter may include:
ijid -- The JID for the item.
inode -- Optional additional information to reference :param ijid: The JID for the item.
non-addressable items. :param inode: Optional additional information to reference
name -- Optional human readable name for the item. non-addressable items.
:param name: Optional human readable name for the item.
""" """
new_node = self.add_node(jid, node) new_node = self.add_node(jid, node)
new_node['items'].add_item( new_node['items'].add_item(
@ -366,20 +433,23 @@ class StaticDisco(object):
node=data.get('inode', ''), node=data.get('inode', ''),
name=data.get('name', '')) name=data.get('name', ''))
def del_item(self, jid, node, ifrom, data): def del_item(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Dict[str, str]):
""" """
Remove an item from a JID/node combination. Remove an item from a JID/node combination.
The data parameter may include: The data parameter may include:
ijid -- JID of the item to remove.
inode -- Optional extra identifying information. :param ijid: JID of the item to remove.
:param inode: Optional extra identifying information.
""" """
if self.node_exists(jid, node): if self.node_exists(jid, node):
self.get_node(jid, node)['items'].del_item( self.get_node(jid, node)['items'].del_item(
data.get('ijid', ''), data.get('ijid', ''),
node=data.get('inode', None)) node=data.get('inode', None))
def cache_info(self, jid, node, ifrom, data): def cache_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Union[Iq, DiscoInfo]):
""" """
Cache disco information for an external JID. Cache disco information for an external JID.
@ -388,12 +458,15 @@ class StaticDisco(object):
the disco#info substanza itself. the disco#info substanza itself.
""" """
if isinstance(data, Iq): if isinstance(data, Iq):
data = data['disco_info'] info = data['disco_info']
else:
info = data
new_node = self.add_node(jid, node, ifrom) new_node = self.add_node(jid, node, ifrom)
new_node['info'] = data new_node['info'] = info
def get_cached_info(self, jid, node, ifrom, data): def get_cached_info(self, jid: OptJid, node: Optional[str], ifrom: OptJid,
data: Any) -> Optional[DiscoInfo]:
""" """
Retrieve cached disco info data. Retrieve cached disco info data.
@ -401,5 +474,4 @@ class StaticDisco(object):
""" """
if not self.node_exists(jid, node, ifrom): if not self.node_exists(jid, node, ifrom):
return None return None
else: return self.get_node(jid, node, ifrom)['info']
return self.get_node(jid, node, ifrom)['info']