Skip to content

Commit

Permalink
fix: construct cert data with correct issuer
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgordon-switch-ev committed Aug 3, 2022
1 parent c8e4c00 commit 1b8ed1a
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 20 deletions.
73 changes: 54 additions & 19 deletions iso15118/secc/states/iso15118_2_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -835,32 +835,54 @@ def __init__(self, comm_session: SECCCommunicationSession):
super().__init__(comm_session, Timeouts.V2G_SECC_SEQUENCE_TIMEOUT)

def _all_certificates_from_chain(
self, certificate_chain: CertificateChain
self, certificate_chain: CertificateChain, root_cert: Certificate
) -> List[Certificate]:
"""Return all certificates from a certificate chain as a list.
The order should be: leaf certificate, sub-CA 2, sub-CA 1,
if all three are present.
The order should be: leaf certificate, sub-CA 2, sub-CA 1, root,
if all are present.
"""
return [
chain = [
certificate_chain.certificate
] + certificate_chain.sub_certificates.certificates
if root_cert is not None:
chain.append(root_cert)
return chain

def _get_contract_certificate_hash_data(
def _get_certificate_hash_data(
self,
certificate_chain: Optional[CertificateChain],
root_cert: Optional[Certificate],
) -> Optional[List[Dict[str, str]]]:
"""Return a list of hash data for a contract certificate chain."""
if certificate_chain is None:
# If we do not have all certificates, we cannot create all the hash data.
# This is because the hash data requires the public key of a certificate's
# issuer. Thus, lacking the root certificate makes it impossible to construct
# the hash data.
#
# In this case, we will ultimately send the certificates we do have -- the
# CSMS may be able to obtain the corresponding root certificate from a
# root certificate pool.
if certificate_chain is None or root_cert is None:
return None

all_certificates = self._all_certificates_from_chain(
certificate_chain, root_cert
)
# Each certificate is followed by its issuer, except for the root,
# which is self-signed.
certificate_and_issuer_pairs = [
(all_certificates[i], all_certificates[i + 1])
for i in range(len(all_certificates) - 1)
] + [(root_cert, root_cert)]

return [
derive_certificate_hash_data(certificate)
for certificate in self._all_certificates_from_chain(certificate_chain)
derive_certificate_hash_data(certificate, issuer)
for certificate, issuer in certificate_and_issuer_pairs
]

def _build_pem_certificate_chain(
self, certificate_chain: Optional[CertificateChain]
self, certificate_chain: Optional[CertificateChain], root_cert: Certificate
) -> Optional[str]:
"""Return a string of certificates in PEM form concatenated together."""
if certificate_chain is None:
Expand All @@ -869,7 +891,10 @@ def _build_pem_certificate_chain(
return "".join(
[
certificate_to_pem_string(certificate)
for certificate in self._all_certificates_from_chain(certificate_chain)
for certificate in self._all_certificates_from_chain(
certificate_chain,
root_cert,
)
]
)

Expand Down Expand Up @@ -905,19 +930,20 @@ async def process_message(
# could be used to verify, need to be flexible with regards
# to the PKI that is used.
root_cert_path = self._mobility_operator_root_cert_path()
verify_certs(leaf_cert, sub_ca_certs, load_cert(root_cert_path))
root_cert = load_cert(root_cert_path)
verify_certs(leaf_cert, sub_ca_certs, root_cert)

# Note that the eMAID format (14 or 15 characters) will be validated
# by the definition of the eMAID type in
# shared/messages/iso15118_2/datatypes.py
self.comm_session.emaid = payment_details_req.emaid
self.comm_session.contract_cert_chain = payment_details_req.cert_chain

hash_data = self._get_contract_certificate_hash_data(
self.comm_session.contract_cert_chain
hash_data = self._get_certificate_hash_data(
self.comm_session.contract_cert_chain, root_cert
)
pem_certificate_chain = self._build_pem_certificate_chain(
self.comm_session.contract_cert_chain
self.comm_session.contract_cert_chain, root_cert
)

authorization_result = (
Expand All @@ -937,6 +963,7 @@ async def process_message(
evse_timestamp=time.time(),
)

# TODO: Should the next message really be Authorization? If so why?
self.create_next_message(
Authorization,
payment_details_res,
Expand All @@ -947,12 +974,15 @@ async def process_message(
# TODO GitHub#54: handle ONGOING case, and REJECTED more fully
payment_details_res = PaymentDetailsRes(
response_code=ResponseCode.FAILED,
gen_challenge=get_random_bytes(16),
evse_timestamp=time.time(),
)
self.stop_state_machine(
"Authorization failed",
payment_details_res,
ResponseCode.FAILED,

self.create_next_message(
Terminate,
SessionStopRes(response_code=ResponseCode.FAILED),
Timeouts.V2G_SECC_SEQUENCE_TIMEOUT,
Namespace.ISO_V2_MSG_DEF,
)

except (
Expand Down Expand Up @@ -1085,8 +1115,13 @@ async def process_message(
)
authorization_result = await self.comm_session.evse_controller.is_authorized(
id_token=id_token,
id_token_type=self.comm_session.selected_auth_option,
id_token_type=(
AuthorizationTokenType.EMAID
if self.comm_session.selected_auth_option == AuthEnum.PNC_V2
else AuthorizationTokenType.EXTERNAL
),
)

if authorization_result == AuthorizationStatus.ACCEPTED:
auth_status = EVSEProcessing.FINISHED
next_state = ChargeParameterDiscovery
Expand Down
35 changes: 34 additions & 1 deletion tests/secc/states/test_iso15118_2_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_welding_detection_to_session_stop_when_session_stop_received(
/ "sample_certs"
/ "moRootCACert.der",
)
async def test_payment_details_next_state_on_payment_details_req(
async def test_payment_details_next_state_on_payment_details_req_auth_success(
self, mo_root_cert_path_mock
):
self.comm_session.selected_auth_option = AuthEnum.PNC_V2
Expand All @@ -97,6 +97,39 @@ async def test_payment_details_next_state_on_payment_details_req(
AuthorizationTokenType.EMAID
)

@patch.object(
PaymentDetails,
"_mobility_operator_root_cert_path",
return_value=Path(__file__).parent.parent.parent
/ "sample_certs"
/ "moRootCACert.der",
)
async def test_payment_details_next_state_on_payment_details_req_auth_failed(
self, mo_root_cert_path_mock
):
self.comm_session.selected_auth_option = AuthEnum.PNC_V2

mock_is_authorized = AsyncMock(return_value=AuthorizationStatus.REJECTED)
self.comm_session.evse_controller.is_authorized = mock_is_authorized
self.comm_session.writer = Mock()
self.comm_session.writer.get_extra_info = Mock()
payment_details = PaymentDetails(self.comm_session)
payment_details_req = get_dummy_v2g_message_payment_details_req()
await payment_details.process_message(payment_details_req)

assert isinstance(
self.comm_session.contract_cert_chain, CertificateChain
), "Comm session certificate chain not populated"
assert (
payment_details.next_state == Terminate
), "State did not progress after PaymentDetailsReq"
mock_is_authorized.assert_called_once()
req_body = payment_details_req.body.payment_details_req
assert mock_is_authorized.call_args[1]["id_token"] == req_body.emaid
assert mock_is_authorized.call_args[1]["id_token_type"] == (
AuthorizationTokenType.EMAID
)

@pytest.mark.parametrize(
"is_authorized_return_value, expected_next_state",
[
Expand Down

0 comments on commit 1b8ed1a

Please sign in to comment.