diff --git a/Makefile b/Makefile index 14b653668..544bcedf7 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,9 @@ install: # helm install connaisseur helm --atomic --create-namespace --namespace $(NAMESPACE) +dev-install: + helm install --set deployment.replicasCount=1,deployment.imagePullPolicy=Never connaisseur helm --atomic --create-namespace --namespace $(NAMESPACE) + uninstall: helm uninstall connaisseur -n $(NAMESPACE) kubectl delete ns $(NAMESPACE) diff --git a/connaisseur/crypto.py b/connaisseur/crypto.py deleted file mode 100644 index a54d64f44..000000000 --- a/connaisseur/crypto.py +++ /dev/null @@ -1,28 +0,0 @@ -import base64 -import binascii -import hashlib - -import ecdsa - - -def verify_signature( - public_key: ecdsa.VerifyingKey, signature_base64: str, message: str -): - """ - Verify the given bas64-encoded signature with the base64-encoded public - key and serialized message. The message should not contain any whitespaces. - - Raise ValidationError if unsuccessful. - """ - signature = base64.b64decode(signature_base64) - - msg_bytes = bytearray(message, "utf-8") - - return public_key.verify(signature, msg_bytes, hashfunc=hashlib.sha256) - - -def load_key(pem_key: str): - try: - return ecdsa.VerifyingKey.from_pem(pem_key) - except (ecdsa.der.UnexpectedDER, binascii.Error, TypeError, AttributeError) as err: - raise ValueError from err diff --git a/connaisseur/exceptions.py b/connaisseur/exceptions.py index 356552759..dfdefc26b 100644 --- a/connaisseur/exceptions.py +++ b/connaisseur/exceptions.py @@ -91,6 +91,10 @@ class UnknownAPIVersionError(UnknownTypeException): pass +class WrongKeyError(UnknownTypeException): + pass + + class AmbiguousDigestError(BaseConnaisseurException): pass diff --git a/connaisseur/trust_root.py b/connaisseur/trust_root.py new file mode 100644 index 000000000..e94583c8e --- /dev/null +++ b/connaisseur/trust_root.py @@ -0,0 +1,90 @@ +import base64 +import re + +import ecdsa +import rsa + +from connaisseur.exceptions import InvalidFormatException + +KMS_REGEX = r"^(awskms|gcpkms|azurekms|hashivault|k8s):\/{2,3}[a-zA-Z0-9_.+\/:-]+$" +KEYLESS_REGEX = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" + + +class TrustRootInterface: + """ + Interface from which all trust roots inherit. + """ + + def __new__(cls, data: object): + instance = super(TrustRootInterface, cls).__new__(cls) + instance.__init__(data) + return instance + + def __init__(self, data: object) -> None: + self.value = data + + def __str__(self) -> str: + return self.value + + +class TrustRoot(TrustRootInterface): + """ + Abstract TrustRoot class used to represent key material or similar entities, used in + verification processes. + + May contain a public key, reference to a key or any other type of trust root. + """ + + value: object + + def __new__(cls, data: str): + try: + tr_cls, tr_data = TrustRoot.__get_type_cls_and_data(data) + return tr_cls.__new__(tr_cls, tr_data) + except Exception as err: + msg = "Error loading the trust root." + raise InvalidFormatException(message=msg) from err + + @staticmethod + def __get_type_cls_and_data(data: str): + if re.match(KEYLESS_REGEX, data): + return KeyLessTrustRoot, data + elif re.match(KMS_REGEX, data): + return KMSKey, data + elif key := TrustRoot.__check_and_return_ecdsa(data): + return ECDSAKey, key + elif key := TrustRoot.__check_and_return_rsa(data): + return RSAKey, key + return None, data + + @staticmethod + def __check_and_return_ecdsa(data: str): + try: + return ecdsa.VerifyingKey.from_pem(data) + except Exception: + return None + + @staticmethod + def __check_and_return_rsa(data: str): + try: + return rsa.PublicKey.load_pkcs1_openssl_pem(data) + except Exception: + return None + + +class ECDSAKey(TrustRootInterface): + def __str__(self) -> str: + return base64.b64encode(self.value.to_der()).decode("utf-8") + + +class RSAKey(TrustRootInterface): + def __str__(self) -> str: + return base64.b64encode(self.value.save_pkcs1("DER")).decode("utf-8") + + +class KMSKey(TrustRootInterface): + pass + + +class KeyLessTrustRoot(TrustRootInterface): + pass diff --git a/connaisseur/validators/cosign/cosign_validator.py b/connaisseur/validators/cosign/cosign_validator.py index 9a720accf..674e7587e 100644 --- a/connaisseur/validators/cosign/cosign_validator.py +++ b/connaisseur/validators/cosign/cosign_validator.py @@ -7,7 +7,6 @@ from concurrent.futures import ThreadPoolExecutor -from connaisseur.crypto import load_key from connaisseur.exceptions import ( CosignError, CosignTimeout, @@ -15,8 +14,10 @@ InvalidFormatException, UnexpectedCosignData, ValidationError, + WrongKeyError, ) from connaisseur.image import Image +from connaisseur.trust_root import KMSKey, TrustRoot, ECDSAKey from connaisseur.util import safe_path_func # nosec from connaisseur.validators.interface import ValidatorInterface @@ -41,8 +42,9 @@ async def validate( "threshold", 1 if trust_root != "*" or any(required) else len(self.trust_roots), ) - # vals is a validations dict for each required trust root containing validated digests and errors - vals = self.__get_pinned_keys(trust_root, required, threshold) + # vals is a validations dict for each required trust root containing validated + # digests and errors + vals = self.__get_pinned_trust_roots(trust_root, required, threshold) # use concurrent.futures for now # tasks = [self.__validation_task(k, str(image)) for k in self.vals.keys()] @@ -52,7 +54,8 @@ async def validate( num_workers = len(vals) executor = ThreadPoolExecutor(num_workers) # prepare tasks - # a copy of vals dictionaries is passed to concurrent validation to ensure thread-safe execution + # a copy of vals dictionaries is passed to concurrent validation to ensure + # thread-safe execution arguments = [(k, v.copy(), str(image)) for k, v in vals.items()] futures = [executor.submit(self.__validation_task, *arg) for arg in arguments] # await results (output dropped as `vals` is updated within function) @@ -63,45 +66,46 @@ async def validate( vals=vals, threshold=threshold, required=required ) - def __get_pinned_keys(self, key_name: str, required: list, threshold: int): + def __get_pinned_trust_roots(self, tr_name: str, required: list, threshold: int): """ - Extract the pinned key(s) selected for validation from the list of trust roots. + Extract the pinned trust root(s) selected for validation from the list of trust + roots. """ - key_name = key_name or "default" - available_keys = list(map(lambda k: k["name"], self.trust_roots)) + tr_name = tr_name or "default" + available_trs = list(map(lambda t: t["name"], self.trust_roots)) - # generate list of pinned keys - if key_name == "*": + # generate list of pinned trust roots + if tr_name == "*": if len(required) >= threshold: - pinned_keys = required + pinned_trs = required else: - pinned_keys = available_keys + pinned_trs = available_trs else: - pinned_keys = [key_name] + pinned_trs = [tr_name] - # check if pinned keys exist in available trust roots - missing_keys = set(pinned_keys) - set(available_keys) - if missing_keys: - msg = 'Trust roots "{key_names}" not configured for validator "{validator_name}".' + # check if pinned trust roots exist in available trust roots + missing_trs = set(pinned_trs) - set(available_trs) + if missing_trs: + msg = 'Trust roots "{tr_names}" not configured for validator "{validator_name}".' raise NotFoundException( message=msg, - key_names=", ".join(missing_keys), + tr_names=", ".join(missing_trs), validator_name=self.name, ) # construct key validation dictionary for pinned keys - keys = { - k["name"]: { - "name": k["name"], - "key": "".join(k["key"]), + trust_roots = { + t["name"]: { + "name": t["name"], + "trust_root": TrustRoot("".join(t["key"])), "digest": None, "error": None, } - for k in self.trust_roots - if k["name"] in pinned_keys + for t in self.trust_roots + if t["name"] in pinned_trs } - return keys + return trust_roots # async def __validation_task(self, trust_root: str, image: str): def __validation_task(self, trust_root: str, val: dict, image: str): @@ -121,13 +125,13 @@ def __validation_task(self, trust_root: str, val: dict, image: str): # async def __get_cosign_validated_digests(self, image: str, trust_root: dict): def __get_cosign_validated_digests(self, image: str, trust_root: dict): """ - Get and process Cosign validation output for a given `image` and `key` + Get and process Cosign validation output for a given `image` and `trust_root` and either return a list of valid digests or raise a suitable exception in case no valid signature is found or Cosign fails. """ - # returncode, stdout, stderr = await self.__invoke_cosign(image, trust_root["key"]) - returncode, stdout, stderr = self.__invoke_cosign(image, trust_root["key"]) - + returncode, stdout, stderr = self.__validate_using_trust_root( + image, trust_root["trust_root"] + ) logging.info( "COSIGN output of trust root '%s' for image'%s': RETURNCODE: %s; STDOUT: %s; STDERR: %s", trust_root["name"], @@ -217,35 +221,71 @@ def __get_cosign_validated_digests(self, image: str, trust_root: dict): ) return digests.pop() - # async def __invoke_cosign(self, image: str, key: str): - def __invoke_cosign(self, image: str, key: str): + def __validate_using_trust_root(self, image: str, trust_root: TrustRoot): """ - Invoke the Cosign binary in a subprocess for a specific `image` given a `key` and - return the returncode, stdout and stderr. Will raise an exception if Cosign times out. + Call the `CosignValidator.__invoke_cosign` method, using a specific trust root. + """ + # reminder when implementing RSA validation: + # ["--key", "/dev/stdin", self.value.save_pkcs1()] + + # reminder when implementing Keyless validation: + # ["--cert-email", self.value, b""] + + if isinstance(trust_root, ECDSAKey): + return self.__invoke_cosign( + image, + { + "option_kword": "--key", + "inline_tr": "/dev/stdin", + "trust_root": trust_root.value.to_pem(), + }, + ) + elif isinstance(trust_root, KMSKey): + return self.__invoke_cosign( + image, + { + "option_kword": "--key", + "inline_tr": trust_root.value, + }, + ) + msg = ( + "The trust_root type {tr_type} is unsupported for a validator of type" + "{val_type}." + ) + raise WrongKeyError(message=msg, tr_type=type(trust_root), val_type="cosign") + + def __invoke_cosign(self, image: str, tr_args: dict): """ - pubkey_config, env_vars, pubkey = CosignValidator.__get_pubkey_config(key) + Invoke the Cosign binary in a subprocess for a specific `image` given trust root + argument dict (`tr_args`) and return the returncode, stdout and stderr. The trust + root argument dict includes a Cosign option keyword and the trust root itself, + either as inline argument or pipeable input with an inline reference. The + composition of the dict is dependant on the type of trust root. + Raises an exception if Cosign times out. + """ cmd = [ "/app/cosign/cosign", "verify", "--output", "text", - *pubkey_config, + tr_args["option_kword"], + tr_args["inline_tr"], *(["--k8s-keychain"] if self.k8s_keychain else []), image, ] - env = self.__get_envs() - env.update(env_vars) with subprocess.Popen( # nosec cmd, - env=env, + env=self.__get_envs(), stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) as process: try: - stdout, stderr = process.communicate(pubkey, timeout=60) + stdout, stderr = process.communicate( + input=tr_args.get("trust_root", None), timeout=60 + ) except subprocess.TimeoutExpired as err: process.kill() msg = "Cosign timed out." @@ -255,33 +295,9 @@ def __invoke_cosign(self, image: str, key: str): return process.returncode, stdout.decode("utf-8"), stderr.decode("utf-8") - @staticmethod - def __get_pubkey_config(key: str): - """ - Return a tuple of the used Cosign verification command (flag-value list), a - dict of potentially required environment variables and public key in binary - PEM format to be used as stdin to Cosign based on the format of the input - key (reference). - - Raise InvalidFormatException if none of the supported patterns is matched. - """ - try: - # key is ecdsa public key - pkey = load_key(key).to_pem() # raises if invalid - return ["--key", "/dev/stdin"], {}, pkey - except ValueError: - pass - - # key is KMS reference - if re.match(r"^\w{2,20}://[\w:/-]{3,255}$", key): - return ["--key", key], {}, b"" - - msg = "Public key (reference) '{input_str}' does not match expected patterns." - raise InvalidFormatException(message=msg, input_str=key) - def __get_envs(self): """ - Sets up environment variables used by cosign. + Set up environment variables used by cosign. """ env = os.environ.copy() # Extend the OS env vars only for passing to the subprocess below @@ -295,7 +311,7 @@ def __get_envs(self): @staticmethod def __apply_policy(vals: dict, threshold: int, required: list): """ - Validates the signature verification outcome against the policy for + Validate the signature verification outcome against the policy for threshold and required trust roots. Raises an exception if not compliant. diff --git a/connaisseur/validators/notaryv1/key_store.py b/connaisseur/validators/notaryv1/key_store.py index da64306ae..4cb99e188 100644 --- a/connaisseur/validators/notaryv1/key_store.py +++ b/connaisseur/validators/notaryv1/key_store.py @@ -1,5 +1,5 @@ -from connaisseur.crypto import load_key -from connaisseur.exceptions import InvalidKeyFormatError, NotFoundException +from connaisseur.exceptions import NotFoundException +from connaisseur.trust_root import TrustRoot class KeyStore: @@ -11,20 +11,9 @@ class KeyStore: keys: dict hashes: dict - def __init__(self, root_pub_key: str = None): + def __init__(self, root_key: TrustRoot = None): self.hashes = {} - - if root_pub_key: - try: - key = load_key(root_pub_key) - except ValueError as err: - msg = "The public root key has an invalid format." - raise InvalidKeyFormatError( - message=msg, root_pub_key=root_pub_key - ) from err - self.keys = {"root": key} - else: - self.keys = {} + self.keys = {"root": root_key} if root_key else {} def get_key(self, key_id: str): """ @@ -66,17 +55,12 @@ def update(self, trust_data): # root.json, as it is the only file that contains a key which was used to sign # itself. signature_keys = [sig.get("keyid") for sig in trust_data.signatures] - keys = {k: v for k, v in keys.items() if k not in signature_keys} - - for key_id in keys: - try: - key = load_key(keys[key_id]["keyval"]["public"]) - except ValueError as err: - msg = "Key {key_id} has an invalid format." - raise InvalidKeyFormatError( - message=msg, key_id=key_id, key=keys[key_id]["keyval"]["public"] - ) from err - self.keys.setdefault(key_id, key) + keys = { + k: TrustRoot(v["keyval"]["public"]) + for k, v in keys.items() + if k not in signature_keys + } + self.keys = dict(keys, **self.keys) # update hashes hashes = trust_data.get_hashes() diff --git a/connaisseur/validators/notaryv1/notary.py b/connaisseur/validators/notaryv1/notary.py index 0f30526a8..110bcfb27 100644 --- a/connaisseur/validators/notaryv1/notary.py +++ b/connaisseur/validators/notaryv1/notary.py @@ -23,7 +23,7 @@ class Notary: name: str host: str - pub_root_keys: list + root_keys: list is_acr: bool auth: dict cert: Optional[ssl.SSLContext] @@ -42,7 +42,7 @@ def __init__( ): # pylint: disable=unused-argument self.name = name self.host = host - self.pub_root_keys = trust_roots or [] + self.root_keys = trust_roots or [] self.is_acr = is_acr if auth is None: auth = {} @@ -66,9 +66,7 @@ def get_key(self, key_name: str = None): """ key_name = key_name or "default" try: - key = next( - key["key"] for key in self.pub_root_keys if key["name"] == key_name - ) + key = next(key["key"] for key in self.root_keys if key["name"] == key_name) except StopIteration as err: msg = ( 'Trust root "{key_name}" not configured for validator "{notary_name}".' diff --git a/connaisseur/validators/notaryv1/notaryv1_validator.py b/connaisseur/validators/notaryv1/notaryv1_validator.py index ff43e92de..d1dc99568 100644 --- a/connaisseur/validators/notaryv1/notaryv1_validator.py +++ b/connaisseur/validators/notaryv1/notaryv1_validator.py @@ -9,6 +9,7 @@ NotFoundException, ) from connaisseur.image import Image +from connaisseur.trust_root import TrustRoot from connaisseur.validators.interface import ValidatorInterface from connaisseur.validators.notaryv1.key_store import KeyStore from connaisseur.validators.notaryv1.notary import Notary @@ -34,7 +35,7 @@ async def validate( if delegations is None: delegations = [] # get the public root key - pub_key = self.notary.get_key(trust_root) + root_key = TrustRoot(self.notary.get_key(trust_root)) # prepend `targets/` to the required delegation roles, if not already present req_delegations = list( map(NotaryV1Validator.__normalize_delegation, delegations) @@ -43,7 +44,7 @@ async def validate( # get list of targets fields, containing tag to signed digest mapping from # `targets.json` and all potential delegation roles signed_image_targets = await self.__process_chain_of_trust( - image, req_delegations, pub_key + image, req_delegations, root_key ) # search for digests or tag, depending on given image @@ -92,7 +93,7 @@ def __normalize_delegation(delegation_role: str): return delegation_role async def __process_chain_of_trust( - self, image: Image, req_delegations: list, pub_root_key: str + self, image: Image, req_delegations: list, root_key: TrustRoot ): # pylint: disable=too-many-branches """ Process the whole chain of trust, provided by the notary @@ -106,7 +107,7 @@ async def __process_chain_of_trust( Raise `NotFoundExceptions` should no required delegations be present in the trust data, or no image targets be found. """ - key_store = KeyStore(pub_root_key) + key_store = KeyStore(root_key) tuf_roles = ["root", "snapshot", "timestamp", "targets"] diff --git a/connaisseur/validators/notaryv1/trust_data.py b/connaisseur/validators/notaryv1/trust_data.py index 372e64f56..0e08b07b3 100644 --- a/connaisseur/validators/notaryv1/trust_data.py +++ b/connaisseur/validators/notaryv1/trust_data.py @@ -7,13 +7,14 @@ import pytz from dateutil import parser -from connaisseur.crypto import verify_signature from connaisseur.exceptions import ( InvalidTrustDataFormatError, NoSuchClassError, NotFoundException, ValidationError, + WrongKeyError, ) +from connaisseur.trust_root import TrustRoot, ECDSAKey from connaisseur.util import validate_schema from connaisseur.validators.notaryv1.key_store import KeyStore @@ -89,15 +90,32 @@ def validate_signature(self, keystore: KeyStore): msg = json.dumps(self.signed, separators=(",", ":")) for signature in self.signatures: key_id = "root" if self.kind == "root" else signature["keyid"] - pub_key = keystore.get_key(key_id) + key = keystore.get_key(key_id) sig = signature["sig"] try: - verify_signature(pub_key, sig, msg) + TrustData.__validate_signature_with_key(sig, msg, key) except Exception as err: msg = "Failed to verify signature of trust data {trust_data_kind}." raise ValidationError(message=msg, trust_data_kind=self.kind) from err + @staticmethod + def __validate_signature_with_key(signature: str, payload: str, key: TrustRoot): + if isinstance(key, ECDSAKey): + return TrustData.__validate_signature_with_ecdsa(signature, payload, key) + msg = ( + "The key type {key_type} is unsupported for a validator of type {val_type}." + ) + raise WrongKeyError(message=msg, key_type=type(key), val_type="notaryv1") + + @staticmethod + def __validate_signature_with_ecdsa(signature: str, payload: str, key: TrustRoot): + signature_decoded = base64.b64decode(signature) + payload_bytes = bytearray(payload, "utf-8") + return key.value.verify( + signature_decoded, payload_bytes, hashfunc=hashlib.sha256 + ) + def validate_hash(self, keystore: KeyStore): """ Validate the given hash from a `keystore` corresponds to the trust diff --git a/requirements.txt b/requirements.txt index 739faef00..66bdae673 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,5 @@ python-dateutil~=2.8.2 pytz~=2022.1 PyYAML~=6.0 requests~=2.27.1 -rfc3339-validator~=0.1.4 \ No newline at end of file +rfc3339-validator~=0.1.4 +rsa~=4.7.2 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 25597e297..bfd9e21cb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,6 +8,7 @@ import connaisseur.config as co import connaisseur.admission_request as admreq import connaisseur.alert as alert +from connaisseur.trust_root import TrustRoot import connaisseur.validators.notaryv1.trust_data as td import connaisseur.validators.notaryv1.key_store as ks import connaisseur.validators.notaryv1.notary as no @@ -269,7 +270,7 @@ def mock_expiry(self): @pytest.fixture def sample_key_store(m_trust_data): - sample_key = ( + sample_key = TrustRoot( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEtR5kwrDK22SyCu" "7WMF8tCjVgeORAS2PWacRcBN/VQdVK4PVk1w4pMWlz9AHQthDG" "l+W2k3elHkPbR+gNkK2PCA==" @@ -282,7 +283,7 @@ def sample_key_store(m_trust_data): @pytest.fixture def alice_key_store(m_trust_data): - sample_key = ( + sample_key = TrustRoot( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEtR5kwrD" "K22SyCu7WMF8tCjVgeORAS2PWacRcBN/VQdVK4PVk1" "w4pMWlz9AHQthDGl+W2k3elHkPbR+gNkK2PCA==" diff --git a/tests/test_crypto.py b/tests/test_crypto.py deleted file mode 100644 index b3f5a5fcc..000000000 --- a/tests/test_crypto.py +++ /dev/null @@ -1,113 +0,0 @@ -import pytest -import json -import base64 -from . import conftest as fix -import connaisseur.crypto as cr - - -def get_message(path: str): - with open(f"tests/data/trust_data/{path}.json", "r") as file: - msg = json.load(file) - - return json.dumps(msg["signed"], separators=(",", ":")) - - -root_pub = ( - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEtR5kwrDK22SyCu" - "7WMF8tCjVgeORAS2PWacRcBN/VQdVK4PVk1w4pMWlz9AHQthDG" - "l+W2k3elHkPbR+gNkK2PCA==" -) - -targets_pub = ( - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErIGdt5pelfW" - "OSjmY7k+/TypV0IFF9XLA+K4swhclLJb79cLoeBBDqkkUrk" - "fhN5gxRnA//wA3amL4WXkaGsb9zQ==" -) - -key1 = ( - "-----BEGIN PUBLIC KEY-----\n" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEM0xl8F5nwIV3IAru1Pf85WCo4cfT\n" - "OQ91jhxVaQ3xHMeW430q7R4H/tJmAXUZBe+nOTX8pgtmrLpT+Hu/H7pUhw==\n" - "-----END PUBLIC KEY-----\n" -) -key2 = ( - "-----BEGIN PUBLIC KEY-----\n" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEWGcErqaO7+y3PzNTHt7PVx0+Xtgv\n" - "LV5mFW91CxzN8uQht/Ig6+FAymrn2lOtUz5BqF4pSQizcdqN475t6raTWw==\n" - "-----END PUBLIC KEY-----\n" -) - - -@pytest.mark.parametrize( - "key, out, exception", - [ - ( - root_pub, - ( - b"tR5kwrDK22SyCu" - b"7WMF8tCjVgeORAS2PWacRcBN/VQdVK4PVk1w4pMWlz9AHQthDG" - b"l+W2k3elHkPbR+gNkK2PCA==" - ), - fix.no_exc(), - ), - ( - targets_pub, - ( - b"rIGdt5pelfW" - b"OSjmY7k+/TypV0IFF9XLA+K4swhclLJb79cLoeBBDqkkUrk" - b"fhN5gxRnA//wA3amL4WXkaGsb9zQ==" - ), - fix.no_exc(), - ), - ( - key1, - ( - b"M0xl8F5nwIV3IAru1Pf85WCo4cfT" - b"OQ91jhxVaQ3xHMeW430q7R4H/tJmAXUZBe+nOTX8pgtmrLpT+Hu/H7pUhw==" - ), - fix.no_exc(), - ), - ( - key2, - ( - b"WGcErqaO7+y3PzNTHt7PVx0+Xtgv" - b"LV5mFW91CxzN8uQht/Ig6+FAymrn2lOtUz5BqF4pSQizcdqN475t6raTWw==" - ), - fix.no_exc(), - ), - ("", "", pytest.raises(ValueError)), - ], -) -def test_load_key(key: str, out: str, exception): - with exception: - pub_key = cr.load_key(key) - assert base64.b64encode(pub_key.to_string()) == out - - -root_sig = ( - "77lGn17vJPsru39/mO6quh+yuMQvhLyqz4PhvMySLpnpzYu2x+" - "YIsXfH2gngP8hYzOWvovE6iQPKBoJv3zWMsQ==" -) -targets_sig = ( - "ayUgIwW4LmtW+kuzHuyU7lkn8awoXlymBcXeO8j++JSAUpU" - "3BSuFsBe7yx3SOOsxh57u+vWkCOzPdLEYVyQrqg==" -) - - -@pytest.mark.parametrize( - "public, signature, message", - [ - ( - cr.load_key(root_pub), - root_sig, - get_message("sample_root"), - ), - ( - cr.load_key(targets_pub), - targets_sig, - get_message("sample_targets"), - ), - ], -) -def test_verify_signature(public, signature: str, message: str): - assert cr.verify_signature(public, signature, message) diff --git a/tests/test_trust_root.py b/tests/test_trust_root.py new file mode 100644 index 000000000..5ccd838f1 --- /dev/null +++ b/tests/test_trust_root.py @@ -0,0 +1,83 @@ +import pytest +from . import conftest as fix +import connaisseur.exceptions as exc +import connaisseur.trust_root as trust_root + + +sample_ecdsa = "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEOXYta5TgdCwXTCnLU09W5T4M4r9f\nQQrqJuADP6U7g5r9ICgPSmZuRHP/1AYUfOQW3baveKsT969EfELKj1lfCA==\n-----END PUBLIC KEY-----" +sample_rsa = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAs5pC7R5OTSTUMJHUniPk\nrLfmGDAUxZtRlvIE+pGPCD6cUXH22advkK87xwpupjxdVYuKTFnWHUIyFJwjI3vu\nsievezcAr0E/xxyeo49tWog9kFoooK3qmXjpETC8OpvNROZ0K3qhlm9PZkGo3gSJ\n/B4rMU/d+jkCI8eiUPpdVQOczdBoD5nzQAF1mfmffWGsbKY+d8/l77Vset0GXExR\nzUtnglMhREyHNpDeQUg5OEn+kuGLlTzIxpIF+MlbzP3+xmNEzH2iafr0ae2g5kX2\n880priXpxG8GXW2ybZmPvchclnvFu4ZfZcM10FpgYJFvR/9iofFeAka9u5z6VZcc\nmQIDAQAB\n-----END PUBLIC KEY-----" +awskms1 = "awskms:///1234abcd-12ab-34cd-56ef-1234567890ab" +awskms2 = "awskms://localhost:4566/1234abcd-12ab-34cd-56ef-1234567890ab" +awskms3 = "awskms:///arn:aws:kms:us-east-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab" +awskms4 = "awskms://localhost:4566/arn:aws:kms:us-east-2:111122223333:key/1234abcd-12ab-34cd-56ef-1234567890ab" +awskms5 = "awskms:///alias/ExampleAlias" +awskms6 = "awskms://localhost:4566/alias/ExampleAlias" +awskms7 = "awskms:///arn:aws:kms:us-east-2:111122223333:alias/ExampleAlias" +awskms8 = ( + "awskms://localhost:4566/arn:aws:kms:us-east-2:111122223333:alias/ExampleAlias" +) +gcpkms = "gcpkms://projects/example_project/locations/example_location/keyRings/example_keyring/cryptoKeys/example_key/versions/example_keyversion" +azurekms = "azurekms://example_vault_name/example_key" +hashicorpkms = "hashivault://example_keyname" +k8skms = "k8s://example_ns/example_key" +sample_mail = "mail@example.com" + +sample_ecdsa2 = "-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEi2WD/E/UXF4+yoE5e4cjpJMNgQw\n8PAVALRX+8f8I8B+XneAtnOHDTI8L6wBeFRTzl6G4OmgDyCRYTb5MV3hog==\n-----END PUBLIC KEY-----" + + +def cb(image, key_args): + return key_args[:2] + + +@pytest.mark.parametrize( + "data, class_, exception", + [ + (sample_ecdsa, trust_root.ECDSAKey, fix.no_exc()), + (sample_rsa, trust_root.RSAKey, fix.no_exc()), + (sample_mail, trust_root.KeyLessTrustRoot, fix.no_exc()), + ("iamnotakey", None, pytest.raises(exc.InvalidFormatException)), + ] + + list( + map( + lambda x: (x, trust_root.KMSKey, fix.no_exc()), + [ + awskms1, + awskms2, + awskms3, + awskms4, + awskms5, + awskms6, + awskms7, + awskms8, + gcpkms, + azurekms, + hashicorpkms, + k8skms, + ], + ) + ), +) +def test_keys(data, class_, exception): + with exception: + key = trust_root.TrustRoot(data) + assert isinstance(key, class_) + + +@pytest.mark.parametrize( + "key, out", + [ + ( + sample_ecdsa, + "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEOXYta5TgdCwXTCnLU09W5T4M4r9fQQrqJuADP6U7g5r9ICgPSmZuRHP/1AYUfOQW3baveKsT969EfELKj1lfCA==", + ), + ( + sample_rsa, + "MIIBCgKCAQEAs5pC7R5OTSTUMJHUniPkrLfmGDAUxZtRlvIE+pGPCD6cUXH22advkK87xwpupjxdVYuKTFnWHUIyFJwjI3vusievezcAr0E/xxyeo49tWog9kFoooK3qmXjpETC8OpvNROZ0K3qhlm9PZkGo3gSJ/B4rMU/d+jkCI8eiUPpdVQOczdBoD5nzQAF1mfmffWGsbKY+d8/l77Vset0GXExRzUtnglMhREyHNpDeQUg5OEn+kuGLlTzIxpIF+MlbzP3+xmNEzH2iafr0ae2g5kX2880priXpxG8GXW2ybZmPvchclnvFu4ZfZcM10FpgYJFvR/9iofFeAka9u5z6VZccmQIDAQAB", + ), + (awskms1, awskms1), + (sample_mail, sample_mail), + ], +) +def test_str(key, out): + k = trust_root.TrustRoot(key) + assert str(k) == out diff --git a/tests/validators/cosign/test_cosign_validator.py b/tests/validators/cosign/test_cosign_validator.py index 0782067c3..7188993b3 100644 --- a/tests/validators/cosign/test_cosign_validator.py +++ b/tests/validators/cosign/test_cosign_validator.py @@ -2,8 +2,10 @@ import pytest_subprocess import subprocess from ... import conftest as fix +from connaisseur.image import Image import connaisseur.validators.cosign.cosign_validator as co import connaisseur.exceptions as exc +from connaisseur.trust_root import TrustRoot example_key = ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXb" @@ -24,7 +26,7 @@ "name": "default", "key": example_key, }, - {"name": "test", "key": "..."}, + {"name": "test", "key": example_key2}, ], }, { @@ -100,12 +102,12 @@ def gen_vals(static_cosign, root_no: list = None, digest=None, error=None): root_no = range(len(static_cosign["trust_roots"])) if not isinstance(digest, list): - digest = [digest for k in static_cosign["trust_roots"]] + digest = [digest] * len(static_cosign["trust_roots"]) return { static_cosign["trust_roots"][num]["name"]: { "name": static_cosign["trust_roots"][num]["name"], - "key": "".join(static_cosign["trust_roots"][num]["key"]), + "trust_root": TrustRoot("".join(static_cosign["trust_roots"][num]["key"])), "digest": digest[num], "error": error, } @@ -115,7 +117,7 @@ def gen_vals(static_cosign, root_no: list = None, digest=None, error=None): def str_vals(vals): for k in vals.keys(): - vals[k]["key"] = str(vals[k]["key"]) + vals[k]["trust_root"] = str(vals[k]["trust_root"]) return vals @@ -222,7 +224,7 @@ def test_get_pinned_keys( with exception: val = co.CosignValidator(**static_cosigns[index]) assert str_vals( - val._CosignValidator__get_pinned_keys(key_name, required, threshold) + val._CosignValidator__get_pinned_trust_roots(key_name, required, threshold) ) == str_vals(key) @@ -315,7 +317,7 @@ async def test_validate( 0, cosign_multiline_payload, cosign_stderr_at_success, - "", + "testimage:v1", [ "2f6d89c49ad745bfd5d997f9b2d253329323da4c500c7fe343e068c0382b8df4", "2f6d89c49ad745bfd5d997f9b2d253329323da4c500c7fe343e068c0382b8df4", @@ -392,24 +394,113 @@ def test_get_cosign_validated_digests( @pytest.mark.parametrize( - "image, process_input, input_type, k8s_keychain", + "image, key, process_input, exception", [ ( "testimage:v1", - ( - "-----BEGIN PUBLIC KEY-----\n" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXbZhEfTYb4Mnb/LdrtXKTIIbz\n" - "NBp8mwriocbaxXxzquvbZpv4QtOTPoIw+0192MW9dWlSVaQPJd7IaiZIIQ==\n" - "-----END PUBLIC KEY-----\n" + example_key, + { + "option_kword": "--key", + "inline_tr": "/dev/stdin", + "trust_root": ( + b"-----BEGIN PUBLIC KEY-----\n" + b"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXbZhEfTYb4Mnb/LdrtXKTIIbz\n" + b"NBp8mwriocbaxXxzquvbZpv4QtOTPoIw+0192MW9dWlSVaQPJd7IaiZIIQ==\n" + b"-----END PUBLIC KEY-----\n" + ), + }, + fix.no_exc(), + ), + ( + "testimage:v1", + "k8s://example_ns/example_key", + { + "option_kword": "--key", + "inline_tr": "k8s://example_ns/example_key", + }, + fix.no_exc(), + ), + ( + "testimage:v1", + "mail@example.com", + {"option_kword": "", "inline_tr": ""}, + pytest.raises(exc.WrongKeyError), + ), + ], +) +def test_validate_using_key(fake_process, image, key, process_input, exception): + def stdin_function(input): + return {"stderr": input.decode(), "stdout": input} + + # as we are mocking the subprocess the output doesn't change with the input. To check that the + # .communicate() method is invoked with the correct input, we append it to stderr as explained in the docs + # https://pytest-subprocess.readthedocs.io/en/latest/usage.html#passing-input + # It seems there is a bug that, when appending the input to a data stream (e.g. stderr), + # eats the other data stream (stdout in that case). Thus, simply appending to both. + + im = Image(image) + fake_process_calls = [ + "/app/cosign/cosign", + "verify", + "--output", + "text", + process_input["option_kword"], + process_input["inline_tr"], + *[], + str(im), + ] + fake_process.register_subprocess( + fake_process_calls, + stderr=cosign_stderr_at_success, + stdout=bytes(cosign_payload, "utf-8"), + stdin_callable=stdin_function, + ) + config = static_cosigns[0].copy() + val = co.CosignValidator(**config) + with exception: + returncode, stdout, stderr = val._CosignValidator__validate_using_trust_root( + im, TrustRoot(key) + ) + assert fake_process_calls in fake_process.calls + assert (returncode, stdout, stderr) == ( + 0, + "{}{}".format( + cosign_payload, process_input.get("trust_root", b"").decode() + ), + "{}{}".format( + cosign_stderr_at_success, process_input.get("trust_root", b"").decode() ), - "key", + ) + + +@pytest.mark.parametrize( + "image, process_input, k8s_keychain", + [ + ( + "testimage:v1", + { + "option_kword": "--key", + "inline_tr": "/dev/stdin", + "trust_root": ( + b"-----BEGIN PUBLIC KEY-----\n" + b"MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXbZhEfTYb4Mnb/LdrtXKTIIbz\n" + b"NBp8mwriocbaxXxzquvbZpv4QtOTPoIw+0192MW9dWlSVaQPJd7IaiZIIQ==\n" + b"-----END PUBLIC KEY-----\n" + ), + }, {"secret_name": "thesecret"}, ), - ("testimage:v1", "k8s://connaisseur/test_key", "ref", False), - ("testimage:v1", "k8s://connaisseur/test_key", "ref", True), + ( + "testimage:v1", + { + "option_kword": "--key", + "inline_tr": "k8s://connaisseur/test_key", + }, + False, + ), ], ) -def test_invoke_cosign(fake_process, image, process_input, input_type, k8s_keychain): +def test_invoke_cosign(fake_process, image, process_input, k8s_keychain): def stdin_function(input): return {"stderr": input.decode(), "stdout": input} @@ -418,15 +509,16 @@ def stdin_function(input): # https://pytest-subprocess.readthedocs.io/en/latest/usage.html#passing-input # It seems there is a bug that, when appending the input to a data stream (e.g. stderr), # eats the other data stream (stdout in that case). Thus, simply appending to both. + im = Image(image) fake_process_calls = [ "/app/cosign/cosign", "verify", "--output", "text", - "--key", - "/dev/stdin" if input_type == "key" else process_input, + process_input["option_kword"], + process_input["inline_tr"], *(["--k8s-keychain"] if k8s_keychain else []), - image, + str(im), ] fake_process.register_subprocess( fake_process_calls, @@ -437,15 +529,13 @@ def stdin_function(input): config = static_cosigns[0].copy() config["auth"] = {"k8s_keychain": k8s_keychain} val = co.CosignValidator(**config) - returncode, stdout, stderr = val._CosignValidator__invoke_cosign( - image, process_input - ) + returncode, stdout, stderr = val._CosignValidator__invoke_cosign(im, process_input) assert fake_process_calls in fake_process.calls assert (returncode, stdout, stderr) == ( 0, - "{}{}".format(cosign_payload, process_input if input_type == "key" else ""), + "{}{}".format(cosign_payload, process_input.get("trust_root", b"").decode()), "{}{}".format( - cosign_stderr_at_success, process_input if input_type == "key" else "" + cosign_stderr_at_success, process_input.get("trust_root", b"").decode() ), ) @@ -481,52 +571,18 @@ def callback_function(input): with pytest.raises(exc.CosignTimeout) as err: co.CosignValidator(**static_cosigns[0])._CosignValidator__invoke_cosign( - image, example_pubkey + image, + { + "option_kword": "--key", + "inline_tr": "/dev/stdin", + "trust_root": example_pubkey, + }, ) mock_kill.assert_has_calls([mocker.call()]) assert "Cosign timed out." in str(err.value) -@pytest.mark.parametrize( - "pubkey, output, exception", - [ - ( - ( - "-----BEGIN PUBLIC KEY-----\n" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXbZhEfTYb4Mnb/LdrtXKTIIbz\n" - "NBp8mwriocbaxXxzquvbZpv4QtOTPoIw+0192MW9dWlSVaQPJd7IaiZIIQ==\n" - "-----END PUBLIC KEY-----\n" - ), - ( - ["--key", "/dev/stdin"], - {}, - b"-----BEGIN PUBLIC KEY-----\nMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAE6uuXbZhEfTYb4Mnb/LdrtXKTIIbz\nNBp8mwriocbaxXxzquvbZpv4QtOTPoIw+0192MW9dWlSVaQPJd7IaiZIIQ==\n-----END PUBLIC KEY-----\n", - ), - fix.no_exc(), - ), - ( - "k8s://connaisseur/test_key", - (["--key", "k8s://connaisseur/test_key"], {}, b""), - fix.no_exc(), - ), - ( - "123step123step", - ([], {}, b""), - pytest.raises(exc.InvalidFormatException, match=r".*Public key.*"), - ), - ], -) -def test_get_pubkey_config(pubkey, output, exception): - with exception: - assert ( - co.CosignValidator(**static_cosigns[0])._CosignValidator__get_pubkey_config( - pubkey - ) - == output - ) - - def test_get_envs(monkeypatch): env = co.CosignValidator(**static_cosigns[0])._CosignValidator__get_envs() assert env["DOCKER_CONFIG"] == "/app/connaisseur-config/cosign1/.docker/" diff --git a/tests/validators/notaryv1/test_keystore.py b/tests/validators/notaryv1/test_keystore.py index a24614ba5..269c4c7b6 100644 --- a/tests/validators/notaryv1/test_keystore.py +++ b/tests/validators/notaryv1/test_keystore.py @@ -1,10 +1,10 @@ +import base64 +from connaisseur.trust_root import TrustRoot import pytest -import ecdsa from ... import conftest as fix import connaisseur.validators.notaryv1.key_store as ks from connaisseur.validators.notaryv1.trust_data import TrustData import connaisseur.exceptions as exc -from connaisseur.crypto import load_key sample_key = ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEtR5kwrDK22SyCu" @@ -12,19 +12,19 @@ "l+W2k3elHkPbR+gNkK2PCA==" ) pub_root_keys = { - "7c62922e6be165f1ea08252f77410152b9e4ec0d7bf4e69c1cc43f0e6c73da20": load_key( + "7c62922e6be165f1ea08252f77410152b9e4ec0d7bf4e69c1cc43f0e6c73da20": TrustRoot( ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErIGdt5pelfWOSjmY7k+/TypV0IFF" "9XLA+K4swhclLJb79cLoeBBDqkkUrkfhN5gxRnA//wA3amL4WXkaGsb9zQ==" ), ), - "7dbacd611d5933ca3f0fad581ed233881c501229343613f63f2d4b5771ee4299": load_key( + "7dbacd611d5933ca3f0fad581ed233881c501229343613f63f2d4b5771ee4299": TrustRoot( ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEzRo/rFtBEAJLvEFU7xem34GpEsw" "xsw6nW9YiBqbAcba6LWZuem7slTp+List+NKAVK3EzJCjUixooO5ss4Erug==" ), ), - "f1997e14be3d33c5677282b6a73060d8124f4020f464644e27ab76f703eb6f7e": load_key( + "f1997e14be3d33c5677282b6a73060d8124f4020f464644e27ab76f703eb6f7e": TrustRoot( ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEMza1L1+e8vfZ1q7+GA5E0st13g7j" "WR7fdQSsxkdrpJ6IkUq9D6f9BUopD83YvLBMEMy20MBvsICJnXMu8IZlYA==" @@ -33,13 +33,13 @@ } target_keys = { - "6984a67934a29955b3f969835c58ee0dd09158f5bec43726d319515b56b0a878": load_key( + "6984a67934a29955b3f969835c58ee0dd09158f5bec43726d319515b56b0a878": TrustRoot( ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEchQNiJJt4PTaEeAzaztL+TQZqTa" "0iM0YSf+w0LjSElobVsYgnqIbCWe6pGX3UvcCngNw7N4uGkdVNVMS2Tslg==" ), ), - "70aa109003a93131c63499c70dcfc8db3ba33ca81bdd1abcd52c067a8acc0492": load_key( + "70aa109003a93131c63499c70dcfc8db3ba33ca81bdd1abcd52c067a8acc0492": TrustRoot( ( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEM0xl8F5nwIV3IAru1Pf85WCo4cfT" "OQ91jhxVaQ3xHMeW430q7R4H/tJmAXUZBe+nOTX8pgtmrLpT+Hu/H7pUhw==" @@ -61,67 +61,42 @@ @pytest.mark.parametrize( - "pub_key, exception", + "pub_key", [ ( - ( + TrustRoot( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEtR5kwrDK22SyCu" "7WMF8tCjVgeORAS2PWacRcBN/VQdVK4PVk1w4pMWlz9AHQthDG" "l+W2k3elHkPbR+gNkK2PCA==" - ), - fix.no_exc(), + ) ), ( - ( + TrustRoot( "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErIGdt5pelfWOSjmY7k+/TypV0IFF" "9XLA+K4swhclLJb79cLoeBBDqkkUrkfhN5gxRnA//wA3amL4WXkaGsb9zQ==" - ), - fix.no_exc(), + ) ), - (None, fix.no_exc()), - ( - ( - "-----BEGIN PUBLIC KEY-----" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEWGcErqaO7+y3PzNTHt7PVx0+Xtgv" - "LV5mFW91CxzN8uQht/Ig6+FAymrn2lOtUz5BqF4pSQizcdqN475t6raTWw==" - "-----END PUBLIC KEY-----" - ), - pytest.raises(exc.InvalidKeyFormatError), - ), - ( - ( - "-----BEGIN PUBLIC KEY-----\n" - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEWGcErqaO7+y3PzNTHt7PVx0+Xtgv\n" - "LV5mFW91CxzN8uQht/Ig6+FAymrn2lOtUz5BqF4pSQizcdqN475t6raTWwthismakesnosense==C\n" - "-----END PUBLIC KEY-----\n" - ), - pytest.raises(exc.InvalidKeyFormatError), - ), - ("hello! i'm happy to be here.", pytest.raises(exc.InvalidKeyFormatError)), + (None), ], ) -def test_key_store(pub_key: str, exception): - with exception: - k = ks.KeyStore(pub_key) - if pub_key: - key = ecdsa.VerifyingKey.from_pem(pub_key) - assert k.keys == {"root": key} - else: - assert k.keys == {} - assert k.hashes == {} +def test_key_store(pub_key: TrustRoot): + k = ks.KeyStore(pub_key) + if pub_key: + assert k.keys == {"root": pub_key} + else: + assert k.keys == {} + assert k.hashes == {} @pytest.mark.parametrize( "k_id, k_value, exception", [ - ("root", load_key(sample_key), fix.no_exc()), + ("root", sample_key, fix.no_exc()), ( "7c62922e6be165f1ea08252f77410152b9e4ec0d7bf4e69c1cc43f0e6c73da20", - load_key( - ( - "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErIGdt5pelfWOSjmY7k+/TypV0IFF" - "9XLA+K4swhclLJb79cLoeBBDqkkUrkfhN5gxRnA//wA3amL4WXkaGsb9zQ==" - ) + ( + "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAErIGdt5pelfWOSjmY7k+/TypV0IFF" + "9XLA+K4swhclLJb79cLoeBBDqkkUrkfhN5gxRnA//wA3amL4WXkaGsb9zQ==" ), fix.no_exc(), ), @@ -130,7 +105,8 @@ def test_key_store(pub_key: str, exception): ) def test_get_key(sample_key_store, k_id, k_value, exception): with exception: - assert sample_key_store.get_key(k_id) == k_value + k = sample_key_store.get_key(k_id).value.to_string() + assert k_value.endswith(base64.b64encode(k).decode("utf-8")) @pytest.mark.parametrize( @@ -153,51 +129,40 @@ def test_get_hash(sample_key_store, role: str, _hash: str, _len: int, exception) @pytest.mark.parametrize( - "data, role, keys, hashes, exception", + "data, role, keys, hashes", [ ( fix.get_td("sample_root"), "root", - dict(**{"root": load_key(sample_key)}, **pub_root_keys), + dict(**{"root": TrustRoot(sample_key)}, **pub_root_keys), {}, - fix.no_exc(), ), ( fix.get_td("sample_targets"), "targets", - dict(**{"root": load_key(sample_key)}, **target_keys), + dict(**{"root": TrustRoot(sample_key)}, **target_keys), {}, - fix.no_exc(), ), ( fix.get_td("sample_snapshot"), "snapshot", - {"root": load_key(sample_key)}, + {"root": TrustRoot(sample_key)}, snapshot_hashes, - fix.no_exc(), ), ( fix.get_td("sample_timestamp"), "timestamp", - {"root": load_key(sample_key)}, + {"root": TrustRoot(sample_key)}, timestamp_hashes, - fix.no_exc(), - ), - ( - fix.get_td("wrong_key_format"), - "targets", - {}, - {}, - pytest.raises(exc.InvalidKeyFormatError), ), ], ) -def test_update( - m_trust_data, sample_key_store, data, role, keys: dict, hashes: dict, exception -): - with exception: - k = ks.KeyStore(sample_key) - trust_data = TrustData(data, role) - k.update(trust_data) - assert k.keys == keys - assert k.hashes == hashes +def test_update(m_trust_data, data, role, keys: dict, hashes: dict): + def transform_dict(d): + return {k: v.value.to_string() for k, v in d.items()} + + k = ks.KeyStore(TrustRoot(sample_key)) + td = TrustData(data, role) + k.update(td) + assert transform_dict(k.keys) == transform_dict(keys) + assert k.hashes == hashes diff --git a/tests/validators/notaryv1/test_notary.py b/tests/validators/notaryv1/test_notary.py index b0324efb2..b6ce5f71d 100644 --- a/tests/validators/notaryv1/test_notary.py +++ b/tests/validators/notaryv1/test_notary.py @@ -76,7 +76,7 @@ def test_notary(sample_notaries, index, exception): no = notary.Notary(**sample_notaries[index]) assert no.name == static_notaries[index]["name"] assert no.host == static_notaries[index]["host"] - assert no.pub_root_keys == static_notaries[index]["trust_roots"] + assert no.root_keys == static_notaries[index]["trust_roots"] assert no.is_acr == static_notaries[index].get("is_acr", False) diff --git a/tests/validators/notaryv1/test_notaryv1_validator.py b/tests/validators/notaryv1/test_notaryv1_validator.py index be19653e2..b98252222 100644 --- a/tests/validators/notaryv1/test_notaryv1_validator.py +++ b/tests/validators/notaryv1/test_notaryv1_validator.py @@ -1,5 +1,6 @@ import os import re +from connaisseur.trust_root import TrustRoot import pytest from aioresponses import aioresponses from ... import conftest as fix @@ -316,7 +317,7 @@ async def test_process_chain_of_trust( aio.get(re.compile(r".*"), callback=fix.async_callback, repeat=True) signed_targets = ( await sample_nv1._NotaryV1Validator__process_chain_of_trust( - Image(image), delegations, key + Image(image), delegations, TrustRoot(key) ) ) diff --git a/tests/validators/notaryv1/test_trust_data.py b/tests/validators/notaryv1/test_trust_data.py index 7391e6bf8..355b39ab9 100644 --- a/tests/validators/notaryv1/test_trust_data.py +++ b/tests/validators/notaryv1/test_trust_data.py @@ -4,9 +4,8 @@ import datetime as dt from ... import conftest as fix import connaisseur.validators.notaryv1.trust_data as td -from connaisseur.validators.notaryv1.key_store import KeyStore import connaisseur.exceptions as exc -from connaisseur.crypto import load_key +from connaisseur.trust_root import TrustRoot pub_root_keys = { "2cd463575a31cb3184320e889e82fb1f9e3bbebee2ae42b2f825b0c8a734e798": { @@ -279,6 +278,94 @@ def test_validate_signature( assert trust_data_.validate_signature(sample_key_store) is None +@pytest.mark.parametrize( + "signature, payload, key, exception", + [ + ( + ( + "hx/VtTJT2r1nmkHtPZacncvosKca4XnLbMxNmeuH0cw5sTsUsznRuZ" + "mgd4vKPaQUbnCA3RMQpNlaGRWz1TR8CQ==" + ), + "iliketurtles", + TrustRoot( + ( + "-----BEGIN PUBLIC KEY-----\n" + "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEi2WD/E/UXF4+yoE5e4cjpJMNgQw\n" + "8PAVALRX+8f8I8B+XneAtnOHDTI8L6wBeFRTzl6G4OmgDyCRYTb5MV3hog==\n" + "-----END PUBLIC KEY-----" + ) + ), + fix.no_exc(), + ), + ( + "", + "", + TrustRoot("mail@example.com"), + pytest.raises(exc.WrongKeyError), + ), + ], +) +def test_validate_signature_with_key( + signature: str, payload: str, key: TrustRoot, exception +): + with exception: + assert ( + td.TrustData._TrustData__validate_signature_with_key( + signature, payload, key + ) + is True + ) + + +@pytest.mark.parametrize( + "signature, payload, key, exception", + [ + ( + ( + "hx/VtTJT2r1nmkHtPZacncvosKca4XnLbMxNmeuH0cw5sTsUsznRuZ" + "mgd4vKPaQUbnCA3RMQpNlaGRWz1TR8CQ==" + ), + "iliketurtles", + TrustRoot( + ( + "-----BEGIN PUBLIC KEY-----\n" + "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEi2WD/E/UXF4+yoE5e4cjpJMNgQw\n" + "8PAVALRX+8f8I8B+XneAtnOHDTI8L6wBeFRTzl6G4OmgDyCRYTb5MV3hog==\n" + "-----END PUBLIC KEY-----" + ) + ), + fix.no_exc(), + ), + ( + ( + "hx/VtTJT2r1nmkHtPZacncvosKca4XnLbMxNmeuH0cw5sTsUsznRuZ" + "mgd4vKPaQUbnCA3RMQpNlaGRWz1TR8CM==" + ), + "iliketurtles", + TrustRoot( + ( + "-----BEGIN PUBLIC KEY-----\n" + "MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEEi2WD/E/UXF4+yoE5e4cjpJMNgQw\n" + "8PAVALRX+8f8I8B+XneAtnOHDTI8L6wBeFRTzl6G4OmgDyCRYTb5MV3hog==\n" + "-----END PUBLIC KEY-----" + ) + ), + pytest.raises(Exception), + ), + ], +) +def test_validate_signature_with_ecdsa( + signature: str, payload: str, key: TrustRoot, exception +): + with exception: + assert ( + td.TrustData._TrustData__validate_signature_with_ecdsa( + signature, payload, key + ) + is True + ) + + @pytest.mark.parametrize( "data, role, exception", [