Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: key interface #540

Merged
merged 1 commit into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ install:
#
helm install connaisseur helm --atomic --create-namespace --namespace $(NAMESPACE)

dev-install:
helm install --set deployment.replicasCount=1,deployment.imagePullPolicy=Never connaisseur helm --atomic --create-namespace --namespace $(NAMESPACE)

uninstall:
helm uninstall connaisseur -n $(NAMESPACE)
kubectl delete ns $(NAMESPACE)
Expand Down
28 changes: 0 additions & 28 deletions connaisseur/crypto.py

This file was deleted.

4 changes: 4 additions & 0 deletions connaisseur/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class UnknownAPIVersionError(UnknownTypeException):
pass


class WrongKeyError(UnknownTypeException):
pass


class AmbiguousDigestError(BaseConnaisseurException):
pass

Expand Down
90 changes: 90 additions & 0 deletions connaisseur/trust_root.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import base64
import re

import ecdsa
import rsa

from connaisseur.exceptions import InvalidFormatException

KMS_REGEX = r"^(awskms|gcpkms|azurekms|hashivault|k8s):\/{2,3}[a-zA-Z0-9_.+\/:-]+$"
KEYLESS_REGEX = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"


class TrustRootInterface:
"""
Interface from which all trust roots inherit.
"""

def __new__(cls, data: object):
instance = super(TrustRootInterface, cls).__new__(cls)
instance.__init__(data)
return instance

def __init__(self, data: object) -> None:
self.value = data

def __str__(self) -> str:
return self.value


class TrustRoot(TrustRootInterface):
"""
Abstract TrustRoot class used to represent key material or similar entities, used in
verification processes.

May contain a public key, reference to a key or any other type of trust root.
"""

value: object

def __new__(cls, data: str):
try:
tr_cls, tr_data = TrustRoot.__get_type_cls_and_data(data)
return tr_cls.__new__(tr_cls, tr_data)
except Exception as err:
msg = "Error loading the trust root."
raise InvalidFormatException(message=msg) from err

@staticmethod
def __get_type_cls_and_data(data: str):
if re.match(KEYLESS_REGEX, data):
return KeyLessTrustRoot, data
elif re.match(KMS_REGEX, data):
return KMSKey, data
elif key := TrustRoot.__check_and_return_ecdsa(data):
return ECDSAKey, key
elif key := TrustRoot.__check_and_return_rsa(data):
return RSAKey, key
return None, data

@staticmethod
def __check_and_return_ecdsa(data: str):
try:
return ecdsa.VerifyingKey.from_pem(data)
except Exception:
return None

@staticmethod
def __check_and_return_rsa(data: str):
try:
return rsa.PublicKey.load_pkcs1_openssl_pem(data)
except Exception:
return None


class ECDSAKey(TrustRootInterface):
def __str__(self) -> str:
return base64.b64encode(self.value.to_der()).decode("utf-8")


class RSAKey(TrustRootInterface):
def __str__(self) -> str:
return base64.b64encode(self.value.save_pkcs1("DER")).decode("utf-8")


class KMSKey(TrustRootInterface):
pass


class KeyLessTrustRoot(TrustRootInterface):
pass
146 changes: 81 additions & 65 deletions connaisseur/validators/cosign/cosign_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@

from concurrent.futures import ThreadPoolExecutor

from connaisseur.crypto import load_key
from connaisseur.exceptions import (
CosignError,
CosignTimeout,
NotFoundException,
InvalidFormatException,
UnexpectedCosignData,
ValidationError,
WrongKeyError,
)
from connaisseur.image import Image
from connaisseur.trust_root import KMSKey, TrustRoot, ECDSAKey
from connaisseur.util import safe_path_func # nosec
from connaisseur.validators.interface import ValidatorInterface

Expand All @@ -41,8 +42,9 @@ async def validate(
"threshold",
1 if trust_root != "*" or any(required) else len(self.trust_roots),
)
# vals is a validations dict for each required trust root containing validated digests and errors
vals = self.__get_pinned_keys(trust_root, required, threshold)
# vals is a validations dict for each required trust root containing validated
# digests and errors
vals = self.__get_pinned_trust_roots(trust_root, required, threshold)

# use concurrent.futures for now
# tasks = [self.__validation_task(k, str(image)) for k in self.vals.keys()]
Expand All @@ -52,7 +54,8 @@ async def validate(
num_workers = len(vals)
executor = ThreadPoolExecutor(num_workers)
# prepare tasks
# a copy of vals dictionaries is passed to concurrent validation to ensure thread-safe execution
# a copy of vals dictionaries is passed to concurrent validation to ensure
# thread-safe execution
arguments = [(k, v.copy(), str(image)) for k, v in vals.items()]
futures = [executor.submit(self.__validation_task, *arg) for arg in arguments]
# await results (output dropped as `vals` is updated within function)
Expand All @@ -63,45 +66,46 @@ async def validate(
vals=vals, threshold=threshold, required=required
)

def __get_pinned_keys(self, key_name: str, required: list, threshold: int):
def __get_pinned_trust_roots(self, tr_name: str, required: list, threshold: int):
"""
Extract the pinned key(s) selected for validation from the list of trust roots.
Extract the pinned trust root(s) selected for validation from the list of trust
roots.
"""
key_name = key_name or "default"
available_keys = list(map(lambda k: k["name"], self.trust_roots))
tr_name = tr_name or "default"
available_trs = list(map(lambda t: t["name"], self.trust_roots))

# generate list of pinned keys
if key_name == "*":
# generate list of pinned trust roots
if tr_name == "*":
if len(required) >= threshold:
pinned_keys = required
pinned_trs = required
else:
pinned_keys = available_keys
pinned_trs = available_trs
else:
pinned_keys = [key_name]
pinned_trs = [tr_name]

# check if pinned keys exist in available trust roots
missing_keys = set(pinned_keys) - set(available_keys)
if missing_keys:
msg = 'Trust roots "{key_names}" not configured for validator "{validator_name}".'
# check if pinned trust roots exist in available trust roots
missing_trs = set(pinned_trs) - set(available_trs)
if missing_trs:
msg = 'Trust roots "{tr_names}" not configured for validator "{validator_name}".'
raise NotFoundException(
message=msg,
key_names=", ".join(missing_keys),
tr_names=", ".join(missing_trs),
validator_name=self.name,
)

# construct key validation dictionary for pinned keys
keys = {
k["name"]: {
"name": k["name"],
"key": "".join(k["key"]),
trust_roots = {
t["name"]: {
"name": t["name"],
"trust_root": TrustRoot("".join(t["key"])),
phbelitz marked this conversation as resolved.
Show resolved Hide resolved
"digest": None,
"error": None,
}
for k in self.trust_roots
if k["name"] in pinned_keys
for t in self.trust_roots
if t["name"] in pinned_trs
}

return keys
return trust_roots

# async def __validation_task(self, trust_root: str, image: str):
def __validation_task(self, trust_root: str, val: dict, image: str):
Expand All @@ -121,13 +125,13 @@ def __validation_task(self, trust_root: str, val: dict, image: str):
# async def __get_cosign_validated_digests(self, image: str, trust_root: dict):
def __get_cosign_validated_digests(self, image: str, trust_root: dict):
"""
Get and process Cosign validation output for a given `image` and `key`
Get and process Cosign validation output for a given `image` and `trust_root`
and either return a list of valid digests or raise a suitable exception
in case no valid signature is found or Cosign fails.
"""
# returncode, stdout, stderr = await self.__invoke_cosign(image, trust_root["key"])
returncode, stdout, stderr = self.__invoke_cosign(image, trust_root["key"])

returncode, stdout, stderr = self.__validate_using_trust_root(
image, trust_root["trust_root"]
)
logging.info(
"COSIGN output of trust root '%s' for image'%s': RETURNCODE: %s; STDOUT: %s; STDERR: %s",
trust_root["name"],
Expand Down Expand Up @@ -217,35 +221,71 @@ def __get_cosign_validated_digests(self, image: str, trust_root: dict):
)
return digests.pop()

# async def __invoke_cosign(self, image: str, key: str):
def __invoke_cosign(self, image: str, key: str):
def __validate_using_trust_root(self, image: str, trust_root: TrustRoot):
"""
Invoke the Cosign binary in a subprocess for a specific `image` given a `key` and
return the returncode, stdout and stderr. Will raise an exception if Cosign times out.
Call the `CosignValidator.__invoke_cosign` method, using a specific trust root.
"""
# reminder when implementing RSA validation:
# ["--key", "/dev/stdin", self.value.save_pkcs1()]

# reminder when implementing Keyless validation:
# ["--cert-email", self.value, b""]

if isinstance(trust_root, ECDSAKey):
return self.__invoke_cosign(
image,
{
"option_kword": "--key",
"inline_tr": "/dev/stdin",
"trust_root": trust_root.value.to_pem(),
},
)
elif isinstance(trust_root, KMSKey):
return self.__invoke_cosign(
image,
{
"option_kword": "--key",
"inline_tr": trust_root.value,
},
)
msg = (
"The trust_root type {tr_type} is unsupported for a validator of type"
"{val_type}."
)
raise WrongKeyError(message=msg, tr_type=type(trust_root), val_type="cosign")

def __invoke_cosign(self, image: str, tr_args: dict):
"""
pubkey_config, env_vars, pubkey = CosignValidator.__get_pubkey_config(key)
Invoke the Cosign binary in a subprocess for a specific `image` given trust root
argument dict (`tr_args`) and return the returncode, stdout and stderr. The trust
root argument dict includes a Cosign option keyword and the trust root itself,
either as inline argument or pipeable input with an inline reference. The
composition of the dict is dependant on the type of trust root.

Raises an exception if Cosign times out.
"""
cmd = [
"/app/cosign/cosign",
"verify",
"--output",
"text",
*pubkey_config,
tr_args["option_kword"],
tr_args["inline_tr"],
*(["--k8s-keychain"] if self.k8s_keychain else []),
image,
]
env = self.__get_envs()
env.update(env_vars)

with subprocess.Popen( # nosec
cmd,
env=env,
env=self.__get_envs(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as process:
try:
stdout, stderr = process.communicate(pubkey, timeout=60)
stdout, stderr = process.communicate(
input=tr_args.get("trust_root", None), timeout=60
)
except subprocess.TimeoutExpired as err:
process.kill()
msg = "Cosign timed out."
Expand All @@ -255,33 +295,9 @@ def __invoke_cosign(self, image: str, key: str):

return process.returncode, stdout.decode("utf-8"), stderr.decode("utf-8")

@staticmethod
def __get_pubkey_config(key: str):
"""
Return a tuple of the used Cosign verification command (flag-value list), a
dict of potentially required environment variables and public key in binary
PEM format to be used as stdin to Cosign based on the format of the input
key (reference).

Raise InvalidFormatException if none of the supported patterns is matched.
"""
try:
# key is ecdsa public key
pkey = load_key(key).to_pem() # raises if invalid
return ["--key", "/dev/stdin"], {}, pkey
except ValueError:
pass

# key is KMS reference
if re.match(r"^\w{2,20}://[\w:/-]{3,255}$", key):
return ["--key", key], {}, b""

msg = "Public key (reference) '{input_str}' does not match expected patterns."
raise InvalidFormatException(message=msg, input_str=key)

def __get_envs(self):
phbelitz marked this conversation as resolved.
Show resolved Hide resolved
"""
Sets up environment variables used by cosign.
Set up environment variables used by cosign.
"""
env = os.environ.copy()
# Extend the OS env vars only for passing to the subprocess below
phbelitz marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -295,7 +311,7 @@ def __get_envs(self):
@staticmethod
def __apply_policy(vals: dict, threshold: int, required: list):
"""
Validates the signature verification outcome against the policy for
Validate the signature verification outcome against the policy for
threshold and required trust roots.

Raises an exception if not compliant.
Expand Down
Loading