Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 56004802fa | ||
|   | 8bfe6177f4 | 
| @@ -1,13 +1,8 @@ | ||||
|  | ||||
| # Slixmpp: The Slick XMPP Library | ||||
| # Copyright (C) 2020 Mathieu Pasquet <mathieui@mathieui.net> | ||||
| # This file is part of Slixmpp. | ||||
| # See the file LICENSE for copying permissio | ||||
| from abc import ABC | ||||
| try: | ||||
|     from typing import Literal | ||||
| except ImportError: | ||||
|     from typing_extensions import Literal | ||||
|  | ||||
| from slixmpp.stanza import Message | ||||
| from slixmpp.xmlstream import ( | ||||
|     ElementBase, | ||||
| @@ -15,83 +10,14 @@ from slixmpp.xmlstream import ( | ||||
| ) | ||||
|  | ||||
|  | ||||
| NS = "urn:xmpp:fallback:0" | ||||
| NS = 'urn:xmpp:fallback:0' | ||||
|  | ||||
|  | ||||
| class Fallback(ElementBase): | ||||
|     namespace = NS | ||||
|     name = "fallback" | ||||
|     plugin_attrib = "fallback" | ||||
|     plugin_multi_attrib = "fallbacks" | ||||
|     interfaces = {"for"} | ||||
|  | ||||
|     def _find_fallback(self, fallback_for: str) -> "Fallback": | ||||
|         if self["for"] == fallback_for: | ||||
|             return self | ||||
|         for fallback in self.parent()["fallbacks"]: | ||||
|             if fallback["for"] == fallback_for: | ||||
|                 return fallback | ||||
|         raise AttributeError("No fallback for this namespace", fallback_for) | ||||
|  | ||||
|     def get_stripped_body( | ||||
|         self, fallback_for: str, element: Literal["body", "subject"] = "body" | ||||
|     ) -> str: | ||||
|         """ | ||||
|         Get the body of a message, with the fallback part stripped | ||||
|  | ||||
|         :param fallback_for: namespace of the fallback to strip | ||||
|         :param element: set this to "subject" get the stripped subject instead | ||||
|             of body | ||||
|  | ||||
|         :return: body (or subject) content minus the fallback part | ||||
|         """ | ||||
|         fallback = self._find_fallback(fallback_for) | ||||
|         start = fallback[element]["start"] | ||||
|         end = fallback[element]["end"] | ||||
|         body = self.parent()[element] | ||||
|         if start == end == 0: | ||||
|             return "" | ||||
|         if start <= end < len(body): | ||||
|             return body[:start] + body[end:] | ||||
|         else: | ||||
|             return body | ||||
|  | ||||
|  | ||||
| class FallbackMixin(ABC): | ||||
|     namespace = NS | ||||
|     name = NotImplemented | ||||
|     plugin_attrib = NotImplemented | ||||
|     interfaces = {"start", "end"} | ||||
|  | ||||
|     def set_start(self, v: int): | ||||
|         self._set_attr("start", str(v)) | ||||
|  | ||||
|     def get_start(self): | ||||
|         return _int_or_zero(self._get_attr("start")) | ||||
|  | ||||
|     def set_end(self, v: int): | ||||
|         self._set_attr("end", str(v)) | ||||
|  | ||||
|     def get_end(self): | ||||
|         return _int_or_zero(self._get_attr("end")) | ||||
|  | ||||
|  | ||||
| class FallbackBody(FallbackMixin, ElementBase): | ||||
|     name = plugin_attrib = "body" | ||||
|  | ||||
|  | ||||
| class FallbackSubject(FallbackMixin, ElementBase): | ||||
|     name = plugin_attrib = "subject" | ||||
|  | ||||
|  | ||||
| def _int_or_zero(v: str): | ||||
|     try: | ||||
|         return int(v) | ||||
|     except ValueError: | ||||
|         return 0 | ||||
|     name = 'fallback' | ||||
|     plugin_attrib = 'fallback' | ||||
|  | ||||
|  | ||||
| def register_plugins(): | ||||
|     register_stanza_plugin(Message, Fallback, iterable=True) | ||||
|     register_stanza_plugin(Fallback, FallbackBody) | ||||
|     register_stanza_plugin(Fallback, FallbackSubject) | ||||
|     register_stanza_plugin(Message, Fallback) | ||||
|   | ||||
| @@ -13,7 +13,7 @@ class XEP_0461(BasePlugin): | ||||
|     name = "xep_0461" | ||||
|     description = "XEP-0461: Message Replies" | ||||
|  | ||||
|     dependencies = {"xep_0030", "xep_0428"} | ||||
|     dependencies = {"xep_0030"} | ||||
|     stanza = stanza | ||||
|     namespace = stanza.NS | ||||
|  | ||||
|   | ||||
| @@ -2,7 +2,6 @@ from typing import Optional | ||||
|  | ||||
| from slixmpp.stanza import Message | ||||
| from slixmpp.xmlstream import ElementBase, register_stanza_plugin | ||||
| from slixmpp.plugins.xep_0428.stanza import Fallback | ||||
|  | ||||
| NS = "urn:xmpp:reply:0" | ||||
|  | ||||
| @@ -13,11 +12,39 @@ class Reply(ElementBase): | ||||
|     plugin_attrib = "reply" | ||||
|     interfaces = {"id", "to"} | ||||
|  | ||||
|  | ||||
| class FeatureFallBack(ElementBase): | ||||
|     # should also be a multi attrib | ||||
|     namespace = "urn:xmpp:fallback:0" | ||||
|     name = "fallback" | ||||
|     plugin_attrib = "feature_fallback" | ||||
|     interfaces = {"for"} | ||||
|  | ||||
|     def get_fallback_body(self): | ||||
|         # only works for a single fallback_body attrib | ||||
|         start = self["fallback_body"]["start"] | ||||
|         end = self["fallback_body"]["end"] | ||||
|         body = self.parent()["body"] | ||||
|         if start <= end: | ||||
|             return body[start:end] | ||||
|         else: | ||||
|             return "" | ||||
|  | ||||
|     def get_stripped_body(self): | ||||
|         # only works for a single fallback_body attrib | ||||
|         start = self["fallback_body"]["start"] | ||||
|         end = self["fallback_body"]["end"] | ||||
|         body = self.parent()["body"] | ||||
|         if start <= end < len(body): | ||||
|             return body[:start] + body[end:] | ||||
|         else: | ||||
|             return body | ||||
|  | ||||
|     def add_quoted_fallback(self, fallback: str, nickname: Optional[str] = None): | ||||
|         """ | ||||
|         Add plain text fallback for clients not implementing XEP-0461. | ||||
|  | ||||
|         ``msg["reply"].add_quoted_fallback("Some text", "Bob")`` will | ||||
|         ``msg["feature_fallback"].add_quoted_fallback("Some text", "Bob")`` will | ||||
|         prepend "> Bob:\n> Some text\n" to the body of the message, and set the | ||||
|         fallback_body attributes accordingly, so that clients implementing | ||||
|         XEP-0461 can hide the fallback text. | ||||
| @@ -30,27 +57,39 @@ class Reply(ElementBase): | ||||
|         if nickname: | ||||
|             quoted = "> " + nickname + ":\n" + quoted | ||||
|         msg["body"] = quoted + msg["body"] | ||||
|         fallback = Fallback() | ||||
|         fallback["for"] = NS | ||||
|         fallback["body"]["start"] = 0 | ||||
|         fallback["body"]["end"] = len(quoted) | ||||
|         msg.append(fallback) | ||||
|         msg["feature_fallback"]["for"] = NS | ||||
|         msg["feature_fallback"]["fallback_body"]["start"] = 0 | ||||
|         msg["feature_fallback"]["fallback_body"]["end"] = len(quoted) | ||||
|  | ||||
|     def get_fallback_body(self) -> str: | ||||
|         msg = self.parent() | ||||
|         for fallback in msg["fallbacks"]: | ||||
|             if fallback["for"] == NS: | ||||
|                 break | ||||
|         else: | ||||
|             return "" | ||||
|         start = fallback["body"]["start"] | ||||
|         end = fallback["body"]["end"] | ||||
|         body = msg["body"] | ||||
|         if start <= end: | ||||
|             return body[start:end] | ||||
|         else: | ||||
|             return "" | ||||
|  | ||||
| class FallBackBody(ElementBase): | ||||
|     # According to https://xmpp.org/extensions/inbox/compatibility-fallback.html | ||||
|     # this should be a multi_attrib *but* since it's a protoXEP, we'll see... | ||||
|     namespace = FeatureFallBack.namespace | ||||
|     name = "body" | ||||
|     plugin_attrib = "fallback_body" | ||||
|     interfaces = {"start", "end"} | ||||
|  | ||||
|     def set_start(self, v: int): | ||||
|         self._set_attr("start", str(v)) | ||||
|  | ||||
|     def get_start(self): | ||||
|         try: | ||||
|             return int(self._get_attr("start")) | ||||
|         except ValueError: | ||||
|             return 0 | ||||
|  | ||||
|     def set_end(self, v: int): | ||||
|         self._set_attr("end", str(v)) | ||||
|  | ||||
|     def get_end(self): | ||||
|         try: | ||||
|             return int(self._get_attr("end")) | ||||
|         except ValueError: | ||||
|             return 0 | ||||
|  | ||||
|  | ||||
| def register_plugins(): | ||||
|     register_stanza_plugin(Message, Reply) | ||||
|     register_stanza_plugin(Message, FeatureFallBack) | ||||
|     register_stanza_plugin(FeatureFallBack, FallBackBody) | ||||
|   | ||||
| @@ -10,5 +10,5 @@ from slixmpp.xmlstream.tostring import tostring, highlight | ||||
| from slixmpp.xmlstream.xmlstream import XMLStream, RESPONSE_TIMEOUT | ||||
|  | ||||
| __all__ = ['JID', 'StanzaBase', 'ElementBase', | ||||
|            'ET', 'tostring', 'highlight', 'XMLStream', | ||||
|            'RESPONSE_TIMEOUT', 'register_stanza_plugin'] | ||||
|            'ET', 'StateMachine', 'tostring', 'highlight', 'XMLStream', | ||||
|            'RESPONSE_TIMEOUT'] | ||||
|   | ||||
| @@ -1243,7 +1243,7 @@ class ElementBase(object): | ||||
|                 self.init_plugin(item.__class__.plugin_multi_attrib) | ||||
|         else: | ||||
|             self.iterables.append(item) | ||||
|         item.parent = weakref.ref(self) | ||||
|  | ||||
|         return self | ||||
|  | ||||
|     def appendxml(self, xml: ET.Element) -> ElementBase: | ||||
|   | ||||
| @@ -1,149 +0,0 @@ | ||||
| import unittest | ||||
|  | ||||
| from slixmpp import Message | ||||
| from slixmpp.test import SlixTest | ||||
| from slixmpp.plugins.xep_0428 import stanza | ||||
|  | ||||
| from slixmpp.plugins import xep_0461 | ||||
| from slixmpp.plugins import xep_0444 | ||||
|  | ||||
|  | ||||
| class TestFallback(SlixTest): | ||||
|     def setUp(self): | ||||
|         stanza.register_plugins() | ||||
|  | ||||
|     def testSingleFallbackBody(self): | ||||
|         message = Message() | ||||
|         message["fallback"]["for"] = "ns" | ||||
|         message["fallback"]["body"]["start"] = 0 | ||||
|         message["fallback"]["body"]["end"] = 8 | ||||
|  | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns'> | ||||
|                 <body start="0" end="8" /> | ||||
|               </fallback> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|     def testSingleFallbackSubject(self): | ||||
|         message = Message() | ||||
|         message["fallback"]["for"] = "ns" | ||||
|         message["fallback"]["subject"]["start"] = 0 | ||||
|         message["fallback"]["subject"]["end"] = 8 | ||||
|  | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns'> | ||||
|                 <subject start="0" end="8" /> | ||||
|               </fallback> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|     def testSingleFallbackWholeBody(self): | ||||
|         message = Message() | ||||
|         message["fallback"]["for"] = "ns" | ||||
|         message["fallback"].enable("body") | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns'> | ||||
|                 <body /> | ||||
|               </fallback> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|     def testMultiFallback(self): | ||||
|         message = Message() | ||||
|  | ||||
|         f1 = stanza.Fallback() | ||||
|         f1["for"] = "ns1" | ||||
|  | ||||
|         f2 = stanza.Fallback() | ||||
|         f2["for"] = "ns2" | ||||
|  | ||||
|         message.append(f1) | ||||
|         message.append(f2) | ||||
|  | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns1' /> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns2' /> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|         for i, fallback in enumerate(message["fallbacks"], start=1): | ||||
|             self.assertEqual(fallback["for"], f"ns{i}") | ||||
|  | ||||
|     def testStripFallbackPartOfBody(self): | ||||
|         message = Message() | ||||
|         message["body"] = "> quoted\nsome-body" | ||||
|         message["fallback"]["for"] = xep_0461.stanza.NS | ||||
|         message["fallback"]["body"]["start"] = 0 | ||||
|         message["fallback"]["body"]["end"] = 9 | ||||
|  | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <body>> quoted\nsome-body</body> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='urn:xmpp:reply:0'> | ||||
|                 <body start="0" end="9" /> | ||||
|               </fallback> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual( | ||||
|             message["fallback"].get_stripped_body(xep_0461.stanza.NS), "some-body" | ||||
|         ) | ||||
|  | ||||
|     def testStripWholeBody(self): | ||||
|         message = Message() | ||||
|         message["body"] = "> quoted\nsome-body" | ||||
|         message["fallback"]["for"] = "ns" | ||||
|         message["fallback"].enable("body") | ||||
|  | ||||
|         self.check( | ||||
|             message,  # language=XML | ||||
|             """ | ||||
|             <message> | ||||
|               <body>> quoted\nsome-body</body> | ||||
|               <fallback xmlns='urn:xmpp:fallback:0' for='ns'> | ||||
|                 <body /> | ||||
|               </fallback> | ||||
|             </message> | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|         self.assertEqual(message["fallback"].get_stripped_body("ns"), "") | ||||
|  | ||||
|     def testStripMultiFallback(self): | ||||
|         message = Message() | ||||
|         message["body"] = "> huuuuu\n👍" | ||||
|  | ||||
|         message["fallback"]["for"] = xep_0461.stanza.NS | ||||
|         message["fallback"]["body"]["start"] = 0 | ||||
|         message["fallback"]["body"]["end"] = 9 | ||||
|  | ||||
|         reaction_fallback = stanza.Fallback() | ||||
|         reaction_fallback["for"] = xep_0444.stanza.NS | ||||
|         reaction_fallback.enable("body") | ||||
|         message.append(reaction_fallback) | ||||
|  | ||||
|         self.assertEqual(message["fallback"].get_stripped_body(xep_0461.stanza.NS), "👍") | ||||
|         self.assertEqual(message["fallback"].get_stripped_body(xep_0444.stanza.NS), "") | ||||
|  | ||||
|  | ||||
| suite = unittest.TestLoader().loadTestsFromTestCase(TestFallback) | ||||
| @@ -1,13 +1,11 @@ | ||||
| import unittest | ||||
| from slixmpp import Message | ||||
| from slixmpp.test import SlixTest | ||||
| from slixmpp.plugins.xep_0428 import stanza as fallback_stanza | ||||
| from slixmpp.plugins.xep_0461 import stanza | ||||
|  | ||||
|  | ||||
| class TestReply(SlixTest): | ||||
|     def setUp(self): | ||||
|         fallback_stanza.register_plugins() | ||||
|         stanza.register_plugins() | ||||
|  | ||||
|     def testReply(self): | ||||
| @@ -28,9 +26,9 @@ class TestReply(SlixTest): | ||||
|     def testFallback(self): | ||||
|         message = Message() | ||||
|         message["body"] = "12345\nrealbody" | ||||
|         message["fallback"]["for"] = "NS" | ||||
|         message["fallback"]["body"]["start"] = 0 | ||||
|         message["fallback"]["body"]["end"] = 6 | ||||
|         message["feature_fallback"]["for"] = "NS" | ||||
|         message["feature_fallback"]["fallback_body"]["start"] = 0 | ||||
|         message["feature_fallback"]["fallback_body"]["end"] = 6 | ||||
|  | ||||
|         self.check( | ||||
|             message, | ||||
| @@ -44,18 +42,18 @@ class TestReply(SlixTest): | ||||
|             """, | ||||
|         ) | ||||
|  | ||||
|         assert message["fallback"].get_stripped_body("NS") == "realbody" | ||||
|         assert message["feature_fallback"].get_stripped_body() == "realbody" | ||||
|  | ||||
|     def testAddFallBackHelper(self): | ||||
|         msg = Message() | ||||
|         msg["body"] = "Great" | ||||
|         msg["reply"].add_quoted_fallback("Anna wrote:\nHi, how are you?") | ||||
|         msg["feature_fallback"].add_quoted_fallback("Anna wrote:\nHi, how are you?") | ||||
|         # ugly dedent but the test does not pass without it | ||||
|         self.check( | ||||
|             msg,  # language=XML | ||||
|             msg, | ||||
|             """ | ||||
|         <message xmlns="jabber:client" type="normal"> | ||||
|             <body>> Anna wrote:\n> Hi, how are you?\nGreat</body> | ||||
|             <reply xmlns="urn:xmpp:reply:0" /> | ||||
|             <fallback xmlns="urn:xmpp:fallback:0" for="urn:xmpp:reply:0"> | ||||
|                 <body start='0' end='33' /> | ||||
|             </fallback> | ||||
| @@ -69,8 +67,8 @@ class TestReply(SlixTest): | ||||
|  | ||||
|         msg = Message() | ||||
|         msg["body"] = "Great" | ||||
|         msg["reply"].add_quoted_fallback(body) | ||||
|         body2 = msg["reply"].get_fallback_body() | ||||
|         msg["feature_fallback"].add_quoted_fallback(body) | ||||
|         body2 = msg["feature_fallback"].get_fallback_body() | ||||
|         self.assertTrue(body2 == quoted, body2) | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -9,8 +9,8 @@ class TestReply(SlixTest): | ||||
|  | ||||
|     def testFallBackBody(self): | ||||
|         async def on_reply(msg): | ||||
|             start = msg["fallback"]["body"]["start"] | ||||
|             end = msg["fallback"]["body"]["end"] | ||||
|             start = msg["feature_fallback"]["fallback_body"]["start"] | ||||
|             end = msg["feature_fallback"]["fallback_body"]["end"] | ||||
|             self.xmpp["xep_0461"].send_reply( | ||||
|                 reply_to=msg.get_from(), | ||||
|                 reply_id=msg.get_id(), | ||||
|   | ||||
		Reference in New Issue
	
	Block a user