Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JTT-529: Changed order of validation. Start with checking validity of availabl… #350

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions iso15118/secc/states/iso15118_2_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ class PaymentDetails(StateSECC):
def __init__(self, comm_session: SECCCommunicationSession):
super().__init__(comm_session, Timeouts.V2G_SECC_SEQUENCE_TIMEOUT)

def _mobility_operator_root_cert_path(self) -> str:
def _mobility_operator_root_cert_path(self) -> Optional[str]:
"""Return the path to the MO root. Included to be patched in tests."""
return CertPath.MO_ROOT_DER

Expand Down Expand Up @@ -955,12 +955,18 @@ async def process_message(
# to the PKI that is used.
root_cert_path = self._mobility_operator_root_cert_path()
try:
root_cert = load_cert(root_cert_path)
verify_certs(leaf_cert, sub_ca_certs, root_cert)
if root_cert_path:
root_cert = load_cert(root_cert_path)
logger.info(f"Using MO root at {root_cert_path}")
else:
root_cert = None
logger.info("No suitable MO root found.")
except FileNotFoundError:
logger.warning(f"MO Root Cert cannot be found {root_cert_path}")
logger.warning("MO Root Cert not available.")
root_cert = None

verify_certs(leaf_cert, sub_ca_certs, root_cert)

# Note that the eMAID format (14 or 15 characters) will be validated
# by the definition of the eMAID type in
# shared/messages/iso15118_2/datatypes.py
Expand Down
42 changes: 23 additions & 19 deletions iso15118/shared/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ def verify_certs(
):
"""
Verifies a certificate chain according to the following criteria:
1. Verify the signature of each certificate contained in the cert chain
1. Check that the current date is within the time span provided by the
certificate's notBefore and notAfter attributes
2. Verify the signature of each certificate contained in the cert chain
(throws CertSignatureError if not)
1.a) Get the sub_ca_certs in order: leaf -> sub_ca_2 -> sub_ca_1 -> root
(if two sub-CAs are in use, otherwise: leaf -> sub_ca_2 -> root)
2.b) Do the actual signature verification from leaf to root
2. Check that the current date is within the time span provided by the
certificate's notBefore and notAfter attributes
3. Checks that none of the certificates has been revoked.

Args:
Expand All @@ -516,13 +516,30 @@ def verify_certs(
leaf_cert = load_der_x509_certificate(leaf_cert_bytes)
sub_ca2_cert = None
sub_ca1_cert = None
root_ca_cert = load_der_x509_certificate(root_ca_cert_bytes)
root_ca_cert = None
if root_ca_cert_bytes:
root_ca_cert = load_der_x509_certificate(root_ca_cert_bytes)

sub_ca_der_certs: List[Certificate] = [
load_der_x509_certificate(cert) for cert in sub_ca_certs_bytes
]

# Step 1.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2.
# Step 1: Check that each certificate is valid, i.e. the current time is
# between the notBefore and notAfter timestamps of the certificate
try:
certs_to_check: List[Certificate] = [leaf_cert]
if len(sub_ca_der_certs) != 0:
certs_to_check.extend(sub_ca_der_certs)
if root_ca_cert:
certs_to_check.append(root_ca_cert)
check_validity(certs_to_check)
except (CertNotYetValidError, CertExpiredError) as exc:
raise exc

if not root_ca_cert:
logger.info("Can't validate the chain as MO root is not present.")
return None
# Step 2.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2.
# A sub-CA 2 certificate's profile has its PathLength extension
# attribute set to 0, whereas a sub-CA 1 certificate's profile has
# its PathLength extension attribute set to 0.
Expand Down Expand Up @@ -577,7 +594,7 @@ def verify_certs(
if (sub_ca2_cert or sub_ca1_cert) and private_environment:
raise CertChainLengthError(allowed_num_sub_cas=0, num_sub_cas=1)

# Step 1.b: Now that we have established the right order of sub-CA
# Step 2.b: Now that we have established the right order of sub-CA
# certificates we can start verifying the signatures from leaf
# certificate to root CA certificate
cert_to_check = leaf_cert
Expand Down Expand Up @@ -688,19 +705,6 @@ def verify_certs(
f"of certificate {cert_to_check.subject}"
)

# Step 2: Check that each certificate is valid, i.e. the current time is
# between the notBefore and notAfter timestamps of the certificate
try:
certs_to_check: List[Certificate] = [leaf_cert]
if sub_ca2_cert:
certs_to_check.append(sub_ca2_cert)
if sub_ca1_cert:
certs_to_check.append(sub_ca1_cert)
certs_to_check.append(root_ca_cert)
check_validity(certs_to_check)
except (CertNotYetValidError, CertExpiredError) as exc:
raise exc

# Step 3: Check the OCSP (Online Certificate Status Protocol) response to
# see whether or not a certificate has been revoked
# TODO As OCSP is not supported for the CharIN Testival Europe 2021, we'll
Expand Down
Binary file not shown.
17 changes: 12 additions & 5 deletions tests/iso15118_2/sample_certs/load_certs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from pathlib import Path
from typing import Optional

from iso15118.shared.messages.iso15118_2.datatypes import (
Certificate,
Expand All @@ -7,7 +8,7 @@
)


def load_certificate_chain() -> CertificateChain:
def load_certificate_chain(contract: Optional[str] = None) -> CertificateChain:
"""Load the sample certificate chain.

This includes the contract, sub-CA 2, and sub-CA 1 certificates.
Expand All @@ -16,7 +17,7 @@ def load_certificate_chain() -> CertificateChain:
The sample certificate chain, with certificates in DER (binary) form.
"""
return CertificateChain(
certificate=load_contract_certificate(),
certificate=load_contract_certificate(contract),
sub_certificates=SubCertificates(
certificates=[load_sub_ca_2_certificate(), load_sub_ca_1_certificate()],
),
Expand Down Expand Up @@ -80,14 +81,20 @@ def load_sub_ca_2_certificate() -> Certificate:
return sub_ca_2_certificate


def load_contract_certificate() -> Certificate:
def load_contract_certificate(contract_path: Optional[str] = None) -> Certificate:
"""Load the sample contract certificate.

Returns:
The sample contract certificate in DER (binary) form.
"""
cert_dir = get_cert_dir()
with open(cert_dir / "contractLeafCert.der", "rb") as root_file:

if contract_path is None:
cert_dir = get_cert_dir()
path = f"{cert_dir}/contractLeafCert.der"
else:
path = contract_path

with open(path, "rb") as root_file:
contract_certificate = root_file.read()

return contract_certificate
95 changes: 95 additions & 0 deletions tests/iso15118_2/secc/states/test_iso15118_2_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,101 @@ async def test_payment_details_next_state_on_payment_details_req_auth(
AuthorizationTokenType.EMAID
)

@patch(
"iso15118.secc.states.iso15118_2_states.PaymentDetails._mobility_operator_root_cert_path" # noqa
)
@pytest.mark.parametrize(
"mo_root, "
"contract_cert, "
"is_authorized_return_value, "
"expected_response_code, "
"expected_next_state",
[
(
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "moRootCACert.der",
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "contractLeafCert.der",
AuthorizationResponse(AuthorizationStatus.ACCEPTED, ResponseCode.OK),
ResponseCode.OK,
Authorization,
),
(
None,
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "contractLeafCert.der",
AuthorizationResponse(AuthorizationStatus.ACCEPTED, ResponseCode.OK),
ResponseCode.OK,
Authorization,
),
(
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "moRootCACert.der",
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "contractLeafCert_Expired.der",
None,
ResponseCode.FAILED_CERTIFICATE_EXPIRED,
Terminate,
),
(
None,
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "contractLeafCert_Expired.der",
None,
ResponseCode.FAILED_CERTIFICATE_EXPIRED,
Terminate,
),
(
None,
Path(__file__).parent.parent.parent
/ "sample_certs"
/ "contractLeafCert.der",
AuthorizationResponse(
AuthorizationStatus.REJECTED,
ResponseCode.FAILED_CERTIFICATE_REVOKED,
),
ResponseCode.FAILED_CERTIFICATE_REVOKED,
Terminate,
),
],
)
async def test_payment_details_next_state_on_payment_details_contract_variants(
self,
mock_mo_root_path,
mo_root,
contract_cert,
is_authorized_return_value,
expected_response_code,
expected_next_state: StateSECC,
):
self.comm_session.selected_auth_option = AuthEnum.PNC_V2
mock_mo_root_path.return_value = mo_root
mock_is_authorized = AsyncMock(return_value=is_authorized_return_value)
self.comm_session.evse_controller.is_authorized = mock_is_authorized
payment_details = PaymentDetails(self.comm_session)
payment_details_req = get_dummy_v2g_message_payment_details_req(contract_cert)
await payment_details.process_message(payment_details_req)

assert (
payment_details.next_state == expected_next_state
), "State did not progress after PaymentDetailsReq"

if isinstance(payment_details.message, V2GMessageV2):
assert (
payment_details.message.body.payment_details_res.response_code
== expected_response_code
)
# if is_authorized_return_value is not None:
# mock_is_authorized.assert_called_once()
# else:
# mock_is_authorized.assert_not_called()

@pytest.mark.parametrize(
"auth_type, is_authorized_return_value, expected_next_state,"
"expected_response_code, expected_evse_processing, is_ready_to_charge",
Expand Down
6 changes: 4 additions & 2 deletions tests/iso15118_2/secc/states/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ def get_dummy_v2g_message_authorization_req(
)


def get_dummy_v2g_message_payment_details_req() -> V2GMessage:
def get_dummy_v2g_message_payment_details_req(
contract: Optional[str] = None,
) -> V2GMessage:
payment_details_req = PaymentDetailsReq(
emaid="1234567890abcd",
cert_chain=load_certificate_chain(),
cert_chain=load_certificate_chain(contract),
)
return V2GMessage(
header=MessageHeader(session_id=MOCK_SESSION_ID),
Expand Down