Compare commits

...

4 Commits

Author SHA1 Message Date
nicoco
985926ed7b XEP-0461: rely on XEP-0428 for fallback
Breaks the previous fallback helpers, we now
rely on XEP-0461 instead
2023-12-19 14:15:24 +00:00
nicoco
8d63bd68cf XEP-0428: add fallback body and subject elements
+ tests
+ helpers to strip the fallback content
2023-12-19 14:15:24 +00:00
nicoco
465e735d18 ElementBase: add weak ref to parent when using append() 2023-12-19 14:15:24 +00:00
nicoco
fea4ee83be fix slixmpp.xmlstream.__all__ 2023-12-19 14:15:24 +00:00
8 changed files with 266 additions and 80 deletions

View File

@ -1,8 +1,13 @@
# Slixmpp: The Slick XMPP Library # Slixmpp: The Slick XMPP Library
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net> # Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
# This file is part of Slixmpp. # This file is part of Slixmpp.
# See the file LICENSE for copying permissio # See the file LICENSE for copying permissio
from abc import ABC
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
from slixmpp.stanza import Message from slixmpp.stanza import Message
from slixmpp.xmlstream import ( from slixmpp.xmlstream import (
ElementBase, ElementBase,
@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
) )
NS = 'urn:xmpp:fallback:0' NS = "urn:xmpp:fallback:0"
class Fallback(ElementBase): class Fallback(ElementBase):
namespace = NS namespace = NS
name = 'fallback' name = "fallback"
plugin_attrib = 'fallback' plugin_attrib = "fallback"
plugin_multi_attrib = "fallbacks"
interfaces = {"for"}
def _find_fallback(self, fallback_for: str) -> "Fallback":
if self["for"] == fallback_for:
return self
for fallback in self.parent()["fallbacks"]:
if fallback["for"] == fallback_for:
return fallback
raise AttributeError("No fallback for this namespace", fallback_for)
def get_stripped_body(
self, fallback_for: str, element: Literal["body", "subject"] = "body"
) -> str:
"""
Get the body of a message, with the fallback part stripped
:param fallback_for: namespace of the fallback to strip
:param element: set this to "subject" get the stripped subject instead
of body
:return: body (or subject) content minus the fallback part
"""
fallback = self._find_fallback(fallback_for)
start = fallback[element]["start"]
end = fallback[element]["end"]
body = self.parent()[element]
if start == end == 0:
return ""
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
class FallbackMixin(ABC):
namespace = NS
name = NotImplemented
plugin_attrib = NotImplemented
interfaces = {"start", "end"}
def set_start(self, v: int):
self._set_attr("start", str(v))
def get_start(self):
return _int_or_zero(self._get_attr("start"))
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
return _int_or_zero(self._get_attr("end"))
class FallbackBody(FallbackMixin, ElementBase):
name = plugin_attrib = "body"
class FallbackSubject(FallbackMixin, ElementBase):
name = plugin_attrib = "subject"
def _int_or_zero(v: str):
try:
return int(v)
except ValueError:
return 0
def register_plugins(): def register_plugins():
register_stanza_plugin(Message, Fallback) register_stanza_plugin(Message, Fallback, iterable=True)
register_stanza_plugin(Fallback, FallbackBody)
register_stanza_plugin(Fallback, FallbackSubject)

View File

@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
name = "xep_0461" name = "xep_0461"
description = "XEP-0461: Message Replies" description = "XEP-0461: Message Replies"
dependencies = {"xep_0030"} dependencies = {"xep_0030", "xep_0428"}
stanza = stanza stanza = stanza
namespace = stanza.NS namespace = stanza.NS

View File

@ -2,6 +2,7 @@ from typing import Optional
from slixmpp.stanza import Message from slixmpp.stanza import Message
from slixmpp.xmlstream import ElementBase, register_stanza_plugin from slixmpp.xmlstream import ElementBase, register_stanza_plugin
from slixmpp.plugins.xep_0428.stanza import Fallback
NS = "urn:xmpp:reply:0" NS = "urn:xmpp:reply:0"
@ -12,39 +13,11 @@ class Reply(ElementBase):
plugin_attrib = "reply" plugin_attrib = "reply"
interfaces = {"id", "to"} interfaces = {"id", "to"}
class FeatureFallBack(ElementBase):
# should also be a multi attrib
namespace = "urn:xmpp:fallback:0"
name = "fallback"
plugin_attrib = "feature_fallback"
interfaces = {"for"}
def get_fallback_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end:
return body[start:end]
else:
return ""
def get_stripped_body(self):
# only works for a single fallback_body attrib
start = self["fallback_body"]["start"]
end = self["fallback_body"]["end"]
body = self.parent()["body"]
if start <= end < len(body):
return body[:start] + body[end:]
else:
return body
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None): def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
""" """
Add plain text fallback for clients not implementing XEP-0461. Add plain text fallback for clients not implementing XEP-0461.
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will ``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
prepend "> Bob:\n> Some text\n" to the body of the message, and set the prepend "> Bob:\n> Some text\n" to the body of the message, and set the
fallback_body attributes accordingly, so that clients implementing fallback_body attributes accordingly, so that clients implementing
XEP-0461 can hide the fallback text. XEP-0461 can hide the fallback text.
@ -57,39 +30,27 @@ class FeatureFallBack(ElementBase):
if nickname: if nickname:
quoted = "> " + nickname + ":\n" + quoted quoted = "> " + nickname + ":\n" + quoted
msg["body"] = quoted + msg["body"] msg["body"] = quoted + msg["body"]
msg["feature_fallback"]["for"] = NS fallback = Fallback()
msg["feature_fallback"]["fallback_body"]["start"] = 0 fallback["for"] = NS
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted) fallback["body"]["start"] = 0
fallback["body"]["end"] = len(quoted)
msg.append(fallback)
def get_fallback_body(self) -> str:
class FallBackBody(ElementBase): msg = self.parent()
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html for fallback in msg["fallbacks"]:
# this should be a multi_attrib *but* since it's a protoXEP, we'll see... if fallback["for"] == NS:
namespace = FeatureFallBack.namespace break
name = "body" else:
plugin_attrib = "fallback_body" return ""
interfaces = {"start", "end"} start = fallback["body"]["start"]
end = fallback["body"]["end"]
def set_start(self, v: int): body = msg["body"]
self._set_attr("start", str(v)) if start <= end:
return body[start:end]
def get_start(self): else:
try: return ""
return int(self._get_attr("start"))
except ValueError:
return 0
def set_end(self, v: int):
self._set_attr("end", str(v))
def get_end(self):
try:
return int(self._get_attr("end"))
except ValueError:
return 0
def register_plugins(): def register_plugins():
register_stanza_plugin(Message, Reply) register_stanza_plugin(Message, Reply)
register_stanza_plugin(Message, FeatureFallBack)
register_stanza_plugin(FeatureFallBack, FallBackBody)

View File

@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
__all__ = ['JID', 'StanzaBase', 'ElementBase', __all__ = ['JID', 'StanzaBase', 'ElementBase',
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream', 'ET', 'tostring', 'highlight', 'XMLStream',
'RESPONSE_TIMEOUT'] 'RESPONSE_TIMEOUT', 'register_stanza_plugin']

View File

@ -1243,7 +1243,7 @@ class ElementBase(object):
self.init_plugin(item.__class__.plugin_multi_attrib) self.init_plugin(item.__class__.plugin_multi_attrib)
else: else:
self.iterables.append(item) self.iterables.append(item)
item.parent = weakref.ref(self)
return self return self
def appendxml(self, xml: ET.Element) -> ElementBase: def appendxml(self, xml: ET.Element) -> ElementBase:

View File

@ -0,0 +1,149 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza
from slixmpp.plugins import xep_0461
from slixmpp.plugins import xep_0444
class TestFallback(SlixTest):
def setUp(self):
stanza.register_plugins()
def testSingleFallbackBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackSubject(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"]["subject"]["start"] = 0
message["fallback"]["subject"]["end"] = 8
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<subject start="0" end="8" />
</fallback>
</message>
""",
)
def testSingleFallbackWholeBody(self):
message = Message()
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
def testMultiFallback(self):
message = Message()
f1 = stanza.Fallback()
f1["for"] = "ns1"
f2 = stanza.Fallback()
f2["for"] = "ns2"
message.append(f1)
message.append(f2)
self.check(
message, # language=XML
"""
<message>
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
</message>
""",
)
for i, fallback in enumerate(message["fallbacks"], start=1):
self.assertEqual(fallback["for"], f"ns{i}")
def testStripFallbackPartOfBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
<body start="0" end="9" />
</fallback>
</message>
""",
)
self.assertEqual(
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
)
def testStripWholeBody(self):
message = Message()
message["body"] = "> quoted\nsome-body"
message["fallback"]["for"] = "ns"
message["fallback"].enable("body")
self.check(
message, # language=XML
"""
<message>
<body>&gt; quoted\nsome-body</body>
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
<body />
</fallback>
</message>
""",
)
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
def testStripMultiFallback(self):
message = Message()
message["body"] = "> huuuuu\n👍"
message["fallback"]["for"] = xep_0461.stanza.NS
message["fallback"]["body"]["start"] = 0
message["fallback"]["body"]["end"] = 9
reaction_fallback = stanza.Fallback()
reaction_fallback["for"] = xep_0444.stanza.NS
reaction_fallback.enable("body")
message.append(reaction_fallback)
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)

View File

@ -1,11 +1,13 @@
import unittest import unittest
from slixmpp import Message from slixmpp import Message
from slixmpp.test import SlixTest from slixmpp.test import SlixTest
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
from slixmpp.plugins.xep_0461 import stanza from slixmpp.plugins.xep_0461 import stanza
class TestReply(SlixTest): class TestReply(SlixTest):
def setUp(self): def setUp(self):
fallback_stanza.register_plugins()
stanza.register_plugins() stanza.register_plugins()
def testReply(self): def testReply(self):
@ -26,9 +28,9 @@ class TestReply(SlixTest):
def testFallback(self): def testFallback(self):
message = Message() message = Message()
message["body"] = "12345\nrealbody" message["body"] = "12345\nrealbody"
message["feature_fallback"]["for"] = "NS" message["fallback"]["for"] = "NS"
message["feature_fallback"]["fallback_body"]["start"] = 0 message["fallback"]["body"]["start"] = 0
message["feature_fallback"]["fallback_body"]["end"] = 6 message["fallback"]["body"]["end"] = 6
self.check( self.check(
message, message,
@ -42,18 +44,18 @@ class TestReply(SlixTest):
""", """,
) )
assert message["feature_fallback"].get_stripped_body() == "realbody" assert message["fallback"].get_stripped_body("NS") == "realbody"
def testAddFallBackHelper(self): def testAddFallBackHelper(self):
msg = Message() msg = Message()
msg["body"] = "Great" msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?") msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
# ugly dedent but the test does not pass without it
self.check( self.check(
msg, msg, # language=XML
""" """
<message xmlns="jabber:client" type="normal"> <message xmlns="jabber:client" type="normal">
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body> <body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
<reply xmlns="urn:xmpp:reply:0" />
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0"> <fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
<body start='0' end='33' /> <body start='0' end='33' />
</fallback> </fallback>
@ -67,8 +69,8 @@ class TestReply(SlixTest):
msg = Message() msg = Message()
msg["body"] = "Great" msg["body"] = "Great"
msg["feature_fallback"].add_quoted_fallback(body) msg["reply"].add_quoted_fallback(body)
body2 = msg["feature_fallback"].get_fallback_body() body2 = msg["reply"].get_fallback_body()
self.assertTrue(body2 == quoted, body2) self.assertTrue(body2 == quoted, body2)

View File

@ -9,8 +9,8 @@ class TestReply(SlixTest):
def testFallBackBody(self): def testFallBackBody(self):
async def on_reply(msg): async def on_reply(msg):
start = msg["feature_fallback"]["fallback_body"]["start"] start = msg["fallback"]["body"]["start"]
end = msg["feature_fallback"]["fallback_body"]["end"] end = msg["fallback"]["body"]["end"]
self.xmpp["xep_0461"].send_reply( self.xmpp["xep_0461"].send_reply(
reply_to=msg.get_from(), reply_to=msg.get_from(),
reply_id=msg.get_id(), reply_id=msg.get_id(),