Skip to content

Commit

Permalink
feat: Implement KV manager (#571)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielArndt authored Jan 6, 2025
1 parent 609fd39 commit 4fe80a1
Show file tree
Hide file tree
Showing 6 changed files with 458 additions and 280 deletions.
250 changes: 232 additions & 18 deletions lib/charms/vault_k8s/v0/vault_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
VaultClient,
VaultClientError,
)
from charms.vault_k8s.v0.vault_kv import VaultKvProvides
from ops import CharmBase, EventBase, Object, Relation
from ops.pebble import PathError

Expand All @@ -83,7 +84,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
LIBPATCH = 4


SEND_CA_CERT_RELATION_NAME = "send-ca-cert"
Expand All @@ -101,6 +102,15 @@
}}
"""

KV_POLICY = """# Allows the KV requirer to create, read, update, delete and list secrets
path "{mount}/*" {{
capabilities = ["create", "read", "update", "delete", "list"]
}}
path "sys/internal/ui/mounts/{mount}" {{
capabilities = ["read"]
}}
"""


class LogAdapter(logging.LoggerAdapter):
"""Adapter for the logger to prepend a prefix to all log lines."""
Expand Down Expand Up @@ -586,24 +596,49 @@ class Naming:
provides a central place to manage them.
"""

key_prefix: str = ""
policy_prefix: str = "charm-autounseal-"
approle_prefix: str = "charm-autounseal-"
autounseal_approle_prefix: str = "charm-autounseal-"
autounseal_key_prefix: str = ""
autounseal_policy_prefix: str = "charm-autounseal-"
kv_mount_prefix: str = "charm-"
kv_secret_prefix: str = "vault-kv-"

@classmethod
def key_name(cls, relation_id: int) -> str:
def autounseal_key_name(cls, relation_id: int) -> str:
"""Return the key name for the relation."""
return f"{cls.key_prefix}{relation_id}"
return f"{cls.autounseal_key_prefix}{relation_id}"

@classmethod
def policy_name(cls, relation_id: int) -> str:
def autounseal_policy_name(cls, relation_id: int) -> str:
"""Return the policy name for the relation."""
return f"{cls.policy_prefix}{relation_id}"
return f"{cls.autounseal_policy_prefix}{relation_id}"

@classmethod
def approle_name(cls, relation_id: int) -> str:
def autounseal_approle_name(cls, relation_id: int) -> str:
"""Return the approle name for the relation."""
return f"{cls.approle_prefix}{relation_id}"
return f"{cls.autounseal_approle_prefix}{relation_id}"

@classmethod
def kv_secret_label(cls, unit_name: str) -> str:
"""Return the secret label for the KV backend."""
unit_name_dash = unit_name.replace("/", "-")
return f"{cls.kv_secret_prefix}{unit_name_dash}"

@classmethod
def kv_mount_path(cls, app_name: str, mount_suffix: str) -> str:
"""Return the mount path for the KV backend."""
return f"{cls.kv_mount_prefix}{app_name}-{mount_suffix}"

@classmethod
def kv_policy_name(cls, mount_path: str, unit_name: str) -> str:
"""Return the policy name for the KV backend."""
unit_name_dash = unit_name.replace("/", "-")
return f"{mount_path}-{unit_name_dash}"

@classmethod
def kv_role_name(cls, mount_path: str, unit_name: str) -> str:
"""Return the role name for the KV backend."""
unit_name_dash = unit_name.replace("/", "-")
return f"{mount_path}-{unit_name_dash}"


class AutounsealProviderManager:
Expand Down Expand Up @@ -662,7 +697,7 @@ def _detect_and_allow_deletion_of_orphaned_keys(self) -> None:
"""
existing_keys = self._get_existing_keys()
relation_key_names = [
Naming.key_name(relation.id)
Naming.autounseal_key_name(relation.id)
for relation in self._juju_facade.get_active_relations(self._provides.relation_name)
]
orphaned_keys = [key for key in existing_keys if key not in relation_key_names]
Expand Down Expand Up @@ -690,7 +725,7 @@ def _clean_up_roles(self) -> None:
"""Delete roles that are no longer associated with an autounseal Juju relation."""
existing_roles = self._get_existing_roles()
relation_role_names = [
Naming.approle_name(relation.id)
Naming.autounseal_approle_name(relation.id)
for relation in self._juju_facade.get_active_relations(self._provides.relation_name)
]
for role in existing_roles:
Expand All @@ -702,7 +737,7 @@ def _clean_up_policies(self) -> None:
"""Delete policies that are no longer associated with an autounseal Juju relation."""
existing_policies = self._get_existing_policies()
relation_policy_names = [
Naming.policy_name(relation.id)
Naming.autounseal_policy_name(relation.id)
for relation in self._juju_facade.get_active_relations(self._provides.relation_name)
]
for policy in existing_policies:
Expand All @@ -724,9 +759,9 @@ def create_credentials(self, relation: Relation, vault_address: str) -> tuple[st
Returns:
A tuple containing the key name, role ID, and approle secret ID.
"""
key_name = Naming.key_name(relation.id)
policy_name = Naming.policy_name(relation.id)
approle_name = Naming.approle_name(relation.id)
key_name = Naming.autounseal_key_name(relation.id)
policy_name = Naming.autounseal_policy_name(relation.id)
approle_name = Naming.autounseal_approle_name(relation.id)
self._create_key(key_name)
policy_content = AUTOUNSEAL_POLICY.format(mount=self.mount_path, key_name=key_name)
self._client.create_or_update_policy(
Expand Down Expand Up @@ -755,11 +790,11 @@ def _get_existing_keys(self) -> list[str]:

def _get_existing_roles(self) -> list[str]:
output = self._client.list("auth/approle/role")
return [role for role in output if role.startswith(Naming.approle_prefix)]
return [role for role in output if role.startswith(Naming.autounseal_approle_prefix)]

def _get_existing_policies(self) -> list[str]:
output = self._client.list("sys/policy")
return [policy for policy in output if policy.startswith(Naming.policy_prefix)]
return [policy for policy in output if policy.startswith(Naming.autounseal_policy_prefix)]


@dataclass
Expand Down Expand Up @@ -1062,3 +1097,182 @@ def calculate_pki_certificates_ttl(certificate: Certificate) -> int:
ca_validity_time = certificate.expiry_time - certificate.validity_start_time
ca_validity_seconds = ca_validity_time.total_seconds()
return int(ca_validity_seconds / 2)


class KVManager:
"""Encapsulates the business logic for managing KV credentials for requirer Charms."""

def __init__(
self,
charm: CharmBase,
vault_client: VaultClient,
vault_kv: VaultKvProvides,
ca_cert: str,
):
self._vault_client = vault_client
self._juju_facade = JujuFacade(charm)
self._vault_kv = vault_kv
self._ca_cert = ca_cert

def generate_credentials_for_requirer(
self,
relation: Relation,
app_name: str,
unit_name: str,
mount_suffix: str,
egress_subnets: list[str],
nonce: str,
vault_url: str,
):
"""Generate KV credentials for the requirer, and store the credentials in the relation.
This method ensures that the approle and policy are created or updated,
and that the approle secret ID is generated and stored in a Juju secret.
The Juju secret ID is then passed to the requirer, along with other
necessary information to access the KV backend.
Args:
relation: The relation of the requirer
app_name: The name of the requirer application
unit_name: The name of the requirer unit for this relation
mount_suffix: The suffix to append to the mount path, as provided by the requirer
egress_subnets: The egress subnets of the requirer
nonce: The nonce provided by the requirer
vault_url: The URL of the Vault server that the requirer can access
over this relation.
"""
if not self._juju_facade.is_leader:
logger.debug("Only leader unit can handle a vault-kv request")
return
mount = Naming.kv_mount_path(app_name, mount_suffix)
self._vault_client.enable_secrets_engine(SecretsBackend.KV_V2, mount)
secret_id = self._ensure_unit_credentials(
relation=relation,
unit_name=unit_name,
mount=mount,
nonce=nonce,
egress_subnets=egress_subnets,
)
self._vault_kv.set_kv_data(
relation=relation,
mount=mount,
ca_certificate=self._ca_cert,
vault_url=vault_url,
nonce=nonce,
credentials_juju_secret_id=secret_id,
)
self._remove_stale_nonce(relation=relation, nonce=nonce)

def _remove_stale_nonce(self, relation: Relation, nonce: str) -> None:
"""Remove stale nonce.
If the nonce is not present in the credentials, it is stale and should be removed.
Args:
relation: the reltaion from which to check the credentials for the nonce
nonce: the one to remove if stale
"""
credential_nonces = self._vault_kv.get_credentials(relation).keys()
if nonce not in set(credential_nonces):
self._vault_kv.remove_unit_credentials(relation, nonce=nonce)

def _ensure_unit_credentials(
self,
relation: Relation,
unit_name: str,
mount: str,
nonce: str,
egress_subnets: list[str],
) -> str:
"""Ensure a unit has credentials to access the vault-kv mount.
If the credentials are already configured for the provided egress
subnets, the existing Juju secret ID which contains the approle secret
ID is returned.
Otherwise, the necessary Vault policy and approle are created or
updated as necessary. A Vault secret ID is then generated for the
approle and stored in a Juju secret. The secret is granted to the
associate relation, and the ID of the Juju secret is returned.
Returns:
The ID of the Juju secret containing the approle secret ID.
"""
policy_name = Naming.kv_policy_name(mount, unit_name)
role_name = Naming.kv_role_name(mount, unit_name)

juju_secret_label = Naming.kv_secret_label(unit_name=unit_name)
current_credentials = self._vault_kv.get_credentials(relation)
credentials_juju_secret_id = current_credentials.get(nonce, None)

if self._is_vault_kv_role_configured(
label=juju_secret_label,
egress_subnets=egress_subnets,
role_name=role_name,
credentials_juju_secret_id=credentials_juju_secret_id,
):
logger.info("Vault KV role already configured for the provided egress subnets")
return credentials_juju_secret_id
self._vault_client.create_or_update_policy(policy_name, KV_POLICY.format(mount=mount))
role_id = self._vault_client.create_or_update_approle(
role_name,
policies=[policy_name],
cidrs=egress_subnets,
token_ttl="1h",
token_max_ttl="1h",
)
role_secret_id = self._vault_client.generate_role_secret_id(role_name, egress_subnets)
secret = self._juju_facade.set_app_secret_content(
content={"role-id": role_id, "role-secret-id": role_secret_id},
label=juju_secret_label,
)
self._juju_facade.grant_secret(relation, secret=secret)
if not secret.id:
raise ValueError(
f"Unexpected error, just created secret {juju_secret_label!r} has no id"
)
return secret.id

def _is_vault_kv_role_configured(
self,
label: str,
egress_subnets: list[str],
role_name: str,
credentials_juju_secret_id: str,
) -> bool:
"""Check if the Vault role is already configured for the provided egress subnets.
Args:
label: The label of the secret
egress_subnets: The egress subnets provided by the requirer.
role_name: The role name associated with KV for this unit.
credentials_juju_secret_id: The juju secret id.
Returns:
True if the role is already configured with the provided egress
subnets, False otherwise.
"""
try:
role_secret_id = self._juju_facade.get_latest_secret_content(
label=label,
id=credentials_juju_secret_id,
).get("role-secret-id")
except NoSuchSecretError:
return False
if not role_secret_id:
return False
role_data = self._vault_client.read_role_secret(role_name, role_secret_id)
if egress_subnets == role_data["cidr_list"]:
return True
return False

@staticmethod
def remove_unit_credentials(juju_facade: JujuFacade, unit_name: str) -> None:
"""Remove any KV credentials associated with the given unit.
Args:
juju_facade: The JujuFacade object to use for removing the secret
unit_name: The name of the unit for which to remove the secret
"""
juju_facade.remove_secret(Naming.kv_secret_label(unit_name=unit_name))
Loading

0 comments on commit 4fe80a1

Please sign in to comment.