fix tls 1.3 wip

This commit is contained in:
mathieui 2024-02-01 09:45:33 +01:00
parent ef02b3a596
commit 51cbe87501
No known key found for this signature in database
GPG Key ID: C59F84CEEFD616E3
2 changed files with 31 additions and 14 deletions

View File

@ -97,7 +97,20 @@ class FeatureMechanisms(BasePlugin):
result[value] = creds.get('email', jid) result[value] = creds.get('email', jid)
elif value == 'channel_binding': elif value == 'channel_binding':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)): if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.get_channel_binding() version = self.xmpp.socket.version()
# As of now, python does not implement anything else
# than tls-unique, which is forbidden on TLSv1.3
# see https://github.com/python/cpython/issues/95341
if version != 'TLSv1.3':
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-unique"
)
elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES:
result[value] = self.xmpp.socket.get_channel_binding(
cb_type="tls-exporter"
)
else:
result[value] = None
else: else:
result[value] = None result[value] = None
elif value == 'host': elif value == 'host':
@ -122,6 +135,11 @@ class FeatureMechanisms(BasePlugin):
result[value] = True result[value] = True
else: else:
result[value] = False result[value] = False
elif value == 'tls_version':
if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)):
result[value] = self.xmpp.socket.version()
elif value == 'binding_proposed':
result[value] = any(x for x in self.mech_list if x.endswith('-PLUS'))
else: else:
result[value] = self.config.get(value, False) result[value] = self.config.get(value, False)
return result return result
@ -179,9 +197,6 @@ class FeatureMechanisms(BasePlugin):
log.exception("A credential value did not pass SASLprep.") log.exception("A credential value did not pass SASLprep.")
self.xmpp.disconnect() self.xmpp.disconnect()
if 'tls_version' in self.mech.security:
self.tls_version = self.xmpp.socket.version()
resp = stanza.Auth(self.xmpp) resp = stanza.Auth(self.xmpp)
resp['mechanism'] = self.mech.name resp['mechanism'] = self.mech.name
try: try:

View File

@ -181,14 +181,13 @@ class SCRAM(Mech):
channel_binding = True channel_binding = True
required_credentials = {'username', 'password'} required_credentials = {'username', 'password'}
optional_credentials = {'authzid', 'channel_binding'} optional_credentials = {'authzid', 'channel_binding'}
security = {'tls_version', 'encrypted', 'unencrypted_scram'} security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'}
def setup(self, name): def setup(self, name):
self.use_channel_binding = False self.use_channel_binding = False
if name[-5:] == '-PLUS': if name[-5:] == '-PLUS':
name = name[:-5] name = name[:-5]
if self.security_settings['tls_version'] == 'TLSv1.2': self.use_channel_binding = True
self.use_channel_binding = True
self.hash_name = name[6:] self.hash_name = name[6:]
self.hash = hash(self.hash_name) self.hash = hash(self.hash_name)
@ -245,12 +244,15 @@ class SCRAM(Mech):
self.cnonce = bytes(('%s' % random.random())[2:]) self.cnonce = bytes(('%s' % random.random())[2:])
gs2_cbind_flag = b'n' gs2_cbind_flag = b'n'
if self.credentials['channel_binding'] and \ if self.security_settings['binding_proposed']:
self.security_settings['tls_version'] == 'TLSv1.2': if self.credentials['channel_binding'] and \
if self.use_channel_binding: self.use_channel_binding:
gs2_cbind_flag = b'p=tls-unique' if self.security_settings['tls_version'] != 'TLSv1.3':
else: gs2_cbind_flag = b'p=tls-unique'
gs2_cbind_flag = b'y' else:
gs2_cbind_flag = b'p=tls-exporter'
else:
gs2_cbind_flag = b'y'
authzid = b'' authzid = b''
if self.credentials['authzid']: if self.credentials['authzid']:
@ -282,7 +284,7 @@ class SCRAM(Mech):
raise SASLCancelled('Invalid nonce') raise SASLCancelled('Invalid nonce')
cbind_data = b'' cbind_data = b''
if self.use_channel_binding: if self.use_channel_binding and self.credentials['channel_binding']:
cbind_data = self.credentials['channel_binding'] cbind_data = self.credentials['channel_binding']
cbind_input = self.gs2_header + cbind_data cbind_input = self.gs2_header + cbind_data
channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'') channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')