From 50d76731ce077d05bf65968a35d46ab78d133a85 Mon Sep 17 00:00:00 2001 From: FWinterborn <39853478+FWinterborn@users.noreply.github.com> Date: Thu, 3 Oct 2019 09:18:06 +0100 Subject: [PATCH 1/2] Added DH/ECDH info to output --- .../plugins/openssl_cipher_suites_plugin.py | 42 ++++++++++++++++++- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/sslyze/plugins/openssl_cipher_suites_plugin.py b/sslyze/plugins/openssl_cipher_suites_plugin.py index b507c78e..1099bccd 100644 --- a/sslyze/plugins/openssl_cipher_suites_plugin.py +++ b/sslyze/plugins/openssl_cipher_suites_plugin.py @@ -398,10 +398,12 @@ def __init__( ssl_version: OpenSslVersionEnum, key_size: Optional[int], # TODO(AD): Make it non-optional again by fixing client certificate handling post_handshake_response: Optional[str] = None, + dh_info: Optional[dict] = None, ) -> None: super().__init__(openssl_name, ssl_version) self.key_size = key_size self.post_handshake_response = post_handshake_response + self.dh_info = dh_info @classmethod def from_ongoing_ssl_connection( @@ -410,7 +412,13 @@ def from_ongoing_ssl_connection( keysize = ssl_connection.ssl_client.get_current_cipher_bits() picked_cipher_name = ssl_connection.ssl_client.get_current_cipher_name() status_msg = ssl_connection.post_handshake_check() - return AcceptedCipherSuite(picked_cipher_name, ssl_version, keysize, status_msg) + try: + dh_info = ssl_connection.ssl_client.get_dh_info() + if len(dh_info) == 0: + dh_info = None + except TypeError: + dh_info = None + return AcceptedCipherSuite(picked_cipher_name, ssl_version, keysize, status_msg, dh_info) class RejectedCipherSuite(CipherSuite): @@ -548,6 +556,29 @@ def _format_accepted_cipher_xml(cipher: AcceptedCipherSuite) -> Element: cipher_attributes["connectionStatus"] = cipher.post_handshake_response cipher_xml = Element("cipherSuite", attrib=cipher_attributes) + + if cipher.dh_info is not None: + attribs = { + "type": cipher.dh_info["type"], + "size": str(cipher.dh_info["size"]), + "public_key": str(cipher.dh_info["public_key"]), + } + + if cipher.dh_info["type"] == "DH": + attribs["generator"] = str(cipher.dh_info["generator"]) + attribs["prime"] = str(cipher.dh_info["prime"]) + elif cipher.dh_info["type"] == "ECDH": + curve = cipher.dh_info["curve"].lower() + attribs["curve"] = curve + + if curve != "x25519" and curve != "x448": + attribs["nist_curve"] = cipher.dh_info["nist_curve"] + attribs["x"] = cipher.dh_info["x"] + attribs["y"] = cipher.dh_info["y"] + + key_exchange_xml = Element("keyExchange", attrib=attribs) + cipher_xml.append(key_exchange_xml) + return cipher_xml ACCEPTED_CIPHER_LINE_FORMAT = " {cipher_name:<50}{dh_size:<15}{key_size:<10} {status:<60}" @@ -631,9 +662,16 @@ def _format_accepted_cipher_txt(self, cipher: AcceptedCipherSuite) -> str: # Always display ANON as the key size for anonymous ciphers to make it visible keysize_str = "ANONYMOUS" + dh_size = "" + if cipher.dh_info is not None: + if cipher.dh_info["type"] == "DH": + dh_size = "DH-{} bits".format(cipher.dh_info["size"]) + else: + dh_size = "ECDH-{} bits".format(cipher.dh_info["size"]) + cipher_line_txt = self.ACCEPTED_CIPHER_LINE_FORMAT.format( cipher_name=cipher.name, - dh_size="", + dh_size=dh_size, key_size=keysize_str, status=cipher.post_handshake_response if cipher.post_handshake_response is not None else "", ) From 7358b471a2ab66dbbccf1ffe84cf4af24ee93a63 Mon Sep 17 00:00:00 2001 From: Fraser Winterborn Date: Wed, 16 Oct 2019 09:45:13 +0100 Subject: [PATCH 2/2] Support returning TempKeyInfo object from nassl for DH info --- sslyze/cli/json_output.py | 4 +++ .../plugins/openssl_cipher_suites_plugin.py | 36 ++++--------------- 2 files changed, 11 insertions(+), 29 deletions(-) diff --git a/sslyze/cli/json_output.py b/sslyze/cli/json_output.py index 10fd294f..2fec3eec 100644 --- a/sslyze/cli/json_output.py +++ b/sslyze/cli/json_output.py @@ -15,6 +15,7 @@ from sslyze.plugins.utils.certificate_utils import CertificateUtils from sslyze.server_connectivity_info import ServerConnectivityInfo from sslyze.server_connectivity_tester import ServerConnectivityError +from nassl.temp_key_info import TempKeyInfo class JsonOutputGenerator(OutputGenerator): @@ -112,6 +113,9 @@ def default(self, obj: Any) -> Union[bool, int, float, str, Dict[str, Any]]: elif isinstance(obj, Path): result = str(obj) + elif isinstance(obj, TempKeyInfo): + return obj.as_dict() + elif isinstance(obj, object): # Some objects (like str) don't have a __dict__ if hasattr(obj, "__dict__"): diff --git a/sslyze/plugins/openssl_cipher_suites_plugin.py b/sslyze/plugins/openssl_cipher_suites_plugin.py index 1099bccd..59c2483d 100644 --- a/sslyze/plugins/openssl_cipher_suites_plugin.py +++ b/sslyze/plugins/openssl_cipher_suites_plugin.py @@ -4,6 +4,7 @@ from xml.etree.ElementTree import Element from nassl.ssl_client import OpenSslVersionEnum, ClientCertificateRequested +from nassl.temp_key_info import TempKeyInfo, OpenSslEvpPkeyEnum from sslyze.plugins.plugin_base import Plugin, PluginScanCommand from sslyze.plugins.plugin_base import PluginScanResult from sslyze.server_connectivity_info import ServerConnectivityInfo @@ -398,7 +399,7 @@ def __init__( ssl_version: OpenSslVersionEnum, key_size: Optional[int], # TODO(AD): Make it non-optional again by fixing client certificate handling post_handshake_response: Optional[str] = None, - dh_info: Optional[dict] = None, + dh_info: Optional[TempKeyInfo] = None, ) -> None: super().__init__(openssl_name, ssl_version) self.key_size = key_size @@ -412,12 +413,7 @@ def from_ongoing_ssl_connection( keysize = ssl_connection.ssl_client.get_current_cipher_bits() picked_cipher_name = ssl_connection.ssl_client.get_current_cipher_name() status_msg = ssl_connection.post_handshake_check() - try: - dh_info = ssl_connection.ssl_client.get_dh_info() - if len(dh_info) == 0: - dh_info = None - except TypeError: - dh_info = None + dh_info = ssl_connection.ssl_client.get_dh_info() return AcceptedCipherSuite(picked_cipher_name, ssl_version, keysize, status_msg, dh_info) @@ -558,25 +554,7 @@ def _format_accepted_cipher_xml(cipher: AcceptedCipherSuite) -> Element: cipher_xml = Element("cipherSuite", attrib=cipher_attributes) if cipher.dh_info is not None: - attribs = { - "type": cipher.dh_info["type"], - "size": str(cipher.dh_info["size"]), - "public_key": str(cipher.dh_info["public_key"]), - } - - if cipher.dh_info["type"] == "DH": - attribs["generator"] = str(cipher.dh_info["generator"]) - attribs["prime"] = str(cipher.dh_info["prime"]) - elif cipher.dh_info["type"] == "ECDH": - curve = cipher.dh_info["curve"].lower() - attribs["curve"] = curve - - if curve != "x25519" and curve != "x448": - attribs["nist_curve"] = cipher.dh_info["nist_curve"] - attribs["x"] = cipher.dh_info["x"] - attribs["y"] = cipher.dh_info["y"] - - key_exchange_xml = Element("keyExchange", attrib=attribs) + key_exchange_xml = Element("keyExchange", attrib=cipher.dh_info.as_dict()) cipher_xml.append(key_exchange_xml) return cipher_xml @@ -664,10 +642,10 @@ def _format_accepted_cipher_txt(self, cipher: AcceptedCipherSuite) -> str: dh_size = "" if cipher.dh_info is not None: - if cipher.dh_info["type"] == "DH": - dh_size = "DH-{} bits".format(cipher.dh_info["size"]) + if cipher.dh_info.key_type == OpenSslEvpPkeyEnum.DH: + dh_size = "DH-{} bits".format(cipher.dh_info.key_size) else: - dh_size = "ECDH-{} bits".format(cipher.dh_info["size"]) + dh_size = "ECDH-{} bits".format(cipher.dh_info.key_size) cipher_line_txt = self.ACCEPTED_CIPHER_LINE_FORMAT.format( cipher_name=cipher.name,