Compare commits

...

15 Commits

Author SHA1 Message Date
mathieui
24f35e433f slixmpp 1.2.4 release 2017-01-30 23:02:45 +01:00
mathieui
22664ee7b8 Fix carbons 2017-01-28 00:02:27 +01:00
Clint Olson
6476cfcde5 Remove unused import caught by Codacy. 2017-01-23 23:58:53 -08:00
Clint Olson
5bb347e884 Fix partially-merged Google plugin from acc52fd935. 2017-01-23 23:51:59 -08:00
Emmanuel Gil Peyrot
eb1251b919 Fix a typo in the title of the MUC documentation. 2017-01-08 17:38:11 +00:00
Emmanuel Gil Peyrot
820144c40c Add missing asyncio.coroutine decorators. 2016-12-30 13:41:15 +01:00
Emmanuel Gil Peyrot
6034df0a78 Check for XML parsing errors and disconnect in that case. 2016-12-29 18:59:09 +01:00
Emmanuel Gil Peyrot
df4012e66d XMLStream: Break a long line to make it more readable. 2016-12-29 18:41:09 +01:00
Emmanuel Gil Peyrot
c372f3071a Examples: Use argparse for http_over_xmpp. 2016-12-29 18:34:37 +01:00
Emmanuel Gil Peyrot
829c8b27b6 Test more things before trying to build our stringprep module. 2016-12-25 13:28:51 +01:00
mathieui
fb3ac78bf9 slixmpp 1.2.3 2016-12-07 21:47:54 +01:00
mathieui
ffd9436e5c Fix roster push origin detection and tests 2016-12-07 19:06:25 +01:00
louiz’
bbb1344d79 Add very basic gitlab-ci.yml file 2016-12-05 00:13:13 +01:00
Emmanuel Gil Peyrot
457785b286 XEP-0380: Add a helper to test for the presence of an EME tag. 2016-11-26 16:41:48 +00:00
Emmanuel Gil Peyrot
4847f834bd Add a plugin for XEP-0380: Explicit Message Encryption. 2016-11-26 16:29:19 +00:00
22 changed files with 659 additions and 53 deletions

8
.gitlab-ci.yml Normal file
View File

@@ -0,0 +1,8 @@
test:
tags:
- docker
image: ubuntu:latest
script:
- apt update
- apt install -y python3 cython3
- ./run_tests.py

View File

@@ -1,7 +1,7 @@
.. _mucbot:
=========================
Mulit-User Chat (MUC) Bot
Multi-User Chat (MUC) Bot
=========================
.. note::

View File

@@ -13,7 +13,7 @@
from slixmpp import ClientXMPP
from optparse import OptionParser
from argparse import ArgumentParser
import logging
import getpass
@@ -58,40 +58,40 @@ if __name__ == '__main__':
# ./http_over_xmpp.py -J <jid> -P <pwd> -i <ip> -p <port> [-v]
#
parser = OptionParser()
parser = ArgumentParser()
# Output verbosity options.
parser.add_option(
parser.add_argument(
'-v', '--verbose', help='set logging to DEBUG', action='store_const',
dest='loglevel', const=logging.DEBUG, default=logging.ERROR
)
# JID and password options.
parser.add_option('-J', '--jid', dest='jid', help='JID')
parser.add_option('-P', '--password', dest='password', help='Password')
parser.add_argument('-J', '--jid', dest='jid', help='JID')
parser.add_argument('-P', '--password', dest='password', help='Password')
# XMPP server ip and port options.
parser.add_option(
parser.add_argument(
'-i', '--ipaddr', dest='ipaddr',
help='IP Address of the XMPP server', default=None
)
parser.add_option(
parser.add_argument(
'-p', '--port', dest='port',
help='Port of the XMPP server', default=None
)
opts, args = parser.parse_args()
args = parser.parse_args()
# Setup logging.
logging.basicConfig(level=opts.loglevel,
logging.basicConfig(level=args.loglevel,
format='%(levelname)-8s %(message)s')
if opts.jid is None:
opts.jid = input('Username: ')
if opts.password is None:
opts.password = getpass.getpass('Password: ')
if args.jid is None:
args.jid = input('Username: ')
if args.password is None:
args.password = getpass.getpass('Password: ')
xmpp = HTTPOverXMPPClient(opts.jid, opts.password)
xmpp = HTTPOverXMPPClient(args.jid, args.password)
xmpp.connect()
xmpp.process()

View File

@@ -9,7 +9,7 @@
import os
from pathlib import Path
from subprocess import call, DEVNULL
from subprocess import call, DEVNULL, check_output, CalledProcessError
from tempfile import TemporaryFile
try:
from setuptools import setup
@@ -35,18 +35,31 @@ CLASSIFIERS = [
packages = [str(mod.parent) for mod in Path('slixmpp').rglob('__init__.py')]
def check_include(header):
command = [os.environ.get('CC', 'cc'), '-E', '-']
def check_include(library_name, header):
command = [os.environ.get('PKG_CONFIG', 'pkg-config'), '--cflags', library_name]
try:
cflags = check_output(command).decode('utf-8').split()
except FileNotFoundError:
print('pkg-config not found.')
return False
except CalledProcessError:
# pkg-config already prints the missing libraries on stderr.
return False
command = [os.environ.get('CC', 'cc')] + cflags + ['-E', '-']
with TemporaryFile('w+') as c_file:
c_file.write('#include <%s>' % header)
c_file.seek(0)
try:
return call(command, stdin=c_file, stdout=DEVNULL, stderr=DEVNULL) == 0
except FileNotFoundError:
print('%s headers not found.' % library_name)
return False
HAS_PYTHON_HEADERS = check_include('python3', 'Python.h')
HAS_STRINGPREP_HEADERS = check_include('libidn', 'stringprep.h')
ext_modules = None
if check_include('stringprep.h'):
if HAS_PYTHON_HEADERS and HAS_STRINGPREP_HEADERS:
try:
from Cython.Build import cythonize
except ImportError:
@@ -54,7 +67,7 @@ if check_include('stringprep.h'):
else:
ext_modules = cythonize('slixmpp/stringprep.pyx')
else:
print('libidn-dev not found, falling back to the slow stringprep module.')
print('Falling back to the slow stringprep module.')
setup(
name="slixmpp",

View File

@@ -15,6 +15,7 @@
import asyncio
import logging
from slixmpp.jid import JID
from slixmpp.stanza import StreamFeatures
from slixmpp.basexmpp import BaseXMPP
from slixmpp.exceptions import XMPPError
@@ -110,7 +111,13 @@ class ClientXMPP(BaseXMPP):
self._handle_stream_features))
def roster_push_filter(iq):
from_ = iq['from']
if from_ and from_ != self.boundjid.bare:
if from_ and from_ != JID('') and from_ != self.boundjid.bare:
reply = iq.reply()
reply['type'] = 'error'
reply['error']['type'] = 'cancel'
reply['error']['code'] = 503
reply['error']['condition'] = 'service-unavailable'
reply.send()
return
self.event('roster_update', iq)
self.register_handler(

View File

@@ -0,0 +1,47 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.base import register_plugin, BasePlugin
from slixmpp.plugins.google.gmail import Gmail
from slixmpp.plugins.google.auth import GoogleAuth
from slixmpp.plugins.google.settings import GoogleSettings
from slixmpp.plugins.google.nosave import GoogleNoSave
class Google(BasePlugin):
"""
Google: Custom GTalk Features
Also see: <https://developers.google.com/talk/jep_extensions/extensions>
"""
name = 'google'
description = 'Google: Custom GTalk Features'
dependencies = set([
'gmail',
'google_settings',
'google_nosave',
'google_auth'
])
def __getitem__(self, attr):
if attr in ('settings', 'nosave', 'auth'):
return self.xmpp['google_%s' % attr]
elif attr == 'gmail':
return self.xmpp['gmail']
else:
raise KeyError(attr)
register_plugin(Gmail)
register_plugin(GoogleAuth)
register_plugin(GoogleSettings)
register_plugin(GoogleNoSave)
register_plugin(Google)

View File

@@ -0,0 +1,10 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.google.auth import stanza
from slixmpp.plugins.google.auth.auth import GoogleAuth

View File

@@ -0,0 +1,47 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.google.auth import stanza
class GoogleAuth(BasePlugin):
"""
Google: Auth Extensions (JID Domain Discovery, OAuth2)
Also see:
<https://developers.google.com/talk/jep_extensions/jid_domain_change>
<https://developers.google.com/talk/jep_extensions/oauth>
"""
name = 'google_auth'
description = 'Google: Auth Extensions (JID Domain Discovery, OAuth2)'
dependencies = set(['feature_mechanisms'])
stanza = stanza
def plugin_init(self):
self.xmpp.namespace_map['http://www.google.com/talk/protocol/auth'] = 'ga'
register_stanza_plugin(self.xmpp['feature_mechanisms'].stanza.Auth,
stanza.GoogleAuth)
self.xmpp.add_filter('out', self._auth)
def plugin_end(self):
self.xmpp.del_filter('out', self._auth)
def _auth(self, stanza):
if isinstance(stanza, self.xmpp['feature_mechanisms'].stanza.Auth):
stanza.stream = self.xmpp
stanza['google']['client_uses_full_bind_result'] = True
if stanza['mechanism'] == 'X-OAUTH2':
stanza['google']['service'] = 'oauth2'
print(stanza)
return stanza

View File

@@ -0,0 +1,10 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.google.gmail import stanza
from slixmpp.plugins.google.gmail.notifications import Gmail

View File

@@ -0,0 +1,101 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.xmlstream import ElementBase, register_stanza_plugin
class GmailQuery(ElementBase):
namespace = 'google:mail:notify'
name = 'query'
plugin_attrib = 'gmail'
interfaces = set(['newer_than_time', 'newer_than_tid', 'search'])
def get_search(self):
return self._get_attr('q', '')
def set_search(self, search):
self._set_attr('q', search)
def del_search(self):
self._del_attr('q')
def get_newer_than_time(self):
return self._get_attr('newer-than-time', '')
def set_newer_than_time(self, value):
self._set_attr('newer-than-time', value)
def del_newer_than_time(self):
self._del_attr('newer-than-time')
def get_newer_than_tid(self):
return self._get_attr('newer-than-tid', '')
def set_newer_than_tid(self, value):
self._set_attr('newer-than-tid', value)
def del_newer_than_tid(self):
self._del_attr('newer-than-tid')
class MailBox(ElementBase):
namespace = 'google:mail:notify'
name = 'mailbox'
plugin_attrib = 'gmail_messages'
interfaces = set(['result_time', 'url', 'matched', 'estimate'])
def get_matched(self):
return self._get_attr('total-matched', '')
def get_estimate(self):
return self._get_attr('total-estimate', '') == '1'
def get_result_time(self):
return self._get_attr('result-time', '')
class MailThread(ElementBase):
namespace = 'google:mail:notify'
name = 'mail-thread-info'
plugin_attrib = 'thread'
plugin_multi_attrib = 'threads'
interfaces = set(['tid', 'participation', 'messages', 'date',
'senders', 'url', 'labels', 'subject', 'snippet'])
sub_interfaces = set(['labels', 'subject', 'snippet'])
def get_senders(self):
result = []
senders = self.xml.findall('{%s}senders/{%s}sender' % (
self.namespace, self.namespace))
for sender in senders:
result.append(MailSender(xml=sender))
return result
class MailSender(ElementBase):
namespace = 'google:mail:notify'
name = 'sender'
plugin_attrib = name
interfaces = set(['address', 'name', 'originator', 'unread'])
def get_originator(self):
return self.xml.attrib.get('originator', '0') == '1'
def get_unread(self):
return self.xml.attrib.get('unread', '0') == '1'
class NewMail(ElementBase):
namespace = 'google:mail:notify'
name = 'new-mail'
plugin_attrib = 'gmail_notification'
register_stanza_plugin(MailBox, MailThread, iterable=True)

View File

@@ -0,0 +1,10 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.google.nosave import stanza
from slixmpp.plugins.google.nosave.nosave import GoogleNoSave

View File

@@ -0,0 +1,78 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.stanza import Iq, Message
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.google.nosave import stanza
class GoogleNoSave(BasePlugin):
"""
Google: Off the Record Chats
NOTE: This is NOT an encryption method.
Also see <https://developers.google.com/talk/jep_extensions/otr>.
"""
name = 'google_nosave'
description = 'Google: Off the Record Chats'
dependencies = set(['google_settings'])
stanza = stanza
def plugin_init(self):
register_stanza_plugin(Message, stanza.NoSave)
register_stanza_plugin(Iq, stanza.NoSaveQuery)
self.xmpp.register_handler(
Callback('Google Nosave',
StanzaPath('iq@type=set/google_nosave'),
self._handle_nosave_change))
def plugin_end(self):
self.xmpp.remove_handler('Google Nosave')
def enable(self, jid=None, timeout=None, callback=None):
if jid is None:
self.xmpp['google_settings'].update({'archiving_enabled': False},
timeout=timeout, callback=callback)
else:
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['google_nosave']['item']['jid'] = jid
iq['google_nosave']['item']['value'] = True
return iq.send(timeout=timeout, callback=callback)
def disable(self, jid=None, timeout=None, callback=None):
if jid is None:
self.xmpp['google_settings'].update({'archiving_enabled': True},
timeout=timeout, callback=callback)
else:
iq = self.xmpp.Iq()
iq['type'] = 'set'
iq['google_nosave']['item']['jid'] = jid
iq['google_nosave']['item']['value'] = False
return iq.send(timeout=timeout, callback=callback)
def get(self, timeout=None, callback=None):
iq = self.xmpp.Iq()
iq['type'] = 'get'
iq.enable('google_nosave')
return iq.send(timeout=timeout, callback=callback)
def _handle_nosave_change(self, iq):
reply = self.xmpp.Iq()
reply['type'] = 'result'
reply['id'] = iq['id']
reply['to'] = iq['from']
reply.send()
self.xmpp.event('google_nosave_change', iq)

View File

@@ -0,0 +1,10 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.google.settings import stanza
from slixmpp.plugins.google.settings.settings import GoogleSettings

View File

@@ -0,0 +1,110 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
This file is part of slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.xmlstream import ET, ElementBase
class UserSettings(ElementBase):
name = 'usersetting'
namespace = 'google:setting'
plugin_attrib = 'google_settings'
interfaces = set(['auto_accept_suggestions',
'mail_notifications',
'archiving_enabled',
'gmail',
'email_verified',
'domain_privacy_notice',
'display_name'])
def _get_setting(self, setting):
xml = self.xml.find('{%s}%s' % (self.namespace, setting))
if xml is not None:
return xml.attrib.get('value', '') == 'true'
return False
def _set_setting(self, setting, value):
self._del_setting(setting)
if value in (True, False):
xml = ET.Element('{%s}%s' % (self.namespace, setting))
xml.attrib['value'] = 'true' if value else 'false'
self.xml.append(xml)
def _del_setting(self, setting):
xml = self.xml.find('{%s}%s' % (self.namespace, setting))
if xml is not None:
self.xml.remove(xml)
def get_display_name(self):
xml = self.xml.find('{%s}%s' % (self.namespace, 'displayname'))
if xml is not None:
return xml.attrib.get('value', '')
return ''
def set_display_name(self, value):
self._del_setting(setting)
if value:
xml = ET.Element('{%s}%s' % (self.namespace, 'displayname'))
xml.attrib['value'] = value
self.xml.append(xml)
def del_display_name(self):
self._del_setting('displayname')
def get_auto_accept_suggestions(self):
return self._get_setting('autoacceptsuggestions')
def get_mail_notifications(self):
return self._get_setting('mailnotifications')
def get_archiving_enabled(self):
return self._get_setting('archivingenabled')
def get_gmail(self):
return self._get_setting('gmail')
def get_email_verified(self):
return self._get_setting('emailverified')
def get_domain_privacy_notice(self):
return self._get_setting('domainprivacynotice')
def set_auto_accept_suggestions(self, value):
self._set_setting('autoacceptsuggestions', value)
def set_mail_notifications(self, value):
self._set_setting('mailnotifications', value)
def set_archiving_enabled(self, value):
self._set_setting('archivingenabled', value)
def set_gmail(self, value):
self._set_setting('gmail', value)
def set_email_verified(self, value):
self._set_setting('emailverified', value)
def set_domain_privacy_notice(self, value):
self._set_setting('domainprivacynotice', value)
def del_auto_accept_suggestions(self):
self._del_setting('autoacceptsuggestions')
def del_mail_notifications(self):
self._del_setting('mailnotifications')
def del_archiving_enabled(self):
self._del_setting('archivingenabled')
def del_gmail(self):
self._del_setting('gmail')
def del_email_verified(self):
self._del_setting('emailverified')
def del_domain_privacy_notice(self):
self._del_setting('domainprivacynotice')

View File

@@ -55,6 +55,7 @@ class XEP_0065(BasePlugin):
"""Returns the socket associated to the SID."""
return self._sessions.get(sid, None)
@asyncio.coroutine
def handshake(self, to, ifrom=None, sid=None, timeout=None):
""" Starts the handshake to establish the socks5 bytestreams
connection.
@@ -104,6 +105,7 @@ class XEP_0065(BasePlugin):
iq['socks'].add_streamhost(proxy, host, port)
return iq.send(timeout=timeout, callback=callback)
@asyncio.coroutine
def discover_proxies(self, jid=None, ifrom=None, timeout=None):
"""Auto-discover the JIDs of SOCKS5 proxies on an XMPP server."""
if jid is None:

View File

@@ -61,10 +61,12 @@ class XEP_0280(BasePlugin):
self.xmpp.plugin['xep_0030'].add_feature('urn:xmpp:carbons:2')
def _handle_carbon_received(self, msg):
self.xmpp.event('carbon_received', msg)
if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_received', msg)
def _handle_carbon_sent(self, msg):
self.xmpp.event('carbon_sent', msg)
if msg['from'].bare == self.xmpp.boundjid.bare:
self.xmpp.event('carbon_sent', msg)
def enable(self, ifrom=None, timeout=None, callback=None,
timeout_callback=None):

View File

@@ -0,0 +1,15 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2016 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.plugins.base import register_plugin
from slixmpp.plugins.xep_0380.stanza import Encryption
from slixmpp.plugins.xep_0380.eme import XEP_0380
register_plugin(XEP_0380)

View File

@@ -0,0 +1,69 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2016 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
import logging
import slixmpp
from slixmpp.stanza import Message
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin, ElementBase, ET
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0380 import stanza, Encryption
log = logging.getLogger(__name__)
class XEP_0380(BasePlugin):
"""
XEP-0380: Explicit Message Encryption
"""
name = 'xep_0380'
description = 'XEP-0380: Explicit Message Encryption'
dependencies = {'xep_0030'}
default_config = {
'template': 'This message is encrypted with {name} ({namespace})',
}
mechanisms = {
'jabber:x:encrypted': 'Legacy OpenPGP',
'urn:xmpp:ox:0': 'OpenPGP for XMPP',
'urn:xmpp:otr:0': 'OTR',
'eu.siacs.conversations.axolotl': 'Legacy OMEMO',
'urn:xmpp:omemo:0': 'OMEMO',
}
def plugin_init(self):
self.xmpp.register_handler(
Callback('Explicit Message Encryption',
StanzaPath('message/eme'),
self._handle_eme))
register_stanza_plugin(Message, Encryption)
def plugin_end(self):
self.xmpp.remove_handler('Chat State')
def session_bind(self, jid):
self.xmpp.plugin['xep_0030'].add_feature(Encryption.namespace)
def has_eme(self, msg):
return msg.xml.find('{%s}encryption' % Encryption.namespace) is not None
def replace_body_with_eme(self, msg):
eme = msg['eme']
namespace = eme['namespace']
name = self.mechanisms[namespace] if namespace in self.mechanisms else eme['name']
body = self.config['template'].format(name=name, namespace=namespace)
msg['body'] = body
def _handle_eme(self, msg):
self.xmpp.event('message_encryption', msg)

View File

@@ -0,0 +1,16 @@
"""
Slixmpp: The Slick XMPP Library
Copyright (C) 2016 Emmanuel Gil Peyrot <linkmauve@linkmauve.fr>
This file is part of Slixmpp.
See the file LICENSE for copying permission.
"""
from slixmpp.xmlstream import ElementBase
class Encryption(ElementBase):
name = 'encryption'
namespace = 'urn:xmpp:eme:0'
plugin_attrib = 'eme'
interfaces = {'namespace', 'name'}

View File

@@ -9,5 +9,5 @@
# We don't want to have to import the entire library
# just to get the version info for setup.py
__version__ = '1.2.2'
__version_info__ = (1, 2, 2)
__version__ = '1.2.4'
__version_info__ = (1, 2, 4)

View File

@@ -19,7 +19,7 @@ import ssl
import weakref
import uuid
import xml.etree.ElementTree
import xml.etree.ElementTree as ET
from slixmpp.xmlstream.asyncio import asyncio
from slixmpp.xmlstream import tostring, highlight
@@ -339,7 +339,7 @@ class XMLStream(asyncio.BaseProtocol):
"""
self.xml_depth = 0
self.xml_root = None
self.parser = xml.etree.ElementTree.XMLPullParser(("start", "end"))
self.parser = ET.XMLPullParser(("start", "end"))
def connection_made(self, transport):
"""Called when the TCP connection has been established with the server
@@ -359,32 +359,46 @@ class XMLStream(asyncio.BaseProtocol):
the stream is opened, etc).
"""
self.parser.feed(data)
for event, xml in self.parser.read_events():
if event == 'start':
if self.xml_depth == 0:
# We have received the start of the root element.
self.xml_root = xml
log.debug('RECV: %s', highlight(tostring(self.xml_root, xmlns=self.default_ns,
stream=self,
top_level=True,
open_only=True)))
self.start_stream_handler(self.xml_root)
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
if self.xml_depth == 0:
# The stream's root element has closed,
# terminating the stream.
log.debug("End of stream received")
self.abort()
elif self.xml_depth == 1:
# A stanza is an XML element that is a direct child of
# the root element, hence the check of depth == 1
self._spawn_event(xml)
if self.xml_root is not None:
# Keep the root element empty of children to
# save on memory use.
self.xml_root.clear()
try:
for event, xml in self.parser.read_events():
if event == 'start':
if self.xml_depth == 0:
# We have received the start of the root element.
self.xml_root = xml
log.debug('RECV: %s',
highlight(tostring(self.xml_root,
xmlns=self.default_ns,
stream=self,
top_level=True,
open_only=True)))
self.start_stream_handler(self.xml_root)
self.xml_depth += 1
if event == 'end':
self.xml_depth -= 1
if self.xml_depth == 0:
# The stream's root element has closed,
# terminating the stream.
log.debug("End of stream received")
self.abort()
elif self.xml_depth == 1:
# A stanza is an XML element that is a direct child of
# the root element, hence the check of depth == 1
self._spawn_event(xml)
if self.xml_root is not None:
# Keep the root element empty of children to
# save on memory use.
self.xml_root.clear()
except ET.ParseError:
log.error('Parse error: %r', data)
# Due to cyclic dependencies, this cant be imported at the module
# level.
from slixmpp.stanza.stream_error import StreamError
error = StreamError()
error['condition'] = 'not-well-formed'
error['text'] = 'Server sent: %r' % data
self.send(error)
self.disconnect()
def is_connected(self):
return self.transport is not None

View File

@@ -0,0 +1,37 @@
import unittest
from slixmpp import Message
from slixmpp.test import SlixTest
import slixmpp.plugins.xep_0380 as xep_0380
from slixmpp.xmlstream import register_stanza_plugin
class TestEME(SlixTest):
def setUp(self):
register_stanza_plugin(Message, xep_0380.stanza.Encryption)
def testCreateEME(self):
"""Testing creating EME."""
xmlstring = """
<message>
<encryption xmlns="urn:xmpp:eme:0" namespace="%s"%s />
</message>
"""
msg = self.Message()
self.check(msg, "<message />")
msg['eme']['namespace'] = 'urn:xmpp:otr:0'
self.check(msg, xmlstring % ('urn:xmpp:otr:0', ''))
msg['eme']['namespace'] = 'urn:xmpp:openpgp:0'
self.check(msg, xmlstring % ('urn:xmpp:openpgp:0', ''))
msg['eme']['name'] = 'OX'
self.check(msg, xmlstring % ('urn:xmpp:openpgp:0', ' name="OX"'))
del msg['eme']
self.check(msg, "<message />")
suite = unittest.TestLoader().loadTestsFromTestCase(TestEME)