Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transform KubernetesComputeResourcesPatch "Push" statuses into "Pull" statuses #107

Merged
merged 16 commits into from
Aug 29, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""# KubernetesComputeResourcesPatch Library.

This library is designed to enable developers to more simply patch the Kubernetes compute resource
limits and requests created by Juju during the deployment of a sidecar charm.
limits and requests created by Juju during the deployment of a charm.

When initialised, this library binds a handler to the parent charm's `config-changed` event.
The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on
Expand Down Expand Up @@ -76,19 +76,32 @@ def _resource_spec_from_config(self) -> ResourceRequirements:
return ResourceRequirements(limits=spec, requests=spec)
```

If you wish to pull the state of the resources patch operation and set the charm unit status based on that patch result,
you can achieve that using `get_status()` function.
```python
class SomeCharm(CharmBase):
def __init__(self, *args):
#...
self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status)
#...
def _on_collect_unit_status(self, event: CollectStatusEvent):
event.add_status(self.resources_patch.get_status())
```

Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library
does not try to make any API calls, or open any files during testing that are unlikely to be
present, and could break your tests. The easiest way to do this is during your test `setUp`:

```python
# ...
from ops import ActiveStatus

@patch.multiple(
"charm.KubernetesComputeResourcesPatch",
_namespace="test-namespace",
_is_patched=lambda *a, **kw: True,
is_ready=lambda *a, **kw: True,
get_status=lambda _: ActiveStatus(),
)
@patch("lightkube.core.client.GenericSyncClient")
def setUp(self, *unused):
Expand All @@ -105,8 +118,9 @@ def setUp(self, *unused):
import logging
from decimal import Decimal
from math import ceil, floor
from typing import Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import tenacity
from lightkube import ApiError, Client # pyright: ignore
from lightkube.core import exceptions
from lightkube.models.apps_v1 import StatefulSetSpec
Expand All @@ -120,8 +134,10 @@ def setUp(self, *unused):
from lightkube.resources.core_v1 import Pod
from lightkube.types import PatchType
from lightkube.utils.quantity import equals_canonically, parse_quantity
from ops import ActiveStatus, BlockedStatus, WaitingStatus
from ops.charm import CharmBase
from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents
from ops.model import StatusBase

logger = logging.getLogger(__name__)

Expand All @@ -133,14 +149,16 @@ def setUp(self, *unused):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 7
LIBPATCH = 8


_Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal


def adjust_resource_requirements(
limits: Optional[dict], requests: Optional[dict], adhere_to_requests: bool = True
limits: Optional[Dict[Any, Any]],
requests: Optional[Dict[Any, Any]],
adhere_to_requests: bool = True,
) -> ResourceRequirements:
"""Adjust resource limits so that `limits` and `requests` are consistent with each other.

Expand Down Expand Up @@ -289,6 +307,18 @@ def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]:
return d


def _retry_on_condition(exception):
"""Retry if the exception is an ApiError with a status code != 403.

Returns: a boolean value to indicate whether to retry or not.
"""
if isinstance(exception, ApiError) and str(exception.status.code) != "403":
return True
if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError):
return True
return False


class K8sResourcePatchFailedEvent(EventBase):
"""Emitted when patching fails."""

Expand Down Expand Up @@ -385,18 +415,119 @@ def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]:
)
return podspec.resources

def is_failed(
self, resource_reqs_func: Callable[[], ResourceRequirements]
) -> Tuple[bool, str]:
"""Returns a tuple indicating whether a patch operation has failed along with a failure message.

Implementation is based on dry running the patch operation to catch if there would be failures (e.g: Wrong spec and Auth errors).
"""
try:
resource_reqs = resource_reqs_func()
limits = resource_reqs.limits
requests = resource_reqs.requests
except ValueError as e:
msg = f"Failed obtaining resource limit spec: {e}"
logger.error(msg)
return True, msg

# Dry run does not catch negative values for resource requests and limits.
if not is_valid_spec(limits) or not is_valid_spec(requests):
msg = f"Invalid resource requirements specs: {limits}, {requests}"
logger.error(msg)
return True, msg

resource_reqs = ResourceRequirements(
limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type]
requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type]
)

try:
self.dry_run_apply(resource_reqs)
except ApiError as e:
if e.status.code == 403:
msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}"
else:
msg = f"Kubernetes resources patch failed: {e}"
return True, msg
except ValueError as e:
msg = f"Kubernetes resources patch failed: {e}"
return True, msg

return False, ""

def is_in_progress(self) -> bool:
"""Returns a boolean to indicate whether a patch operation is in progress.

Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout.
Reference: https://github.com/kubernetes/kubectl/blob/kubernetes-1.31.0/pkg/polymorphichelpers/rollout_status.go
"""
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
try:
sts = self.client.get(
StatefulSet, name=self.statefulset_name, namespace=self.namespace
)
except (ValueError, ApiError):
return False
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved

if sts.status is None or sts.spec is None:
logger.debug("status/spec are not yet available")
return False
if sts.status.observedGeneration == 0 or (
sts.metadata
and sts.status.observedGeneration
and sts.metadata.generation
and sts.metadata.generation > sts.status.observedGeneration
):
logger.debug("waiting for statefulset spec update to be observed...")
return True
if (
sts.spec.replicas is not None
and sts.status.readyReplicas is not None
and sts.status.readyReplicas < sts.spec.replicas
):
logger.debug(
f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..."
)
return True

if (
sts.spec.updateStrategy
and sts.spec.updateStrategy.type == "rollingUpdate"
and sts.spec.updateStrategy.rollingUpdate is not None
):
if (
sts.spec.replicas is not None
and sts.spec.updateStrategy.rollingUpdate.partition is not None
):
if sts.status.updatedReplicas and sts.status.updatedReplicas < (
sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition
):
logger.debug(
f"Waiting for partitioned roll out to finish: {sts.status.updatedReplicas} out of {sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition} new pods have been updated..."
)
return True
logger.debug(
f"partitioned roll out complete: {sts.status.updatedReplicas} new pods have been updated..."
)
return False

if sts.status.updateRevision != sts.status.currentRevision:
logger.debug(
f"waiting for statefulset rolling update to complete {sts.status.updatedReplicas} pods at revision {sts.status.updateRevision}..."
)
return True

logger.debug(
f"statefulset rolling update complete pods at revision {sts.status.currentRevision}"
)
return False

def is_ready(self, pod_name, resource_reqs: ResourceRequirements):
"""Reports if the resource patch has been applied and is in effect.

Returns:
bool: A boolean indicating if the service patch has been applied and is in effect.
"""
logger.info(
"reqs=%s, templated=%s, actual=%s",
resource_reqs,
self.get_templated(),
self.get_actual(pod_name),
)
return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore
resource_reqs, self.get_actual(pod_name) # pyright: ignore
)
Expand All @@ -406,6 +537,7 @@ def apply(self, resource_reqs: ResourceRequirements) -> None:
# Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the
# charm would be stuck in unknown/lost.
if self.is_patched(resource_reqs):
logger.debug(f"resource requests {resource_reqs} are already patched.")
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
return

self.client.patch(
Expand All @@ -417,11 +549,26 @@ def apply(self, resource_reqs: ResourceRequirements) -> None:
field_manager=self.__class__.__name__,
)

def dry_run_apply(self, resource_reqs: ResourceRequirements):
"""Run a dry-run patch operation."""
self.client.patch(
StatefulSet,
self.statefulset_name,
self._patched_delta(resource_reqs),
namespace=self.namespace,
patch_type=PatchType.APPLY,
field_manager=self.__class__.__name__,
dry_run=True,
)
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved


class KubernetesComputeResourcesPatch(Object):
"""A utility for patching the Kubernetes compute resources set up by Juju."""

on = K8sResourcePatchEvents() # pyright: ignore
PATCH_RETRY_STOP = tenacity.stop_after_delay(20)
PATCH_RETRY_WAIT = tenacity.wait_fixed(5)
michaeldmitry marked this conversation as resolved.
Show resolved Hide resolved
PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition)

def __init__(
self,
Expand Down Expand Up @@ -468,7 +615,11 @@ def _on_config_changed(self, _):
self._patch()

def _patch(self) -> None:
"""Patch the Kubernetes resources created by Juju to limit cpu or mem."""
"""Patch the Kubernetes resources created by Juju to limit cpu or mem.

This method will keep on retrying to patch the kubernetes resource for a default duration of 20 seconds
if the patching failure is due to a recoverable error (e.g: Network Latency).
"""
try:
resource_reqs = self.resource_reqs_func()
limits = resource_reqs.limits
Expand All @@ -492,7 +643,18 @@ def _patch(self) -> None:
)

try:
self.patcher.apply(resource_reqs)
for attempt in tenacity.Retrying(
retry=self.PATCH_RETRY_IF,
stop=self.PATCH_RETRY_STOP,
wait=self.PATCH_RETRY_WAIT,
# if you don't succeed raise the last caught exception when you're done
reraise=True,
):
with attempt:
logger.debug(
f"attempt #{attempt.retry_state.attempt_number} to patch resource limits"
)
self.patcher.apply(resource_reqs)

except exceptions.ConfigError as e:
msg = f"Error creating k8s client: {e}"
Expand All @@ -503,6 +665,7 @@ def _patch(self) -> None:
except ApiError as e:
if e.status.code == 403:
msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}"

else:
msg = f"Kubernetes resources patch failed: {e}"

Expand Down Expand Up @@ -554,6 +717,29 @@ def is_ready(self) -> bool:
self.on.patch_failed.emit(message=msg)
return False

def get_status(self) -> StatusBase:
"""Return the status of patching the resource limits in a `StatusBase` format.

Returns:
StatusBase: There is a 1:1 mapping between the state of the patching operation and a `StatusBase` value that the charm can be set to.
Possible values are:
- ActiveStatus: The patch was applied successfully.
- BlockedStatus: The patch failed and requires a human intervention.
- WaitingStatus: The patch is still in progress.

Example:
- ActiveStatus("Patch applied successfully")
- BlockedStatus("Failed due to missing permissions")
- WaitingStatus("Patch is in progress")
"""
failed, msg = self.patcher.is_failed(self.resource_reqs_func)
if failed:
return BlockedStatus(msg)
if self.patcher.is_in_progress():
return WaitingStatus("waiting for resources patch to apply")
# patch successful or nothing has been patched yet
return ActiveStatus()

@property
def _app(self) -> str:
"""Name of the current Juju application.
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@

ops
PyYAML
lightkube
lightkube>=v0.15.4
tenacity
Loading
Loading