Skip to content

Commit

Permalink
key helpers: verification key to object: In progress
Browse files Browse the repository at this point in the history
Asciinema: https://asciinema.org/a/627150
Asciinema: https://asciinema.org/a/627165
Asciinema: https://asciinema.org/a/627183
Signed-off-by: John Andersen <johnandersenpdx@gmail.com>
  • Loading branch information
pdxjohnny committed Dec 16, 2023
1 parent b697db6 commit ddd623b
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 117 deletions.
14 changes: 11 additions & 3 deletions docs/registration_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ from jsonschema import validate, ValidationError

from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
from scitt_emulator.verify_statement import verify_statement
from scitt_emulator.key_helpers import verification_key_to_object


def main():
Expand All @@ -107,23 +108,30 @@ def main():
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
)

cwt_cose_key, _pycose_cose_key = verify_statement(msg)
verification_key = verify_statement(msg)
unittest.TestCase().assertTrue(
cwt_cose_key,
verification_key,
"Failed to verify signature on statement",
)

cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_key)
cwt_protected = cwt.decode(msg.phdr[CWTClaims], verification_key.cwt)
issuer = cwt_protected[1]
subject = cwt_protected[2]

issuer_key_as_object = verification_key_to_object(verification_key)
unittest.TestCase().assertTrue(
issuer_key_as_object,
"Failed to convert issuer key to JSON schema verifiable object",
)

SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text())

try:
validate(
instance={
"$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json",
"issuer": issuer,
"issuer_key": issuer_key_as_object,
"subject": subject,
"claim": json.loads(msg.payload.decode()),
},
Expand Down
17 changes: 17 additions & 0 deletions scitt_emulator/key_helper_dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from typing import List, Any, Union

import cwt
import pycose.keys.ec2


@dataclass
class VerificationKey:
transforms: List[Any]
original: Any
original_content_type: str
original_bytes: bytes
original_bytes_encoding: str
usable: bool
cwt: Union[cwt.COSEKey, None]
cose: Union[pycose.keys.ec2.EC2Key, None]
43 changes: 43 additions & 0 deletions scitt_emulator/key_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import itertools
import importlib.metadata
from typing import Optional, Callable, List, Tuple

from scitt_emulator.key_helper_dataclasses import VerificationKey


ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT = "scitt_emulator.key_helpers.verification_key_to_object"


def verification_key_to_object(
verification_key: VerificationKey,
*,
key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None,
) -> bool:
"""
Resolve keys for statement issuer and verify signature on COSESign1
statement and embedded CWT
"""
if key_transforms is None:
key_transforms = []
# There is some difference in the return value of entry_points across
# Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict.
entrypoints = importlib.metadata.entry_points()
if isinstance(entrypoints, dict):
for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT, []):
key_transforms.append(entrypoint.load())
elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)):
for entrypoint in entrypoints:
if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT:
key_transforms.append(entrypoint.load())
else:
raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}")

# Load keys from issuer and attempt verification. Return key used to verify
for verification_key_as_object in itertools.chain(
*[key_transform(unverified_issuer) for key_transform in key_transforms]
):
# Skip keys that we couldn't derive COSE keys for
if verification_key_as_object:
return verification_key_as_object

return None
61 changes: 32 additions & 29 deletions scitt_emulator/key_loader_format_did_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,48 @@
import cwt.algs.ec2
import pycose
import pycose.keys.ec2
import cryptography.hazmat.primitives.asymmetric.ec
from cryptography.hazmat.primitives import serialization

# TODO Remove this once we have a example flow for proper key verification
import jwcrypto.jwk

from scitt_emulator.did_helpers import DID_KEY_METHOD, did_key_to_cryptography_key
from scitt_emulator.key_helper_dataclasses import VerificationKey


# TODO What is the correct content type? Should we differ if it's been expanded?
CONTENT_TYPE = "application/key+did"


def key_loader_format_did_key(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []
cryptography_keys = []

) -> List[VerificationKey]:
if not unverified_issuer.startswith(DID_KEY_METHOD):
return pycose_cose_keys

cryptography_keys.append(did_key_to_cryptography_key(unverified_issuer))

for cryptography_key in cryptography_keys:
jwk_keys.append(
jwcrypto.jwk.JWK.from_pem(
cryptography_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
return []
key = did_key_to_cryptography_key(unverified_issuer)
return [
VerificationKey(
transforms=[key],
original=key,
original_content_type=CONTENT_TYPE,
original_bytes=unverified_issuer.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
]


def transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk(
key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey,
) -> jwcrypto.jwk.JWK:
if not isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
raise TypeError(key)
return jwcrypto.jwk.JWK.from_pem(
key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
)
58 changes: 41 additions & 17 deletions scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
import jwcrypto.jwk

from scitt_emulator.did_helpers import did_web_to_url
from scitt_emulator.key_helper_dataclasses import VerificationKey


CONTENT_TYPE = "application/jwk+json"


def key_loader_format_url_referencing_oidc_issuer(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []
keys = []

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
return pycose_cose_keys
return keys

# TODO Logging for URLErrors
# Check if OIDC issuer
Expand All @@ -44,18 +46,40 @@ def key_loader_format_url_referencing_oidc_issuer(
jwks = json.loads(response.read())
for jwk_key_as_dict in jwks["keys"]:
jwk_key_as_string = json.dumps(jwk_key_as_dict)
jwk_keys.append(
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string)
keys.append(
VerificationKey(
transforms=[jwk_key],
original=jwk_key,
original_content_type=CONTENT_TYPE,
original_bytes=jwk_key_as_string.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
return keys


def transform_key_instance_jwcrypto_jwk_to_cwt_cose(
key: jwcrypto.jwk.JWK,
) -> cwt.COSEKey:
if not isinstance(key, jwcrypto.jwk.JWK):
raise TypeError(key)
return cwt.COSEKey.from_pem(
key.export_to_pem(),
kid=key.thumbprint(),
)


def to_object_oidc_issuer(verification_key: VerificationKey) -> dict:
if verification_key.original_content_type != CONTENT_TYPE:
return

return {
**verification_key.original.export_public(as_dict=True),
"use": "sig",
"kid": verification_key.original.thumbprint(),
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@
def key_loader_format_url_referencing_ssh_authorized_keys(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []

cryptography_ssh_keys = []
keys = []

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
return pycose_cose_keys
return keys

# Try loading ssh keys. Example: https://github.com/username.keys
with contextlib.suppress(urllib.request.URLError):
Expand All @@ -38,28 +34,18 @@ def key_loader_format_url_referencing_ssh_authorized_keys(
with contextlib.suppress(
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
):
cryptography_ssh_keys.append(
serialization.load_ssh_public_key(line)
key = serialization.load_ssh_public_key(line)
keys.append(
VerificationKey(
transforms=[key],
original=key,
original_content_type=CONTENT_TYPE,
original_bytes=line.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)
)

for cryptography_ssh_key in cryptography_ssh_keys:
jwk_keys.append(
jwcrypto.jwk.JWK.from_pem(
cryptography_ssh_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
return keys
Loading

0 comments on commit ddd623b

Please sign in to comment.