Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
985926ed7b | ||
![]() |
8d63bd68cf | ||
![]() |
465e735d18 | ||
![]() |
fea4ee83be |
@ -1,8 +1,13 @@
|
||||
|
||||
# Slixmpp: The Slick XMPP Library
|
||||
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
|
||||
# This file is part of Slixmpp.
|
||||
# See the file LICENSE for copying permissio
|
||||
from abc import ABC
|
||||
try:
|
||||
from typing import Literal
|
||||
except ImportError:
|
||||
from typing_extensions import Literal
|
||||
|
||||
from slixmpp.stanza import Message
|
||||
from slixmpp.xmlstream import (
|
||||
ElementBase,
|
||||
@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
|
||||
)
|
||||
|
||||
|
||||
NS = 'urn:xmpp:fallback:0'
|
||||
NS = "urn:xmpp:fallback:0"
|
||||
|
||||
|
||||
class Fallback(ElementBase):
|
||||
namespace = NS
|
||||
name = 'fallback'
|
||||
plugin_attrib = 'fallback'
|
||||
name = "fallback"
|
||||
plugin_attrib = "fallback"
|
||||
plugin_multi_attrib = "fallbacks"
|
||||
interfaces = {"for"}
|
||||
|
||||
def _find_fallback(self, fallback_for: str) -> "Fallback":
|
||||
if self["for"] == fallback_for:
|
||||
return self
|
||||
for fallback in self.parent()["fallbacks"]:
|
||||
if fallback["for"] == fallback_for:
|
||||
return fallback
|
||||
raise AttributeError("No fallback for this namespace", fallback_for)
|
||||
|
||||
def get_stripped_body(
|
||||
self, fallback_for: str, element: Literal["body", "subject"] = "body"
|
||||
) -> str:
|
||||
"""
|
||||
Get the body of a message, with the fallback part stripped
|
||||
|
||||
:param fallback_for: namespace of the fallback to strip
|
||||
:param element: set this to "subject" get the stripped subject instead
|
||||
of body
|
||||
|
||||
:return: body (or subject) content minus the fallback part
|
||||
"""
|
||||
fallback = self._find_fallback(fallback_for)
|
||||
start = fallback[element]["start"]
|
||||
end = fallback[element]["end"]
|
||||
body = self.parent()[element]
|
||||
if start == end == 0:
|
||||
return ""
|
||||
if start <= end < len(body):
|
||||
return body[:start] + body[end:]
|
||||
else:
|
||||
return body
|
||||
|
||||
|
||||
class FallbackMixin(ABC):
|
||||
namespace = NS
|
||||
name = NotImplemented
|
||||
plugin_attrib = NotImplemented
|
||||
interfaces = {"start", "end"}
|
||||
|
||||
def set_start(self, v: int):
|
||||
self._set_attr("start", str(v))
|
||||
|
||||
def get_start(self):
|
||||
return _int_or_zero(self._get_attr("start"))
|
||||
|
||||
def set_end(self, v: int):
|
||||
self._set_attr("end", str(v))
|
||||
|
||||
def get_end(self):
|
||||
return _int_or_zero(self._get_attr("end"))
|
||||
|
||||
|
||||
class FallbackBody(FallbackMixin, ElementBase):
|
||||
name = plugin_attrib = "body"
|
||||
|
||||
|
||||
class FallbackSubject(FallbackMixin, ElementBase):
|
||||
name = plugin_attrib = "subject"
|
||||
|
||||
|
||||
def _int_or_zero(v: str):
|
||||
try:
|
||||
return int(v)
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
|
||||
def register_plugins():
|
||||
register_stanza_plugin(Message, Fallback)
|
||||
register_stanza_plugin(Message, Fallback, iterable=True)
|
||||
register_stanza_plugin(Fallback, FallbackBody)
|
||||
register_stanza_plugin(Fallback, FallbackSubject)
|
||||
|
@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
|
||||
name = "xep_0461"
|
||||
description = "XEP-0461: Message Replies"
|
||||
|
||||
dependencies = {"xep_0030"}
|
||||
dependencies = {"xep_0030", "xep_0428"}
|
||||
stanza = stanza
|
||||
namespace = stanza.NS
|
||||
|
||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
||||
|
||||
from slixmpp.stanza import Message
|
||||
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
|
||||
from slixmpp.plugins.xep_0428.stanza import Fallback
|
||||
|
||||
NS = "urn:xmpp:reply:0"
|
||||
|
||||
@ -12,39 +13,11 @@ class Reply(ElementBase):
|
||||
plugin_attrib = "reply"
|
||||
interfaces = {"id", "to"}
|
||||
|
||||
|
||||
class FeatureFallBack(ElementBase):
|
||||
# should also be a multi attrib
|
||||
namespace = "urn:xmpp:fallback:0"
|
||||
name = "fallback"
|
||||
plugin_attrib = "feature_fallback"
|
||||
interfaces = {"for"}
|
||||
|
||||
def get_fallback_body(self):
|
||||
# only works for a single fallback_body attrib
|
||||
start = self["fallback_body"]["start"]
|
||||
end = self["fallback_body"]["end"]
|
||||
body = self.parent()["body"]
|
||||
if start <= end:
|
||||
return body[start:end]
|
||||
else:
|
||||
return ""
|
||||
|
||||
def get_stripped_body(self):
|
||||
# only works for a single fallback_body attrib
|
||||
start = self["fallback_body"]["start"]
|
||||
end = self["fallback_body"]["end"]
|
||||
body = self.parent()["body"]
|
||||
if start <= end < len(body):
|
||||
return body[:start] + body[end:]
|
||||
else:
|
||||
return body
|
||||
|
||||
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
|
||||
"""
|
||||
Add plain text fallback for clients not implementing XEP-0461.
|
||||
|
||||
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
|
||||
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
|
||||
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
|
||||
fallback_body attributes accordingly, so that clients implementing
|
||||
XEP-0461 can hide the fallback text.
|
||||
@ -57,39 +30,27 @@ class FeatureFallBack(ElementBase):
|
||||
if nickname:
|
||||
quoted = "> " + nickname + ":\n" + quoted
|
||||
msg["body"] = quoted + msg["body"]
|
||||
msg["feature_fallback"]["for"] = NS
|
||||
msg["feature_fallback"]["fallback_body"]["start"] = 0
|
||||
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
|
||||
fallback = Fallback()
|
||||
fallback["for"] = NS
|
||||
fallback["body"]["start"] = 0
|
||||
fallback["body"]["end"] = len(quoted)
|
||||
msg.append(fallback)
|
||||
|
||||
|
||||
class FallBackBody(ElementBase):
|
||||
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
|
||||
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
|
||||
namespace = FeatureFallBack.namespace
|
||||
name = "body"
|
||||
plugin_attrib = "fallback_body"
|
||||
interfaces = {"start", "end"}
|
||||
|
||||
def set_start(self, v: int):
|
||||
self._set_attr("start", str(v))
|
||||
|
||||
def get_start(self):
|
||||
try:
|
||||
return int(self._get_attr("start"))
|
||||
except ValueError:
|
||||
return 0
|
||||
|
||||
def set_end(self, v: int):
|
||||
self._set_attr("end", str(v))
|
||||
|
||||
def get_end(self):
|
||||
try:
|
||||
return int(self._get_attr("end"))
|
||||
except ValueError:
|
||||
return 0
|
||||
def get_fallback_body(self) -> str:
|
||||
msg = self.parent()
|
||||
for fallback in msg["fallbacks"]:
|
||||
if fallback["for"] == NS:
|
||||
break
|
||||
else:
|
||||
return ""
|
||||
start = fallback["body"]["start"]
|
||||
end = fallback["body"]["end"]
|
||||
body = msg["body"]
|
||||
if start <= end:
|
||||
return body[start:end]
|
||||
else:
|
||||
return ""
|
||||
|
||||
|
||||
def register_plugins():
|
||||
register_stanza_plugin(Message, Reply)
|
||||
register_stanza_plugin(Message, FeatureFallBack)
|
||||
register_stanza_plugin(FeatureFallBack, FallBackBody)
|
||||
|
@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
|
||||
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
|
||||
|
||||
__all__ = ['JID', 'StanzaBase', 'ElementBase',
|
||||
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
|
||||
'RESPONSE_TIMEOUT']
|
||||
'ET', 'tostring', 'highlight', 'XMLStream',
|
||||
'RESPONSE_TIMEOUT', 'register_stanza_plugin']
|
||||
|
@ -1243,7 +1243,7 @@ class ElementBase(object):
|
||||
self.init_plugin(item.__class__.plugin_multi_attrib)
|
||||
else:
|
||||
self.iterables.append(item)
|
||||
|
||||
item.parent = weakref.ref(self)
|
||||
return self
|
||||
|
||||
def appendxml(self, xml: ET.Element) -> ElementBase:
|
||||
|
149
tests/test_stanza_xep_0428.py
Normal file
149
tests/test_stanza_xep_0428.py
Normal file
@ -0,0 +1,149 @@
|
||||
import unittest
|
||||
|
||||
from slixmpp import Message
|
||||
from slixmpp.test import SlixTest
|
||||
from slixmpp.plugins.xep_0428 import stanza
|
||||
|
||||
from slixmpp.plugins import xep_0461
|
||||
from slixmpp.plugins import xep_0444
|
||||
|
||||
|
||||
class TestFallback(SlixTest):
|
||||
def setUp(self):
|
||||
stanza.register_plugins()
|
||||
|
||||
def testSingleFallbackBody(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 8
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body start="0" end="8" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testSingleFallbackSubject(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"]["subject"]["start"] = 0
|
||||
message["fallback"]["subject"]["end"] = 8
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<subject start="0" end="8" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testSingleFallbackWholeBody(self):
|
||||
message = Message()
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"].enable("body")
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
def testMultiFallback(self):
|
||||
message = Message()
|
||||
|
||||
f1 = stanza.Fallback()
|
||||
f1["for"] = "ns1"
|
||||
|
||||
f2 = stanza.Fallback()
|
||||
f2["for"] = "ns2"
|
||||
|
||||
message.append(f1)
|
||||
message.append(f2)
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
for i, fallback in enumerate(message["fallbacks"], start=1):
|
||||
self.assertEqual(fallback["for"], f"ns{i}")
|
||||
|
||||
def testStripFallbackPartOfBody(self):
|
||||
message = Message()
|
||||
message["body"] = "> quoted\nsome-body"
|
||||
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 9
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<body>> quoted\nsome-body</body>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
|
||||
<body start="0" end="9" />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
|
||||
)
|
||||
|
||||
def testStripWholeBody(self):
|
||||
message = Message()
|
||||
message["body"] = "> quoted\nsome-body"
|
||||
message["fallback"]["for"] = "ns"
|
||||
message["fallback"].enable("body")
|
||||
|
||||
self.check(
|
||||
message, # language=XML
|
||||
"""
|
||||
<message>
|
||||
<body>> quoted\nsome-body</body>
|
||||
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||
<body />
|
||||
</fallback>
|
||||
</message>
|
||||
""",
|
||||
)
|
||||
|
||||
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
|
||||
|
||||
def testStripMultiFallback(self):
|
||||
message = Message()
|
||||
message["body"] = "> huuuuu\n👍"
|
||||
|
||||
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 9
|
||||
|
||||
reaction_fallback = stanza.Fallback()
|
||||
reaction_fallback["for"] = xep_0444.stanza.NS
|
||||
reaction_fallback.enable("body")
|
||||
message.append(reaction_fallback)
|
||||
|
||||
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
|
||||
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
|
||||
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)
|
@ -1,11 +1,13 @@
|
||||
import unittest
|
||||
from slixmpp import Message
|
||||
from slixmpp.test import SlixTest
|
||||
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
|
||||
from slixmpp.plugins.xep_0461 import stanza
|
||||
|
||||
|
||||
class TestReply(SlixTest):
|
||||
def setUp(self):
|
||||
fallback_stanza.register_plugins()
|
||||
stanza.register_plugins()
|
||||
|
||||
def testReply(self):
|
||||
@ -26,9 +28,9 @@ class TestReply(SlixTest):
|
||||
def testFallback(self):
|
||||
message = Message()
|
||||
message["body"] = "12345\nrealbody"
|
||||
message["feature_fallback"]["for"] = "NS"
|
||||
message["feature_fallback"]["fallback_body"]["start"] = 0
|
||||
message["feature_fallback"]["fallback_body"]["end"] = 6
|
||||
message["fallback"]["for"] = "NS"
|
||||
message["fallback"]["body"]["start"] = 0
|
||||
message["fallback"]["body"]["end"] = 6
|
||||
|
||||
self.check(
|
||||
message,
|
||||
@ -42,18 +44,18 @@ class TestReply(SlixTest):
|
||||
""",
|
||||
)
|
||||
|
||||
assert message["feature_fallback"].get_stripped_body() == "realbody"
|
||||
assert message["fallback"].get_stripped_body("NS") == "realbody"
|
||||
|
||||
def testAddFallBackHelper(self):
|
||||
msg = Message()
|
||||
msg["body"] = "Great"
|
||||
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
|
||||
# ugly dedent but the test does not pass without it
|
||||
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
|
||||
self.check(
|
||||
msg,
|
||||
msg, # language=XML
|
||||
"""
|
||||
<message xmlns="jabber:client" type="normal">
|
||||
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
|
||||
<reply xmlns="urn:xmpp:reply:0" />
|
||||
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
|
||||
<body start='0' end='33' />
|
||||
</fallback>
|
||||
@ -67,8 +69,8 @@ class TestReply(SlixTest):
|
||||
|
||||
msg = Message()
|
||||
msg["body"] = "Great"
|
||||
msg["feature_fallback"].add_quoted_fallback(body)
|
||||
body2 = msg["feature_fallback"].get_fallback_body()
|
||||
msg["reply"].add_quoted_fallback(body)
|
||||
body2 = msg["reply"].get_fallback_body()
|
||||
self.assertTrue(body2 == quoted, body2)
|
||||
|
||||
|
||||
|
@ -9,8 +9,8 @@ class TestReply(SlixTest):
|
||||
|
||||
def testFallBackBody(self):
|
||||
async def on_reply(msg):
|
||||
start = msg["feature_fallback"]["fallback_body"]["start"]
|
||||
end = msg["feature_fallback"]["fallback_body"]["end"]
|
||||
start = msg["fallback"]["body"]["start"]
|
||||
end = msg["fallback"]["body"]["end"]
|
||||
self.xmpp["xep_0461"].send_reply(
|
||||
reply_to=msg.get_from(),
|
||||
reply_id=msg.get_id(),
|
||||
|
Loading…
Reference in New Issue
Block a user