Skip to content

Commit

Permalink
pythongh-63284: Add support for TLS-PSK (pre-shared key) to the ssl m…
Browse files Browse the repository at this point in the history
…odule (python#103181)

Add support for TLS-PSK (pre-shared key) to the ssl module.

---------

Co-authored-by: Oleg Iarygin <oleg@arhadthedev.net>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
  • Loading branch information
3 people authored Nov 27, 2023
1 parent fb202af commit e954ac7
Show file tree
Hide file tree
Showing 10 changed files with 561 additions and 1 deletion.
88 changes: 88 additions & 0 deletions Doc/library/ssl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2006,6 +2006,94 @@ to speed up repeated connections from the same clients.
>>> ssl.create_default_context().verify_mode # doctest: +SKIP
<VerifyMode.CERT_REQUIRED: 2>

.. method:: SSLContext.set_psk_client_callback(callback)

Enables TLS-PSK (pre-shared key) authentication on a client-side connection.

In general, certificate based authentication should be preferred over this method.

The parameter ``callback`` is a callable object with the signature:
``def callback(hint: str | None) -> tuple[str | None, bytes]``.
The ``hint`` parameter is an optional identity hint sent by the server.
The return value is a tuple in the form (client-identity, psk).
Client-identity is an optional string which may be used by the server to
select a corresponding PSK for the client. The string must be less than or
equal to ``256`` octets when UTF-8 encoded. PSK is a
:term:`bytes-like object` representing the pre-shared key. Return a zero
length PSK to reject the connection.

Setting ``callback`` to :const:`None` removes any existing callback.

.. note::
When using TLS 1.3:

- the ``hint`` parameter is always :const:`None`.
- client-identity must be a non-empty string.

Example usage::

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.set_ciphers('PSK')

# A simple lambda:
psk = bytes.fromhex('c0ffee')
context.set_psk_client_callback(lambda hint: (None, psk))

# A table using the hint from the server:
psk_table = { 'ServerId_1': bytes.fromhex('c0ffee'),
'ServerId_2': bytes.fromhex('facade')
}
def callback(hint):
return 'ClientId_1', psk_table.get(hint, b'')
context.set_psk_client_callback(callback)

.. versionadded:: 3.13

.. method:: SSLContext.set_psk_server_callback(callback, identity_hint=None)

Enables TLS-PSK (pre-shared key) authentication on a server-side connection.

In general, certificate based authentication should be preferred over this method.

The parameter ``callback`` is a callable object with the signature:
``def callback(identity: str | None) -> bytes``.
The ``identity`` parameter is an optional identity sent by the client which can
be used to select a corresponding PSK.
The return value is a :term:`bytes-like object` representing the pre-shared key.
Return a zero length PSK to reject the connection.

Setting ``callback`` to :const:`None` removes any existing callback.

The parameter ``identity_hint`` is an optional identity hint string sent to
the client. The string must be less than or equal to ``256`` octets when
UTF-8 encoded.

.. note::
When using TLS 1.3 the ``identity_hint`` parameter is not sent to the client.

Example usage::

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.maximum_version = ssl.TLSVersion.TLSv1_2
context.set_ciphers('PSK')

# A simple lambda:
psk = bytes.fromhex('c0ffee')
context.set_psk_server_callback(lambda identity: psk)

# A table using the identity of the client:
psk_table = { 'ClientId_1': bytes.fromhex('c0ffee'),
'ClientId_2': bytes.fromhex('facade')
}
def callback(identity):
return psk_table.get(identity, b'')
context.set_psk_server_callback(callback, 'ServerId_1')

.. versionadded:: 3.13

.. index:: single: certificates

.. index:: single: X509 certificate
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_global_objects_fini_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Include/internal/pycore_global_strings.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(call)
STRUCT_FOR_ID(call_exception_handler)
STRUCT_FOR_ID(call_soon)
STRUCT_FOR_ID(callback)
STRUCT_FOR_ID(cancel)
STRUCT_FOR_ID(capath)
STRUCT_FOR_ID(category)
Expand Down Expand Up @@ -460,6 +461,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(hook)
STRUCT_FOR_ID(id)
STRUCT_FOR_ID(ident)
STRUCT_FOR_ID(identity_hint)
STRUCT_FOR_ID(ignore)
STRUCT_FOR_ID(imag)
STRUCT_FOR_ID(importlib)
Expand Down
2 changes: 2 additions & 0 deletions Include/internal/pycore_runtime_init_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Include/internal/pycore_unicodeobject_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 99 additions & 0 deletions Lib/test/test_ssl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4236,6 +4236,105 @@ def test_session_handling(self):
self.assertEqual(str(e.exception),
'Session refers to a different SSLContext.')

@requires_tls_version('TLSv1_2')
def test_psk(self):
psk = bytes.fromhex('deadbeef')

client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
client_context.check_hostname = False
client_context.verify_mode = ssl.CERT_NONE
client_context.maximum_version = ssl.TLSVersion.TLSv1_2
client_context.set_ciphers('PSK')
client_context.set_psk_client_callback(lambda hint: (None, psk))

server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
server_context.maximum_version = ssl.TLSVersion.TLSv1_2
server_context.set_ciphers('PSK')
server_context.set_psk_server_callback(lambda identity: psk)

# correct PSK should connect
server = ThreadedEchoServer(context=server_context)
with server:
with client_context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))

# incorrect PSK should fail
incorrect_psk = bytes.fromhex('cafebabe')
client_context.set_psk_client_callback(lambda hint: (None, incorrect_psk))
server = ThreadedEchoServer(context=server_context)
with server:
with client_context.wrap_socket(socket.socket()) as s:
with self.assertRaises(ssl.SSLError):
s.connect((HOST, server.port))

# identity_hint and client_identity should be sent to the other side
identity_hint = 'identity-hint'
client_identity = 'client-identity'

def client_callback(hint):
self.assertEqual(hint, identity_hint)
return client_identity, psk

def server_callback(identity):
self.assertEqual(identity, client_identity)
return psk

client_context.set_psk_client_callback(client_callback)
server_context.set_psk_server_callback(server_callback, identity_hint)
server = ThreadedEchoServer(context=server_context)
with server:
with client_context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))

# adding client callback to server or vice versa raises an exception
with self.assertRaisesRegex(ssl.SSLError, 'Cannot add PSK server callback'):
client_context.set_psk_server_callback(server_callback, identity_hint)
with self.assertRaisesRegex(ssl.SSLError, 'Cannot add PSK client callback'):
server_context.set_psk_client_callback(client_callback)

# test with UTF-8 identities
identity_hint = '身份暗示' # Translation: "Identity hint"
client_identity = '客户身份' # Translation: "Customer identity"

client_context.set_psk_client_callback(client_callback)
server_context.set_psk_server_callback(server_callback, identity_hint)
server = ThreadedEchoServer(context=server_context)
with server:
with client_context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))

@requires_tls_version('TLSv1_3')
def test_psk_tls1_3(self):
psk = bytes.fromhex('deadbeef')
identity_hint = 'identity-hint'
client_identity = 'client-identity'

def client_callback(hint):
# identity_hint is not sent to the client in TLS 1.3
self.assertIsNone(hint)
return client_identity, psk

def server_callback(identity):
self.assertEqual(identity, client_identity)
return psk

client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
client_context.check_hostname = False
client_context.verify_mode = ssl.CERT_NONE
client_context.minimum_version = ssl.TLSVersion.TLSv1_3
client_context.set_ciphers('PSK')
client_context.set_psk_client_callback(client_callback)

server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
server_context.minimum_version = ssl.TLSVersion.TLSv1_3
server_context.set_ciphers('PSK')
server_context.set_psk_server_callback(server_callback, identity_hint)

server = ThreadedEchoServer(context=server_context)
with server:
with client_context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port))


@unittest.skipUnless(has_tls_version('TLSv1_3'), "Test needs TLS 1.3")
class TestPostHandshakeAuth(unittest.TestCase):
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,7 @@ Ajith Ramachandran
Dhushyanth Ramasamy
Ashwin Ramaswami
Jeff Ramnani
Grant Ramsay
Bayard Randel
Varpu Rantala
Brodie Rao
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for TLS-PSK (pre-shared key) mode to the :mod:`ssl` module.
Loading

0 comments on commit e954ac7

Please sign in to comment.