Skip to content

Commit

Permalink
Ensure that the lead control-plane unit allocates a cos-token for itself
Browse files Browse the repository at this point in the history
  • Loading branch information
addyess committed Aug 29, 2024
1 parent 673a306 commit 4ec7be1
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 106 deletions.
12 changes: 10 additions & 2 deletions charms/worker/k8s/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,10 +448,18 @@ def _create_cos_tokens(self):

log.info("Prepare cos tokens")
if rel := self.model.get_relation("cos-tokens"):
self.distributor.allocate_tokens(relation=rel, token_strategy=TokenStrategy.COS)
self.distributor.allocate_tokens(
relation=rel,
token_strategy=TokenStrategy.COS,
token_type=ClusterTokenType.CONTROL_PLANE,
)

if rel := self.model.get_relation("cos-worker-tokens"):
self.distributor.allocate_tokens(relation=rel, token_strategy=TokenStrategy.COS)
self.distributor.allocate_tokens(
relation=rel,
token_strategy=TokenStrategy.COS,
token_type=ClusterTokenType.WORKER,
)

@on_error(
WaitingStatus("Waiting to enable features"),
Expand Down
262 changes: 167 additions & 95 deletions charms/worker/k8s/src/token_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import contextlib
import logging
from enum import Enum, auto
from typing import Optional
from typing import Dict, Optional

import charms.contextual_status as status
import ops
Expand All @@ -16,6 +16,7 @@
K8sdAPIManager,
K8sdConnectionError,
)
from pydantic import SecretStr

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -48,6 +49,129 @@ class ClusterTokenType(Enum):
NONE = ""


class TokenManager:
"""Protocol for managing tokens.
Attributes:
api_manager (K8sdAPIManager): An K8sdAPIManager object for interacting with k8sd API.
allocator_needs_tokens: Whether or not the allocator needs tokens.
strategy: The strategy for token creation.
revoke_on_join: Whether or not to revoke a token once it's joined.
"""

allocator_needs_tokens: bool
strategy: TokenStrategy
revoke_on_join: bool

def __init__(self, api_manager: K8sdAPIManager):
"""Initialize a TokenManager instance.
Args:
api_manager (K8sdAPIManager): An K8sdAPIManager object for interacting with k8sd API.
"""
self.api_manager = api_manager

def create(self, name: str, token_type: ClusterTokenType) -> SecretStr:
"""Create a token.
Args:
name (str): The name of the node.
token_type (ClusterTokenType): The type of cluster token.
Returns:
SecretStr: The created token.
"""
return SecretStr("")

def revoke(self, name: str, ignore_errors: bool):
"""Remove a token.
Args:
name (str): The name of the node.
ignore_errors (bool): Whether or not errors can be ignored
"""
...


class ClusterTokenManager(TokenManager):
"""Class for managing cluster tokens.
Attributes:
allocator_needs_tokens: Allocator don't need a token to join
strategy: The cluster strategy for token creation.
revoke_on_join: Revoke a token once it's joined.
"""

allocator_needs_tokens: bool = False
strategy: TokenStrategy = TokenStrategy.CLUSTER
revoke_on_join = True

def create(self, name: str, token_type: ClusterTokenType) -> SecretStr:
"""Create a cluster token.
Args:
name (str): The name of the node.
token_type (ClusterTokenType): The type of cluster token.
Returns:
str: The created cluster token.
"""
worker = token_type == ClusterTokenType.WORKER
return self.api_manager.create_join_token(name, worker=worker)

def revoke(self, name: str, ignore_errors: bool):
"""Remove a cluster token.
Args:
name (str): The name of the node.
ignore_errors (bool): Whether or not errors can be ignored
Raises:
K8sdConnectionError: reraises cluster token revoke failures
"""
try:
self.api_manager.remove_node(name)
except (K8sdConnectionError, InvalidResponseError) as e:
if ignore_errors or e.code == ErrorCodes.StatusNodeUnavailable:
# Let's just ignore some of these expected errors:
# "Remote end closed connection without response"
# "Failed to check if node is control-plane"
# Removing a node that doesn't exist
log.warning("Remove_Node %s: but with an expected error: %s", name, e)
else:
raise


class CosTokenManager(TokenManager):
"""Class for managing COS tokens.
Attributes:
allocator_needs_tokens: Allocator needs a token to join
strategy: The strategy for token creation.
revoke_on_join: Don't revoke a token once it's joined.
"""

allocator_needs_tokens: bool = True
strategy: TokenStrategy = TokenStrategy.COS
revoke_on_join = False

def create(self, name: str, _) -> SecretStr:
"""Create a COS token.
Args:
name (str): The name of the node.
Returns:
str: The created COS token.
"""
return self.api_manager.request_auth_token(
username=f"system:cos:{name}", groups=["system:cos"]
)

def revoke(self, _: str, __):
"""Remove a COS token intentionally left unimplemented."""


class TokenCollector:
"""Helper class for collecting tokens for units in a relation."""

Expand Down Expand Up @@ -147,70 +271,10 @@ def __init__(self, charm: ops.CharmBase, node_name: str, api_manager: K8sdAPIMan
self.charm = charm
self.node_name = node_name
self.api_manager = api_manager
self.token_creation_strategies = {
TokenStrategy.CLUSTER: self._create_cluster_token,
TokenStrategy.COS: self._create_cos_token,
self.token_strategies: Dict[TokenStrategy, TokenManager] = {
TokenStrategy.CLUSTER: ClusterTokenManager(api_manager),
TokenStrategy.COS: CosTokenManager(api_manager),
}
self.token_revoking_strategies = {
TokenStrategy.CLUSTER: self._revoke_cluster_token,
TokenStrategy.COS: self._revoke_cos_token,
}

def _create_cluster_token(self, name: str, token_type: ClusterTokenType):
"""Create a cluster token.
Args:
name (str): The name of the node.
token_type (ClusterTokenType): The type of cluster token.
Returns:
str: The created cluster token.
"""
worker = token_type == ClusterTokenType.WORKER
return self.api_manager.create_join_token(name, worker=worker)

def _create_cos_token(self, name: str, _):
"""Create a COS token.
Args:
name (str): The name of the node.
Returns:
str: The created COS token.
"""
return self.api_manager.request_auth_token(
username=f"system:cos:{name}", groups=["system:cos"]
)

def _revoke_cluster_token(self, name: str, ignore_errors: bool):
"""Remove a cluster token.
Args:
name (str): The name of the node.
ignore_errors (bool): Whether or not errors can be ignored
Raises:
K8sdConnectionError: reraises cluster token revoke failures
"""
try:
self.api_manager.remove_node(name)
except (K8sdConnectionError, InvalidResponseError) as e:
if ignore_errors or e.code == ErrorCodes.StatusNodeUnavailable:
# Let's just ignore some of these expected errors:
# "Remote end closed connection without response"
# "Failed to check if node is control-plane"
# Removing a node that doesn't exist
log.warning("Remove_Node %s: but with an expected error: %s", name, e)
else:
raise

def _revoke_cos_token(self, name: str, _):
"""Remove a COS token.
Args:
name (str): The name of the node.
"""
# TODO: implement removing cos token

def _get_juju_secret(self, relation: ops.Relation, unit: ops.Unit) -> Optional[str]:
"""Lookup juju secret offered to a unit on this relation.
Expand Down Expand Up @@ -282,45 +346,49 @@ def allocate_tokens(
token_strategy (TokenStrategy): The strategy of token creation.
token_type (ClusterTokenType): The type of cluster token.
Defaults to ClusterTokenType.NONE.
Raises:
ValueError: If an invalid token_strategy is provided.
"""
revoke_on_join = token_strategy == TokenStrategy.CLUSTER
units = relation.units
if self.charm.app == relation.app:
# include self in peer relations
units |= {self.charm.unit}
assert relation.app, f"Remote application doesn't exist on {relation.name}" # nosec

# Select the appropriate token creation strategy
token_strat = self.token_creation_strategies.get(token_strategy)
if not token_strat:
raise ValueError(f"Invalid token_strategy: {token_strategy}")

log.info("Allocating %s tokens", token_type.value)
status.add(ops.MaintenanceStatus(f"Allocating {token_type.value} tokens"))
tokenizer = self.token_strategies.get(token_strategy)
assert tokenizer, f"Invalid token_strategy: {token_strategy}" # nosec

log.info("Allocating %s %s tokens", token_type.name.title(), token_strategy.value)
status.add(
ops.MaintenanceStatus(
f"Allocating {token_type.name.title()} {token_strategy.value} tokens"
)
)
local_cluster = self.charm.get_cluster_name()
relation.data[self.charm.unit]["node-name"] = self.node_name
relation.data[self.charm.unit]["joined"] = local_cluster
relation.data[self.charm.unit]["cluster-name"] = local_cluster
if not tokenizer.allocator_needs_tokens:
# the allocator doesn't need a token to join, mark as already joined
relation.data[self.charm.unit]["joined"] = local_cluster

for unit in units:
secret_id = SECRET_ID.format(unit.name)
remote_cluster = relation.data[unit].get("joined")
node = relation.data[unit].get("node-name")
if not node:
log.info(
"Wait for node-name of %s unit=%s:%s",
token_type.value,
"Wait for %s token allocation of %s with unit=%s:%s",
token_strategy.value,
token_type.name.title(),
relation.name,
unit.name,
)
continue # wait for the joining unit to provide its node-name
if remote_cluster and remote_cluster != local_cluster:
# ignore this unit, it's not in our cluster
log.info(
"Ignoring token allocation of %s with unit=%s:%s (%s)",
token_type.value,
"Ignoring %s token allocation of %s with unit=%s:%s (%s)",
token_strategy.value,
token_type.name.title(),
relation.name,
unit.name,
node,
Expand All @@ -332,30 +400,33 @@ def allocate_tokens(
# if the unit leaves. Let's create a cache in
# our app's session of this data.
log.info(
"Completed token allocation of %s with unit=%s:%s (%s)",
token_type.value,
"Completed %s token allocation of %s with unit=%s:%s (%s)",
token_strategy.value,
token_type.name.title(),
relation.name,
unit.name,
node,
)
self.update_node(relation, unit, f"joined-{node}")
if revoke_on_join:
if tokenizer.revoke_on_join:
self._revoke_juju_secret(relation, unit)

continue # unit reports its joined already
if relation.data[self.charm.unit].get(secret_id):
# unit already assigned a token
log.info(
"Waiting for token to be recovered %s unit=%s:%s (%s)",
token_type.value,
token_type.name.title(),
relation.name,
unit.name,
node,
)
continue

log.info("Creating token for %s unit=%s node=%s", token_type.value, unit.name, node)
token = token_strat(node, token_type)
log.info(
"Creating token for %s unit=%s node=%s", token_type.name.title(), unit.name, node
)
token = tokenizer.create(node, token_type)
content = {"token": token.get_secret_value()}
secret = relation.app.add_secret(content)
secret.grant(relation, unit=unit)
Expand All @@ -377,9 +448,6 @@ def revoke_tokens(
token_type (ClusterTokenType, optional): The type of cluster token.
Defaults to ClusterTokenType.NONE.
to_remove (ops.Unit, optional): unit to ensure its token is revoked
Raises:
ValueError: If an invalid token_strategy is provided.
"""
# any unit currently in the relation
all_units = relation.units
Expand All @@ -406,19 +474,23 @@ def revoke_tokens(
",".join(sorted(u.name for u in remaining)),
)

token_strat = self.token_revoking_strategies.get(token_strategy)
if not token_strat:
raise ValueError(f"Invalid token_strategy: {token_strategy}")
tokenizer = self.token_strategies.get(token_strategy)
assert tokenizer, f"Invalid token_strategy: {token_strategy}" # nosec

status.add(ops.MaintenanceStatus(f"Revoking {token_type.value} tokens"))
status.add(
ops.MaintenanceStatus(
f"Revoking {token_strategy.value} {token_type.name.title()} tokens"
)
)
local_cluster = self.charm.get_cluster_name()
for unit in remove:
if node_state := app_databag.get(unit):
state, node = node_state.split("-", 1)
remote_cluster = (data := relation.data.get(unit)) and data.get("joined")
log.info(
"Revoking token for %s unit=%s:%s %s node=%s",
token_type.value,
"Revoking %s, token for %s unit=%s:%s %s node=%s",
token_strategy.value,
token_type.name.title(),
relation.name,
unit.name,
state,
Expand All @@ -427,6 +499,6 @@ def revoke_tokens(
ignore_errors = self.node_name == node # removing myself
ignore_errors |= state == "pending" # on pending tokens
ignore_errors |= local_cluster != remote_cluster # if cluster doesn't match
token_strat(node, ignore_errors)
tokenizer.revoke(node, ignore_errors)
self.drop_node(relation, unit)
self._revoke_juju_secret(relation, unit)
Loading

0 comments on commit 4ec7be1

Please sign in to comment.