typing: matchers and senders

Leftover error that I cannot fix:

* https://github.com/python/mypy/issues/708

Leftover error that I am unsure of what to do:

* xml handlers are not properly typed

(it seems like nothing in slix is using it, considering a removal
 instead of adding an Union everywhere)
This commit is contained in:
mathieui 2021-04-21 23:20:25 +02:00
parent 4931e7e604
commit fed55d3dda
14 changed files with 188 additions and 104 deletions

View File

@ -4,10 +4,18 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from __future__ import annotations
import weakref import weakref
from weakref import ReferenceType
from typing import Optional, TYPE_CHECKING
from slixmpp.xmlstream.matcher.base import MatcherBase
if TYPE_CHECKING:
from slixmpp.xmlstream import XMLStream, StanzaBase
class BaseHandler(object): class BaseHandler:
""" """
Base class for stream handlers. Stream handlers are matched with Base class for stream handlers. Stream handlers are matched with
@ -26,8 +34,13 @@ class BaseHandler(object):
:param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream` :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
instance that the handle will respond to. instance that the handle will respond to.
""" """
name: str
stream: Optional[ReferenceType[XMLStream]]
_destroy: bool
_matcher: MatcherBase
_payload: Optional[StanzaBase]
def __init__(self, name, matcher, stream=None): def __init__(self, name: str, matcher: MatcherBase, stream: Optional[XMLStream] = None):
#: The name of the handler #: The name of the handler
self.name = name self.name = name
@ -41,33 +54,33 @@ class BaseHandler(object):
self._payload = None self._payload = None
self._matcher = matcher self._matcher = matcher
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
"""Compare a stanza or XML object with the handler's matcher. """Compare a stanza or XML object with the handler's matcher.
:param xml: An XML or :param xml: An XML or
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object
""" """
return self._matcher.match(xml) return self._matcher.match(xml)
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
"""Prepare the handler for execution while the XML """Prepare the handler for execution while the XML
stream is being processed. stream is being processed.
:param payload: A :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param payload: A :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
object. object.
""" """
self._payload = payload self._payload = payload
def run(self, payload): def run(self, payload: StanzaBase) -> None:
"""Execute the handler after XML stream processing and during the """Execute the handler after XML stream processing and during the
main event loop. main event loop.
:param payload: A :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param payload: A :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
object. object.
""" """
self._payload = payload self._payload = payload
def check_delete(self): def check_delete(self) -> bool:
"""Check if the handler should be removed from the list """Check if the handler should be removed from the list
of stream handlers. of stream handlers.
""" """

View File

@ -1,10 +1,17 @@
# slixmpp.xmlstream.handler.callback # slixmpp.xmlstream.handler.callback
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from __future__ import annotations
from typing import Optional, Callable, Any, TYPE_CHECKING
from slixmpp.xmlstream.handler.base import BaseHandler from slixmpp.xmlstream.handler.base import BaseHandler
from slixmpp.xmlstream.matcher.base import MatcherBase
if TYPE_CHECKING:
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.xmlstream.xmlstream import XMLStream
class Callback(BaseHandler): class Callback(BaseHandler):
@ -28,8 +35,6 @@ class Callback(BaseHandler):
:param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase` :param matcher: A :class:`~slixmpp.xmlstream.matcher.base.MatcherBase`
derived object for matching stanza objects. derived object for matching stanza objects.
:param pointer: The function to execute during callback. :param pointer: The function to execute during callback.
:param bool thread: **DEPRECATED.** Remains only for
backwards compatibility.
:param bool once: Indicates if the handler should be used only :param bool once: Indicates if the handler should be used only
once. Defaults to False. once. Defaults to False.
:param bool instream: Indicates if the callback should be executed :param bool instream: Indicates if the callback should be executed
@ -38,31 +43,36 @@ class Callback(BaseHandler):
:param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream` :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
instance this handler should monitor. instance this handler should monitor.
""" """
_once: bool
_instream: bool
_pointer: Callable[[StanzaBase], Any]
def __init__(self, name, matcher, pointer, thread=False, def __init__(self, name: str, matcher: MatcherBase,
once=False, instream=False, stream=None): pointer: Callable[[StanzaBase], Any],
once: bool = False, instream: bool = False,
stream: Optional[XMLStream] = None):
BaseHandler.__init__(self, name, matcher, stream) BaseHandler.__init__(self, name, matcher, stream)
self._pointer = pointer self._pointer = pointer
self._once = once self._once = once
self._instream = instream self._instream = instream
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
"""Execute the callback during stream processing, if """Execute the callback during stream processing, if
the callback was created with ``instream=True``. the callback was created with ``instream=True``.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
""" """
if self._once: if self._once:
self._destroy = True self._destroy = True
if self._instream: if self._instream:
self.run(payload, True) self.run(payload, True)
def run(self, payload, instream=False): def run(self, payload: StanzaBase, instream: bool = False) -> None:
"""Execute the callback function with the matched stanza payload. """Execute the callback function with the matched stanza payload.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
:param bool instream: Force the handler to execute during stream :param bool instream: Force the handler to execute during stream
processing. This should only be used by processing. This should only be used by
:meth:`prerun()`. Defaults to ``False``. :meth:`prerun()`. Defaults to ``False``.

View File

@ -4,11 +4,17 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2012 Nathanael C. Fritz, Lance J.T. Stout # :copyright: (c) 2012 Nathanael C. Fritz, Lance J.T. Stout
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from __future__ import annotations
import logging import logging
from queue import Queue, Empty from typing import List, Optional, TYPE_CHECKING
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.xmlstream.handler.base import BaseHandler from slixmpp.xmlstream.handler.base import BaseHandler
from slixmpp.xmlstream.matcher.base import MatcherBase
if TYPE_CHECKING:
from slixmpp.xmlstream.xmlstream import XMLStream
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -27,35 +33,35 @@ class Collector(BaseHandler):
:param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream` :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
instance this handler should monitor. instance this handler should monitor.
""" """
_stanzas: List[StanzaBase]
def __init__(self, name, matcher, stream=None): def __init__(self, name: str, matcher: MatcherBase, stream: Optional[XMLStream] = None):
BaseHandler.__init__(self, name, matcher, stream=stream) BaseHandler.__init__(self, name, matcher, stream=stream)
self._payload = Queue() self._stanzas = []
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
"""Store the matched stanza when received during processing. """Store the matched stanza when received during processing.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
""" """
self._payload.put(payload) self._stanzas.append(payload)
def run(self, payload): def run(self, payload: StanzaBase) -> None:
"""Do not process this handler during the main event loop.""" """Do not process this handler during the main event loop."""
pass pass
def stop(self): def stop(self) -> List[StanzaBase]:
""" """
Stop collection of matching stanzas, and return the ones that Stop collection of matching stanzas, and return the ones that
have been stored so far. have been stored so far.
""" """
stream_ref = self.stream
if stream_ref is None:
raise ValueError('stop() called without a stream!')
stream = stream_ref()
if stream is None:
raise ValueError('stop() called without a stream!')
self._destroy = True self._destroy = True
results = [] stream.remove_handler(self.name)
try: return self._stanzas
while True:
results.append(self._payload.get(False))
except Empty:
pass
self.stream().remove_handler(self.name)
return results

View File

@ -4,8 +4,19 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from __future__ import annotations
from asyncio import iscoroutinefunction, ensure_future
from typing import Optional, Callable, Awaitable, TYPE_CHECKING
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.xmlstream.handler.base import BaseHandler from slixmpp.xmlstream.handler.base import BaseHandler
from slixmpp.xmlstream.asyncio import asyncio from slixmpp.xmlstream.matcher.base import MatcherBase
CoroutineFunction = Callable[[StanzaBase], Awaitable[None]]
if TYPE_CHECKING:
from slixmpp.xmlstream.xmlstream import XMLStream
class CoroutineCallback(BaseHandler): class CoroutineCallback(BaseHandler):
@ -34,15 +45,20 @@ class CoroutineCallback(BaseHandler):
instance this handler should monitor. instance this handler should monitor.
""" """
def __init__(self, name, matcher, pointer, once=False, _once: bool
instream=False, stream=None): _instream: bool
_pointer: CoroutineFunction
def __init__(self, name: str, matcher: MatcherBase,
pointer: CoroutineFunction, once: bool = False,
instream: bool = False, stream: Optional[XMLStream] = None):
BaseHandler.__init__(self, name, matcher, stream) BaseHandler.__init__(self, name, matcher, stream)
if not asyncio.iscoroutinefunction(pointer): if not iscoroutinefunction(pointer):
raise ValueError("Given function is not a coroutine") raise ValueError("Given function is not a coroutine")
async def pointer_wrapper(stanza, *args, **kwargs): async def pointer_wrapper(stanza: StanzaBase) -> None:
try: try:
await pointer(stanza, *args, **kwargs) await pointer(stanza)
except Exception as e: except Exception as e:
stanza.exception(e) stanza.exception(e)
@ -50,29 +66,29 @@ class CoroutineCallback(BaseHandler):
self._once = once self._once = once
self._instream = instream self._instream = instream
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
"""Execute the callback during stream processing, if """Execute the callback during stream processing, if
the callback was created with ``instream=True``. the callback was created with ``instream=True``.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
""" """
if self._once: if self._once:
self._destroy = True self._destroy = True
if self._instream: if self._instream:
self.run(payload, True) self.run(payload, True)
def run(self, payload, instream=False): def run(self, payload: StanzaBase, instream: bool = False) -> None:
"""Execute the callback function with the matched stanza payload. """Execute the callback function with the matched stanza payload.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
:param bool instream: Force the handler to execute during stream :param bool instream: Force the handler to execute during stream
processing. This should only be used by processing. This should only be used by
:meth:`prerun()`. Defaults to ``False``. :meth:`prerun()`. Defaults to ``False``.
""" """
if not self._instream or instream: if not self._instream or instream:
asyncio.ensure_future(self._pointer(payload)) ensure_future(self._pointer(payload))
if self._once: if self._once:
self._destroy = True self._destroy = True
del self._pointer del self._pointer

View File

@ -4,13 +4,19 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from __future__ import annotations
import logging import logging
import asyncio from asyncio import Event, wait_for, TimeoutError
from asyncio import Queue, wait_for, TimeoutError from typing import Optional, TYPE_CHECKING
import slixmpp import slixmpp
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.xmlstream.handler.base import BaseHandler from slixmpp.xmlstream.handler.base import BaseHandler
from slixmpp.xmlstream.matcher.base import MatcherBase
if TYPE_CHECKING:
from slixmpp.xmlstream.xmlstream import XMLStream
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -28,24 +34,27 @@ class Waiter(BaseHandler):
:param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream` :param stream: The :class:`~slixmpp.xmlstream.xmlstream.XMLStream`
instance this handler should monitor. instance this handler should monitor.
""" """
_event: Event
def __init__(self, name, matcher, stream=None): def __init__(self, name: str, matcher: MatcherBase, stream: Optional[XMLStream] = None):
BaseHandler.__init__(self, name, matcher, stream=stream) BaseHandler.__init__(self, name, matcher, stream=stream)
self._payload = Queue() self._event = Event()
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
"""Store the matched stanza when received during processing. """Store the matched stanza when received during processing.
:param payload: The matched :param payload: The matched
:class:`~slixmpp.xmlstream.stanzabase.ElementBase` object. :class:`~slixmpp.xmlstream.stanzabase.StanzaBase` object.
""" """
self._payload.put_nowait(payload) if not self._event.is_set():
self._event.set()
self._payload = payload
def run(self, payload): def run(self, payload: StanzaBase) -> None:
"""Do not process this handler during the main event loop.""" """Do not process this handler during the main event loop."""
pass pass
async def wait(self, timeout=None): async def wait(self, timeout: Optional[int] = None) -> Optional[StanzaBase]:
"""Block an event handler while waiting for a stanza to arrive. """Block an event handler while waiting for a stanza to arrive.
Be aware that this will impact performance if called from a Be aware that this will impact performance if called from a
@ -59,17 +68,24 @@ class Waiter(BaseHandler):
:class:`~slixmpp.xmlstream.xmlstream.XMLStream.response_timeout` :class:`~slixmpp.xmlstream.xmlstream.XMLStream.response_timeout`
value. value.
""" """
stream_ref = self.stream
if stream_ref is None:
raise ValueError('wait() called without a stream')
stream = stream_ref()
if stream is None:
raise ValueError('wait() called without a stream')
if timeout is None: if timeout is None:
timeout = slixmpp.xmlstream.RESPONSE_TIMEOUT timeout = slixmpp.xmlstream.RESPONSE_TIMEOUT
stanza = None
try: try:
stanza = await self._payload.get() await wait_for(
self._event.wait(), timeout, loop=stream.loop
)
except TimeoutError: except TimeoutError:
log.warning("Timed out waiting for %s", self.name) log.warning("Timed out waiting for %s", self.name)
self.stream().remove_handler(self.name) stream.remove_handler(self.name)
return stanza return self._payload
def check_delete(self): def check_delete(self) -> bool:
"""Always remove waiters after use.""" """Always remove waiters after use."""
return True return True

View File

@ -4,6 +4,7 @@
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from slixmpp.xmlstream.handler import Callback from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.stanzabase import StanzaBase
class XMLCallback(Callback): class XMLCallback(Callback):
@ -17,7 +18,7 @@ class XMLCallback(Callback):
run -- Overrides Callback.run run -- Overrides Callback.run
""" """
def run(self, payload, instream=False): def run(self, payload: StanzaBase, instream: bool = False) -> None:
""" """
Execute the callback function with the matched stanza's Execute the callback function with the matched stanza's
XML contents, instead of the stanza itself. XML contents, instead of the stanza itself.

View File

@ -3,6 +3,7 @@
# Copyright (C) 2010 Nathanael C. Fritz # Copyright (C) 2010 Nathanael C. Fritz
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.xmlstream.handler import Waiter from slixmpp.xmlstream.handler import Waiter
@ -17,7 +18,7 @@ class XMLWaiter(Waiter):
prerun -- Overrides Waiter.prerun prerun -- Overrides Waiter.prerun
""" """
def prerun(self, payload): def prerun(self, payload: StanzaBase) -> None:
""" """
Store the XML contents of the stanza to return to the Store the XML contents of the stanza to return to the
waiting event handler. waiting event handler.

View File

@ -1,10 +1,13 @@
# slixmpp.xmlstream.matcher.base # slixmpp.xmlstream.matcher.base
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from typing import Any
from slixmpp.xmlstream.stanzabase import StanzaBase
class MatcherBase(object): class MatcherBase(object):
""" """
@ -15,10 +18,10 @@ class MatcherBase(object):
:param criteria: Object to compare some aspect of a stanza against. :param criteria: Object to compare some aspect of a stanza against.
""" """
def __init__(self, criteria): def __init__(self, criteria: Any):
self._criteria = criteria self._criteria = criteria
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
"""Check if a stanza matches the stored criteria. """Check if a stanza matches the stored criteria.
Meant to be overridden. Meant to be overridden.

View File

@ -5,6 +5,7 @@
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
from slixmpp.xmlstream.stanzabase import StanzaBase
class MatcherId(MatcherBase): class MatcherId(MatcherBase):
@ -13,12 +14,13 @@ class MatcherId(MatcherBase):
The ID matcher selects stanzas that have the same stanza 'id' The ID matcher selects stanzas that have the same stanza 'id'
interface value as the desired ID. interface value as the desired ID.
""" """
_criteria: str
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
"""Compare the given stanza's ``'id'`` attribute to the stored """Compare the given stanza's ``'id'`` attribute to the stored
``id`` value. ``id`` value.
:param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param xml: The :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
stanza to compare against. stanza to compare against.
""" """
return xml['id'] == self._criteria return bool(xml['id'] == self._criteria)

View File

@ -4,7 +4,17 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
from slixmpp.xmlstream.stanzabase import StanzaBase
from slixmpp.jid import JID
from slixmpp.types import TypedDict
class CriteriaType(TypedDict):
self: JID
peer: JID
id: str
class MatchIDSender(MatcherBase): class MatchIDSender(MatcherBase):
@ -14,12 +24,13 @@ class MatchIDSender(MatcherBase):
interface value as the desired ID, and that the 'from' value is one interface value as the desired ID, and that the 'from' value is one
of a set of approved entities that can respond to a request. of a set of approved entities that can respond to a request.
""" """
_criteria: CriteriaType
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
"""Compare the given stanza's ``'id'`` attribute to the stored """Compare the given stanza's ``'id'`` attribute to the stored
``id`` value, and verify the sender's JID. ``id`` value, and verify the sender's JID.
:param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param xml: The :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
stanza to compare against. stanza to compare against.
""" """

View File

@ -3,7 +3,9 @@
# Copyright (C) 2010 Nathanael C. Fritz # Copyright (C) 2010 Nathanael C. Fritz
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permission. # See the file LICENSE for copying permission.
from typing import Iterable
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
from slixmpp.xmlstream.stanzabase import StanzaBase
class MatchMany(MatcherBase): class MatchMany(MatcherBase):
@ -18,8 +20,9 @@ class MatchMany(MatcherBase):
Methods: Methods:
match -- Overrides MatcherBase.match. match -- Overrides MatcherBase.match.
""" """
_criteria: Iterable[MatcherBase]
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
""" """
Match a stanza against multiple criteria. The match is successful Match a stanza against multiple criteria. The match is successful
if one of the criteria matches. if one of the criteria matches.

View File

@ -4,8 +4,9 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from typing import cast, List
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
from slixmpp.xmlstream.stanzabase import fix_ns from slixmpp.xmlstream.stanzabase import fix_ns, StanzaBase
class StanzaPath(MatcherBase): class StanzaPath(MatcherBase):
@ -17,22 +18,28 @@ class StanzaPath(MatcherBase):
:param criteria: Object to compare some aspect of a stanza against. :param criteria: Object to compare some aspect of a stanza against.
""" """
_criteria: List[str]
_raw_criteria: str
def __init__(self, criteria): def __init__(self, criteria: str):
self._criteria = fix_ns(criteria, split=True, self._criteria = cast(
propagate_ns=False, List[str],
default_ns='jabber:client') fix_ns(
criteria, split=True, propagate_ns=False,
default_ns='jabber:client'
)
)
self._raw_criteria = criteria self._raw_criteria = criteria
def match(self, stanza): def match(self, stanza: StanzaBase) -> bool:
""" """
Compare a stanza against a "stanza path". A stanza path is similar to Compare a stanza against a "stanza path". A stanza path is similar to
an XPath expression, but uses the stanza's interfaces and plugins an XPath expression, but uses the stanza's interfaces and plugins
instead of the underlying XML. See the documentation for the stanza instead of the underlying XML. See the documentation for the stanza
:meth:`~slixmpp.xmlstream.stanzabase.ElementBase.match()` method :meth:`~slixmpp.xmlstream.stanzabase.StanzaBase.match()` method
for more information. for more information.
:param stanza: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param stanza: The :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
stanza to compare against. stanza to compare against.
""" """
return stanza.match(self._criteria) or stanza.match(self._raw_criteria) return stanza.match(self._criteria) or stanza.match(self._raw_criteria)

View File

@ -1,4 +1,3 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz # Copyright (C) 2010 Nathanael C. Fritz
# This file is part of Slixmpp. # This file is part of Slixmpp.
@ -6,8 +5,9 @@
import logging import logging
from xml.parsers.expat import ExpatError from xml.parsers.expat import ExpatError
from xml.etree.ElementTree import Element
from slixmpp.xmlstream.stanzabase import ET from slixmpp.xmlstream.stanzabase import ET, StanzaBase
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
@ -33,32 +33,33 @@ class MatchXMLMask(MatcherBase):
:param criteria: Either an :class:`~xml.etree.ElementTree.Element` XML :param criteria: Either an :class:`~xml.etree.ElementTree.Element` XML
object or XML string to use as a mask. object or XML string to use as a mask.
""" """
_criteria: Element
def __init__(self, criteria, default_ns='jabber:client'): def __init__(self, criteria: str, default_ns: str = 'jabber:client'):
MatcherBase.__init__(self, criteria) MatcherBase.__init__(self, criteria)
if isinstance(criteria, str): if isinstance(criteria, str):
self._criteria = ET.fromstring(self._criteria) self._criteria = ET.fromstring(criteria)
self.default_ns = default_ns self.default_ns = default_ns
def setDefaultNS(self, ns): def setDefaultNS(self, ns: str) -> None:
"""Set the default namespace to use during comparisons. """Set the default namespace to use during comparisons.
:param ns: The new namespace to use as the default. :param ns: The new namespace to use as the default.
""" """
self.default_ns = ns self.default_ns = ns
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
"""Compare a stanza object or XML object against the stored XML mask. """Compare a stanza object or XML object against the stored XML mask.
Overrides MatcherBase.match. Overrides MatcherBase.match.
:param xml: The stanza object or XML object to compare against. :param xml: The stanza object or XML object to compare against.
""" """
if hasattr(xml, 'xml'): real_xml = xml.xml
xml = xml.xml return self._mask_cmp(real_xml, self._criteria, True)
return self._mask_cmp(xml, self._criteria, True)
def _mask_cmp(self, source, mask, use_ns=False, default_ns='__no_ns__'): def _mask_cmp(self, source: Element, mask: Element, use_ns: bool = False,
default_ns: str = '__no_ns__') -> bool:
"""Compare an XML object against an XML mask. """Compare an XML object against an XML mask.
:param source: The :class:`~xml.etree.ElementTree.Element` XML object :param source: The :class:`~xml.etree.ElementTree.Element` XML object
@ -75,13 +76,6 @@ class MatchXMLMask(MatcherBase):
# If the element was not found. May happen during recursive calls. # If the element was not found. May happen during recursive calls.
return False return False
# Convert the mask to an XML object if it is a string.
if not hasattr(mask, 'attrib'):
try:
mask = ET.fromstring(mask)
except ExpatError:
log.warning("Expat error: %s\nIn parsing: %s", '', mask)
mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag) mask_ns_tag = "{%s}%s" % (self.default_ns, mask.tag)
if source.tag not in [mask.tag, mask_ns_tag]: if source.tag not in [mask.tag, mask_ns_tag]:
return False return False

View File

@ -4,7 +4,8 @@
# Part of Slixmpp: The Slick XMPP Library # Part of Slixmpp: The Slick XMPP Library
# :copyright: (c) 2011 Nathanael C. Fritz # :copyright: (c) 2011 Nathanael C. Fritz
# :license: MIT, see LICENSE for more details # :license: MIT, see LICENSE for more details
from slixmpp.xmlstream.stanzabase import ET, fix_ns from typing import cast
from slixmpp.xmlstream.stanzabase import ET, fix_ns, StanzaBase
from slixmpp.xmlstream.matcher.base import MatcherBase from slixmpp.xmlstream.matcher.base import MatcherBase
@ -17,23 +18,23 @@ class MatchXPath(MatcherBase):
If the value of :data:`IGNORE_NS` is set to ``True``, then XPath If the value of :data:`IGNORE_NS` is set to ``True``, then XPath
expressions will be matched without using namespaces. expressions will be matched without using namespaces.
""" """
_criteria: str
def __init__(self, criteria): def __init__(self, criteria: str):
self._criteria = fix_ns(criteria) self._criteria = cast(str, fix_ns(criteria))
def match(self, xml): def match(self, xml: StanzaBase) -> bool:
""" """
Compare a stanza's XML contents to an XPath expression. Compare a stanza's XML contents to an XPath expression.
If the value of :data:`IGNORE_NS` is set to ``True``, then XPath If the value of :data:`IGNORE_NS` is set to ``True``, then XPath
expressions will be matched without using namespaces. expressions will be matched without using namespaces.
:param xml: The :class:`~slixmpp.xmlstream.stanzabase.ElementBase` :param xml: The :class:`~slixmpp.xmlstream.stanzabase.StanzaBase`
stanza to compare against. stanza to compare against.
""" """
if hasattr(xml, 'xml'): real_xml = xml.xml
xml = xml.xml
x = ET.Element('x') x = ET.Element('x')
x.append(xml) x.append(real_xml)
return x.find(self._criteria) is not None return x.find(self._criteria) is not None