diff --git a/iso15118/secc/states/iso15118_2_states.py b/iso15118/secc/states/iso15118_2_states.py index 80da02296..7758b8a3f 100644 --- a/iso15118/secc/states/iso15118_2_states.py +++ b/iso15118/secc/states/iso15118_2_states.py @@ -835,32 +835,54 @@ def __init__(self, comm_session: SECCCommunicationSession): super().__init__(comm_session, Timeouts.V2G_SECC_SEQUENCE_TIMEOUT) def _all_certificates_from_chain( - self, certificate_chain: CertificateChain + self, certificate_chain: CertificateChain, root_cert: Certificate ) -> List[Certificate]: """Return all certificates from a certificate chain as a list. - The order should be: leaf certificate, sub-CA 2, sub-CA 1, - if all three are present. + The order should be: leaf certificate, sub-CA 2, sub-CA 1, root, + if all are present. """ - return [ + chain = [ certificate_chain.certificate ] + certificate_chain.sub_certificates.certificates + if root_cert is not None: + chain.append(root_cert) + return chain - def _get_contract_certificate_hash_data( + def _get_certificate_hash_data( self, certificate_chain: Optional[CertificateChain], + root_cert: Optional[Certificate], ) -> Optional[List[Dict[str, str]]]: """Return a list of hash data for a contract certificate chain.""" - if certificate_chain is None: + # If we do not have all certificates, we cannot create all the hash data. + # This is because the hash data requires the public key of a certificate's + # issuer. Thus, lacking the root certificate makes it impossible to construct + # the hash data. + # + # In this case, we will ultimately send the certificates we do have -- the + # CSMS may be able to obtain the corresponding root certificate from a + # root certificate pool. + if certificate_chain is None or root_cert is None: return None + all_certificates = self._all_certificates_from_chain( + certificate_chain, root_cert + ) + # Each certificate is followed by its issuer, except for the root, + # which is self-signed. + certificate_and_issuer_pairs = [ + (all_certificates[i], all_certificates[i + 1]) + for i in range(len(all_certificates) - 1) + ] + [(root_cert, root_cert)] + return [ - derive_certificate_hash_data(certificate) - for certificate in self._all_certificates_from_chain(certificate_chain) + derive_certificate_hash_data(certificate, issuer) + for certificate, issuer in certificate_and_issuer_pairs ] def _build_pem_certificate_chain( - self, certificate_chain: Optional[CertificateChain] + self, certificate_chain: Optional[CertificateChain], root_cert: Certificate ) -> Optional[str]: """Return a string of certificates in PEM form concatenated together.""" if certificate_chain is None: @@ -869,7 +891,10 @@ def _build_pem_certificate_chain( return "".join( [ certificate_to_pem_string(certificate) - for certificate in self._all_certificates_from_chain(certificate_chain) + for certificate in self._all_certificates_from_chain( + certificate_chain, + root_cert, + ) ] ) @@ -905,7 +930,8 @@ async def process_message( # could be used to verify, need to be flexible with regards # to the PKI that is used. root_cert_path = self._mobility_operator_root_cert_path() - verify_certs(leaf_cert, sub_ca_certs, load_cert(root_cert_path)) + root_cert = load_cert(root_cert_path) + verify_certs(leaf_cert, sub_ca_certs, root_cert) # Note that the eMAID format (14 or 15 characters) will be validated # by the definition of the eMAID type in @@ -913,11 +939,11 @@ async def process_message( self.comm_session.emaid = payment_details_req.emaid self.comm_session.contract_cert_chain = payment_details_req.cert_chain - hash_data = self._get_contract_certificate_hash_data( - self.comm_session.contract_cert_chain + hash_data = self._get_certificate_hash_data( + self.comm_session.contract_cert_chain, root_cert ) pem_certificate_chain = self._build_pem_certificate_chain( - self.comm_session.contract_cert_chain + self.comm_session.contract_cert_chain, root_cert ) authorization_result = ( @@ -937,6 +963,7 @@ async def process_message( evse_timestamp=time.time(), ) + # TODO: Should the next message really be Authorization? If so why? self.create_next_message( Authorization, payment_details_res, @@ -947,12 +974,15 @@ async def process_message( # TODO GitHub#54: handle ONGOING case, and REJECTED more fully payment_details_res = PaymentDetailsRes( response_code=ResponseCode.FAILED, + gen_challenge=get_random_bytes(16), evse_timestamp=time.time(), ) - self.stop_state_machine( - "Authorization failed", - payment_details_res, - ResponseCode.FAILED, + + self.create_next_message( + Terminate, + SessionStopRes(response_code=ResponseCode.FAILED), + Timeouts.V2G_SECC_SEQUENCE_TIMEOUT, + Namespace.ISO_V2_MSG_DEF, ) except ( @@ -1085,8 +1115,13 @@ async def process_message( ) authorization_result = await self.comm_session.evse_controller.is_authorized( id_token=id_token, - id_token_type=self.comm_session.selected_auth_option, + id_token_type=( + AuthorizationTokenType.EMAID + if self.comm_session.selected_auth_option == AuthEnum.PNC_V2 + else AuthorizationTokenType.EXTERNAL + ), ) + if authorization_result == AuthorizationStatus.ACCEPTED: auth_status = EVSEProcessing.FINISHED next_state = ChargeParameterDiscovery diff --git a/tests/secc/states/test_iso15118_2_states.py b/tests/secc/states/test_iso15118_2_states.py index bc73ee1f6..7eb20bd9e 100644 --- a/tests/secc/states/test_iso15118_2_states.py +++ b/tests/secc/states/test_iso15118_2_states.py @@ -73,7 +73,7 @@ async def test_welding_detection_to_session_stop_when_session_stop_received( / "sample_certs" / "moRootCACert.der", ) - async def test_payment_details_next_state_on_payment_details_req( + async def test_payment_details_next_state_on_payment_details_req_auth_success( self, mo_root_cert_path_mock ): self.comm_session.selected_auth_option = AuthEnum.PNC_V2 @@ -97,6 +97,39 @@ async def test_payment_details_next_state_on_payment_details_req( AuthorizationTokenType.EMAID ) + @patch.object( + PaymentDetails, + "_mobility_operator_root_cert_path", + return_value=Path(__file__).parent.parent.parent + / "sample_certs" + / "moRootCACert.der", + ) + async def test_payment_details_next_state_on_payment_details_req_auth_failed( + self, mo_root_cert_path_mock + ): + self.comm_session.selected_auth_option = AuthEnum.PNC_V2 + + mock_is_authorized = AsyncMock(return_value=AuthorizationStatus.REJECTED) + self.comm_session.evse_controller.is_authorized = mock_is_authorized + self.comm_session.writer = Mock() + self.comm_session.writer.get_extra_info = Mock() + payment_details = PaymentDetails(self.comm_session) + payment_details_req = get_dummy_v2g_message_payment_details_req() + await payment_details.process_message(payment_details_req) + + assert isinstance( + self.comm_session.contract_cert_chain, CertificateChain + ), "Comm session certificate chain not populated" + assert ( + payment_details.next_state == Terminate + ), "State did not progress after PaymentDetailsReq" + mock_is_authorized.assert_called_once() + req_body = payment_details_req.body.payment_details_req + assert mock_is_authorized.call_args[1]["id_token"] == req_body.emaid + assert mock_is_authorized.call_args[1]["id_token_type"] == ( + AuthorizationTokenType.EMAID + ) + @pytest.mark.parametrize( "is_authorized_return_value, expected_next_state", [