From e70969f32257a9ee4002f5c33a91a6d2b34b18dc Mon Sep 17 00:00:00 2001 From: Ron Frederick Date: Sat, 24 Jun 2023 21:08:24 -0700 Subject: [PATCH] Add an option to disable expensive OpenSSL 3.x RSA private key checks This commit adds a new unsafe_skip_rsa_key_validation argument to import_private_key(), import_private_key_and_certs(), read_private_key(), read_private_key_and_certs(), read_private_key_list(), and load_keypairs() which can be used to disable somewhat expensive RSA private key validation code in OpenSSL 3.x, reducing the cost back to what it was in earlier OpenSSL versions. Skipping these checks is only recommended when keys being loaded are from a trusted source. A new set_default_skip_rsa_key_validation() function was also added, to set a global default for whether or not to disable this extra key validation. --- asyncssh/__init__.py | 4 +- asyncssh/crypto/rsa.py | 6 +- asyncssh/public_key.py | 170 ++++++++++++++++++++++++++++------------- asyncssh/rsa.py | 49 +++++++++++- docs/api.rst | 5 ++ setup.py | 4 +- tests/util.py | 12 +-- 7 files changed, 181 insertions(+), 69 deletions(-) diff --git a/asyncssh/__init__.py b/asyncssh/__init__.py index 9afc8002..5b99444f 100644 --- a/asyncssh/__init__.py +++ b/asyncssh/__init__.py @@ -86,6 +86,8 @@ from .public_key import load_keypairs, load_public_keys, load_certificates from .public_key import load_resident_keys +from .rsa import set_default_skip_rsa_key_validation + from .scp import scp from .session import DataType, SSHClientSession, SSHServerSession @@ -164,5 +166,5 @@ 'read_certificate_list', 'read_known_hosts', 'read_private_key', 'read_private_key_list', 'read_public_key', 'read_public_key_list', 'run_client', 'run_server', 'scp', 'set_debug_level', 'set_log_level', - 'set_sftp_log_level', + 'set_sftp_log_level', 'set_default_skip_rsa_key_validation', ] diff --git a/asyncssh/crypto/rsa.py b/asyncssh/crypto/rsa.py index a62f43ea..66a2774c 100644 --- a/asyncssh/crypto/rsa.py +++ b/asyncssh/crypto/rsa.py @@ -98,12 +98,14 @@ class RSAPrivateKey(_RSAKey): @classmethod def construct(cls, n: int, e: int, d: int, p: int, q: int, - dmp1: int, dmq1: int, iqmp: int) -> 'RSAPrivateKey': + dmp1: int, dmq1: int, iqmp: int, + skip_validation: bool) -> 'RSAPrivateKey': """Construct an RSA private key""" pub = rsa.RSAPublicNumbers(e, n) priv = rsa.RSAPrivateNumbers(p, q, d, dmp1, dmq1, iqmp, pub) - priv_key = priv.private_key() + priv_key = priv.private_key( + unsafe_skip_rsa_key_validation=skip_validation) return cls(priv_key, pub, priv) diff --git a/asyncssh/public_key.py b/asyncssh/public_key.py index f7fa4dc8..10b8f3ee 100644 --- a/asyncssh/public_key.py +++ b/asyncssh/public_key.py @@ -262,13 +262,13 @@ def generate(cls, algorithm: bytes, **kwargs) -> 'SSHKey': @classmethod def make_private(cls, key_params: object) -> 'SSHKey': - """Construct an RSA private key""" + """Construct a private key""" raise NotImplementedError @classmethod def make_public(cls, key_params: object) -> 'SSHKey': - """Construct an RSA public key""" + """Construct a public key""" raise NotImplementedError @@ -2402,7 +2402,9 @@ def _match_next(data: bytes, keytype: bytes, public: bool = False) -> \ return None, (), len(data) -def _decode_pkcs1_private(pem_name: bytes, key_data: object) -> SSHKey: +def _decode_pkcs1_private( + pem_name: bytes, key_data: object, + unsafe_skip_rsa_key_validation: Optional[bool]) -> SSHKey: """Decode a PKCS#1 format private key""" handler = _pem_map.get(pem_name) @@ -2415,6 +2417,10 @@ def _decode_pkcs1_private(pem_name: bytes, key_data: object) -> SSHKey: raise KeyImportError('Invalid %s private key' % pem_name.decode('ascii')) + if pem_name == b'RSA': + key_params = cast(Tuple, key_params) + \ + (unsafe_skip_rsa_key_validation,) + return handler.make_private(key_params) @@ -2434,7 +2440,9 @@ def _decode_pkcs1_public(pem_name: bytes, key_data: object) -> SSHKey: return handler.make_public(key_params) -def _decode_pkcs8_private(key_data: object) -> SSHKey: +def _decode_pkcs8_private( + key_data: object, + unsafe_skip_rsa_key_validation: Optional[bool]) -> SSHKey: """Decode a PKCS#8 format private key""" if (isinstance(key_data, tuple) and len(key_data) >= 3 and @@ -2455,6 +2463,10 @@ def _decode_pkcs8_private(key_data: object) -> SSHKey: handler.pem_name.decode('ascii') if handler.pem_name else 'PKCS#8') + if alg == ObjectIdentifier('1.2.840.113549.1.1.1'): + key_params = cast(Tuple, key_params) + \ + (unsafe_skip_rsa_key_validation,) + return handler.make_private(key_params) else: raise KeyImportError('Invalid PKCS#8 private key') @@ -2486,8 +2498,9 @@ def _decode_pkcs8_public(key_data: object) -> SSHKey: raise KeyImportError('Invalid PKCS#8 public key') -def _decode_openssh_private(data: bytes, - passphrase: Optional[BytesOrStr]) -> SSHKey: +def _decode_openssh_private( + data: bytes, passphrase: Optional[BytesOrStr], + unsafe_skip_rsa_key_validation: Optional[bool]) -> SSHKey: """Decode an OpenSSH format private key""" try: @@ -2578,6 +2591,10 @@ def _decode_openssh_private(data: bytes, if len(pad) >= block_size or pad != bytes(range(1, len(pad) + 1)): raise KeyImportError('Invalid OpenSSH private key') + if alg == b'ssh-rsa': + key_params = cast(Tuple, key_params) + \ + (unsafe_skip_rsa_key_validation,) + key = handler.make_private(key_params) key.set_comment(comment) return key @@ -2609,8 +2626,9 @@ def _decode_openssh_public(data: bytes) -> SSHKey: raise KeyImportError('Invalid OpenSSH private key') from None -def _decode_der_private(key_data: object, - passphrase: Optional[BytesOrStr]) -> SSHKey: +def _decode_der_private( + key_data: object, passphrase: Optional[BytesOrStr], + unsafe_skip_rsa_key_validation: Optional[bool]) -> SSHKey: """Decode a DER format private key""" # First, if there's a passphrase, try to decrypt PKCS#8 @@ -2623,7 +2641,7 @@ def _decode_der_private(key_data: object, # Then, try to decode PKCS#8 try: - return _decode_pkcs8_private(key_data) + return _decode_pkcs8_private(key_data, unsafe_skip_rsa_key_validation) except KeyImportError: # PKCS#8 failed - try PKCS#1 instead pass @@ -2631,7 +2649,8 @@ def _decode_der_private(key_data: object, # If that fails, try each of the possible PKCS#1 encodings for pem_name in _pem_map: try: - return _decode_pkcs1_private(pem_name, key_data) + return _decode_pkcs1_private(pem_name, key_data, + unsafe_skip_rsa_key_validation) except KeyImportError: # Try the next PKCS#1 encoding pass @@ -2667,13 +2686,15 @@ def _decode_der_certificate(data: bytes, return SSHX509Certificate.construct_from_der(data, comment) -def _decode_pem_private(pem_name: bytes, headers: Mapping[bytes, bytes], - data: bytes, passphrase: Optional[BytesOrStr]) -> \ - SSHKey: +def _decode_pem_private( + pem_name: bytes, headers: Mapping[bytes, bytes], + data: bytes, passphrase: Optional[BytesOrStr], + unsafe_skip_rsa_key_validation: Optional[bool]) -> SSHKey: """Decode a PEM format private key""" if pem_name == b'OPENSSH': - return _decode_openssh_private(data, passphrase) + return _decode_openssh_private(data, passphrase, + unsafe_skip_rsa_key_validation) if headers.get(b'Proc-Type') == b'4,ENCRYPTED': if passphrase is None: @@ -2715,9 +2736,10 @@ def _decode_pem_private(pem_name: bytes, headers: Mapping[bytes, bytes], 'private key') from None if pem_name: - return _decode_pkcs1_private(pem_name, key_data) + return _decode_pkcs1_private(pem_name, key_data, + unsafe_skip_rsa_key_validation) else: - return _decode_pkcs8_private(key_data) + return _decode_pkcs8_private(key_data, unsafe_skip_rsa_key_validation) def _decode_pem_public(pem_name: bytes, data: bytes) -> SSHKey: @@ -2750,8 +2772,10 @@ def _decode_pem_certificate(pem_name: bytes, data: bytes) -> SSHCertificate: return SSHX509Certificate.construct_from_der(data) -def _decode_private(data: bytes, passphrase: Optional[BytesOrStr]) -> \ - Tuple[Optional[SSHKey], Optional[int]]: +def _decode_private( + data: bytes, passphrase: Optional[BytesOrStr], + unsafe_skip_rsa_key_validation: Optional[bool]) -> \ + Tuple[Optional[SSHKey], Optional[int]]: """Decode a private key""" fmt, key_info, end = _match_next(data, b'PRIVATE KEY') @@ -2759,10 +2783,12 @@ def _decode_private(data: bytes, passphrase: Optional[BytesOrStr]) -> \ key: Optional[SSHKey] if fmt == 'der': - key = _decode_der_private(key_info[0], passphrase) + key = _decode_der_private(key_info[0], passphrase, + unsafe_skip_rsa_key_validation) elif fmt == 'pem': pem_name, headers, data = key_info - key = _decode_pem_private(pem_name, headers, data, passphrase) + key = _decode_pem_private(pem_name, headers, data, passphrase, + unsafe_skip_rsa_key_validation) else: key = None @@ -2799,7 +2825,7 @@ def _decode_public(data: bytes) -> Tuple[Optional[SSHKey], Optional[int]]: if fmt == 'pem' and key_info[0] == b'OPENSSH': key = _decode_openssh_public(key_info[2]) else: - key, _ = _decode_private(data, None) + key, _ = _decode_private(data, None, False) if key: key = key.convert_to_public() @@ -2836,14 +2862,16 @@ def _decode_certificate(data: bytes) -> \ return cert, end -def _decode_private_list(data: bytes, passphrase: Optional[BytesOrStr]) -> \ - Sequence[SSHKey]: +def _decode_private_list( + data: bytes, passphrase: Optional[BytesOrStr], + unsafe_skip_rsa_key_validation: Optional[bool]) -> Sequence[SSHKey]: """Decode a private key list""" keys: List[SSHKey] = [] while data: - key, end = _decode_private(data, passphrase) + key, end = _decode_private(data, passphrase, + unsafe_skip_rsa_key_validation) if key: keys.append(key) @@ -3122,8 +3150,9 @@ def generate_private_key(alg_name: str, comment: _Comment = None, key.set_comment(comment) return key -def import_private_key(data: BytesOrStr, - passphrase: Optional[BytesOrStr] = None) -> SSHKey: +def import_private_key( + data: BytesOrStr, passphrase: Optional[BytesOrStr] = None, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> SSHKey: """Import a private key This function imports a private key encoded in PKCS#1 or PKCS#8 DER @@ -3134,8 +3163,13 @@ def import_private_key(data: BytesOrStr, The data to import. :param passphrase: (optional) The passphrase to use to decrypt the key. + :param unsafe_skip_rsa_key_validation: (optional) + Whether or not to skip key validation when loading RSA private + keys, defaulting to performing these checks unless changed by + calling :func:`set_default_skip_rsa_key_validation`. :type data: `bytes` or ASCII `str` :type passphrase: `str` or `bytes` + :type unsafe_skip_rsa_key_validation: bool :returns: An :class:`SSHKey` private key @@ -3147,7 +3181,7 @@ def import_private_key(data: BytesOrStr, except UnicodeEncodeError: raise KeyImportError('Invalid encoding for key') from None - key, _ = _decode_private(data, passphrase) + key, _ = _decode_private(data, passphrase, unsafe_skip_rsa_key_validation) if key: return key @@ -3155,12 +3189,14 @@ def import_private_key(data: BytesOrStr, raise KeyImportError('Invalid private key') -def import_private_key_and_certs(data: bytes, - passphrase: Optional[BytesOrStr] = None) -> \ - Tuple[SSHKey, Optional[SSHX509CertificateChain]]: +def import_private_key_and_certs( + data: bytes, passphrase: Optional[BytesOrStr] = None, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> \ + Tuple[SSHKey, Optional[SSHX509CertificateChain]]: """Import a private key and optional certificate chain""" - key, end = _decode_private(data, passphrase) + key, end = _decode_private(data, passphrase, + unsafe_skip_rsa_key_validation) if key: return key, import_certificate_chain(data[end:]) @@ -3256,8 +3292,9 @@ def import_certificate_subject(data: str) -> str: raise KeyImportError('Invalid certificate subject') -def read_private_key(filename: FilePath, - passphrase: Optional[BytesOrStr] = None) -> SSHKey: +def read_private_key( + filename: FilePath, passphrase: Optional[BytesOrStr] = None, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> SSHKey: """Read a private key from a file This function reads a private key from a file. See the function @@ -3268,26 +3305,34 @@ def read_private_key(filename: FilePath, The file to read the key from. :param passphrase: (optional) The passphrase to use to decrypt the key. + :param unsafe_skip_rsa_key_validation: (optional) + Whether or not to skip key validation when loading RSA private + keys, defaulting to performing these checks unless changed by + calling :func:`set_default_skip_rsa_key_validation`. :type filename: :class:`PurePath ` or `str` :type passphrase: `str` or `bytes` + :type unsafe_skip_rsa_key_validation: bool :returns: An :class:`SSHKey` private key """ - key = import_private_key(read_file(filename), passphrase) + key = import_private_key(read_file(filename), passphrase, + unsafe_skip_rsa_key_validation) key.set_filename(filename) return key -def read_private_key_and_certs(filename: FilePath, - passphrase: Optional[BytesOrStr] = None) -> \ - Tuple[SSHKey, Optional[SSHX509CertificateChain]]: +def read_private_key_and_certs( + filename: FilePath, passphrase: Optional[BytesOrStr] = None, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> \ + Tuple[SSHKey, Optional[SSHX509CertificateChain]]: """Read a private key and optional certificate chain from a file""" - key, cert = import_private_key_and_certs(read_file(filename), passphrase) + key, cert = import_private_key_and_certs(read_file(filename), passphrase, + unsafe_skip_rsa_key_validation) key.set_filename(filename) @@ -3334,9 +3379,10 @@ def read_certificate(filename: FilePath) -> SSHCertificate: return import_certificate(read_file(filename)) -def read_private_key_list(filename: FilePath, - passphrase: Optional[BytesOrStr] = None) -> \ - Sequence[SSHKey]: +def read_private_key_list( + filename: FilePath, passphrase: Optional[BytesOrStr] = None, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> \ + Sequence[SSHKey]: """Read a list of private keys from a file This function reads a list of private keys from a file. See the @@ -3348,14 +3394,20 @@ def read_private_key_list(filename: FilePath, The file to read the keys from. :param passphrase: (optional) The passphrase to use to decrypt the keys. + :param unsafe_skip_rsa_key_validation: (optional) + Whether or not to skip key validation when loading RSA private + keys, defaulting to performing these checks unless changed by + calling :func:`set_default_skip_rsa_key_validation`. :type filename: :class:`PurePath ` or `str` :type passphrase: `str` or `bytes` + :type unsafe_skip_rsa_key_validation: bool :returns: A list of :class:`SSHKey` private keys """ - keys = _decode_private_list(read_file(filename), passphrase) + keys = _decode_private_list(read_file(filename), passphrase, + unsafe_skip_rsa_key_validation) for key in keys: key.set_filename(filename) @@ -3404,10 +3456,12 @@ def read_certificate_list(filename: FilePath) -> Sequence[SSHCertificate]: return _decode_certificate_list(read_file(filename)) -def load_keypairs(keylist: KeyPairListArg, - passphrase: Optional[BytesOrStr] = None, - certlist: CertListArg = (), skip_public: bool = False, - ignore_encrypted: bool = False) -> Sequence[SSHKeyPair]: +def load_keypairs( + keylist: KeyPairListArg, passphrase: Optional[BytesOrStr] = None, + certlist: CertListArg = (), skip_public: bool = False, + ignore_encrypted: bool = False, + unsafe_skip_rsa_key_validation: Optional[bool] = None) -> \ + Sequence[SSHKeyPair]: """Load SSH private keys and optional matching certificates This function loads a list of SSH keys and optional matching @@ -3427,10 +3481,15 @@ def load_keypairs(keylist: KeyPairListArg, An internal parameter used to skip public keys and certificates when IdentitiesOnly and IdentityFile are used to specify a mixture of private and public keys. + :param unsafe_skip_rsa_key_validation: (optional) + Whether or not to skip key validation when loading RSA private + keys, defaulting to performing these checks unless changed by + calling :func:`set_default_skip_rsa_key_validation`. :type keylist: *see* :ref:`SpecifyingPrivateKeys` :type passphrase: `str` or `bytes` :type certlist: *see* :ref:`SpecifyingCertificates` :type skip_public: `bool` + :type unsafe_skip_rsa_key_validation: bool :returns: A list of :class:`SSHKeyPair` objects @@ -3444,7 +3503,8 @@ def load_keypairs(keylist: KeyPairListArg, if isinstance(keylist, (PurePath, str)): try: - priv_keys = read_private_key_list(keylist, passphrase) + priv_keys = read_private_key_list(keylist, passphrase, + unsafe_skip_rsa_key_validation) keys_to_load = [keylist] if len(priv_keys) <= 1 else priv_keys except KeyImportError: keys_to_load = [keylist] @@ -3472,21 +3532,25 @@ def load_keypairs(keylist: KeyPairListArg, key_prefix = str(key_to_load) if allow_certs: - key, certs_to_load = \ - read_private_key_and_certs(key_to_load, passphrase) + key, certs_to_load = read_private_key_and_certs( + key_to_load, passphrase, + unsafe_skip_rsa_key_validation) if not certs_to_load: certs_to_load = key_prefix + '-cert.pub' else: - key = read_private_key(key_to_load, passphrase) + key = read_private_key(key_to_load, passphrase, + unsafe_skip_rsa_key_validation) pubkey_to_load = key_prefix + '.pub' elif isinstance(key_to_load, bytes): if allow_certs: - key, certs_to_load = \ - import_private_key_and_certs(key_to_load, passphrase) + key, certs_to_load = import_private_key_and_certs( + key_to_load, passphrase, + unsafe_skip_rsa_key_validation) else: - key = import_private_key(key_to_load, passphrase) + key = import_private_key(key_to_load, passphrase, + unsafe_skip_rsa_key_validation) else: key = key_to_load except KeyImportError as exc: diff --git a/asyncssh/rsa.py b/asyncssh/rsa.py index 8a0df76e..c1b67c49 100644 --- a/asyncssh/rsa.py +++ b/asyncssh/rsa.py @@ -43,9 +43,49 @@ _PrivateKeyArgs = Tuple[int, int, int, int, int, int, int, int] +_PrivateKeyConstructArgs = Tuple[int, int, int, int, int, int, int, int, bool] _PublicKeyArgs = Tuple[int, int] +_default_skip_rsa_key_validation = False + + +def set_default_skip_rsa_key_validation(skip_validation: bool) -> None: + """Set whether to disable RSA key validation in OpenSSL + + OpenSSL 3.x does additional validation when loading RSA keys + as an added security measure. However, the result is that + loading a key can take significantly longer than it did before. + + If all your RSA keys are coming from a trusted source, you can + call this function with a value of `True` to default to skipping + these checks on RSA keys, reducing the cost back down to what it + was in earlier releases. + + This can also be set on a case by case basis by using the new + `unsafe_skip_rsa_key_validation` argument on the functions used + to load keys. This will only affect loading keys of type RSA. + + .. note:: The extra cost only applies to loading existing keys, and + not to generating new keys. Also, in cases where a key is + used repeatedly, it can be loaded once into an `SSHKey` + object and reused without having to pay the cost each time. + So, this call should not be needed in most applications. + + If an application does need this, it is strongly + recommended that the `unsafe_skip_rsa_key_validation` + argument be used rather than using this function to + change the default behavior for all load operations. + + """ + + # pylint: disable=global-statement + + global _default_skip_rsa_key_validation + + _default_skip_rsa_key_validation = skip_validation + + class RSAKey(SSHKey): """Handler for RSA public key encryption""" @@ -94,9 +134,14 @@ def generate(cls, algorithm: bytes, *, # type: ignore def make_private(cls, key_params: object) -> SSHKey: """Construct an RSA private key""" - n, e, d, p, q, dmp1, dmq1, iqmp = cast(_PrivateKeyArgs, key_params) + n, e, d, p, q, dmp1, dmq1, iqmp, unsafe_skip_rsa_key_validation = \ + cast(_PrivateKeyConstructArgs, key_params) + + if unsafe_skip_rsa_key_validation is None: + unsafe_skip_rsa_key_validation = _default_skip_rsa_key_validation - return cls(RSAPrivateKey.construct(n, e, d, p, q, dmp1, dmq1, iqmp)) + return cls(RSAPrivateKey.construct(n, e, d, p, q, dmp1, dmq1, iqmp, + unsafe_skip_rsa_key_validation)) @classmethod def make_public(cls, key_params: object) -> SSHKey: diff --git a/docs/api.rst b/docs/api.rst index 348d0099..cfa96d03 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1617,6 +1617,11 @@ load_resident_keys .. index:: SSH agent support +set_default_skip_rsa_key_validation +----------------------------------- + +.. autofunction:: set_default_skip_rsa_key_validation + SSH Agent Support ================= diff --git a/setup.py b/setup.py index b9df7969..48f9a552 100755 --- a/setup.py +++ b/setup.py @@ -57,14 +57,14 @@ long_description = long_description, platforms = 'Any', python_requires = '>= 3.6', - install_requires = ['cryptography >= 3.1', 'typing_extensions >= 3.6'], + install_requires = ['cryptography >= 39.0', 'typing_extensions >= 3.6'], extras_require = { 'bcrypt': ['bcrypt >= 3.1.3'], 'fido2': ['fido2 >= 0.9.2'], 'gssapi': ['gssapi >= 1.2.0'], 'libnacl': ['libnacl >= 1.4.2'], 'pkcs11': ['python-pkcs11 >= 0.7.0'], - 'pyOpenSSL': ['pyOpenSSL >= 17.0.0'], + 'pyOpenSSL': ['pyOpenSSL >= 23.0.0'], 'pywin32': ['pywin32 >= 227'] }, packages = ['asyncssh', 'asyncssh.crypto'], diff --git a/tests/util.py b/tests/util.py index a869944c..6600a5bf 100644 --- a/tests/util.py +++ b/tests/util.py @@ -32,8 +32,7 @@ from unittest.mock import patch -from cryptography.hazmat.backends.openssl import backend - +from asyncssh import set_default_skip_rsa_key_validation from asyncssh.gss import gss_available from asyncssh.logging import logger from asyncssh.misc import ConnectionLost, SignalReceived @@ -77,15 +76,10 @@ # pylint: enable=no-member -# pylint: disable=protected-access - -# Disable RSA key blinding to speed up unit tests -backend._rsa_skip_check_key = True - -# pylint: enable=protected-access - _test_keys = {} +set_default_skip_rsa_key_validation(True) + def asynctest(coro): """Decorator for async tests, for use with AsyncTestCase"""