Add validation for JIDs.

This commit is contained in:
Lance Stout 2012-07-22 00:16:35 -07:00
parent 01cc0e6def
commit e4e18a416f
8 changed files with 382 additions and 150 deletions

View File

@ -49,6 +49,7 @@ packages = [ 'sleekxmpp',
'sleekxmpp/stanza', 'sleekxmpp/stanza',
'sleekxmpp/test', 'sleekxmpp/test',
'sleekxmpp/roster', 'sleekxmpp/roster',
'sleekxmpp/util',
'sleekxmpp/xmlstream', 'sleekxmpp/xmlstream',
'sleekxmpp/xmlstream/matcher', 'sleekxmpp/xmlstream/matcher',
'sleekxmpp/xmlstream/handler', 'sleekxmpp/xmlstream/handler',

View File

@ -10,6 +10,7 @@ from sleekxmpp.basexmpp import BaseXMPP
from sleekxmpp.clientxmpp import ClientXMPP from sleekxmpp.clientxmpp import ClientXMPP
from sleekxmpp.componentxmpp import ComponentXMPP from sleekxmpp.componentxmpp import ComponentXMPP
from sleekxmpp.stanza import Message, Presence, Iq from sleekxmpp.stanza import Message, Presence, Iq
from sleekxmpp.jid import JID
from sleekxmpp.xmlstream.handler import * from sleekxmpp.xmlstream.handler import *
from sleekxmpp.xmlstream import XMLStream, RestartStream from sleekxmpp.xmlstream import XMLStream, RestartStream
from sleekxmpp.xmlstream.matcher import * from sleekxmpp.xmlstream.matcher import *

262
sleekxmpp/jid.py Normal file
View File

@ -0,0 +1,262 @@
# -*- coding: utf-8 -*-
"""
sleekxmpp.jid
~~~~~~~~~~~~~~~~~~~~~~~
This module allows for working with Jabber IDs (JIDs) by
providing accessors for the various components of a JID.
Part of SleekXMPP: The Sleek XMPP Library
:copyright: (c) 2011 Nathanael C. Fritz
:license: MIT, see LICENSE for more details
"""
from __future__ import unicode_literals
import re
import socket
import stringprep
import encodings.idna
from sleekxmpp.util import stringprep_profiles
ILLEGAL_CHARS = '\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r' + \
'\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19' + \
'\x1a\x1b\x1c\x1d\x1e\x1f' + \
' !"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~\x7f'
JID_PATTERN = "^(?:([^\"&'/:<>@]{1,1023})@)?([^/@]{1,1023})(?:/(.{1,1023}))?$"
nodeprep = stringprep_profiles.create(
nfkc=True,
bidi=True,
mappings=[
stringprep_profiles.b1_mapping,
stringprep_profiles.c12_mapping],
prohibited=[
stringprep.in_table_c11,
stringprep.in_table_c12,
stringprep.in_table_c21,
stringprep.in_table_c22,
stringprep.in_table_c3,
stringprep.in_table_c4,
stringprep.in_table_c5,
stringprep.in_table_c6,
stringprep.in_table_c7,
stringprep.in_table_c8,
stringprep.in_table_c9,
lambda c: c in '\'"&/:<>@'],
unassigned=[stringprep.in_table_a1])
resourceprep = stringprep_profiles.create(
nfkc=True,
bidi=True,
mappings=[stringprep_profiles.b1_mapping],
prohibited=[
stringprep.in_table_c12,
stringprep.in_table_c21,
stringprep.in_table_c22,
stringprep.in_table_c3,
stringprep.in_table_c4,
stringprep.in_table_c5,
stringprep.in_table_c6,
stringprep.in_table_c7,
stringprep.in_table_c8,
stringprep.in_table_c9],
unassigned=[stringprep.in_table_a1])
class InvalidJID(ValueError):
pass
def parse_jid(data):
"""
Parse string data into the node, domain, and resource
components of a JID.
"""
match = re.match(JID_PATTERN, data)
if not match:
raise InvalidJID
(node, domain, resource) = match.groups()
ip_addr = False
try:
socket.inet_aton(domain)
ip_addr = True
except socket.error:
pass
if not ip_addr and hasattr(socket, 'inet_pton'):
try:
socket.inet_pton(socket.AF_INET6, domain.strip('[]'))
ip_addr = True
except socket.error:
pass
if not ip_addr:
domain_parts = []
for label in domain.split('.'):
try:
label = encodings.idna.nameprep(label)
encodings.idna.ToASCII(label)
except UnicodeError:
raise InvalidJID
for char in label:
if char in ILLEGAL_CHARS:
raise InvalidJID
if '-' in (label[0], label[-1]):
raise InvalidJID
domain_parts.append(label)
domain = '.'.join(domain_parts)
try:
if node is not None:
node = nodeprep(node)
if resource is not None:
resource = resourceprep(resource)
except stringprep_profiles.StringPrepError:
raise InvalidJID
return node, domain, resource
class JID(object):
"""
A representation of a Jabber ID, or JID.
Each JID may have three components: a user, a domain, and an optional
resource. For example: user@domain/resource
When a resource is not used, the JID is called a bare JID.
The JID is a full JID otherwise.
**JID Properties:**
:jid: Alias for ``full``.
:full: The string value of the full JID.
:bare: The string value of the bare JID.
:user: The username portion of the JID.
:username: Alias for ``user``.
:local: Alias for ``user``.
:node: Alias for ``user``.
:domain: The domain name portion of the JID.
:server: Alias for ``domain``.
:host: Alias for ``domain``.
:resource: The resource portion of the JID.
:param string jid: A string of the form ``'[user@]domain[/resource]'``.
"""
def __init__(self, jid=None, local=None, domain=None, resource=None):
"""Initialize a new JID"""
self._jid = (None, None, None)
if jid is None or jid == '':
jid = (None, None, None)
elif not isinstance(jid, JID):
jid = parse_jid(jid)
else:
jid = jid._jid
orig_local, orig_domain, orig_resource = jid
self._jid = (local or orig_local or None,
domain or orig_domain or None,
resource or orig_resource or None)
def regenerate(self):
"""Deprecated"""
pass
def reset(self, data):
"""Start fresh from a new JID string.
:param string data: A string of the form ``'[user@]domain[/resource]'``.
"""
self._jid = JID(data)._jid
def __getattr__(self, name):
"""handle getting the jid values, using cache if available.
:param name: one of: user, server, domain, resource,
full, or bare.
"""
if name == 'resource':
return self._jid[2] or ''
elif name in ('user', 'username', 'local', 'node'):
return self._jid[0] or ''
elif name in ('server', 'domain', 'host'):
return self._jid[1] or ''
elif name in ('full', 'jid'):
return str(self)
elif name == 'bare':
return str(JID(local=self._jid[0],
domain=self._jid[1]))
else:
object.__getattr__(self, name)
def __setattr__(self, name, value):
"""handle getting the jid values, using cache if available.
:param name: one of: ``user``, ``username``, ``local``,
``node``, ``server``, ``domain``, ``host``,
``resource``, ``full``, ``jid``, or ``bare``.
:param value: The new string value of the JID component.
"""
if name == 'resource':
self._jid = JID(self, resource=value)._jid
elif name in ('user', 'username', 'local', 'node'):
self._jid = JID(self, local=value)._jid
elif name in ('server', 'domain', 'host'):
self._jid = JID(self, domain=value)._jid
elif name in ('full', 'jid'):
self._jid = JID(value)._jid
elif name == 'bare':
parsed = JID(value)._jid
self._jid = (parsed[0], parsed[1], self._jid[2])
else:
object.__setattr__(self, name, value)
def __str__(self):
"""Use the full JID as the string value."""
result = []
if self._jid[0]:
result.append(self._jid[0])
result.append('@')
if self._jid[1]:
result.append(self._jid[1])
if self._jid[2]:
result.append('/')
result.append(self._jid[2])
return ''.join(result)
def __repr__(self):
return self.__str__()
def __eq__(self, other):
"""
Two JIDs are considered equal if they have the same full JID value.
"""
other = JID(other)
return self._jid == other._jid
def __ne__(self, other):
"""Two JIDs are considered unequal if they are not equal."""
return not self._jid == other._jid
def __hash__(self):
"""Hash a JID based on the string version of its full JID."""
return hash(self.__str__())
def __copy__(self):
"""Generate a duplicate JID."""
return JID(self)

View File

View File

@ -0,0 +1,116 @@
from __future__ import unicode_literals
import sys
import stringprep
import unicodedata
class StringPrepError(UnicodeError):
pass
def to_unicode(data):
if sys.version_info < (3, 0):
return unicode(data)
else:
return str(data)
def b1_mapping(char):
return '' if stringprep.in_table_c12(char) else None
def c12_mapping(char):
return ' ' if stringprep.in_table_c12(char) else None
def map_input(data, tables=None):
"""
Each character in the input stream MUST be checked against
a mapping table.
"""
result = []
for char in data:
replacement = None
for mapping in tables:
replacement = mapping(char)
if replacement is not None:
break
if replacement is None:
replacement = char
result.append(replacement)
return ''.join(result)
def normalize(data, nfkc=True):
"""
A profile can specify one of two options for Unicode normalization:
- no normalization
- Unicode normalization with form KC
"""
if nfkc:
data = unicodedata.normalize('NFKC', data)
return data
def prohibit_output(data, tables=None):
"""
Before the text can be emitted, it MUST be checked for prohibited
code points.
"""
for char in data:
for check in tables:
if check(char):
raise StringPrepError("Prohibited code point: %s" % char)
def check_bidi(data):
"""
1) The characters in section 5.8 MUST be prohibited.
2) If a string contains any RandALCat character, the string MUST NOT
contain any LCat character.
3) If a string contains any RandALCat character, a RandALCat
character MUST be the first character of the string, and a
RandALCat character MUST be the last character of the string.
"""
has_lcat = False
has_randal = False
for c in data:
if stringprep.in_table_c8(c):
raise StringPrepError("BIDI violation: seciton 6 (1)")
if stringprep.in_table_d1(c):
has_randal = True
elif stringprep.in_table_d2(c):
has_lcat = True
if has_randal and has_lcat:
raise StringPrepError("BIDI violation: section 6 (2)")
first_randal = stringprep.in_table_d1(data[0])
last_randal = stringprep.in_table_d1(data[-1])
if has_randal and not (first_randal and last_randal):
raise StringPrepError("BIDI violation: section 6 (3)")
def create(nfkc=True, bidi=True, mappings=None,
prohibited=None, unassigned=None):
def profile(data, query=False):
try:
data = to_unicode(data)
except UnicodeError:
raise StringPrepError
data = map_input(data, mappings)
data = normalize(data, nfkc)
prohibit_output(data, prohibited)
if bidi:
check_bidi(data)
if query and unassigned:
check_unassigned(data, unassigned)
return data
return profile

View File

@ -6,7 +6,7 @@
See the file LICENSE for copying permission. See the file LICENSE for copying permission.
""" """
from sleekxmpp.xmlstream.jid import JID from sleekxmpp.jid import JID
from sleekxmpp.xmlstream.scheduler import Scheduler from sleekxmpp.xmlstream.scheduler import Scheduler
from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET from sleekxmpp.xmlstream.stanzabase import StanzaBase, ElementBase, ET
from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin from sleekxmpp.xmlstream.stanzabase import register_stanza_plugin

View File

@ -1,148 +0,0 @@
# -*- coding: utf-8 -*-
"""
sleekxmpp.xmlstream.jid
~~~~~~~~~~~~~~~~~~~~~~~
This module allows for working with Jabber IDs (JIDs) by
providing accessors for the various components of a JID.
Part of SleekXMPP: The Sleek XMPP Library
:copyright: (c) 2011 Nathanael C. Fritz
:license: MIT, see LICENSE for more details
"""
from __future__ import unicode_literals
class JID(object):
"""
A representation of a Jabber ID, or JID.
Each JID may have three components: a user, a domain, and an optional
resource. For example: user@domain/resource
When a resource is not used, the JID is called a bare JID.
The JID is a full JID otherwise.
**JID Properties:**
:jid: Alias for ``full``.
:full: The value of the full JID.
:bare: The value of the bare JID.
:user: The username portion of the JID.
:domain: The domain name portion of the JID.
:server: Alias for ``domain``.
:resource: The resource portion of the JID.
:param string jid: A string of the form ``'[user@]domain[/resource]'``.
"""
def __init__(self, jid):
"""Initialize a new JID"""
self.reset(jid)
def reset(self, jid):
"""Start fresh from a new JID string.
:param string jid: A string of the form ``'[user@]domain[/resource]'``.
"""
if isinstance(jid, JID):
jid = jid.full
self._full = self._jid = jid
self._domain = None
self._resource = None
self._user = None
self._bare = None
def __getattr__(self, name):
"""Handle getting the JID values, using cache if available.
:param name: One of: user, server, domain, resource,
full, or bare.
"""
if name == 'resource':
if self._resource is None and '/' in self._jid:
self._resource = self._jid.split('/', 1)[-1]
return self._resource or ""
elif name == 'user':
if self._user is None:
if '@' in self._jid:
self._user = self._jid.split('@', 1)[0]
else:
self._user = self._user
return self._user or ""
elif name in ('server', 'domain', 'host'):
if self._domain is None:
self._domain = self._jid.split('@', 1)[-1].split('/', 1)[0]
return self._domain or ""
elif name in ('full', 'jid'):
return self._jid or ""
elif name == 'bare':
if self._bare is None:
self._bare = self._jid.split('/', 1)[0]
return self._bare or ""
def __setattr__(self, name, value):
"""Edit a JID by updating it's individual values, resetting the
generated JID in the end.
Arguments:
name -- The name of the JID part. One of: user, domain,
server, resource, full, jid, or bare.
value -- The new value for the JID part.
"""
if name in ('resource', 'user', 'domain'):
object.__setattr__(self, "_%s" % name, value)
self.regenerate()
elif name in ('server', 'domain', 'host'):
self.domain = value
elif name in ('full', 'jid'):
self.reset(value)
self.regenerate()
elif name == 'bare':
if '@' in value:
u, d = value.split('@', 1)
object.__setattr__(self, "_user", u)
object.__setattr__(self, "_domain", d)
else:
object.__setattr__(self, "_user", '')
object.__setattr__(self, "_domain", value)
self.regenerate()
else:
object.__setattr__(self, name, value)
def regenerate(self):
"""Generate a new JID based on current values, useful after editing."""
jid = ""
if self.user:
jid = "%s@" % self.user
jid += self.domain
if self.resource:
jid += "/%s" % self.resource
self.reset(jid)
def __str__(self):
"""Use the full JID as the string value."""
return self.full
def __repr__(self):
return self.full
def __eq__(self, other):
"""
Two JIDs are considered equal if they have the same full JID value.
"""
other = JID(other)
return self.full == other.full
def __ne__(self, other):
"""Two JIDs are considered unequal if they are not equal."""
return not self == other
def __hash__(self):
"""Hash a JID based on the string version of its full JID."""
return hash(self.full)
def __copy__(self):
return JID(self.jid)

View File

@ -1,5 +1,5 @@
from sleekxmpp.test import * from sleekxmpp.test import *
from sleekxmpp.xmlstream.jid import JID from sleekxmpp import JID
class TestJIDClass(SleekTest): class TestJIDClass(SleekTest):