diff --git a/Lib/ssl.py b/Lib/ssl.py index 1d5873726441e4..0c8fc9708a1c10 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -254,6 +254,33 @@ class _TLSMessageType: CHANGE_CIPHER_SPEC = 0x0101 +@_simple_enum(_Enum) +class ChannelBindings: + TLS_UNIQUE = "tls-unique" + TLS_EXPORTER = "tls-exporter" + + def _get_channel_binding(self, sslobj): + cls = type(self) + match self: + case cls.TLS_UNIQUE: + if sslobj.version() == "TLSv1.3": + warnings.warn( + "tls-unique channel binding is not specified for TLS 1.3", + DeprecationWarning, + stacklevel=3 + ) + return sslobj.get_channel_binding(self.value) + case cls.TLS_EXPORTER: + return sslobj.export_keying_material( + 32, + "EXPORTER-Channel-Binding", + context="", + require_extms=True + ) + case _: + raise ValueError(f"{self!r} channel binding type not implemented") + + if sys.platform == "win32": from _ssl import enum_certificates, enum_crls @@ -267,7 +294,7 @@ class _TLSMessageType: socket_error = OSError # keep that public name in module namespace -CHANNEL_BINDING_TYPES = ['tls-unique'] +CHANNEL_BINDING_TYPES = list(cb.value for cb in ChannelBindings) HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT') @@ -924,7 +951,16 @@ def get_channel_binding(self, cb_type="tls-unique"): """Get channel binding data for current connection. Raise ValueError if the requested `cb_type` is not supported. Return bytes of the data or None if the data is not available (e.g. before the handshake).""" - return self._sslobj.get_channel_binding(cb_type) + return ChannelBindings(cb_type)._get_channel_binding(self._sslobj) + + def export_keying_material(self, length, label, context=None, require_extms=True): + """Export keying material for current connection + + See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3) + """ + return self._sslobj.export_keying_material( + length, label, context=context, require_extms=require_extms + ) def version(self): """Return a string identifying the protocol version used by the @@ -1336,7 +1372,7 @@ def accept(self): @_sslcopydoc def get_channel_binding(self, cb_type="tls-unique"): if self._sslobj is not None: - return self._sslobj.get_channel_binding(cb_type) + return ChannelBindings(cb_type)._get_channel_binding(self._sslobj) else: if cb_type not in CHANNEL_BINDING_TYPES: raise ValueError( @@ -1344,6 +1380,15 @@ def get_channel_binding(self, cb_type="tls-unique"): ) return None + @_sslcopydoc + def export_keying_material(self, length, label, context=None, require_extms=True): + if self._sslobj is not None: + return self._sslobj.export_keying_material( + length, label, context=context, require_extms=require_extms + ) + else: + return None + @_sslcopydoc def version(self): if self._sslobj is not None: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 5007e08f321b5a..ded0b6b01a96b2 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -668,17 +668,17 @@ def test_unknown_channel_binding(self): ss.get_channel_binding("unknown-type") s.close() - @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, - "'tls-unique' channel binding not available") - def test_tls_unique_channel_binding(self): + def test_tls_channel_binding(self): # unconnected should return None for known type s = socket.socket(socket.AF_INET) with test_wrap_socket(s) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) + self.assertIsNone(ss.get_channel_binding("tls-exporter")) # the same for server-side s = socket.socket(socket.AF_INET) with test_wrap_socket(s, server_side=True, certfile=CERTFILE) as ss: self.assertIsNone(ss.get_channel_binding("tls-unique")) + self.assertIsNone(ss.get_channel_binding("tls-exporter")) def test_dealloc_warn(self): ss = test_wrap_socket(socket.socket(socket.AF_INET)) @@ -2086,15 +2086,15 @@ def test_bio_handshake(self): self.assertIsNone(sslobj.version()) self.assertIsNotNone(sslobj.shared_ciphers()) self.assertRaises(ValueError, sslobj.getpeercert) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertIsNone(sslobj.get_channel_binding('tls-unique')) + for cb in ssl.CHANNEL_BINDING_TYPES: + self.assertIsNone(sslobj.get_channel_binding(cb)) self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake) self.assertTrue(sslobj.cipher()) self.assertIsNotNone(sslobj.shared_ciphers()) self.assertIsNotNone(sslobj.version()) self.assertTrue(sslobj.getpeercert()) - if 'tls-unique' in ssl.CHANNEL_BINDING_TYPES: - self.assertTrue(sslobj.get_channel_binding('tls-unique')) + self.assertTrue(sslobj.get_channel_binding("tls-unique")) + self.assertTrue(sslobj.get_channel_binding("tls-exporter")) try: self.ssl_io_loop(sock, incoming, outgoing, sslobj.unwrap) except ssl.SSLSyscallError: @@ -2317,6 +2317,12 @@ def run(self): sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") data = self.sslconn.get_channel_binding("tls-unique") self.write(repr(data).encode("us-ascii") + b"\n") + elif stripped == b'CB tls-exporter': + if support.verbose and self.server.connectionchatty: + sys.stdout.write(" server: read CB tls-exporter from client, sending our CB data...\n") + data = self.sslconn.get_channel_binding("tls-exporter") + self.write(repr(data).encode("us-ascii") + b"\n") + elif stripped == b'PHA': if support.verbose and self.server.connectionchatty: sys.stdout.write(" server: initiating post handshake auth\n") @@ -3737,14 +3743,31 @@ def test_default_ecdh_curve(self): s.connect((HOST, server.port)) self.assertIn("ECDH", s.cipher()[0]) - @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, - "'tls-unique' channel binding not available") - def test_tls_unique_channel_binding(self): - """Test tls-unique channel binding.""" + def test_tls_channel_binding_unique_tlsv1_2(self): + self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_2, "tls-unique", 12) + + def test_tls_channel_binding_unique_tlsv1_3(self): + with warnings_helper.check_warnings(("tls-unique", DeprecationWarning)): + self._test_tls_channel_binding(ssl.TLSVersion.TLSv1_3, "tls-unique", 48) + + def test_tls_channel_binding_exporter_tlsv1_2(self): + self._test_tls_channel_binding( + ssl.TLSVersion.TLSv1_2, "tls-exporter", 32, "EXPORTER-Channel-Binding" + ) + + def test_tls_channel_binding_exporter_tlsv1_3(self): + self._test_tls_channel_binding( + ssl.TLSVersion.TLSv1_3, "tls-exporter", 32, "EXPORTER-Channel-Binding" + ) + + def _test_tls_channel_binding(self, version, cb, cb_size, ekm=None): + """Test tls-unique and tls-export channel binding.""" if support.verbose: sys.stdout.write("\n") client_context, server_context, hostname = testing_context() + server_context.minimum_version = version + server_context.maximum_version = version server = ThreadedEchoServer(context=server_context, chatty=True, @@ -3756,22 +3779,33 @@ def test_tls_unique_channel_binding(self): server_hostname=hostname) as s: s.connect((HOST, server.port)) # get the data - cb_data = s.get_channel_binding("tls-unique") + cb_data = s.get_channel_binding(cb) if support.verbose: sys.stdout.write( - " got channel binding data: {0!r}\n".format(cb_data)) + f" got {cb} channel binding data: {cb_data!r}\n" + ) + + if ekm: + cb_ekm = s.export_keying_material( + cb_size, ekm, context="", require_extms=True + ) + self.assertEqual(cb_ekm, cb_data) + # TLS 1.3: empty and no context result in equal values + # other: empty context and no context result in different values + cb_ekm_no_context = s.export_keying_material(cb_size, ekm) + if version == ssl.TLSVersion.TLSv1_3: + self.assertEqual(cb_data, cb_ekm_no_context) + else: + self.assertNotEqual(cb_data, cb_ekm_no_context) # check if it is sane self.assertIsNotNone(cb_data) - if s.version() == 'TLSv1.3': - self.assertEqual(len(cb_data), 48) - else: - self.assertEqual(len(cb_data), 12) # True for TLSv1 + self.assertEqual(len(cb_data), cb_size) # and compare with the peers version - s.write(b"CB tls-unique\n") - peer_data_repr = s.read().strip() - self.assertEqual(peer_data_repr, + s.write(f"CB {cb}\n".encode("ascii")) + peer_data = s.read().strip() + self.assertEqual(peer_data, repr(cb_data).encode("us-ascii")) # now, again @@ -3779,22 +3813,19 @@ def test_tls_unique_channel_binding(self): socket.socket(), server_hostname=hostname) as s: s.connect((HOST, server.port)) - new_cb_data = s.get_channel_binding("tls-unique") + new_cb_data = s.get_channel_binding(cb) if support.verbose: sys.stdout.write( - "got another channel binding data: {0!r}\n".format( - new_cb_data) + f" got another {cb} channel binding data: {new_cb_data!r}\n" ) # is it really unique self.assertNotEqual(cb_data, new_cb_data) - self.assertIsNotNone(cb_data) - if s.version() == 'TLSv1.3': - self.assertEqual(len(cb_data), 48) - else: - self.assertEqual(len(cb_data), 12) # True for TLSv1 - s.write(b"CB tls-unique\n") - peer_data_repr = s.read().strip() - self.assertEqual(peer_data_repr, + self.assertIsNotNone(new_cb_data) + self.assertEqual(len(cb_data), cb_size) + + s.write(f"CB {cb}\n".encode("ascii")) + peer_unique= s.read().strip() + self.assertEqual(peer_unique, repr(new_cb_data).encode("us-ascii")) def test_compression(self): diff --git a/Modules/_ssl.c b/Modules/_ssl.c index bf8bd9dea89b6b..30803d30007719 100644 --- a/Modules/_ssl.c +++ b/Modules/_ssl.c @@ -2699,10 +2699,9 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, const char *cb_type) /*[clinic end generated code: output=34bac9acb6a61d31 input=08b7e43b99c17d41]*/ { - char buf[PySSL_CB_MAXLEN]; - size_t len; - if (strcmp(cb_type, "tls-unique") == 0) { + char buf[PySSL_CB_MAXLEN]; + size_t len; if (SSL_session_reused(self->ssl) ^ !self->socket_type) { /* if session is resumed XOR we are the client */ len = SSL_get_finished(self->ssl, buf, PySSL_CB_MAXLEN); @@ -2711,8 +2710,12 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, /* if a new session XOR we are the server */ len = SSL_get_peer_finished(self->ssl, buf, PySSL_CB_MAXLEN); } - } - else { + /* It cannot be negative in current OpenSSL version as of July 2011 */ + if (len == 0) + Py_RETURN_NONE; + + return PyBytes_FromStringAndSize(buf, len); + } else { PyErr_Format( PyExc_ValueError, "'%s' channel binding type not implemented", @@ -2720,12 +2723,69 @@ _ssl__SSLSocket_get_channel_binding_impl(PySSLSocket *self, ); return NULL; } +} - /* It cannot be negative in current OpenSSL version as of July 2011 */ - if (len == 0) - Py_RETURN_NONE; +/*[clinic input] +_ssl._SSLSocket.export_keying_material + length: Py_ssize_t + label: str(accept={str, robuffer}, zeroes=True) + * + context: str(accept={str, robuffer, NoneType}, zeroes=True) = None + require_extms: bool = True + +Get keying material for current connection. + +See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3) +[clinic start generated code]*/ + +static PyObject * +_ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, + Py_ssize_t length, + const char *label, + Py_ssize_t label_length, + const char *context, + Py_ssize_t context_length, + int require_extms) +/*[clinic end generated code: output=7ff9f8a13d326773 input=1ed86925a1a8c634]*/ +{ + PyObject *km; - return PyBytes_FromStringAndSize(buf, len); + if (!SSL_is_init_finished(self->ssl)) { + Py_RETURN_NONE; + } + if (length < 1) { + PyErr_SetString(PyExc_ValueError, "invalid export length"); + return NULL; + } + if (require_extms) { + // RFC 9266 requires extended master secret for tls-exporter + // channel binding. EMS is always present with TLS 1.3 and an + // optional extension with TLS 1.2. + if (SSL_version(self->ssl) != TLS1_3_VERSION) { + int res = SSL_get_extms_support(self->ssl); + if (res == -1) { + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + if (res == 0) { + PyErr_SetString(PyExc_ValueError, "connect has no extended master secret"); + return NULL; + } + } + } + km = PyBytes_FromStringAndSize(NULL, length); + if (km == NULL) { + return NULL; + } + if (SSL_export_keying_material( + self->ssl, + (unsigned char*)PyBytes_AS_STRING(km), (size_t)length, + label, (size_t)label_length, + (const unsigned char*)context, (size_t)context_length, + (context != NULL)) != 1) { + Py_DECREF(km); + return _setSSLError(get_state_sock(self), NULL, 0, __FILE__, __LINE__); + } + return km; } /*[clinic input] @@ -2918,6 +2978,7 @@ static PyMethodDef PySSLMethods[] = { _SSL__SSLSOCKET_VERIFY_CLIENT_POST_HANDSHAKE_METHODDEF _SSL__SSLSOCKET_GET_UNVERIFIED_CHAIN_METHODDEF _SSL__SSLSOCKET_GET_VERIFIED_CHAIN_METHODDEF + _SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF {NULL, NULL} }; diff --git a/Modules/clinic/_ssl.c.h b/Modules/clinic/_ssl.c.h index 24604dd43687c5..2a346a9fcf81f7 100644 --- a/Modules/clinic/_ssl.c.h +++ b/Modules/clinic/_ssl.c.h @@ -381,6 +381,50 @@ _ssl__SSLSocket_get_channel_binding(PySSLSocket *self, PyObject *const *args, Py return return_value; } +PyDoc_STRVAR(_ssl__SSLSocket_export_keying_material__doc__, +"export_keying_material($self, /, length, label, *, context=None,\n" +" require_extms=True)\n" +"--\n" +"\n" +"Get keying material for current connection.\n" +"\n" +"See RFC 5705 (for TLS 1.2) and RFC 8446 (for TLS 1.3)"); + +#define _SSL__SSLSOCKET_EXPORT_KEYING_MATERIAL_METHODDEF \ + {"export_keying_material", _PyCFunction_CAST(_ssl__SSLSocket_export_keying_material), METH_FASTCALL|METH_KEYWORDS, _ssl__SSLSocket_export_keying_material__doc__}, + +static PyObject * +_ssl__SSLSocket_export_keying_material_impl(PySSLSocket *self, + Py_ssize_t length, + const char *label, + Py_ssize_t label_length, + const char *context, + Py_ssize_t context_length, + int require_extms); + +static PyObject * +_ssl__SSLSocket_export_keying_material(PySSLSocket *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"length", "label", "context", "require_extms", NULL}; + static _PyArg_Parser _parser = {"ns#|$z#p:export_keying_material", _keywords, 0}; + Py_ssize_t length; + const char *label; + Py_ssize_t label_length; + const char *context = NULL; + Py_ssize_t context_length; + int require_extms = 1; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &length, &label, &label_length, &context, &context_length, &require_extms)) { + goto exit; + } + return_value = _ssl__SSLSocket_export_keying_material_impl(self, length, label, label_length, context, context_length, require_extms); + +exit: + return return_value; +} + PyDoc_STRVAR(_ssl__SSLSocket_verify_client_post_handshake__doc__, "verify_client_post_handshake($self, /)\n" "--\n" @@ -1330,4 +1374,4 @@ _ssl_enum_crls(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObje #ifndef _SSL_ENUM_CRLS_METHODDEF #define _SSL_ENUM_CRLS_METHODDEF #endif /* !defined(_SSL_ENUM_CRLS_METHODDEF) */ -/*[clinic end generated code: output=9d806f8ff4a06ed3 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=d6d65f40541312d3 input=a9049054013a1b77]*/