diff --git a/slixmpp/features/feature_mechanisms/mechanisms.py b/slixmpp/features/feature_mechanisms/mechanisms.py index db2d73d0..a4d25784 100644 --- a/slixmpp/features/feature_mechanisms/mechanisms.py +++ b/slixmpp/features/feature_mechanisms/mechanisms.py @@ -37,7 +37,8 @@ class FeatureMechanisms(BasePlugin): 'unencrypted_digest': False, 'unencrypted_cram': False, 'unencrypted_scram': True, - 'order': 100 + 'order': 100, + 'tls_version': None, } def plugin_init(self): @@ -96,7 +97,20 @@ class FeatureMechanisms(BasePlugin): result[value] = creds.get('email', jid) elif value == 'channel_binding': if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)): - result[value] = self.xmpp.socket.get_channel_binding() + version = self.xmpp.socket.version() + # As of now, python does not implement anything else + # than tls-unique, which is forbidden on TLSv1.3 + # see https://github.com/python/cpython/issues/95341 + if version != 'TLSv1.3': + result[value] = self.xmpp.socket.get_channel_binding( + cb_type="tls-unique" + ) + elif 'tls-exporter' in ssl.CHANNEL_BINDING_TYPES: + result[value] = self.xmpp.socket.get_channel_binding( + cb_type="tls-exporter" + ) + else: + result[value] = None else: result[value] = None elif value == 'host': @@ -121,6 +135,11 @@ class FeatureMechanisms(BasePlugin): result[value] = True else: result[value] = False + elif value == 'tls_version': + if isinstance(self.xmpp.socket, (ssl.SSLSocket, ssl.SSLObject)): + result[value] = self.xmpp.socket.version() + elif value == 'binding_proposed': + result[value] = any(x for x in self.mech_list if x.endswith('-PLUS')) else: result[value] = self.config.get(value, False) return result diff --git a/slixmpp/util/sasl/mechanisms.py b/slixmpp/util/sasl/mechanisms.py index d53caec8..ea42c114 100644 --- a/slixmpp/util/sasl/mechanisms.py +++ b/slixmpp/util/sasl/mechanisms.py @@ -181,7 +181,7 @@ class SCRAM(Mech): channel_binding = True required_credentials = {'username', 'password'} optional_credentials = {'authzid', 'channel_binding'} - security = {'encrypted', 'unencrypted_scram'} + security = {'tls_version', 'encrypted', 'unencrypted_scram', 'binding_proposed'} def setup(self, name): self.use_channel_binding = False @@ -244,11 +244,15 @@ class SCRAM(Mech): self.cnonce = bytes(('%s' % random.random())[2:]) gs2_cbind_flag = b'n' - if self.credentials['channel_binding']: - if self.use_channel_binding: - gs2_cbind_flag = b'p=tls-unique' - else: - gs2_cbind_flag = b'y' + if self.security_settings['binding_proposed']: + if self.credentials['channel_binding'] and \ + self.use_channel_binding: + if self.security_settings['tls_version'] != 'TLSv1.3': + gs2_cbind_flag = b'p=tls-unique' + else: + gs2_cbind_flag = b'p=tls-exporter' + else: + gs2_cbind_flag = b'y' authzid = b'' if self.credentials['authzid']: @@ -280,7 +284,7 @@ class SCRAM(Mech): raise SASLCancelled('Invalid nonce') cbind_data = b'' - if self.use_channel_binding: + if self.use_channel_binding and self.credentials['channel_binding']: cbind_data = self.credentials['channel_binding'] cbind_input = self.gs2_header + cbind_data channel_binding = b'c=' + b64encode(cbind_input).replace(b'\n', b'')