Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
985926ed7b | ||
![]() |
8d63bd68cf | ||
![]() |
465e735d18 | ||
![]() |
fea4ee83be |
@ -1,8 +1,13 @@
|
|||||||
|
|
||||||
# Slixmpp: The Slick XMPP Library
|
# Slixmpp: The Slick XMPP Library
|
||||||
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
|
# Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net>
|
||||||
# This file is part of Slixmpp.
|
# This file is part of Slixmpp.
|
||||||
# See the file LICENSE for copying permissio
|
# See the file LICENSE for copying permissio
|
||||||
|
from abc import ABC
|
||||||
|
try:
|
||||||
|
from typing import Literal
|
||||||
|
except ImportError:
|
||||||
|
from typing_extensions import Literal
|
||||||
|
|
||||||
from slixmpp.stanza import Message
|
from slixmpp.stanza import Message
|
||||||
from slixmpp.xmlstream import (
|
from slixmpp.xmlstream import (
|
||||||
ElementBase,
|
ElementBase,
|
||||||
@ -10,14 +15,83 @@ from slixmpp.xmlstream import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
NS = 'urn:xmpp:fallback:0'
|
NS = "urn:xmpp:fallback:0"
|
||||||
|
|
||||||
|
|
||||||
class Fallback(ElementBase):
|
class Fallback(ElementBase):
|
||||||
namespace = NS
|
namespace = NS
|
||||||
name = 'fallback'
|
name = "fallback"
|
||||||
plugin_attrib = 'fallback'
|
plugin_attrib = "fallback"
|
||||||
|
plugin_multi_attrib = "fallbacks"
|
||||||
|
interfaces = {"for"}
|
||||||
|
|
||||||
|
def _find_fallback(self, fallback_for: str) -> "Fallback":
|
||||||
|
if self["for"] == fallback_for:
|
||||||
|
return self
|
||||||
|
for fallback in self.parent()["fallbacks"]:
|
||||||
|
if fallback["for"] == fallback_for:
|
||||||
|
return fallback
|
||||||
|
raise AttributeError("No fallback for this namespace", fallback_for)
|
||||||
|
|
||||||
|
def get_stripped_body(
|
||||||
|
self, fallback_for: str, element: Literal["body", "subject"] = "body"
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Get the body of a message, with the fallback part stripped
|
||||||
|
|
||||||
|
:param fallback_for: namespace of the fallback to strip
|
||||||
|
:param element: set this to "subject" get the stripped subject instead
|
||||||
|
of body
|
||||||
|
|
||||||
|
:return: body (or subject) content minus the fallback part
|
||||||
|
"""
|
||||||
|
fallback = self._find_fallback(fallback_for)
|
||||||
|
start = fallback[element]["start"]
|
||||||
|
end = fallback[element]["end"]
|
||||||
|
body = self.parent()[element]
|
||||||
|
if start == end == 0:
|
||||||
|
return ""
|
||||||
|
if start <= end < len(body):
|
||||||
|
return body[:start] + body[end:]
|
||||||
|
else:
|
||||||
|
return body
|
||||||
|
|
||||||
|
|
||||||
|
class FallbackMixin(ABC):
|
||||||
|
namespace = NS
|
||||||
|
name = NotImplemented
|
||||||
|
plugin_attrib = NotImplemented
|
||||||
|
interfaces = {"start", "end"}
|
||||||
|
|
||||||
|
def set_start(self, v: int):
|
||||||
|
self._set_attr("start", str(v))
|
||||||
|
|
||||||
|
def get_start(self):
|
||||||
|
return _int_or_zero(self._get_attr("start"))
|
||||||
|
|
||||||
|
def set_end(self, v: int):
|
||||||
|
self._set_attr("end", str(v))
|
||||||
|
|
||||||
|
def get_end(self):
|
||||||
|
return _int_or_zero(self._get_attr("end"))
|
||||||
|
|
||||||
|
|
||||||
|
class FallbackBody(FallbackMixin, ElementBase):
|
||||||
|
name = plugin_attrib = "body"
|
||||||
|
|
||||||
|
|
||||||
|
class FallbackSubject(FallbackMixin, ElementBase):
|
||||||
|
name = plugin_attrib = "subject"
|
||||||
|
|
||||||
|
|
||||||
|
def _int_or_zero(v: str):
|
||||||
|
try:
|
||||||
|
return int(v)
|
||||||
|
except ValueError:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
def register_plugins():
|
def register_plugins():
|
||||||
register_stanza_plugin(Message, Fallback)
|
register_stanza_plugin(Message, Fallback, iterable=True)
|
||||||
|
register_stanza_plugin(Fallback, FallbackBody)
|
||||||
|
register_stanza_plugin(Fallback, FallbackSubject)
|
||||||
|
@ -13,7 +13,7 @@ class XEP_0461(BasePlugin):
|
|||||||
name = "xep_0461"
|
name = "xep_0461"
|
||||||
description = "XEP-0461: Message Replies"
|
description = "XEP-0461: Message Replies"
|
||||||
|
|
||||||
dependencies = {"xep_0030"}
|
dependencies = {"xep_0030", "xep_0428"}
|
||||||
stanza = stanza
|
stanza = stanza
|
||||||
namespace = stanza.NS
|
namespace = stanza.NS
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
|||||||
|
|
||||||
from slixmpp.stanza import Message
|
from slixmpp.stanza import Message
|
||||||
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
|
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
|
||||||
|
from slixmpp.plugins.xep_0428.stanza import Fallback
|
||||||
|
|
||||||
NS = "urn:xmpp:reply:0"
|
NS = "urn:xmpp:reply:0"
|
||||||
|
|
||||||
@ -12,39 +13,11 @@ class Reply(ElementBase):
|
|||||||
plugin_attrib = "reply"
|
plugin_attrib = "reply"
|
||||||
interfaces = {"id", "to"}
|
interfaces = {"id", "to"}
|
||||||
|
|
||||||
|
|
||||||
class FeatureFallBack(ElementBase):
|
|
||||||
# should also be a multi attrib
|
|
||||||
namespace = "urn:xmpp:fallback:0"
|
|
||||||
name = "fallback"
|
|
||||||
plugin_attrib = "feature_fallback"
|
|
||||||
interfaces = {"for"}
|
|
||||||
|
|
||||||
def get_fallback_body(self):
|
|
||||||
# only works for a single fallback_body attrib
|
|
||||||
start = self["fallback_body"]["start"]
|
|
||||||
end = self["fallback_body"]["end"]
|
|
||||||
body = self.parent()["body"]
|
|
||||||
if start <= end:
|
|
||||||
return body[start:end]
|
|
||||||
else:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
def get_stripped_body(self):
|
|
||||||
# only works for a single fallback_body attrib
|
|
||||||
start = self["fallback_body"]["start"]
|
|
||||||
end = self["fallback_body"]["end"]
|
|
||||||
body = self.parent()["body"]
|
|
||||||
if start <= end < len(body):
|
|
||||||
return body[:start] + body[end:]
|
|
||||||
else:
|
|
||||||
return body
|
|
||||||
|
|
||||||
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
|
def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None):
|
||||||
"""
|
"""
|
||||||
Add plain text fallback for clients not implementing XEP-0461.
|
Add plain text fallback for clients not implementing XEP-0461.
|
||||||
|
|
||||||
``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will
|
``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will
|
||||||
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
|
prepend "> Bob:\n> Some text\n" to the body of the message, and set the
|
||||||
fallback_body attributes accordingly, so that clients implementing
|
fallback_body attributes accordingly, so that clients implementing
|
||||||
XEP-0461 can hide the fallback text.
|
XEP-0461 can hide the fallback text.
|
||||||
@ -57,39 +30,27 @@ class FeatureFallBack(ElementBase):
|
|||||||
if nickname:
|
if nickname:
|
||||||
quoted = "> " + nickname + ":\n" + quoted
|
quoted = "> " + nickname + ":\n" + quoted
|
||||||
msg["body"] = quoted + msg["body"]
|
msg["body"] = quoted + msg["body"]
|
||||||
msg["feature_fallback"]["for"] = NS
|
fallback = Fallback()
|
||||||
msg["feature_fallback"]["fallback_body"]["start"] = 0
|
fallback["for"] = NS
|
||||||
msg["feature_fallback"]["fallback_body"]["end"] = len(quoted)
|
fallback["body"]["start"] = 0
|
||||||
|
fallback["body"]["end"] = len(quoted)
|
||||||
|
msg.append(fallback)
|
||||||
|
|
||||||
|
def get_fallback_body(self) -> str:
|
||||||
class FallBackBody(ElementBase):
|
msg = self.parent()
|
||||||
# According to https://xmpp.org/extensions/inbox/compatibility-fallback.html
|
for fallback in msg["fallbacks"]:
|
||||||
# this should be a multi_attrib *but* since it's a protoXEP, we'll see...
|
if fallback["for"] == NS:
|
||||||
namespace = FeatureFallBack.namespace
|
break
|
||||||
name = "body"
|
else:
|
||||||
plugin_attrib = "fallback_body"
|
return ""
|
||||||
interfaces = {"start", "end"}
|
start = fallback["body"]["start"]
|
||||||
|
end = fallback["body"]["end"]
|
||||||
def set_start(self, v: int):
|
body = msg["body"]
|
||||||
self._set_attr("start", str(v))
|
if start <= end:
|
||||||
|
return body[start:end]
|
||||||
def get_start(self):
|
else:
|
||||||
try:
|
return ""
|
||||||
return int(self._get_attr("start"))
|
|
||||||
except ValueError:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def set_end(self, v: int):
|
|
||||||
self._set_attr("end", str(v))
|
|
||||||
|
|
||||||
def get_end(self):
|
|
||||||
try:
|
|
||||||
return int(self._get_attr("end"))
|
|
||||||
except ValueError:
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
def register_plugins():
|
def register_plugins():
|
||||||
register_stanza_plugin(Message, Reply)
|
register_stanza_plugin(Message, Reply)
|
||||||
register_stanza_plugin(Message, FeatureFallBack)
|
|
||||||
register_stanza_plugin(FeatureFallBack, FallBackBody)
|
|
||||||
|
@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight
|
|||||||
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
|
from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT
|
||||||
|
|
||||||
__all__ = ['JID', 'StanzaBase', 'ElementBase',
|
__all__ = ['JID', 'StanzaBase', 'ElementBase',
|
||||||
'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream',
|
'ET', 'tostring', 'highlight', 'XMLStream',
|
||||||
'RESPONSE_TIMEOUT']
|
'RESPONSE_TIMEOUT', 'register_stanza_plugin']
|
||||||
|
@ -1243,7 +1243,7 @@ class ElementBase(object):
|
|||||||
self.init_plugin(item.__class__.plugin_multi_attrib)
|
self.init_plugin(item.__class__.plugin_multi_attrib)
|
||||||
else:
|
else:
|
||||||
self.iterables.append(item)
|
self.iterables.append(item)
|
||||||
|
item.parent = weakref.ref(self)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def appendxml(self, xml: ET.Element) -> ElementBase:
|
def appendxml(self, xml: ET.Element) -> ElementBase:
|
||||||
|
149
tests/test_stanza_xep_0428.py
Normal file
149
tests/test_stanza_xep_0428.py
Normal file
@ -0,0 +1,149 @@
|
|||||||
|
import unittest
|
||||||
|
|
||||||
|
from slixmpp import Message
|
||||||
|
from slixmpp.test import SlixTest
|
||||||
|
from slixmpp.plugins.xep_0428 import stanza
|
||||||
|
|
||||||
|
from slixmpp.plugins import xep_0461
|
||||||
|
from slixmpp.plugins import xep_0444
|
||||||
|
|
||||||
|
|
||||||
|
class TestFallback(SlixTest):
|
||||||
|
def setUp(self):
|
||||||
|
stanza.register_plugins()
|
||||||
|
|
||||||
|
def testSingleFallbackBody(self):
|
||||||
|
message = Message()
|
||||||
|
message["fallback"]["for"] = "ns"
|
||||||
|
message["fallback"]["body"]["start"] = 0
|
||||||
|
message["fallback"]["body"]["end"] = 8
|
||||||
|
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||||
|
<body start="0" end="8" />
|
||||||
|
</fallback>
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
def testSingleFallbackSubject(self):
|
||||||
|
message = Message()
|
||||||
|
message["fallback"]["for"] = "ns"
|
||||||
|
message["fallback"]["subject"]["start"] = 0
|
||||||
|
message["fallback"]["subject"]["end"] = 8
|
||||||
|
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||||
|
<subject start="0" end="8" />
|
||||||
|
</fallback>
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
def testSingleFallbackWholeBody(self):
|
||||||
|
message = Message()
|
||||||
|
message["fallback"]["for"] = "ns"
|
||||||
|
message["fallback"].enable("body")
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||||
|
<body />
|
||||||
|
</fallback>
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
def testMultiFallback(self):
|
||||||
|
message = Message()
|
||||||
|
|
||||||
|
f1 = stanza.Fallback()
|
||||||
|
f1["for"] = "ns1"
|
||||||
|
|
||||||
|
f2 = stanza.Fallback()
|
||||||
|
f2["for"] = "ns2"
|
||||||
|
|
||||||
|
message.append(f1)
|
||||||
|
message.append(f2)
|
||||||
|
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns1' />
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns2' />
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, fallback in enumerate(message["fallbacks"], start=1):
|
||||||
|
self.assertEqual(fallback["for"], f"ns{i}")
|
||||||
|
|
||||||
|
def testStripFallbackPartOfBody(self):
|
||||||
|
message = Message()
|
||||||
|
message["body"] = "> quoted\nsome-body"
|
||||||
|
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||||
|
message["fallback"]["body"]["start"] = 0
|
||||||
|
message["fallback"]["body"]["end"] = 9
|
||||||
|
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<body>> quoted\nsome-body</body>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'>
|
||||||
|
<body start="0" end="9" />
|
||||||
|
</fallback>
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body"
|
||||||
|
)
|
||||||
|
|
||||||
|
def testStripWholeBody(self):
|
||||||
|
message = Message()
|
||||||
|
message["body"] = "> quoted\nsome-body"
|
||||||
|
message["fallback"]["for"] = "ns"
|
||||||
|
message["fallback"].enable("body")
|
||||||
|
|
||||||
|
self.check(
|
||||||
|
message, # language=XML
|
||||||
|
"""
|
||||||
|
<message>
|
||||||
|
<body>> quoted\nsome-body</body>
|
||||||
|
<fallback xmlns='urn:xmpp:fallback:0' for='ns'>
|
||||||
|
<body />
|
||||||
|
</fallback>
|
||||||
|
</message>
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(message["fallback"].get_stripped_body("ns"), "")
|
||||||
|
|
||||||
|
def testStripMultiFallback(self):
|
||||||
|
message = Message()
|
||||||
|
message["body"] = "> huuuuu\n👍"
|
||||||
|
|
||||||
|
message["fallback"]["for"] = xep_0461.stanza.NS
|
||||||
|
message["fallback"]["body"]["start"] = 0
|
||||||
|
message["fallback"]["body"]["end"] = 9
|
||||||
|
|
||||||
|
reaction_fallback = stanza.Fallback()
|
||||||
|
reaction_fallback["for"] = xep_0444.stanza.NS
|
||||||
|
reaction_fallback.enable("body")
|
||||||
|
message.append(reaction_fallback)
|
||||||
|
|
||||||
|
self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍")
|
||||||
|
self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "")
|
||||||
|
|
||||||
|
|
||||||
|
suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback)
|
@ -1,11 +1,13 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from slixmpp import Message
|
from slixmpp import Message
|
||||||
from slixmpp.test import SlixTest
|
from slixmpp.test import SlixTest
|
||||||
|
from slixmpp.plugins.xep_0428 import stanza as fallback_stanza
|
||||||
from slixmpp.plugins.xep_0461 import stanza
|
from slixmpp.plugins.xep_0461 import stanza
|
||||||
|
|
||||||
|
|
||||||
class TestReply(SlixTest):
|
class TestReply(SlixTest):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
fallback_stanza.register_plugins()
|
||||||
stanza.register_plugins()
|
stanza.register_plugins()
|
||||||
|
|
||||||
def testReply(self):
|
def testReply(self):
|
||||||
@ -26,9 +28,9 @@ class TestReply(SlixTest):
|
|||||||
def testFallback(self):
|
def testFallback(self):
|
||||||
message = Message()
|
message = Message()
|
||||||
message["body"] = "12345\nrealbody"
|
message["body"] = "12345\nrealbody"
|
||||||
message["feature_fallback"]["for"] = "NS"
|
message["fallback"]["for"] = "NS"
|
||||||
message["feature_fallback"]["fallback_body"]["start"] = 0
|
message["fallback"]["body"]["start"] = 0
|
||||||
message["feature_fallback"]["fallback_body"]["end"] = 6
|
message["fallback"]["body"]["end"] = 6
|
||||||
|
|
||||||
self.check(
|
self.check(
|
||||||
message,
|
message,
|
||||||
@ -42,18 +44,18 @@ class TestReply(SlixTest):
|
|||||||
""",
|
""",
|
||||||
)
|
)
|
||||||
|
|
||||||
assert message["feature_fallback"].get_stripped_body() == "realbody"
|
assert message["fallback"].get_stripped_body("NS") == "realbody"
|
||||||
|
|
||||||
def testAddFallBackHelper(self):
|
def testAddFallBackHelper(self):
|
||||||
msg = Message()
|
msg = Message()
|
||||||
msg["body"] = "Great"
|
msg["body"] = "Great"
|
||||||
msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
|
msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?")
|
||||||
# ugly dedent but the test does not pass without it
|
|
||||||
self.check(
|
self.check(
|
||||||
msg,
|
msg, # language=XML
|
||||||
"""
|
"""
|
||||||
<message xmlns="jabber:client" type="normal">
|
<message xmlns="jabber:client" type="normal">
|
||||||
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
|
<body>> Anna wrote:\n> Hi, how are you?\nGreat</body>
|
||||||
|
<reply xmlns="urn:xmpp:reply:0" />
|
||||||
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
|
<fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0">
|
||||||
<body start='0' end='33' />
|
<body start='0' end='33' />
|
||||||
</fallback>
|
</fallback>
|
||||||
@ -67,8 +69,8 @@ class TestReply(SlixTest):
|
|||||||
|
|
||||||
msg = Message()
|
msg = Message()
|
||||||
msg["body"] = "Great"
|
msg["body"] = "Great"
|
||||||
msg["feature_fallback"].add_quoted_fallback(body)
|
msg["reply"].add_quoted_fallback(body)
|
||||||
body2 = msg["feature_fallback"].get_fallback_body()
|
body2 = msg["reply"].get_fallback_body()
|
||||||
self.assertTrue(body2 == quoted, body2)
|
self.assertTrue(body2 == quoted, body2)
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,8 +9,8 @@ class TestReply(SlixTest):
|
|||||||
|
|
||||||
def testFallBackBody(self):
|
def testFallBackBody(self):
|
||||||
async def on_reply(msg):
|
async def on_reply(msg):
|
||||||
start = msg["feature_fallback"]["fallback_body"]["start"]
|
start = msg["fallback"]["body"]["start"]
|
||||||
end = msg["feature_fallback"]["fallback_body"]["end"]
|
end = msg["fallback"]["body"]["end"]
|
||||||
self.xmpp["xep_0461"].send_reply(
|
self.xmpp["xep_0461"].send_reply(
|
||||||
reply_to=msg.get_from(),
|
reply_to=msg.get_from(),
|
||||||
reply_id=msg.get_id(),
|
reply_id=msg.get_id(),
|
||||||
|
Loading…
Reference in New Issue
Block a user