From a5f8db73ef547de9d83b3ad9de5459e0a238d94d Mon Sep 17 00:00:00 2001 From: Shalin Nijel <89510971+shalinnijel2@users.noreply.github.com> Date: Mon, 18 Dec 2023 09:08:11 +0300 Subject: [PATCH 1/2] Charged order of validation. Start with checking validity of available certificates. --- iso15118/secc/states/iso15118_2_states.py | 5 +-- iso15118/shared/security.py | 39 ++++++++++++----------- 2 files changed, 23 insertions(+), 21 deletions(-) diff --git a/iso15118/secc/states/iso15118_2_states.py b/iso15118/secc/states/iso15118_2_states.py index 9b74e48e..9781c7bd 100644 --- a/iso15118/secc/states/iso15118_2_states.py +++ b/iso15118/secc/states/iso15118_2_states.py @@ -956,11 +956,12 @@ async def process_message( root_cert_path = self._mobility_operator_root_cert_path() try: root_cert = load_cert(root_cert_path) - verify_certs(leaf_cert, sub_ca_certs, root_cert) except FileNotFoundError: - logger.warning(f"MO Root Cert cannot be found {root_cert_path}") + logger.warning(f"MO Root Cert not available.") root_cert = None + verify_certs(leaf_cert, sub_ca_certs, root_cert) + # Note that the eMAID format (14 or 15 characters) will be validated # by the definition of the eMAID type in # shared/messages/iso15118_2/datatypes.py diff --git a/iso15118/shared/security.py b/iso15118/shared/security.py index 63109867..d9925364 100644 --- a/iso15118/shared/security.py +++ b/iso15118/shared/security.py @@ -484,13 +484,13 @@ def verify_certs( ): """ Verifies a certificate chain according to the following criteria: - 1. Verify the signature of each certificate contained in the cert chain + 1. Check that the current date is within the time span provided by the + certificate's notBefore and notAfter attributes + 2. Verify the signature of each certificate contained in the cert chain (throws CertSignatureError if not) 1.a) Get the sub_ca_certs in order: leaf -> sub_ca_2 -> sub_ca_1 -> root (if two sub-CAs are in use, otherwise: leaf -> sub_ca_2 -> root) 2.b) Do the actual signature verification from leaf to root - 2. Check that the current date is within the time span provided by the - certificate's notBefore and notAfter attributes 3. Checks that none of the certificates has been revoked. Args: @@ -516,13 +516,27 @@ def verify_certs( leaf_cert = load_der_x509_certificate(leaf_cert_bytes) sub_ca2_cert = None sub_ca1_cert = None - root_ca_cert = load_der_x509_certificate(root_ca_cert_bytes) + root_ca_cert = None + if root_ca_cert_bytes: + root_ca_cert = load_der_x509_certificate(root_ca_cert_bytes) sub_ca_der_certs: List[Certificate] = [ load_der_x509_certificate(cert) for cert in sub_ca_certs_bytes ] - # Step 1.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2. + # Step 1: Check that each certificate is valid, i.e. the current time is + # between the notBefore and notAfter timestamps of the certificate + try: + certs_to_check: List[Certificate] = [leaf_cert] + if len(sub_ca_der_certs) != 0: + certs_to_check.extend(sub_ca_der_certs) + if root_ca_cert: + certs_to_check.append(root_ca_cert) + check_validity(certs_to_check) + except (CertNotYetValidError, CertExpiredError) as exc: + raise exc + + # Step 2.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2. # A sub-CA 2 certificate's profile has its PathLength extension # attribute set to 0, whereas a sub-CA 1 certificate's profile has # its PathLength extension attribute set to 0. @@ -577,7 +591,7 @@ def verify_certs( if (sub_ca2_cert or sub_ca1_cert) and private_environment: raise CertChainLengthError(allowed_num_sub_cas=0, num_sub_cas=1) - # Step 1.b: Now that we have established the right order of sub-CA + # Step 2.b: Now that we have established the right order of sub-CA # certificates we can start verifying the signatures from leaf # certificate to root CA certificate cert_to_check = leaf_cert @@ -688,19 +702,6 @@ def verify_certs( f"of certificate {cert_to_check.subject}" ) - # Step 2: Check that each certificate is valid, i.e. the current time is - # between the notBefore and notAfter timestamps of the certificate - try: - certs_to_check: List[Certificate] = [leaf_cert] - if sub_ca2_cert: - certs_to_check.append(sub_ca2_cert) - if sub_ca1_cert: - certs_to_check.append(sub_ca1_cert) - certs_to_check.append(root_ca_cert) - check_validity(certs_to_check) - except (CertNotYetValidError, CertExpiredError) as exc: - raise exc - # Step 3: Check the OCSP (Online Certificate Status Protocol) response to # see whether or not a certificate has been revoked # TODO As OCSP is not supported for the CharIN Testival Europe 2021, we'll From b82fda8d2896babd56550441cc63a08d0b1c5d4f Mon Sep 17 00:00:00 2001 From: Shalin Nijel <89510971+shalinnijel2@users.noreply.github.com> Date: Mon, 18 Dec 2023 11:33:12 +0300 Subject: [PATCH 2/2] Added tests. --- iso15118/secc/states/iso15118_2_states.py | 11 +- iso15118/shared/security.py | 3 + .../sample_certs/contractLeafCert_Expired.der | Bin 0 -> 581 bytes tests/iso15118_2/sample_certs/load_certs.py | 17 +++- .../secc/states/test_iso15118_2_states.py | 95 ++++++++++++++++++ tests/iso15118_2/secc/states/test_messages.py | 6 +- 6 files changed, 122 insertions(+), 10 deletions(-) create mode 100644 tests/iso15118_2/sample_certs/contractLeafCert_Expired.der diff --git a/iso15118/secc/states/iso15118_2_states.py b/iso15118/secc/states/iso15118_2_states.py index 9781c7bd..61ebe74b 100644 --- a/iso15118/secc/states/iso15118_2_states.py +++ b/iso15118/secc/states/iso15118_2_states.py @@ -918,7 +918,7 @@ class PaymentDetails(StateSECC): def __init__(self, comm_session: SECCCommunicationSession): super().__init__(comm_session, Timeouts.V2G_SECC_SEQUENCE_TIMEOUT) - def _mobility_operator_root_cert_path(self) -> str: + def _mobility_operator_root_cert_path(self) -> Optional[str]: """Return the path to the MO root. Included to be patched in tests.""" return CertPath.MO_ROOT_DER @@ -955,9 +955,14 @@ async def process_message( # to the PKI that is used. root_cert_path = self._mobility_operator_root_cert_path() try: - root_cert = load_cert(root_cert_path) + if root_cert_path: + root_cert = load_cert(root_cert_path) + logger.info(f"Using MO root at {root_cert_path}") + else: + root_cert = None + logger.info("No suitable MO root found.") except FileNotFoundError: - logger.warning(f"MO Root Cert not available.") + logger.warning("MO Root Cert not available.") root_cert = None verify_certs(leaf_cert, sub_ca_certs, root_cert) diff --git a/iso15118/shared/security.py b/iso15118/shared/security.py index d9925364..e0b4df6c 100644 --- a/iso15118/shared/security.py +++ b/iso15118/shared/security.py @@ -536,6 +536,9 @@ def verify_certs( except (CertNotYetValidError, CertExpiredError) as exc: raise exc + if not root_ca_cert: + logger.info("Can't validate the chain as MO root is not present.") + return None # Step 2.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2. # A sub-CA 2 certificate's profile has its PathLength extension # attribute set to 0, whereas a sub-CA 1 certificate's profile has diff --git a/tests/iso15118_2/sample_certs/contractLeafCert_Expired.der b/tests/iso15118_2/sample_certs/contractLeafCert_Expired.der new file mode 100644 index 0000000000000000000000000000000000000000..bd9277ce2380fdb48b41bcd8812e3df81fa6d137 GIT binary patch literal 581 zcmXqLVsbQSVtlcHnTe5!iOImlfQyYotIgw_EekV~L71VEfdU(IC<`->WPrD)u4_d} zymL@Uysv+JaHx|}e3+w;r;8!K0WU}m7Y|!-d1gs+h9S2BCrE%zm?_lTP{=@ljf-=V zTIb|XIw_2jVobjN26E!OM#hFlh86~3a(8$=t z)Xdz{(2)$=A`OJu*uj2cVuX5*nUS5@iGhVhg~Ktq_hGF~zGd2nu=)m*FuPfImL)t} zUT)&ZTKad_d(NiN6HC@e{@sxmw_@d)8`@I)+E`vS#=6Bg9yK~$n6kKWy+Pv|10JAL zWrZ0T|FbX|Fc|QGc>Ew9FbJ7n7|4S7d@N!tBCcM)Z6}{Td1s>eTpxRP4MiE{w_gl$ z**LV>7+G1_85vm;4dM+{V0;6{Hi?Xql7eC@ef{$Ca=p}w#N2|MRK4W6H8KcogMX(Q;U$Jm)V`cz=cU6X`bOm!*yGJY2A|EtK`;xJXX<7 zY)jV--00!#|5A(NC@-oN)+_8@w>XymHHwTkQbuX1fyQZ?CgY)@Ly BqFev~ literal 0 HcmV?d00001 diff --git a/tests/iso15118_2/sample_certs/load_certs.py b/tests/iso15118_2/sample_certs/load_certs.py index 96b17bb8..e3ab57d2 100644 --- a/tests/iso15118_2/sample_certs/load_certs.py +++ b/tests/iso15118_2/sample_certs/load_certs.py @@ -1,4 +1,5 @@ from pathlib import Path +from typing import Optional from iso15118.shared.messages.iso15118_2.datatypes import ( Certificate, @@ -7,7 +8,7 @@ ) -def load_certificate_chain() -> CertificateChain: +def load_certificate_chain(contract: Optional[str] = None) -> CertificateChain: """Load the sample certificate chain. This includes the contract, sub-CA 2, and sub-CA 1 certificates. @@ -16,7 +17,7 @@ def load_certificate_chain() -> CertificateChain: The sample certificate chain, with certificates in DER (binary) form. """ return CertificateChain( - certificate=load_contract_certificate(), + certificate=load_contract_certificate(contract), sub_certificates=SubCertificates( certificates=[load_sub_ca_2_certificate(), load_sub_ca_1_certificate()], ), @@ -80,14 +81,20 @@ def load_sub_ca_2_certificate() -> Certificate: return sub_ca_2_certificate -def load_contract_certificate() -> Certificate: +def load_contract_certificate(contract_path: Optional[str] = None) -> Certificate: """Load the sample contract certificate. Returns: The sample contract certificate in DER (binary) form. """ - cert_dir = get_cert_dir() - with open(cert_dir / "contractLeafCert.der", "rb") as root_file: + + if contract_path is None: + cert_dir = get_cert_dir() + path = f"{cert_dir}/contractLeafCert.der" + else: + path = contract_path + + with open(path, "rb") as root_file: contract_certificate = root_file.read() return contract_certificate diff --git a/tests/iso15118_2/secc/states/test_iso15118_2_states.py b/tests/iso15118_2/secc/states/test_iso15118_2_states.py index 4feaa870..e87da545 100644 --- a/tests/iso15118_2/secc/states/test_iso15118_2_states.py +++ b/tests/iso15118_2/secc/states/test_iso15118_2_states.py @@ -213,6 +213,101 @@ async def test_payment_details_next_state_on_payment_details_req_auth( AuthorizationTokenType.EMAID ) + @patch( + "iso15118.secc.states.iso15118_2_states.PaymentDetails._mobility_operator_root_cert_path" # noqa + ) + @pytest.mark.parametrize( + "mo_root, " + "contract_cert, " + "is_authorized_return_value, " + "expected_response_code, " + "expected_next_state", + [ + ( + Path(__file__).parent.parent.parent + / "sample_certs" + / "moRootCACert.der", + Path(__file__).parent.parent.parent + / "sample_certs" + / "contractLeafCert.der", + AuthorizationResponse(AuthorizationStatus.ACCEPTED, ResponseCode.OK), + ResponseCode.OK, + Authorization, + ), + ( + None, + Path(__file__).parent.parent.parent + / "sample_certs" + / "contractLeafCert.der", + AuthorizationResponse(AuthorizationStatus.ACCEPTED, ResponseCode.OK), + ResponseCode.OK, + Authorization, + ), + ( + Path(__file__).parent.parent.parent + / "sample_certs" + / "moRootCACert.der", + Path(__file__).parent.parent.parent + / "sample_certs" + / "contractLeafCert_Expired.der", + None, + ResponseCode.FAILED_CERTIFICATE_EXPIRED, + Terminate, + ), + ( + None, + Path(__file__).parent.parent.parent + / "sample_certs" + / "contractLeafCert_Expired.der", + None, + ResponseCode.FAILED_CERTIFICATE_EXPIRED, + Terminate, + ), + ( + None, + Path(__file__).parent.parent.parent + / "sample_certs" + / "contractLeafCert.der", + AuthorizationResponse( + AuthorizationStatus.REJECTED, + ResponseCode.FAILED_CERTIFICATE_REVOKED, + ), + ResponseCode.FAILED_CERTIFICATE_REVOKED, + Terminate, + ), + ], + ) + async def test_payment_details_next_state_on_payment_details_contract_variants( + self, + mock_mo_root_path, + mo_root, + contract_cert, + is_authorized_return_value, + expected_response_code, + expected_next_state: StateSECC, + ): + self.comm_session.selected_auth_option = AuthEnum.PNC_V2 + mock_mo_root_path.return_value = mo_root + mock_is_authorized = AsyncMock(return_value=is_authorized_return_value) + self.comm_session.evse_controller.is_authorized = mock_is_authorized + payment_details = PaymentDetails(self.comm_session) + payment_details_req = get_dummy_v2g_message_payment_details_req(contract_cert) + await payment_details.process_message(payment_details_req) + + assert ( + payment_details.next_state == expected_next_state + ), "State did not progress after PaymentDetailsReq" + + if isinstance(payment_details.message, V2GMessageV2): + assert ( + payment_details.message.body.payment_details_res.response_code + == expected_response_code + ) + # if is_authorized_return_value is not None: + # mock_is_authorized.assert_called_once() + # else: + # mock_is_authorized.assert_not_called() + @pytest.mark.parametrize( "auth_type, is_authorized_return_value, expected_next_state," "expected_response_code, expected_evse_processing, is_ready_to_charge", diff --git a/tests/iso15118_2/secc/states/test_messages.py b/tests/iso15118_2/secc/states/test_messages.py index 1720b15c..e179cc32 100644 --- a/tests/iso15118_2/secc/states/test_messages.py +++ b/tests/iso15118_2/secc/states/test_messages.py @@ -140,10 +140,12 @@ def get_dummy_v2g_message_authorization_req( ) -def get_dummy_v2g_message_payment_details_req() -> V2GMessage: +def get_dummy_v2g_message_payment_details_req( + contract: Optional[str] = None, +) -> V2GMessage: payment_details_req = PaymentDetailsReq( emaid="1234567890abcd", - cert_chain=load_certificate_chain(), + cert_chain=load_certificate_chain(contract), ) return V2GMessage( header=MessageHeader(session_id=MOCK_SESSION_ID),