155 lines
5.3 KiB
Python
155 lines
5.3 KiB
Python
"""
|
|
Slixmpp: The Slick XMPP Library
|
|
Copyright (C) 2012 Nathanael C. Fritz, Lance J.T. Stout
|
|
This file is part of Slixmpp.
|
|
|
|
See the file LICENSE for copying permission
|
|
"""
|
|
|
|
import logging
|
|
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Callable, Optional, Awaitable
|
|
|
|
from slixmpp import JID
|
|
from slixmpp.stanza import Message, Iq
|
|
from slixmpp.xmlstream.handler import Collector
|
|
from slixmpp.xmlstream.matcher import MatchXMLMask
|
|
from slixmpp.xmlstream import register_stanza_plugin
|
|
from slixmpp.plugins import BasePlugin
|
|
from slixmpp.plugins.xep_0313 import stanza
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class XEP_0313(BasePlugin):
|
|
|
|
"""
|
|
XEP-0313 Message Archive Management
|
|
"""
|
|
|
|
name = 'xep_0313'
|
|
description = 'XEP-0313: Message Archive Management'
|
|
dependencies = {'xep_0030', 'xep_0050', 'xep_0059', 'xep_0297'}
|
|
stanza = stanza
|
|
|
|
def plugin_init(self):
|
|
register_stanza_plugin(Iq, stanza.MAM)
|
|
register_stanza_plugin(Iq, stanza.Preferences)
|
|
register_stanza_plugin(Message, stanza.Result)
|
|
register_stanza_plugin(Iq, stanza.Fin)
|
|
register_stanza_plugin(stanza.Result, self.xmpp['xep_0297'].stanza.Forwarded)
|
|
register_stanza_plugin(stanza.MAM, self.xmpp['xep_0059'].stanza.Set)
|
|
register_stanza_plugin(stanza.Fin, self.xmpp['xep_0059'].stanza.Set)
|
|
|
|
def retrieve(
|
|
self,
|
|
jid: Optional[JID] = None,
|
|
start: Optional[datetime] = None,
|
|
end: Optional[datetime] = None,
|
|
with_jid: Optional[JID] = None,
|
|
ifrom: Optional[JID] = None,
|
|
reverse: bool = False,
|
|
timeout: int = None,
|
|
callback: Callable[[Iq], None] = None,
|
|
iterator: bool = False,
|
|
rsm: Optional[Dict[str, Any]] = None
|
|
) -> Awaitable:
|
|
"""
|
|
Send a MAM query and retrieve the results.
|
|
|
|
:param JID jid: Entity holding the MAM records
|
|
:param datetime start,end: MAM query temporal boundaries
|
|
:param JID with_jid: Filter results on this JID
|
|
:param JID ifrom: To change the from address of the query
|
|
:param bool reverse: Get the results in reverse order
|
|
:param int timeout: IQ timeout
|
|
:param func callback: Custom callback for handling results
|
|
:param bool iterator: Use RSM and iterate over a paginated query
|
|
:param dict rsm: RSM custom options
|
|
"""
|
|
iq = self.xmpp.Iq()
|
|
query_id = iq['id']
|
|
|
|
iq['to'] = jid
|
|
iq['from'] = ifrom
|
|
iq['type'] = 'set'
|
|
iq['mam']['queryid'] = query_id
|
|
iq['mam']['start'] = start
|
|
iq['mam']['end'] = end
|
|
iq['mam']['with'] = with_jid
|
|
amount = 10
|
|
if rsm:
|
|
for key, value in rsm.items():
|
|
iq['mam']['rsm'][key] = str(value)
|
|
if key == 'max':
|
|
amount = value
|
|
cb_data = {}
|
|
|
|
stanza_mask = self.xmpp.Message()
|
|
stanza_mask.xml.remove(stanza_mask.xml.find('{urn:xmpp:sid:0}origin-id'))
|
|
del stanza_mask['id']
|
|
del stanza_mask['lang']
|
|
stanza_mask['from'] = jid
|
|
stanza_mask['mam_result']['queryid'] = query_id
|
|
xml_mask = str(stanza_mask)
|
|
|
|
def pre_cb(query: Iq) -> None:
|
|
stanza_mask['mam_result']['queryid'] = query['id']
|
|
xml_mask = str(stanza_mask)
|
|
query['mam']['queryid'] = query['id']
|
|
collector = Collector(
|
|
'MAM_Results_%s' % query_id,
|
|
MatchXMLMask(xml_mask))
|
|
self.xmpp.register_handler(collector)
|
|
cb_data['collector'] = collector
|
|
|
|
def post_cb(result: Iq) -> None:
|
|
results = cb_data['collector'].stop()
|
|
if result['type'] == 'result':
|
|
result['mam']['results'] = results
|
|
|
|
if iterator:
|
|
return self.xmpp['xep_0059'].iterate(iq, 'mam', 'results', amount=amount,
|
|
reverse=reverse, recv_interface='mam_fin',
|
|
pre_cb=pre_cb, post_cb=post_cb)
|
|
|
|
collector = Collector(
|
|
'MAM_Results_%s' % query_id,
|
|
MatchXMLMask(xml_mask))
|
|
self.xmpp.register_handler(collector)
|
|
|
|
def wrapped_cb(iq: Iq) -> None:
|
|
results = collector.stop()
|
|
if iq['type'] == 'result':
|
|
iq['mam']['results'] = results
|
|
if callback:
|
|
callback(iq)
|
|
|
|
return iq.send(timeout=timeout, callback=wrapped_cb)
|
|
|
|
def get_preferences(self, timeout=None, callback=None):
|
|
iq = self.xmpp.Iq()
|
|
iq['type'] = 'get'
|
|
query_id = iq['id']
|
|
iq['mam_prefs']['query_id'] = query_id
|
|
return iq.send(timeout=timeout, callback=callback)
|
|
|
|
def set_preferences(self, jid=None, default=None, always=None, never=None,
|
|
ifrom=None, timeout=None, callback=None):
|
|
iq = self.xmpp.Iq()
|
|
iq['type'] = 'set'
|
|
iq['to'] = jid
|
|
iq['from'] = ifrom
|
|
iq['mam_prefs']['default'] = default
|
|
iq['mam_prefs']['always'] = always
|
|
iq['mam_prefs']['never'] = never
|
|
return iq.send(timeout=timeout, callback=callback)
|
|
|
|
def get_configuration_commands(self, jid, **kwargs):
|
|
return self.xmpp['xep_0030'].get_items(
|
|
jid=jid,
|
|
node='urn:xmpp:mam#configure',
|
|
**kwargs)
|