Integrate a modified version of Dave Cridland's Suelta SASL library.

This commit is contained in:
Lance Stout
2011-08-03 17:00:51 -07:00
parent 9a6eb333e6
commit d4091dbde6
24 changed files with 1419 additions and 125 deletions

View File

@@ -0,0 +1,5 @@
from sleekxmpp.thirdparty.suelta.mechanisms.anonymous import ANONYMOUS
from sleekxmpp.thirdparty.suelta.mechanisms.plain import PLAIN
from sleekxmpp.thirdparty.suelta.mechanisms.cram_md5 import CRAM_MD5
from sleekxmpp.thirdparty.suelta.mechanisms.digest_md5 import DIGEST_MD5
from sleekxmpp.thirdparty.suelta.mechanisms.scram_hmac import SCRAM_HMAC

View File

@@ -0,0 +1,36 @@
from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
class ANONYMOUS(Mechanism):
"""
"""
def __init__(self, sasl, name):
"""
"""
super(ANONYMOUS, self).__init__(self, sasl, name, 0)
def get_values(self):
"""
"""
return {}
def process(self, challenge=None):
"""
"""
return b'Anonymous, Suelta'
def okay(self):
"""
"""
return True
def get_user(self):
"""
"""
return 'anonymous'
register_mechanism('ANONYMOUS', 0, ANONYMOUS, use_hashes=False)

View File

@@ -0,0 +1,63 @@
import sys
import hmac
from sleekxmpp.thirdparty.suelta.util import hash, bytes
from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
class CRAM_MD5(Mechanism):
"""
"""
def __init__(self, sasl, name):
"""
"""
super(CRAM_MD5, self).__init__(sasl, name, 2)
self.hash = hash(name[5:])
if self.hash is None:
raise SASLCancelled(self.sasl, self)
if not self.sasl.tls_active():
if not self.sasl.sec_query(self, 'CRAM-MD5'):
raise SASLCancelled(self.sasl, self)
def prep(self):
"""
"""
if 'savepass' not in self.values:
if self.sasl.sec_query(self, 'CLEAR-PASSWORD'):
self.values['savepass'] = True
if 'savepass' not in self.values:
del self.values['password']
def process(self, challenge):
"""
"""
if challenge is None:
return None
self.check_values(['username', 'password'])
username = bytes(self.values['username'])
password = bytes(self.values['password'])
mac = hmac.HMAC(key=password, digestmod=self.hash)
mac.update(challenge)
return username + b' ' + bytes(mac.hexdigest())
def okay(self):
"""
"""
return True
def get_user(self):
"""
"""
return self.values['username']
register_mechanism('CRAM-', 20, CRAM_MD5)

View File

@@ -0,0 +1,273 @@
import sys
import random
from sleekxmpp.thirdparty.suelta.util import hash, bytes, quote
from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
def parse_challenge(stuff):
"""
"""
ret = {}
var = b''
val = b''
in_var = True
in_quotes = False
new = False
escaped = False
for c in stuff:
if sys.version_info >= (3, 0):
c = bytes([c])
if in_var:
if c.isspace():
continue
if c == b'=':
in_var = False
new = True
else:
var += c
else:
if new:
if c == b'"':
in_quotes = True
else:
val += c
new = False
elif in_quotes:
if escaped:
escaped = False
val += c
else:
if c == b'\\':
escaped = True
elif c == b'"':
in_quotes = False
else:
val += c
else:
if c == b',':
if var:
ret[var] = val
var = b''
val = b''
in_var = True
else:
val += c
if var:
ret[var] = val
return ret
class DIGEST_MD5(Mechanism):
"""
"""
enc_magic = 'Digest session key to client-to-server signing key magic'
dec_magic = 'Digest session key to server-to-client signing key magic'
def __init__(self, sasl, name):
"""
"""
super(DIGEST_MD5, self).__init__(sasl, name, 3)
self.hash = hash(name[7:])
if self.hash is None:
raise SASLCancelled(self.sasl, self)
if not self.sasl.tls_active():
if not self.sasl.sec_query(self, '-ENCRYPTION, DIGEST-MD5'):
raise SASLCancelled(self.sasl, self)
self._rspauth_okay = False
self._digest_uri = None
self._a1 = None
self._enc_buf = b''
self._enc_key = None
self._enc_seq = 0
self._max_buffer = 65536
self._dec_buf = b''
self._dec_key = None
self._dec_seq = 0
self._qops = [b'auth']
self._qop = b'auth'
def MAC(self, seq, msg, key):
"""
"""
mac = hmac.HMAC(key=key, digestmod=self.hash)
seqnum = num_to_bytes(seq)
mac.update(seqnum)
mac.update(msg)
return mac.digest()[:10] + b'\x00\x01' + seqnum
def encode(self, text):
"""
"""
self._enc_buf += text
def flush(self):
"""
"""
result = b''
# Leave buffer space for the MAC
mbuf = self._max_buffer - 10 - 2 - 4
while self._enc_buf:
msg = self._encbuf[:mbuf]
mac = self.MAC(self._enc_seq, msg, self._enc_key, self.hash)
self._enc_seq += 1
msg += mac
result += num_to_bytes(len(msg)) + msg
self._enc_buf = self._enc_buf[mbuf:]
return result
def decode(self, text):
"""
"""
self._dec_buf += text
result = b''
while len(self._dec_buf) > 4:
num = bytes_to_num(self._dec_buf)
if len(self._dec_buf) < (num + 4):
return result
mac = self._dec_buf[4:4 + num]
self._dec_buf = self._dec_buf[4 + num:]
msg = mac[:-16]
mac_conf = self.MAC(self._dec_mac, msg, self._dec_key)
if mac[-16:] != mac_conf:
self._desc_sec = None
return result
self._dec_seq += 1
result += msg
return result
def response(self):
"""
"""
vitals = ['username']
if not self.has_values(['key_hash']):
vitals.append('password')
self.check_values(vitals)
resp = {}
if 'auth-int' in self._qops:
self._qop = b'auth-int'
resp['qop'] = self._qop
if 'realm' in self.values:
resp['realm'] = quote(self.values['realm'])
resp['username'] = quote(bytes(self.values['username']))
resp['nonce'] = quote(self.values['nonce'])
if self.values['nc']:
self._cnonce = self.values['cnonce']
else:
self._cnonce = bytes('%s' % random.random())[2:]
resp['cnonce'] = quote(self._cnonce)
self.values['nc'] += 1
resp['nc'] = bytes('%08x' % self.values['nc'])
service = bytes(self.sasl.service)
host = bytes(self.sasl.host)
self._digest_uri = service + b'/' + host
resp['digest-uri'] = quote(self._digest_uri)
a2 = b'AUTHENTICATE:' + self._digest_uri
if self._qop != b'auth':
a2 += b':00000000000000000000000000000000'
resp['maxbuf'] = b'16777215' # 2**24-1
resp['response'] = self.gen_hash(a2)
return b','.join([bytes(k) + b'=' + bytes(v) for k, v in resp.items()])
def gen_hash(self, a2):
"""
"""
if not self.has_values(['key_hash']):
key_hash = self.hash()
user = bytes(self.values['username'])
password = bytes(self.values['password'])
realm = bytes(self.values['realm'])
kh = user + b':' + realm + b':' + password
key_hash.update(kh)
self.values['key_hash'] = key_hash.digest()
a1 = self.hash(self.values['key_hash'])
a1h = b':' + self.values['nonce'] + b':' + self._cnonce
a1.update(a1h)
response = self.hash()
self._a1 = a1.digest()
rv = bytes(a1.hexdigest().lower())
rv += b':' + self.values['nonce']
rv += b':' + bytes('%08x' % self.values['nc'])
rv += b':' + self._cnonce
rv += b':' + self._qop
rv += b':' + bytes(self.hash(a2).hexdigest().lower())
response.update(rv)
return bytes(response.hexdigest().lower())
def mutual_auth(self, cmp_hash):
"""
"""
a2 = b':' + self._digest_uri
if self._qop != b'auth':
a2 += b':00000000000000000000000000000000'
if self.gen_hash(a2) == cmp_hash:
self._rspauth_okay = True
def prep(self):
"""
"""
if 'password' in self.values:
del self.values['password']
self.values['cnonce'] = self._cnonce
def process(self, challenge=None):
"""
"""
if challenge is None:
if self.has_values(['username', 'realm', 'nonce', 'key_hash',
'nc', 'cnonce', 'qops']):
self._qops = self.values['qops']
return self.response()
else:
return None
d = parse_challenge(challenge)
if b'rspauth' in d:
self.mutual_auth(d[b'rspauth'])
else:
if b'realm' not in d:
d[b'realm'] = self.sasl.def_realm
for key in ['nonce', 'realm']:
if bytes(key) in d:
self.values[key] = d[bytes(key)]
self.values['nc'] = 0
self._qops = [b'auth']
if b'qop' in d:
self._qops = [x.strip() for x in d[b'qop'].split(b',')]
self.values['qops'] = self._qops
if b'maxbuf' in d:
self._max_buffer = int(d[b'maxbuf'])
return self.response()
def okay(self):
"""
"""
if self._rspauth_okay and self._qop == b'auth-int':
self._enc_key = self.hash(self._a1 + self.enc_magic).digest()
self._dec_key = self.hash(self._a1 + self.dec_magic).digest()
self.encoding = True
return self._rspauth_okay
register_mechanism('DIGEST-', 30, DIGEST_MD5)

View File

@@ -0,0 +1,61 @@
import sys
from sleekxmpp.thirdparty.suelta.util import bytes
from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
class PLAIN(Mechanism):
"""
"""
def __init__(self, sasl, name):
"""
"""
super(PLAIN, self).__init__(sasl, name)
if not self.sasl.tls_active():
if not self.sasl.sec_query(self, '-ENCRYPTION, PLAIN'):
raise SASLCancelled(self.sasl, self)
else:
if not self.sasl.sec_query(self, '+ENCRYPTION, PLAIN'):
raise SASLCancelled(self.sasl, self)
self.check_values(['username', 'password'])
def prep(self):
"""
Prepare for processing by deleting the password if
the user has not approved storing it in the clear.
"""
if 'savepass' not in self.values:
if self.sasl.sec_query(self, 'CLEAR-PASSWORD'):
self.values['savepass'] = True
if 'savepass' not in self.values:
del self.values['password']
return True
def process(self, challenge=None):
"""
Process a challenge request and return the response.
:param challenge: A challenge issued by the server that
must be answered for authentication.
"""
user = bytes(self.values['username'])
password = bytes(self.values['password'])
return b'\x00' + user + b'\x00' + password
def okay(self):
"""
Mutual authentication is not supported by PLAIN.
:returns: ``True``
"""
return True
register_mechanism('PLAIN', 1, PLAIN, use_hashes=False)

View File

@@ -0,0 +1,176 @@
import sys
import hmac
import random
from base64 import b64encode, b64decode
from sleekxmpp.thirdparty.suelta.util import hash, bytes, num_to_bytes, bytes_to_num, XOR
from sleekxmpp.thirdparty.suelta.sasl import Mechanism, register_mechanism
from sleekxmpp.thirdparty.suelta.exceptions import SASLError, SASLCancelled
def parse_challenge(challenge):
"""
"""
items = {}
for key, value in [item.split(b'=', 1) for item in challenge.split(b',')]:
items[key] = value
return items
class SCRAM_HMAC(Mechanism):
"""
"""
def __init__(self, sasl, name):
"""
"""
super(SCRAM_HMAC, self).__init__(sasl, name, 0)
self._cb = False
if name[-5:] == '-PLUS':
name = name[:-5]
self._cb = True
self.hash = hash(self.name[6:])
if self.hash is None:
raise SASLCancelled(self.sasl, self)
if not self.sasl.tls_active():
if not self.sasl.sec_query(self, '-ENCRYPTION, SCRAM'):
raise SASLCancelled(self.sasl, self)
self._step = 0
self._rspauth = False
def HMAC(self, key, msg):
"""
"""
return hmac.HMAC(key=key, msg=msg, digestmod=self.hash).digest()
def Hi(self, text, salt, iterations):
"""
"""
text = bytes(text)
ui_1 = self.HMAC(text, salt + b'\0\0\0\01')
ui = ui_1
for i in range(iterations - 1):
ui_1 = self.HMAC(text, ui_1)
ui = XOR(ui, ui_1)
return ui
def H(self, text):
"""
"""
return self.hash(text).digest()
def prep(self):
if 'password' in self.values:
del self.values['password']
def process(self, challenge=None):
"""
"""
steps = {
0: self.process_one,
1: self.process_two,
2: self.process_three
}
return steps[self._step](challenge)
def process_one(self, challenge):
"""
"""
vitals = ['username']
if 'SaltedPassword' not in self.values:
vitals.append('password')
if 'Iterations' not in self.values:
vitals.append('password')
self.check_values(vitals)
username = bytes(self.values['username'])
self._step = 1
self._cnonce = bytes(('%s' % random.random())[2:])
self._soup = b'n=' + username + b',r=' + self._cnonce
self._gs2header = b''
if not self.sasl.tls_active():
if self._cb:
self._gs2header = b'p=tls-unique,,'
else:
self._gs2header = b'y,,'
else:
self._gs2header = b'n,,'
return self._gs2header + self._soup
def process_two(self, challenge):
"""
"""
data = parse_challenge(challenge)
self._step = 2
self._soup += b',' + challenge + b','
self._nonce = data[b'r']
self._salt = b64decode(data[b's'])
self._iter = int(data[b'i'])
if self._nonce[:len(self._cnonce)] != self._cnonce:
raise SASLCancelled(self.sasl, self)
cbdata = self.sasl.tls_active()
c = self._gs2header
if not cbdata and self._cb:
c += None
r = b'c=' + b64encode(c).replace(b'\n', b'')
r += b',r=' + self._nonce
self._soup += r
if 'Iterations' in self.values:
if self.values['Iterations'] != self._iter:
if 'SaltedPassword' in self.values:
del self.values['SaltedPassword']
if 'Salt' in self.values:
if self.values['Salt'] != self._salt:
if 'SaltedPassword' in self.values:
del self.values['SaltedPassword']
self.values['Iterations'] = self._iter
self.values['Salt'] = self._salt
if 'SaltedPassword' not in self.values:
self.check_values(['password'])
password = bytes(self.values['password'])
salted_pass = self.Hi(password, self._salt, self._iter)
self.values['SaltedPassword'] = salted_pass
salted_pass = self.values['SaltedPassword']
client_key = self.HMAC(salted_pass, b'Client Key')
stored_key = self.H(client_key)
client_sig = self.HMAC(stored_key, self._soup)
client_proof = XOR(client_key, client_sig)
r += b',p=' + b64encode(client_proof).replace(b'\n', b'')
server_key = self.HMAC(self.values['SaltedPassword'], b'Server Key')
self.server_sig = self.HMAC(server_key, self._soup)
return r
def process_three(self, challenge=None):
"""
"""
data = parse_challenge(challenge)
if b64decode(data[b'v']) == self.server_sig:
self._rspauth = True
def okay(self):
"""
"""
return self._rspauth
def get_user(self):
return self.values['username']
register_mechanism('SCRAM-', 60, SCRAM_HMAC)
register_mechanism('SCRAM-', 70, SCRAM_HMAC, extra='-PLUS')