From 94bbff991540105874afbedc4ecab3241f0fd014 Mon Sep 17 00:00:00 2001 From: michael Date: Tue, 20 Aug 2024 11:43:14 +0300 Subject: [PATCH 01/15] update k8s compute patch major lib --- .../v1/kubernetes_compute_resources_patch.py | 629 ++++++++++++++++++ requirements.txt | 1 + ...> test_kubernetes_compute_resources_v0.py} | 0 .../test_kubernetes_compute_resources_v1.py | 118 ++++ 4 files changed, 748 insertions(+) create mode 100644 lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py rename tests/unit/{test_kubernetes_compute_resources.py => test_kubernetes_compute_resources_v0.py} (100%) create mode 100644 tests/unit/test_kubernetes_compute_resources_v1.py diff --git a/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py new file mode 100644 index 0000000..4bdb7c8 --- /dev/null +++ b/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py @@ -0,0 +1,629 @@ +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. + +"""# KubernetesComputeResourcesPatch Library. + +This library is designed to enable developers to more simply patch the Kubernetes compute resource +limits and requests created by Juju during the deployment of a sidecar charm. + +When initialised, this library binds a handler to the parent charm's `config-changed` event. +The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on +pod churn. Additionally, resource limits may be set by charm config options, which would also be +caught out-of-the-box by this handler. The handler applies the patch to the app's StatefulSet. +This should ensure that the resource limits are correct throughout the charm's life. Additional +optional user-provided events for re-applying the patch are supported but discouraged. + +The constructor takes a reference to the parent charm, a 'limits' and a 'requests' dictionaries +that together define the resource requirements. For information regarding the `lightkube` +`ResourceRequirements` model, please visit the `lightkube` +[docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements). + +Note that patching compute resources will keep on retrying for a default duration of KubernetesComputeResourcesPatch.PATCH_RETRY_STOP +if the patching failed due to a recoverable error (e.g: Network Latency). +This will put the charm using the library in a `Maintenance` state until the patch is either successful or all retries have been consumed, +which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. +If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. + +## Getting Started + +To get started using the library, you just need to fetch the library using `charmcraft`. **Note +that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** + +```shell +cd some-charm +charmcraft fetch-lib charms.observability_libs.v1.kubernetes_compute_resources_patch +cat << EOF >> requirements.txt +lightkube +lightkube-models +EOF +``` + +Then, to initialise the library: + +```python +# ... +from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( + KubernetesComputeResourcesPatch, + ResourceRequirements, +) + +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "container-name", + resource_reqs_func=lambda: ResourceRequirements( + limits={"cpu": "2"}, requests={"cpu": "1"} + ), + ) + self.framework.observe(self.resources_patch.on.patch_failed, self._on_resource_patch_failed) + + def _on_resource_patch_failed(self, event): + self.unit.status = BlockedStatus(event.message) + # ... +``` + +Or, if, for example, the resource specs are coming from config options: + +```python +class SomeCharm(CharmBase): + def __init__(self, *args): + # ... + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "container-name", + resource_reqs_func=self._resource_spec_from_config, + ) + + def _resource_spec_from_config(self) -> ResourceRequirements: + spec = {"cpu": self.model.config.get("cpu"), "memory": self.model.config.get("memory")} + return ResourceRequirements(limits=spec, requests=spec) +``` + + +Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library +does not try to make any API calls, or open any files during testing that are unlikely to be +present, and could break your tests. The easiest way to do this is during your test `setUp`: + +```python +# ... + +@patch.multiple( + "charm.KubernetesComputeResourcesPatch", + _namespace="test-namespace", + _is_patched=lambda *a, **kw: True, + is_ready=lambda *a, **kw: True, +) +@patch("lightkube.core.client.GenericSyncClient") +def setUp(self, *unused): + self.harness = Harness(SomeCharm) + # ... +``` + +References: +- https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ +- https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements +""" + +import decimal +import logging +from decimal import Decimal +from math import ceil, floor +from typing import Callable, Dict, List, Optional, Union + +import tenacity +from lightkube import ApiError, Client # pyright: ignore +from lightkube.core import exceptions +from lightkube.models.apps_v1 import StatefulSetSpec +from lightkube.models.core_v1 import ( + Container, + PodSpec, + PodTemplateSpec, + ResourceRequirements, +) +from lightkube.resources.apps_v1 import StatefulSet +from lightkube.resources.core_v1 import Pod +from lightkube.types import PatchType +from lightkube.utils.quantity import equals_canonically, parse_quantity +from ops import MaintenanceStatus +from ops.charm import CharmBase +from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "2a6066f701444e8db44ba2f6af28da90" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 1 + + +_Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal + + +def adjust_resource_requirements( + limits: Optional[dict], requests: Optional[dict], adhere_to_requests: bool = True +) -> ResourceRequirements: + """Adjust resource limits so that `limits` and `requests` are consistent with each other. + + Args: + limits: the "limits" portion of the resource spec. + requests: the "requests" portion of the resource spec. + adhere_to_requests: a flag indicating which portion should be adjusted when "limits" is + lower than "requests": + - if True, "limits" will be adjusted to max(limits, requests). + - if False, "requests" will be adjusted to min(limits, requests). + + Returns: + An adjusted (limits, requests) 2-tuple. + + >>> adjust_resource_requirements({}, {}) + ResourceRequirements(claims=None, limits={}, requests={}) + >>> adjust_resource_requirements({"cpu": "1"}, {}) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, True) + ResourceRequirements(claims=None, limits={'cpu': '2'}, requests={'cpu': '2'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, False) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, True) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, False) + ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "2"}, True) + ResourceRequirements(\ +claims=None, limits={'cpu': '1', 'memory': '2'}, requests={'memory': '2', 'cpu': '1'}) + >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "1G"}, False) + ResourceRequirements(\ +claims=None, limits={'cpu': '1', 'memory': '1'}, requests={'memory': '1', 'cpu': '1'}) + >>> adjust_resource_requirements({"custom-resource": "1"}, {"custom-resource": "2"}, False) + Traceback (most recent call last): + ... + ValueError: Invalid limits spec: {'custom-resource': '1'} + """ + if not is_valid_spec(limits): + raise ValueError("Invalid limits spec: {}".format(limits)) + if not is_valid_spec(requests): + raise ValueError("Invalid default requests spec: {}".format(requests)) + + limits = sanitize_resource_spec_dict(limits) or {} + requests = sanitize_resource_spec_dict(requests) or {} + + # Make sure we do not modify in-place + limits, requests = limits.copy(), requests.copy() + + # Need to copy key-val pairs from "limits" to "requests", if they are not present in + # "requests". This replicates K8s behavior: + # https://kubernetes.io/docs/concepts/configuration/manage-resources-containers + requests.update({k: limits[k] for k in limits if k not in requests}) + + if adhere_to_requests: + # Keep limits fixed when `limits` is too low + adjusted, fixed = limits, requests + func = max + else: + # Pull down requests when limit is too low + fixed, adjusted = limits, requests + func = min + + # adjusted = {} + for k in adjusted: + if k not in fixed: + # The resource constraint is present in the "adjusted" dict but not in the "fixed" + # dict. Keep the "adjusted" value as is + continue + + adjusted_value = func(parse_quantity(fixed[k]), parse_quantity(adjusted[k])) # type: ignore[type-var] + adjusted[k] = ( + str(adjusted_value.quantize(decimal.Decimal("0.001"), rounding=decimal.ROUND_UP)) # type: ignore[union-attr] + .rstrip("0") + .rstrip(".") + ) + + return ( + ResourceRequirements(limits=adjusted, requests=fixed) + if adhere_to_requests + else ResourceRequirements(limits=fixed, requests=adjusted) + ) + + +def is_valid_spec(spec: Optional[dict], debug=False) -> bool: # noqa: C901 + """Check if the spec dict is valid. + + TODO: generally, the keys can be anything, not just cpu and memory. Perhaps user could pass + list of custom allowed keys in addition to the K8s ones? + """ + if spec is None: + return True + if not isinstance(spec, dict): + if debug: + logger.error("Invalid resource spec type '%s': must be either None or dict.", spec) + return False + + for k, v in spec.items(): + valid_keys = ["cpu", "memory"] # K8s permits custom keys, but we limit here to what we use + if k not in valid_keys: + if debug: + logger.error("Invalid key in resource spec: %s; valid keys: %s.", k, valid_keys) + return False + try: + assert isinstance(v, (str, type(None))) # for type checker + pv = parse_quantity(v) + except ValueError: + if debug: + logger.error("Invalid resource spec entry: {%s: %s}.", k, v) + return False + + if pv and pv < 0: + if debug: + logger.error("Invalid resource spec entry: {%s: %s}; must be non-negative.", k, v) + return False + + return True + + +def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]: + """Fix spec values without altering semantics. + + The purpose of this helper function is to correct known issues. + This function is not intended for fixing user mistakes such as incorrect keys present; that is + left for the `is_valid_spec` function. + """ + if not spec: + return spec + + d = spec.copy() + + for k, v in spec.items(): + if not v: + # Need to ignore empty values input, otherwise the StatefulSet will have "0" as the + # setpoint, the pod will not be scheduled and the charm would be stuck in unknown/lost. + # This slightly changes the spec semantics compared to lightkube/k8s: a setpoint of + # `None` would be interpreted here as "no limit". + del d[k] + + # Round up memory to whole bytes. This is need to avoid K8s errors such as: + # fractional byte value "858993459200m" (0.8Gi) is invalid, must be an integer + memory = d.get("memory") + if memory: + as_decimal = parse_quantity(memory) + if as_decimal and as_decimal.remainder_near(floor(as_decimal)): + d["memory"] = str(ceil(as_decimal)) + return d + + +def _retry_on_condition(exception): + """Retry if the exception is an ApiError with a status code != 403. + + Returns: a boolean value to indicate whether to retry or not. + """ + if isinstance(exception, ApiError) and str(exception.status.code) != "403": + return True + return False + + +class K8sResourcePatchFailedEvent(EventBase): + """Emitted when patching fails.""" + + def __init__(self, handle, message=None): + super().__init__(handle) + self.message = message + + def snapshot(self) -> Dict: + """Save grafana source information.""" + return {"message": self.message} + + def restore(self, snapshot): + """Restore grafana source information.""" + self.message = snapshot["message"] + + +class K8sResourcePatchEvents(ObjectEvents): + """Events raised by :class:`K8sResourcePatchEvents`.""" + + patch_failed = EventSource(K8sResourcePatchFailedEvent) + + +class ContainerNotFoundError(ValueError): + """Raised when a given container does not exist in the list of containers.""" + + +class ResourcePatcher: + """Helper class for patching a container's resource limits in a given StatefulSet.""" + + def __init__(self, namespace: str, statefulset_name: str, container_name: str): + self.namespace = namespace + self.statefulset_name = statefulset_name + self.container_name = container_name + self.client = Client() # pyright: ignore + + def _patched_delta(self, resource_reqs: ResourceRequirements) -> StatefulSet: + statefulset = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + + return StatefulSet( + spec=StatefulSetSpec( + selector=statefulset.spec.selector, # type: ignore[attr-defined] + serviceName=statefulset.spec.serviceName, # type: ignore[attr-defined] + template=PodTemplateSpec( + spec=PodSpec( + containers=[Container(name=self.container_name, resources=resource_reqs)] + ) + ), + ) + ) + + @classmethod + def _get_container(cls, container_name: str, containers: List[Container]) -> Container: + """Find our container from the container list, assuming list is unique by name. + + Typically, *.spec.containers[0] is the charm container, and [1] is the (only) workload. + + Raises: + ContainerNotFoundError, if the user-provided container name does not exist in the list. + + Returns: + An instance of :class:`Container` whose name matches the given name. + """ + try: + return next(iter(filter(lambda ctr: ctr.name == container_name, containers))) + except StopIteration: + raise ContainerNotFoundError(f"Container '{container_name}' not found") + + def is_patched(self, resource_reqs: ResourceRequirements) -> bool: + """Reports if the resource patch has been applied to the StatefulSet. + + Returns: + bool: A boolean indicating if the service patch has been applied. + """ + return equals_canonically(self.get_templated(), resource_reqs) # pyright: ignore + + def get_templated(self) -> Optional[ResourceRequirements]: + """Returns the resource limits specified in the StatefulSet template.""" + statefulset = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + podspec_tpl = self._get_container( + self.container_name, + statefulset.spec.template.spec.containers, # type: ignore[attr-defined] + ) + return podspec_tpl.resources + + def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]: + """Return the resource limits that are in effect for the container in the given pod.""" + pod = self.client.get(Pod, name=pod_name, namespace=self.namespace) + podspec = self._get_container( + self.container_name, pod.spec.containers # type: ignore[attr-defined] + ) + return podspec.resources + + def is_ready(self, pod_name, resource_reqs: ResourceRequirements): + """Reports if the resource patch has been applied and is in effect. + + Returns: + bool: A boolean indicating if the service patch has been applied and is in effect. + """ + logger.info( + "reqs=%s, templated=%s, actual=%s", + resource_reqs, + self.get_templated(), + self.get_actual(pod_name), + ) + return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore + resource_reqs, self.get_actual(pod_name) # pyright: ignore + ) + + def apply(self, resource_reqs: ResourceRequirements) -> None: + """Patch the Kubernetes resources created by Juju to limit cpu or mem.""" + # Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the + # charm would be stuck in unknown/lost. + if self.is_patched(resource_reqs): + return + + self.client.patch( + StatefulSet, + self.statefulset_name, + self._patched_delta(resource_reqs), + namespace=self.namespace, + patch_type=PatchType.APPLY, + field_manager=self.__class__.__name__, + ) + + +class KubernetesComputeResourcesPatch(Object): + """A utility for patching the Kubernetes compute resources set up by Juju.""" + + on = K8sResourcePatchEvents() # pyright: ignore + PATCH_RETRY_STOP = tenacity.stop_after_delay(60 * 3) + PATCH_RETRY_WAIT = tenacity.wait_fixed(30) + PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) + + def __init__( + self, + charm: CharmBase, + container_name: str, + *, + resource_reqs_func: Callable[[], ResourceRequirements], + refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, + ): + """Constructor for KubernetesComputeResourcesPatch. + + References: + - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ + + Args: + charm: the charm that is instantiating the library. + container_name: the container for which to apply the resource limits. + resource_reqs_func: a callable returning a `ResourceRequirements`; if raises, should + only raise ValueError. + refresh_event: an optional bound event or list of bound events which + will be observed to re-apply the patch. + """ + super().__init__(charm, "{}_{}".format(self.__class__.__name__, container_name)) + self._charm = charm + self._container_name = container_name + self.resource_reqs_func = resource_reqs_func + self.patcher = ResourcePatcher(self._namespace, self._app, container_name) + + # Ensure this patch is applied during the 'config-changed' event, which is emitted every + # startup and every upgrade. The config-changed event is a good time to apply this kind of + # patch because it is always emitted after storage-attached, leadership and peer-created, + # all of which only fire after install. Patching the statefulset prematurely could result + # in those events firing without a workload. + self.framework.observe(charm.on.config_changed, self._on_config_changed) + + if not refresh_event: + refresh_event = [] + elif not isinstance(refresh_event, list): + refresh_event = [refresh_event] + for ev in refresh_event: + self.framework.observe(ev, self._on_config_changed) + + def _on_config_changed(self, _): + self._patch() + + def _patch(self) -> None: + """Patch the Kubernetes resources created by Juju to limit cpu or mem. + + This method will keep on retrying to patch the kubernetes resource for a default duration of 3 minutes, + if the patching failure is due to a recoverable error (e.g: Network Latency). + This will put the charm in a `Maintenance` state until the patch is either successful or all retries have been consumed, + which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. + If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. + """ + try: + resource_reqs = self.resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + raise ValueError(msg) from e + + for spec in (limits, requests): + if not is_valid_spec(spec): + msg = f"Invalid resource limit spec: {spec}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + raise RuntimeError(msg) + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + try: + for attempt in tenacity.Retrying( + retry=self.PATCH_RETRY_IF, + stop=self.PATCH_RETRY_STOP, + wait=self.PATCH_RETRY_WAIT, + # if you don't succeed raise the last caught exception when you're done + reraise=True, + ): + with attempt: + self._charm.unit.status = MaintenanceStatus( + f"retrying patching resource limit... (attempt #{attempt.retry_state.attempt_number})" + ) + self.patcher.apply(resource_reqs) + + except exceptions.ConfigError as e: + msg = f"Error creating k8s client: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + raise exceptions.ConfigError(msg) from e + + except ApiError as e: + if e.status.code == 403: + msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" + + else: + msg = f"Kubernetes resources patch failed: {e}" + + logger.error(msg) + self.on.patch_failed.emit(message=msg) + raise + + except ValueError as e: + msg = f"Kubernetes resources patch failed: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + raise ValueError(msg) from e + + else: + logger.info( + "Kubernetes resources for app '%s', container '%s' patched successfully: %s", + self._app, + self._container_name, + resource_reqs, + ) + + def is_ready(self) -> bool: + """Reports if the resource patch has been applied and is in effect. + + Returns: + bool: A boolean indicating if the service patch has been applied and is in effect. + """ + try: + resource_reqs = self.resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + return False + + if not is_valid_spec(limits) or not is_valid_spec(requests): + logger.error("Invalid resource requirements specs: %s, %s", limits, requests) + return False + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + try: + return self.patcher.is_ready(self._pod, resource_reqs) + except (ValueError, ApiError) as e: + msg = f"Failed to apply resource limit patch: {e}" + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return False + + @property + def _app(self) -> str: + """Name of the current Juju application. + + Returns: + str: A string containing the name of the current Juju application. + """ + return self._charm.app.name + + @property + def _pod(self) -> str: + """Name of the unit's pod. + + Returns: + str: A string containing the name of the current unit's pod. + """ + return "-".join(self._charm.unit.name.rsplit("/", 1)) + + @property + def _namespace(self) -> str: + """The Kubernetes namespace we're running in. + + If a charm is deployed into the controller model (which certainly could happen as we move + to representing the controller as a charm) then self._charm.model.name !== k8s namespace. + Instead, the model name is controller in Juju and controller- for the + namespace in K8s. + + Returns: + str: A string containing the name of the current Kubernetes namespace. + """ + with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: + return f.read().strip() diff --git a/requirements.txt b/requirements.txt index 638e61a..8160700 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ ops PyYAML lightkube +tenacity diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources_v0.py similarity index 100% rename from tests/unit/test_kubernetes_compute_resources.py rename to tests/unit/test_kubernetes_compute_resources_v0.py diff --git a/tests/unit/test_kubernetes_compute_resources_v1.py b/tests/unit/test_kubernetes_compute_resources_v1.py new file mode 100644 index 0000000..5dcf687 --- /dev/null +++ b/tests/unit/test_kubernetes_compute_resources_v1.py @@ -0,0 +1,118 @@ +# Copyright 2021 Canonical Ltd. +# See LICENSE file for licensing details. +import unittest +from unittest import mock +from unittest.mock import MagicMock, Mock + +import yaml +from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( + KubernetesComputeResourcesPatch, + adjust_resource_requirements, + is_valid_spec, + sanitize_resource_spec_dict, +) +from ops.charm import CharmBase +from ops.testing import Harness + +from tests.unit.helpers import PROJECT_DIR + +CL_PATH = "charms.observability_libs.v1.kubernetes_compute_resources_patch.KubernetesComputeResourcesPatch" + + +class TestKubernetesComputeResourcesPatch(unittest.TestCase): + class _TestCharm(CharmBase): + def __init__(self, *args): + super().__init__(*args) + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "placeholder", + resource_reqs_func=lambda: adjust_resource_requirements( + { + "cpu": self.model.config.get("cpu"), + "memory": self.model.config.get("memory"), + }, + None, + ), + ) + + def setUp(self) -> None: + with open(PROJECT_DIR / "config.yaml") as config_file: + config = yaml.safe_load(config_file) + self.harness = Harness( + self._TestCharm, meta=open(PROJECT_DIR / "metadata.yaml"), config=str(config) + ) + + @mock.patch("lightkube.core.client.GenericSyncClient", Mock) + @mock.patch(f"{CL_PATH}._namespace", "test-namespace") + def test_listener_is_attached_for_config_changed_event(self): + self.harness.begin() + charm = self.harness.charm + with mock.patch(f"{CL_PATH}._patch") as patch: + charm.on.config_changed.emit() + self.assertEqual(patch.call_count, 1) + + @mock.patch("lightkube.core.client.GenericSyncClient", Mock) + @mock.patch(f"{CL_PATH}._namespace", "test-namespace") + def test_patch_is_applied_regardless_of_leadership_status(self): + self.harness.begin() + charm = self.harness.charm + for is_leader in (True, False): + with self.subTest(is_leader=is_leader): + self.harness.set_leader(True) + with mock.patch(f"{CL_PATH}._patch") as patch: + charm.on.config_changed.emit() + self.assertEqual(patch.call_count, 1) + + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") + @mock.patch("lightkube.core.client.GenericSyncClient") + @mock.patch( + "charms.observability_libs.v1.kubernetes_compute_resources_patch.ResourcePatcher.apply", + MagicMock(return_value=None), + ) + def test_patch_is_applied_during_startup_sequence(self, client_mock): + self.harness.begin_with_initial_hooks() + self.assertGreater(client_mock.call_count, 0) + + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") + @mock.patch("lightkube.core.client.GenericSyncClient") + @mock.patch( + "charms.observability_libs.v1.kubernetes_compute_resources_patch.ResourcePatcher.apply", + MagicMock(return_value=None), + ) + def test_invalid_config_raises_exception(self, client_mock): + + self.harness.begin_with_initial_hooks() + + # Test invalid quantity values + for cpu, memory in [ + ("-1", ""), + ("", "-1Gi"), + ("-1", "1Gi"), + ("1", "-1Gi"), + ("4x", "1Gi"), + ("1", "1Gx"), + ]: + with self.subTest(cpu=cpu, memory=memory): + with self.assertRaises(ValueError): + self.harness.update_config({"cpu": cpu, "memory": memory}) + + +class TestResourceSpecDictValidation(unittest.TestCase): + def test_sanitize_resource_spec_dict(self): + self.assertEqual(None, sanitize_resource_spec_dict(None)) + self.assertEqual({}, sanitize_resource_spec_dict({})) + self.assertEqual({"bad": "combo"}, sanitize_resource_spec_dict({"bad": "combo"})) + self.assertEqual({"cpu": 1}, sanitize_resource_spec_dict({"cpu": 1})) + self.assertEqual({"cpu": "1"}, sanitize_resource_spec_dict({"cpu": "1"})) + self.assertEqual({"memory": "858993460"}, sanitize_resource_spec_dict({"memory": "0.8Gi"})) + + def test_is_valid_spec(self): + self.assertTrue(is_valid_spec(None)) + self.assertTrue(is_valid_spec({})) + self.assertTrue(is_valid_spec({"cpu": "1"})) + self.assertTrue(is_valid_spec({"memory": "858993460"})) + self.assertTrue(is_valid_spec({"memory": "0.8Gi"})) + self.assertTrue(is_valid_spec({"cpu": None, "memory": None})) + + self.assertFalse(is_valid_spec({"bad": "combo"})) + self.assertFalse(is_valid_spec({"invalid-key": "1"})) From 5d6af164dd183235de3f98a892f3f5a835d8a76c Mon Sep 17 00:00:00 2001 From: michael Date: Tue, 20 Aug 2024 11:45:35 +0300 Subject: [PATCH 02/15] add more exception types --- .../observability_libs/v1/kubernetes_compute_resources_patch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py index 4bdb7c8..73ffdd3 100644 --- a/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py @@ -303,6 +303,8 @@ def _retry_on_condition(exception): """ if isinstance(exception, ApiError) and str(exception.status.code) != "403": return True + if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError): + return True return False From 34e4c42bb1be19d956d528f85b282e6b2a93b5ff Mon Sep 17 00:00:00 2001 From: michael Date: Tue, 20 Aug 2024 13:11:30 +0300 Subject: [PATCH 03/15] refactor --- .../v0/kubernetes_compute_resources_patch.py | 82 ++- .../v1/kubernetes_compute_resources_patch.py | 631 ------------------ 2 files changed, 62 insertions(+), 651 deletions(-) delete mode 100644 lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 2ab8a22..4f9081f 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -4,7 +4,7 @@ """# KubernetesComputeResourcesPatch Library. This library is designed to enable developers to more simply patch the Kubernetes compute resource -limits and requests created by Juju during the deployment of a sidecar charm. +limits and requests created by Juju during the deployment of a charm. When initialised, this library binds a handler to the parent charm's `config-changed` event. The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on @@ -18,6 +18,11 @@ `ResourceRequirements` model, please visit the `lightkube` [docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements). +Note that patching compute resources will keep on retrying for a default duration of KubernetesComputeResourcesPatch.PATCH_RETRY_STOP +if the patching failed due to a recoverable error (e.g: Network Latency). +This will put the charm using the library in a `Maintenance` state until the patch is either successful or all retries have been consumed, +which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. +If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. ## Getting Started @@ -26,7 +31,7 @@ ```shell cd some-charm -charmcraft fetch-lib charms.observability_libs.v0.kubernetes_compute_resources_patch +charmcraft fetch-lib charms.observability_libs.v1.kubernetes_compute_resources_patch cat << EOF >> requirements.txt lightkube lightkube-models @@ -37,7 +42,7 @@ ```python # ... -from charms.observability_libs.v0.kubernetes_compute_resources_patch import ( +from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( KubernetesComputeResourcesPatch, ResourceRequirements, ) @@ -107,6 +112,7 @@ def setUp(self, *unused): from math import ceil, floor from typing import Callable, Dict, List, Optional, Union +import tenacity from lightkube import ApiError, Client # pyright: ignore from lightkube.core import exceptions from lightkube.models.apps_v1 import StatefulSetSpec @@ -120,6 +126,7 @@ def setUp(self, *unused): from lightkube.resources.core_v1 import Pod from lightkube.types import PatchType from lightkube.utils.quantity import equals_canonically, parse_quantity +from ops import MaintenanceStatus from ops.charm import CharmBase from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents @@ -133,7 +140,7 @@ def setUp(self, *unused): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 7 +LIBPATCH = 8 _Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal @@ -289,6 +296,20 @@ def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]: return d +def _retry_on_condition(exception): + """Retry if the exception is an ApiError with a status code != 403. + + Returns: a boolean value to indicate whether to retry or not. + """ + if isinstance(exception, ApiError) and str(exception.status.code) != "403": + return True + if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError): + return True + return False + +class PatchFailedError(Exception): + """Raised when patching K8s resources requests and limits fails.""" + class K8sResourcePatchFailedEvent(EventBase): """Emitted when patching fails.""" @@ -391,7 +412,7 @@ def is_ready(self, pod_name, resource_reqs: ResourceRequirements): Returns: bool: A boolean indicating if the service patch has been applied and is in effect. """ - logger.info( + logger.debug( "reqs=%s, templated=%s, actual=%s", resource_reqs, self.get_templated(), @@ -406,7 +427,8 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: # Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the # charm would be stuck in unknown/lost. if self.is_patched(resource_reqs): - return + logger.debug(f"resource requests {resource_reqs} are already patched.") + return self.client.patch( StatefulSet, @@ -422,6 +444,9 @@ class KubernetesComputeResourcesPatch(Object): """A utility for patching the Kubernetes compute resources set up by Juju.""" on = K8sResourcePatchEvents() # pyright: ignore + PATCH_RETRY_STOP = tenacity.stop_after_delay(60 * 3) + PATCH_RETRY_WAIT = tenacity.wait_fixed(30) + PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) def __init__( self, @@ -468,23 +493,28 @@ def _on_config_changed(self, _): self._patch() def _patch(self) -> None: - """Patch the Kubernetes resources created by Juju to limit cpu or mem.""" + """Patch the Kubernetes resources created by Juju to limit cpu or mem. + + This method will keep on retrying to patch the kubernetes resource for a default duration of 3 minutes, + if the patching failure is due to a recoverable error (e.g: Network Latency). + This will put the charm in a `Maintenance` state until the patch is either successful or all retries have been consumed, + which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. + If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. + """ try: resource_reqs = self.resource_reqs_func() limits = resource_reqs.limits requests = resource_reqs.requests except ValueError as e: msg = f"Failed obtaining resource limit spec: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - return + logger.exception(msg) + raise ValueError(msg) from e for spec in (limits, requests): if not is_valid_spec(spec): msg = f"Invalid resource limit spec: {spec}" logger.error(msg) - self.on.patch_failed.emit(message=msg) - return + raise ValueError(msg) resource_reqs = ResourceRequirements( limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] @@ -492,27 +522,39 @@ def _patch(self) -> None: ) try: - self.patcher.apply(resource_reqs) + for attempt in tenacity.Retrying( + retry=self.PATCH_RETRY_IF, + stop=self.PATCH_RETRY_STOP, + wait=self.PATCH_RETRY_WAIT, + # if you don't succeed raise the last caught exception when you're done + reraise=True, + ): + with attempt: + self._charm.unit.status = MaintenanceStatus( + f"retrying patching resource limit... (attempt #{attempt.retry_state.attempt_number})" + ) + self.patcher.apply(resource_reqs) except exceptions.ConfigError as e: msg = f"Error creating k8s client: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - return + logger.exception(msg) + raise PatchFailedError(msg) from e except ApiError as e: if e.status.code == 403: msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" + else: msg = f"Kubernetes resources patch failed: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) + logger.exception(msg) + raise PatchFailedError(msg) from e + except ValueError as e: msg = f"Kubernetes resources patch failed: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) + logger.exception(msg) + raise PatchFailedError(msg) from e else: logger.info( diff --git a/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py deleted file mode 100644 index 73ffdd3..0000000 --- a/lib/charms/observability_libs/v1/kubernetes_compute_resources_patch.py +++ /dev/null @@ -1,631 +0,0 @@ -# Copyright 2022 Canonical Ltd. -# See LICENSE file for licensing details. - -"""# KubernetesComputeResourcesPatch Library. - -This library is designed to enable developers to more simply patch the Kubernetes compute resource -limits and requests created by Juju during the deployment of a sidecar charm. - -When initialised, this library binds a handler to the parent charm's `config-changed` event. -The config-changed event is used because it is guaranteed to fire on startup, on upgrade and on -pod churn. Additionally, resource limits may be set by charm config options, which would also be -caught out-of-the-box by this handler. The handler applies the patch to the app's StatefulSet. -This should ensure that the resource limits are correct throughout the charm's life. Additional -optional user-provided events for re-applying the patch are supported but discouraged. - -The constructor takes a reference to the parent charm, a 'limits' and a 'requests' dictionaries -that together define the resource requirements. For information regarding the `lightkube` -`ResourceRequirements` model, please visit the `lightkube` -[docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements). - -Note that patching compute resources will keep on retrying for a default duration of KubernetesComputeResourcesPatch.PATCH_RETRY_STOP -if the patching failed due to a recoverable error (e.g: Network Latency). -This will put the charm using the library in a `Maintenance` state until the patch is either successful or all retries have been consumed, -which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. -If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. - -## Getting Started - -To get started using the library, you just need to fetch the library using `charmcraft`. **Note -that you also need to add `lightkube` and `lightkube-models` to your charm's `requirements.txt`.** - -```shell -cd some-charm -charmcraft fetch-lib charms.observability_libs.v1.kubernetes_compute_resources_patch -cat << EOF >> requirements.txt -lightkube -lightkube-models -EOF -``` - -Then, to initialise the library: - -```python -# ... -from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( - KubernetesComputeResourcesPatch, - ResourceRequirements, -) - -class SomeCharm(CharmBase): - def __init__(self, *args): - # ... - self.resources_patch = KubernetesComputeResourcesPatch( - self, - "container-name", - resource_reqs_func=lambda: ResourceRequirements( - limits={"cpu": "2"}, requests={"cpu": "1"} - ), - ) - self.framework.observe(self.resources_patch.on.patch_failed, self._on_resource_patch_failed) - - def _on_resource_patch_failed(self, event): - self.unit.status = BlockedStatus(event.message) - # ... -``` - -Or, if, for example, the resource specs are coming from config options: - -```python -class SomeCharm(CharmBase): - def __init__(self, *args): - # ... - self.resources_patch = KubernetesComputeResourcesPatch( - self, - "container-name", - resource_reqs_func=self._resource_spec_from_config, - ) - - def _resource_spec_from_config(self) -> ResourceRequirements: - spec = {"cpu": self.model.config.get("cpu"), "memory": self.model.config.get("memory")} - return ResourceRequirements(limits=spec, requests=spec) -``` - - -Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library -does not try to make any API calls, or open any files during testing that are unlikely to be -present, and could break your tests. The easiest way to do this is during your test `setUp`: - -```python -# ... - -@patch.multiple( - "charm.KubernetesComputeResourcesPatch", - _namespace="test-namespace", - _is_patched=lambda *a, **kw: True, - is_ready=lambda *a, **kw: True, -) -@patch("lightkube.core.client.GenericSyncClient") -def setUp(self, *unused): - self.harness = Harness(SomeCharm) - # ... -``` - -References: -- https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ -- https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements -""" - -import decimal -import logging -from decimal import Decimal -from math import ceil, floor -from typing import Callable, Dict, List, Optional, Union - -import tenacity -from lightkube import ApiError, Client # pyright: ignore -from lightkube.core import exceptions -from lightkube.models.apps_v1 import StatefulSetSpec -from lightkube.models.core_v1 import ( - Container, - PodSpec, - PodTemplateSpec, - ResourceRequirements, -) -from lightkube.resources.apps_v1 import StatefulSet -from lightkube.resources.core_v1 import Pod -from lightkube.types import PatchType -from lightkube.utils.quantity import equals_canonically, parse_quantity -from ops import MaintenanceStatus -from ops.charm import CharmBase -from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents - -logger = logging.getLogger(__name__) - -# The unique Charmhub library identifier, never change it -LIBID = "2a6066f701444e8db44ba2f6af28da90" - -# Increment this major API version when introducing breaking changes -LIBAPI = 1 - -# Increment this PATCH version before using `charmcraft publish-lib` or reset -# to 0 if you are raising the major API version -LIBPATCH = 1 - - -_Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal - - -def adjust_resource_requirements( - limits: Optional[dict], requests: Optional[dict], adhere_to_requests: bool = True -) -> ResourceRequirements: - """Adjust resource limits so that `limits` and `requests` are consistent with each other. - - Args: - limits: the "limits" portion of the resource spec. - requests: the "requests" portion of the resource spec. - adhere_to_requests: a flag indicating which portion should be adjusted when "limits" is - lower than "requests": - - if True, "limits" will be adjusted to max(limits, requests). - - if False, "requests" will be adjusted to min(limits, requests). - - Returns: - An adjusted (limits, requests) 2-tuple. - - >>> adjust_resource_requirements({}, {}) - ResourceRequirements(claims=None, limits={}, requests={}) - >>> adjust_resource_requirements({"cpu": "1"}, {}) - ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) - >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, True) - ResourceRequirements(claims=None, limits={'cpu': '2'}, requests={'cpu': '2'}) - >>> adjust_resource_requirements({"cpu": "1"}, {"cpu": "2"}, False) - ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'cpu': '1'}) - >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, True) - ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) - >>> adjust_resource_requirements({"cpu": "1"}, {"memory": "1G"}, False) - ResourceRequirements(claims=None, limits={'cpu': '1'}, requests={'memory': '1G', 'cpu': '1'}) - >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "2"}, True) - ResourceRequirements(\ -claims=None, limits={'cpu': '1', 'memory': '2'}, requests={'memory': '2', 'cpu': '1'}) - >>> adjust_resource_requirements({"cpu": "1", "memory": "1"}, {"memory": "1G"}, False) - ResourceRequirements(\ -claims=None, limits={'cpu': '1', 'memory': '1'}, requests={'memory': '1', 'cpu': '1'}) - >>> adjust_resource_requirements({"custom-resource": "1"}, {"custom-resource": "2"}, False) - Traceback (most recent call last): - ... - ValueError: Invalid limits spec: {'custom-resource': '1'} - """ - if not is_valid_spec(limits): - raise ValueError("Invalid limits spec: {}".format(limits)) - if not is_valid_spec(requests): - raise ValueError("Invalid default requests spec: {}".format(requests)) - - limits = sanitize_resource_spec_dict(limits) or {} - requests = sanitize_resource_spec_dict(requests) or {} - - # Make sure we do not modify in-place - limits, requests = limits.copy(), requests.copy() - - # Need to copy key-val pairs from "limits" to "requests", if they are not present in - # "requests". This replicates K8s behavior: - # https://kubernetes.io/docs/concepts/configuration/manage-resources-containers - requests.update({k: limits[k] for k in limits if k not in requests}) - - if adhere_to_requests: - # Keep limits fixed when `limits` is too low - adjusted, fixed = limits, requests - func = max - else: - # Pull down requests when limit is too low - fixed, adjusted = limits, requests - func = min - - # adjusted = {} - for k in adjusted: - if k not in fixed: - # The resource constraint is present in the "adjusted" dict but not in the "fixed" - # dict. Keep the "adjusted" value as is - continue - - adjusted_value = func(parse_quantity(fixed[k]), parse_quantity(adjusted[k])) # type: ignore[type-var] - adjusted[k] = ( - str(adjusted_value.quantize(decimal.Decimal("0.001"), rounding=decimal.ROUND_UP)) # type: ignore[union-attr] - .rstrip("0") - .rstrip(".") - ) - - return ( - ResourceRequirements(limits=adjusted, requests=fixed) - if adhere_to_requests - else ResourceRequirements(limits=fixed, requests=adjusted) - ) - - -def is_valid_spec(spec: Optional[dict], debug=False) -> bool: # noqa: C901 - """Check if the spec dict is valid. - - TODO: generally, the keys can be anything, not just cpu and memory. Perhaps user could pass - list of custom allowed keys in addition to the K8s ones? - """ - if spec is None: - return True - if not isinstance(spec, dict): - if debug: - logger.error("Invalid resource spec type '%s': must be either None or dict.", spec) - return False - - for k, v in spec.items(): - valid_keys = ["cpu", "memory"] # K8s permits custom keys, but we limit here to what we use - if k not in valid_keys: - if debug: - logger.error("Invalid key in resource spec: %s; valid keys: %s.", k, valid_keys) - return False - try: - assert isinstance(v, (str, type(None))) # for type checker - pv = parse_quantity(v) - except ValueError: - if debug: - logger.error("Invalid resource spec entry: {%s: %s}.", k, v) - return False - - if pv and pv < 0: - if debug: - logger.error("Invalid resource spec entry: {%s: %s}; must be non-negative.", k, v) - return False - - return True - - -def sanitize_resource_spec_dict(spec: Optional[dict]) -> Optional[dict]: - """Fix spec values without altering semantics. - - The purpose of this helper function is to correct known issues. - This function is not intended for fixing user mistakes such as incorrect keys present; that is - left for the `is_valid_spec` function. - """ - if not spec: - return spec - - d = spec.copy() - - for k, v in spec.items(): - if not v: - # Need to ignore empty values input, otherwise the StatefulSet will have "0" as the - # setpoint, the pod will not be scheduled and the charm would be stuck in unknown/lost. - # This slightly changes the spec semantics compared to lightkube/k8s: a setpoint of - # `None` would be interpreted here as "no limit". - del d[k] - - # Round up memory to whole bytes. This is need to avoid K8s errors such as: - # fractional byte value "858993459200m" (0.8Gi) is invalid, must be an integer - memory = d.get("memory") - if memory: - as_decimal = parse_quantity(memory) - if as_decimal and as_decimal.remainder_near(floor(as_decimal)): - d["memory"] = str(ceil(as_decimal)) - return d - - -def _retry_on_condition(exception): - """Retry if the exception is an ApiError with a status code != 403. - - Returns: a boolean value to indicate whether to retry or not. - """ - if isinstance(exception, ApiError) and str(exception.status.code) != "403": - return True - if isinstance(exception, exceptions.ConfigError) or isinstance(exception, ValueError): - return True - return False - - -class K8sResourcePatchFailedEvent(EventBase): - """Emitted when patching fails.""" - - def __init__(self, handle, message=None): - super().__init__(handle) - self.message = message - - def snapshot(self) -> Dict: - """Save grafana source information.""" - return {"message": self.message} - - def restore(self, snapshot): - """Restore grafana source information.""" - self.message = snapshot["message"] - - -class K8sResourcePatchEvents(ObjectEvents): - """Events raised by :class:`K8sResourcePatchEvents`.""" - - patch_failed = EventSource(K8sResourcePatchFailedEvent) - - -class ContainerNotFoundError(ValueError): - """Raised when a given container does not exist in the list of containers.""" - - -class ResourcePatcher: - """Helper class for patching a container's resource limits in a given StatefulSet.""" - - def __init__(self, namespace: str, statefulset_name: str, container_name: str): - self.namespace = namespace - self.statefulset_name = statefulset_name - self.container_name = container_name - self.client = Client() # pyright: ignore - - def _patched_delta(self, resource_reqs: ResourceRequirements) -> StatefulSet: - statefulset = self.client.get( - StatefulSet, name=self.statefulset_name, namespace=self.namespace - ) - - return StatefulSet( - spec=StatefulSetSpec( - selector=statefulset.spec.selector, # type: ignore[attr-defined] - serviceName=statefulset.spec.serviceName, # type: ignore[attr-defined] - template=PodTemplateSpec( - spec=PodSpec( - containers=[Container(name=self.container_name, resources=resource_reqs)] - ) - ), - ) - ) - - @classmethod - def _get_container(cls, container_name: str, containers: List[Container]) -> Container: - """Find our container from the container list, assuming list is unique by name. - - Typically, *.spec.containers[0] is the charm container, and [1] is the (only) workload. - - Raises: - ContainerNotFoundError, if the user-provided container name does not exist in the list. - - Returns: - An instance of :class:`Container` whose name matches the given name. - """ - try: - return next(iter(filter(lambda ctr: ctr.name == container_name, containers))) - except StopIteration: - raise ContainerNotFoundError(f"Container '{container_name}' not found") - - def is_patched(self, resource_reqs: ResourceRequirements) -> bool: - """Reports if the resource patch has been applied to the StatefulSet. - - Returns: - bool: A boolean indicating if the service patch has been applied. - """ - return equals_canonically(self.get_templated(), resource_reqs) # pyright: ignore - - def get_templated(self) -> Optional[ResourceRequirements]: - """Returns the resource limits specified in the StatefulSet template.""" - statefulset = self.client.get( - StatefulSet, name=self.statefulset_name, namespace=self.namespace - ) - podspec_tpl = self._get_container( - self.container_name, - statefulset.spec.template.spec.containers, # type: ignore[attr-defined] - ) - return podspec_tpl.resources - - def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]: - """Return the resource limits that are in effect for the container in the given pod.""" - pod = self.client.get(Pod, name=pod_name, namespace=self.namespace) - podspec = self._get_container( - self.container_name, pod.spec.containers # type: ignore[attr-defined] - ) - return podspec.resources - - def is_ready(self, pod_name, resource_reqs: ResourceRequirements): - """Reports if the resource patch has been applied and is in effect. - - Returns: - bool: A boolean indicating if the service patch has been applied and is in effect. - """ - logger.info( - "reqs=%s, templated=%s, actual=%s", - resource_reqs, - self.get_templated(), - self.get_actual(pod_name), - ) - return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore - resource_reqs, self.get_actual(pod_name) # pyright: ignore - ) - - def apply(self, resource_reqs: ResourceRequirements) -> None: - """Patch the Kubernetes resources created by Juju to limit cpu or mem.""" - # Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the - # charm would be stuck in unknown/lost. - if self.is_patched(resource_reqs): - return - - self.client.patch( - StatefulSet, - self.statefulset_name, - self._patched_delta(resource_reqs), - namespace=self.namespace, - patch_type=PatchType.APPLY, - field_manager=self.__class__.__name__, - ) - - -class KubernetesComputeResourcesPatch(Object): - """A utility for patching the Kubernetes compute resources set up by Juju.""" - - on = K8sResourcePatchEvents() # pyright: ignore - PATCH_RETRY_STOP = tenacity.stop_after_delay(60 * 3) - PATCH_RETRY_WAIT = tenacity.wait_fixed(30) - PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) - - def __init__( - self, - charm: CharmBase, - container_name: str, - *, - resource_reqs_func: Callable[[], ResourceRequirements], - refresh_event: Optional[Union[BoundEvent, List[BoundEvent]]] = None, - ): - """Constructor for KubernetesComputeResourcesPatch. - - References: - - https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/ - - Args: - charm: the charm that is instantiating the library. - container_name: the container for which to apply the resource limits. - resource_reqs_func: a callable returning a `ResourceRequirements`; if raises, should - only raise ValueError. - refresh_event: an optional bound event or list of bound events which - will be observed to re-apply the patch. - """ - super().__init__(charm, "{}_{}".format(self.__class__.__name__, container_name)) - self._charm = charm - self._container_name = container_name - self.resource_reqs_func = resource_reqs_func - self.patcher = ResourcePatcher(self._namespace, self._app, container_name) - - # Ensure this patch is applied during the 'config-changed' event, which is emitted every - # startup and every upgrade. The config-changed event is a good time to apply this kind of - # patch because it is always emitted after storage-attached, leadership and peer-created, - # all of which only fire after install. Patching the statefulset prematurely could result - # in those events firing without a workload. - self.framework.observe(charm.on.config_changed, self._on_config_changed) - - if not refresh_event: - refresh_event = [] - elif not isinstance(refresh_event, list): - refresh_event = [refresh_event] - for ev in refresh_event: - self.framework.observe(ev, self._on_config_changed) - - def _on_config_changed(self, _): - self._patch() - - def _patch(self) -> None: - """Patch the Kubernetes resources created by Juju to limit cpu or mem. - - This method will keep on retrying to patch the kubernetes resource for a default duration of 3 minutes, - if the patching failure is due to a recoverable error (e.g: Network Latency). - This will put the charm in a `Maintenance` state until the patch is either successful or all retries have been consumed, - which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. - If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. - """ - try: - resource_reqs = self.resource_reqs_func() - limits = resource_reqs.limits - requests = resource_reqs.requests - except ValueError as e: - msg = f"Failed obtaining resource limit spec: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - raise ValueError(msg) from e - - for spec in (limits, requests): - if not is_valid_spec(spec): - msg = f"Invalid resource limit spec: {spec}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - raise RuntimeError(msg) - - resource_reqs = ResourceRequirements( - limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] - requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] - ) - - try: - for attempt in tenacity.Retrying( - retry=self.PATCH_RETRY_IF, - stop=self.PATCH_RETRY_STOP, - wait=self.PATCH_RETRY_WAIT, - # if you don't succeed raise the last caught exception when you're done - reraise=True, - ): - with attempt: - self._charm.unit.status = MaintenanceStatus( - f"retrying patching resource limit... (attempt #{attempt.retry_state.attempt_number})" - ) - self.patcher.apply(resource_reqs) - - except exceptions.ConfigError as e: - msg = f"Error creating k8s client: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - raise exceptions.ConfigError(msg) from e - - except ApiError as e: - if e.status.code == 403: - msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" - - else: - msg = f"Kubernetes resources patch failed: {e}" - - logger.error(msg) - self.on.patch_failed.emit(message=msg) - raise - - except ValueError as e: - msg = f"Kubernetes resources patch failed: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - raise ValueError(msg) from e - - else: - logger.info( - "Kubernetes resources for app '%s', container '%s' patched successfully: %s", - self._app, - self._container_name, - resource_reqs, - ) - - def is_ready(self) -> bool: - """Reports if the resource patch has been applied and is in effect. - - Returns: - bool: A boolean indicating if the service patch has been applied and is in effect. - """ - try: - resource_reqs = self.resource_reqs_func() - limits = resource_reqs.limits - requests = resource_reqs.requests - except ValueError as e: - msg = f"Failed obtaining resource limit spec: {e}" - logger.error(msg) - return False - - if not is_valid_spec(limits) or not is_valid_spec(requests): - logger.error("Invalid resource requirements specs: %s, %s", limits, requests) - return False - - resource_reqs = ResourceRequirements( - limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] - requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] - ) - - try: - return self.patcher.is_ready(self._pod, resource_reqs) - except (ValueError, ApiError) as e: - msg = f"Failed to apply resource limit patch: {e}" - logger.error(msg) - self.on.patch_failed.emit(message=msg) - return False - - @property - def _app(self) -> str: - """Name of the current Juju application. - - Returns: - str: A string containing the name of the current Juju application. - """ - return self._charm.app.name - - @property - def _pod(self) -> str: - """Name of the unit's pod. - - Returns: - str: A string containing the name of the current unit's pod. - """ - return "-".join(self._charm.unit.name.rsplit("/", 1)) - - @property - def _namespace(self) -> str: - """The Kubernetes namespace we're running in. - - If a charm is deployed into the controller model (which certainly could happen as we move - to representing the controller as a charm) then self._charm.model.name !== k8s namespace. - Instead, the model name is controller in Juju and controller- for the - namespace in K8s. - - Returns: - str: A string containing the name of the current Kubernetes namespace. - """ - with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") as f: - return f.read().strip() From 29d1c6ee4620cc5f4452bb7962e8a0eb27c63338 Mon Sep 17 00:00:00 2001 From: michael Date: Tue, 20 Aug 2024 13:12:24 +0300 Subject: [PATCH 04/15] correct version --- .../v0/kubernetes_compute_resources_patch.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 4f9081f..6887d96 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -31,7 +31,7 @@ ```shell cd some-charm -charmcraft fetch-lib charms.observability_libs.v1.kubernetes_compute_resources_patch +charmcraft fetch-lib charms.observability_libs.v0.kubernetes_compute_resources_patch cat << EOF >> requirements.txt lightkube lightkube-models @@ -42,7 +42,7 @@ ```python # ... -from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( +from charms.observability_libs.v0.kubernetes_compute_resources_patch import ( KubernetesComputeResourcesPatch, ResourceRequirements, ) @@ -140,7 +140,7 @@ def setUp(self, *unused): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 8 +LIBPATCH = 7 _Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal From a6ac58eb30286d34b269a1c585e5c8cc894ae373 Mon Sep 17 00:00:00 2001 From: michael Date: Tue, 20 Aug 2024 13:12:57 +0300 Subject: [PATCH 05/15] remove debug log --- .../v0/kubernetes_compute_resources_patch.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 6887d96..9c05158 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -307,9 +307,11 @@ def _retry_on_condition(exception): return True return False + class PatchFailedError(Exception): """Raised when patching K8s resources requests and limits fails.""" + class K8sResourcePatchFailedEvent(EventBase): """Emitted when patching fails.""" @@ -412,12 +414,6 @@ def is_ready(self, pod_name, resource_reqs: ResourceRequirements): Returns: bool: A boolean indicating if the service patch has been applied and is in effect. """ - logger.debug( - "reqs=%s, templated=%s, actual=%s", - resource_reqs, - self.get_templated(), - self.get_actual(pod_name), - ) return self.is_patched(resource_reqs) and equals_canonically( # pyright: ignore resource_reqs, self.get_actual(pod_name) # pyright: ignore ) @@ -428,7 +424,7 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: # charm would be stuck in unknown/lost. if self.is_patched(resource_reqs): logger.debug(f"resource requests {resource_reqs} are already patched.") - return + return self.client.patch( StatefulSet, @@ -550,7 +546,6 @@ def _patch(self) -> None: logger.exception(msg) raise PatchFailedError(msg) from e - except ValueError as e: msg = f"Kubernetes resources patch failed: {e}" logger.exception(msg) From 89ed5ee6e0ab795e437457840c446556c5d80098 Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 16:31:58 +0300 Subject: [PATCH 06/15] refactor --- .../v0/kubernetes_compute_resources_patch.py | 205 +++++++++++++++--- ...y => test_kubernetes_compute_resources.py} | 47 +++- .../test_kubernetes_compute_resources_v1.py | 118 ---------- 3 files changed, 222 insertions(+), 148 deletions(-) rename tests/unit/{test_kubernetes_compute_resources_v0.py => test_kubernetes_compute_resources.py} (65%) delete mode 100644 tests/unit/test_kubernetes_compute_resources_v1.py diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 9c05158..15c5f98 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -18,11 +18,6 @@ `ResourceRequirements` model, please visit the `lightkube` [docs](https://gtsystem.github.io/lightkube-models/1.23/models/core_v1/#resourcerequirements). -Note that patching compute resources will keep on retrying for a default duration of KubernetesComputeResourcesPatch.PATCH_RETRY_STOP -if the patching failed due to a recoverable error (e.g: Network Latency). -This will put the charm using the library in a `Maintenance` state until the patch is either successful or all retries have been consumed, -which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. -If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. ## Getting Started @@ -109,9 +104,11 @@ def setUp(self, *unused): import decimal import logging from decimal import Decimal +from enum import Enum, unique from math import ceil, floor -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, Union +import httpx import tenacity from lightkube import ApiError, Client # pyright: ignore from lightkube.core import exceptions @@ -126,7 +123,6 @@ def setUp(self, *unused): from lightkube.resources.core_v1 import Pod from lightkube.types import PatchType from lightkube.utils.quantity import equals_canonically, parse_quantity -from ops import MaintenanceStatus from ops.charm import CharmBase from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents @@ -140,7 +136,7 @@ def setUp(self, *unused): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 7 +LIBPATCH = 8 _Decimal = Union[Decimal, float, str, int] # types that are potentially convertible to Decimal @@ -308,8 +304,13 @@ def _retry_on_condition(exception): return False -class PatchFailedError(Exception): - """Raised when patching K8s resources requests and limits fails.""" +@unique +class PatchState(str, Enum): + """Patch operation possible states.""" + + in_progress = "in_progress" + succeeded = "succeeded" + failed = "failed" class K8sResourcePatchFailedEvent(EventBase): @@ -341,6 +342,10 @@ class ContainerNotFoundError(ValueError): class ResourcePatcher: """Helper class for patching a container's resource limits in a given StatefulSet.""" + api_url = "https://kubernetes.default.svc" + token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" + ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + def __init__(self, namespace: str, statefulset_name: str, container_name: str): self.namespace = namespace self.statefulset_name = statefulset_name @@ -408,6 +413,99 @@ def get_actual(self, pod_name: str) -> Optional[ResourceRequirements]: ) return podspec.resources + def is_failed( + self, resource_reqs_func: Callable[[], ResourceRequirements] + ) -> Tuple[bool, str]: + """Returns a tuple indicating whether a patch operation has failed along with a failure message. + + Implementation is based on dry running the patch operation to catch if there would be failures (e.g: Wrong spec and Auth errors). + """ + try: + resource_reqs = resource_reqs_func() + limits = resource_reqs.limits + requests = resource_reqs.requests + except ValueError as e: + msg = f"Failed obtaining resource limit spec: {e}" + logger.error(msg) + return True, msg + + # Dry run does not catch negative values for resource requests and limits. + if not is_valid_spec(limits) or not is_valid_spec(requests): + msg = f"Invalid resource requirements specs: {limits}, {requests}" + logger.error(msg) + return True, msg + + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) + + response = self.dry_run_apply(resource_reqs) + if response.status_code == 403: + return True, "Kubernetes resources patch failed: `juju trust` this application." + if response.status_code != 200: + return True, response.json().get("message", "") + + return False, "" + + def is_in_progress(self) -> bool: + """Returns a boolean to indicate whether a patch operation is in progress. + + Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout. + Reference: https://github.com/kubernetes/kubectl/blob/master/pkg/polymorphichelpers/rollout_status.go + """ + sts = self.client.get(StatefulSet, name=self.statefulset_name, namespace=self.namespace) + if sts.status is None: + logger.debug("status is not yet available") + return False + if sts.status.observedGeneration == 0 or ( + sts.metadata and sts.metadata.generation > sts.status.observedGeneration # type: ignore + ): + logger.debug("waiting for statefulset spec update to be observed...") + return True + if ( + sts.spec + and sts.spec.replicas is not None + and sts.status.readyReplicas < sts.spec.replicas # type: ignore + ): + logger.debug( + f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..." # type: ignore + ) + return True + + if ( + sts.spec + and sts.spec.updateStrategy + and sts.spec.updateStrategy.type == "rollingUpdate" + and sts.spec.updateStrategy.rollingUpdate is not None + ): + if ( + sts.spec.replicas is not None + and sts.spec.updateStrategy.rollingUpdate.partition is not None + ): + if sts.status.updatedReplicas < ( # type: ignore + sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition + ): + logger.debug( + f"Waiting for partitioned roll out to finish: {sts.status.updatedReplicas} out of {sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition} new pods have been updated..." + ) + return True + logger.debug( + f"partitioned roll out complete: {sts.status.updatedReplicas} new pods have been updated..." + ) + return False + + if sts.status.updateRevision != sts.status.currentRevision: + logger.debug( + f"waiting for statefulset rolling update to complete {sts.status.updatedReplicas} pods at revision {sts.status.updateRevision}..." + ) + return True + + logger.debug( + f"statefulset rolling update complete pods at revision {sts.status.currentRevision}" + ) + return False + def is_ready(self, pod_name, resource_reqs: ResourceRequirements): """Reports if the resource patch has been applied and is in effect. @@ -435,13 +533,36 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: field_manager=self.__class__.__name__, ) + def dry_run_apply(self, resource_reqs: ResourceRequirements): + """Run a dry-run patch operation.""" + # Read the token for authentication + with open(self.token_path, "r") as token_file: + token = token_file.read() + + patch_url = f"{self.api_url}/apis/apps/v1/namespaces/{self.namespace}/statefulsets/{self.statefulset_name}" + + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/apply-patch+yaml", + "Accept": "application/json", + } + + with httpx.Client(verify=self.ca_path) as client: + response = client.patch( + url=patch_url, + headers=headers, + json=self._patched_delta(resource_reqs).__dict__, + params={"dryRun": "All"}, + ) + return response + class KubernetesComputeResourcesPatch(Object): """A utility for patching the Kubernetes compute resources set up by Juju.""" on = K8sResourcePatchEvents() # pyright: ignore - PATCH_RETRY_STOP = tenacity.stop_after_delay(60 * 3) - PATCH_RETRY_WAIT = tenacity.wait_fixed(30) + PATCH_RETRY_STOP = tenacity.stop_after_delay(20) + PATCH_RETRY_WAIT = tenacity.wait_fixed(5) PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) def __init__( @@ -491,11 +612,8 @@ def _on_config_changed(self, _): def _patch(self) -> None: """Patch the Kubernetes resources created by Juju to limit cpu or mem. - This method will keep on retrying to patch the kubernetes resource for a default duration of 3 minutes, + This method will keep on retrying to patch the kubernetes resource for a default duration of 20 seconds if the patching failure is due to a recoverable error (e.g: Network Latency). - This will put the charm in a `Maintenance` state until the patch is either successful or all retries have been consumed, - which in that case, will raise the same exception as the original exception that caused it and will put the charm in error state. - If the failure is not recoverable (e.g: wrong spec provided), an exception same as the original exception that caused it will be raised and will put the charm in error state. """ try: resource_reqs = self.resource_reqs_func() @@ -503,14 +621,16 @@ def _patch(self) -> None: requests = resource_reqs.requests except ValueError as e: msg = f"Failed obtaining resource limit spec: {e}" - logger.exception(msg) - raise ValueError(msg) from e + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return for spec in (limits, requests): if not is_valid_spec(spec): msg = f"Invalid resource limit spec: {spec}" logger.error(msg) - raise ValueError(msg) + self.on.patch_failed.emit(message=msg) + return resource_reqs = ResourceRequirements( limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] @@ -526,15 +646,16 @@ def _patch(self) -> None: reraise=True, ): with attempt: - self._charm.unit.status = MaintenanceStatus( - f"retrying patching resource limit... (attempt #{attempt.retry_state.attempt_number})" + logger.info( + f"attempt #{attempt.retry_state.attempt_number} to patch resource limits" ) self.patcher.apply(resource_reqs) except exceptions.ConfigError as e: msg = f"Error creating k8s client: {e}" - logger.exception(msg) - raise PatchFailedError(msg) from e + logger.error(msg) + self.on.patch_failed.emit(message=msg) + return except ApiError as e: if e.status.code == 403: @@ -543,13 +664,13 @@ def _patch(self) -> None: else: msg = f"Kubernetes resources patch failed: {e}" - logger.exception(msg) - raise PatchFailedError(msg) from e + logger.error(msg) + self.on.patch_failed.emit(message=msg) except ValueError as e: msg = f"Kubernetes resources patch failed: {e}" - logger.exception(msg) - raise PatchFailedError(msg) from e + logger.error(msg) + self.on.patch_failed.emit(message=msg) else: logger.info( @@ -591,6 +712,36 @@ def is_ready(self) -> bool: self.on.patch_failed.emit(message=msg) return False + def get_status(self) -> Tuple[PatchState, str]: + """Return the status of patching the resource limits along with an optional message. + + Returns: + tuple(PatchState, str): A tuple where: + - The first value is an Enum `PatchState` indicating the status of the patch operation. + Possible values are: + - PatchState.succeeded: The patch was applied successfully. + - PatchState.failed: The patch failed. + - PatchState.in_progress: The patch is still in progress. + - The second value is a string that provides additional information or a message about the status. + + Example: + - ("succeeded", "Patch applied successfully.") + - ("failed", "Failed due to missing permissions.") + - ("in_progress", "Patch is in progress.") + """ + # succeeded + if self.is_ready(): + return PatchState.succeeded, "" + # waiting + if self.patcher.is_in_progress(): + return PatchState.in_progress, "" + # failed + is_failed, msg = self.patcher.is_failed(self.resource_reqs_func) + if is_failed: + return PatchState.failed, msg + # if none of the above, then it probably means nothing has been patched yet + return PatchState.succeeded, "" + @property def _app(self) -> str: """Name of the current Juju application. diff --git a/tests/unit/test_kubernetes_compute_resources_v0.py b/tests/unit/test_kubernetes_compute_resources.py similarity index 65% rename from tests/unit/test_kubernetes_compute_resources_v0.py rename to tests/unit/test_kubernetes_compute_resources.py index a7d03b5..e0cd4ac 100644 --- a/tests/unit/test_kubernetes_compute_resources_v0.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -2,14 +2,12 @@ # See LICENSE file for licensing details. import unittest from unittest import mock -from unittest.mock import Mock +from unittest.mock import MagicMock import yaml from charms.observability_libs.v0.kubernetes_compute_resources_patch import ( KubernetesComputeResourcesPatch, adjust_resource_requirements, - is_valid_spec, - sanitize_resource_spec_dict, ) from ops.charm import CharmBase from ops.testing import Harness @@ -88,6 +86,49 @@ def test_invalid_config_emits_custom_event(self, client_mock): after = self.harness.charm.patch_failed_counter self.assertGreater(after, before) + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") + @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply",MagicMock(return_value=None)) + @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready",MagicMock(return_value=True)) + def test_get_status_success(self): + self.harness.begin_with_initial_hooks() + charm = self.harness.charm + status, msg = charm.resources_patch.get_status() + assert status == "succeeded" + assert msg == "" + + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") + @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready",MagicMock(return_value=False)) + @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_in_progress",MagicMock(return_value=False)) + @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply",MagicMock(return_value=httpx.Response(status_code=401, content='{"message": "unauthorized"}'))) + def test_get_status_failed(self): + self.harness.begin_with_initial_hooks() + charm = self.harness.charm + status, msg = charm.resources_patch.get_status() + assert status == "failed" + assert msg == "unauthorized" + + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply", + MagicMock(return_value=None), + ) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", + MagicMock(return_value=False), + ) + @mock.patch("lightkube.core.client.GenericSyncClient") + def test_get_status_in_progress(self, mock_client): + mock_client_get = MagicMock() + mock_client_get.metadata.generation = 2 + mock_client_get.status.observedGeneration = 1 + mock_client.return_value.request.return_value = mock_client_get + + self.harness.begin_with_initial_hooks() + charm = self.harness.charm + status, msg = charm.resources_patch.get_status() + assert status == "in_progress" + assert msg == "" + class TestResourceSpecDictValidation(unittest.TestCase): def test_sanitize_resource_spec_dict(self): diff --git a/tests/unit/test_kubernetes_compute_resources_v1.py b/tests/unit/test_kubernetes_compute_resources_v1.py deleted file mode 100644 index 5dcf687..0000000 --- a/tests/unit/test_kubernetes_compute_resources_v1.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2021 Canonical Ltd. -# See LICENSE file for licensing details. -import unittest -from unittest import mock -from unittest.mock import MagicMock, Mock - -import yaml -from charms.observability_libs.v1.kubernetes_compute_resources_patch import ( - KubernetesComputeResourcesPatch, - adjust_resource_requirements, - is_valid_spec, - sanitize_resource_spec_dict, -) -from ops.charm import CharmBase -from ops.testing import Harness - -from tests.unit.helpers import PROJECT_DIR - -CL_PATH = "charms.observability_libs.v1.kubernetes_compute_resources_patch.KubernetesComputeResourcesPatch" - - -class TestKubernetesComputeResourcesPatch(unittest.TestCase): - class _TestCharm(CharmBase): - def __init__(self, *args): - super().__init__(*args) - self.resources_patch = KubernetesComputeResourcesPatch( - self, - "placeholder", - resource_reqs_func=lambda: adjust_resource_requirements( - { - "cpu": self.model.config.get("cpu"), - "memory": self.model.config.get("memory"), - }, - None, - ), - ) - - def setUp(self) -> None: - with open(PROJECT_DIR / "config.yaml") as config_file: - config = yaml.safe_load(config_file) - self.harness = Harness( - self._TestCharm, meta=open(PROJECT_DIR / "metadata.yaml"), config=str(config) - ) - - @mock.patch("lightkube.core.client.GenericSyncClient", Mock) - @mock.patch(f"{CL_PATH}._namespace", "test-namespace") - def test_listener_is_attached_for_config_changed_event(self): - self.harness.begin() - charm = self.harness.charm - with mock.patch(f"{CL_PATH}._patch") as patch: - charm.on.config_changed.emit() - self.assertEqual(patch.call_count, 1) - - @mock.patch("lightkube.core.client.GenericSyncClient", Mock) - @mock.patch(f"{CL_PATH}._namespace", "test-namespace") - def test_patch_is_applied_regardless_of_leadership_status(self): - self.harness.begin() - charm = self.harness.charm - for is_leader in (True, False): - with self.subTest(is_leader=is_leader): - self.harness.set_leader(True) - with mock.patch(f"{CL_PATH}._patch") as patch: - charm.on.config_changed.emit() - self.assertEqual(patch.call_count, 1) - - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch("lightkube.core.client.GenericSyncClient") - @mock.patch( - "charms.observability_libs.v1.kubernetes_compute_resources_patch.ResourcePatcher.apply", - MagicMock(return_value=None), - ) - def test_patch_is_applied_during_startup_sequence(self, client_mock): - self.harness.begin_with_initial_hooks() - self.assertGreater(client_mock.call_count, 0) - - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch("lightkube.core.client.GenericSyncClient") - @mock.patch( - "charms.observability_libs.v1.kubernetes_compute_resources_patch.ResourcePatcher.apply", - MagicMock(return_value=None), - ) - def test_invalid_config_raises_exception(self, client_mock): - - self.harness.begin_with_initial_hooks() - - # Test invalid quantity values - for cpu, memory in [ - ("-1", ""), - ("", "-1Gi"), - ("-1", "1Gi"), - ("1", "-1Gi"), - ("4x", "1Gi"), - ("1", "1Gx"), - ]: - with self.subTest(cpu=cpu, memory=memory): - with self.assertRaises(ValueError): - self.harness.update_config({"cpu": cpu, "memory": memory}) - - -class TestResourceSpecDictValidation(unittest.TestCase): - def test_sanitize_resource_spec_dict(self): - self.assertEqual(None, sanitize_resource_spec_dict(None)) - self.assertEqual({}, sanitize_resource_spec_dict({})) - self.assertEqual({"bad": "combo"}, sanitize_resource_spec_dict({"bad": "combo"})) - self.assertEqual({"cpu": 1}, sanitize_resource_spec_dict({"cpu": 1})) - self.assertEqual({"cpu": "1"}, sanitize_resource_spec_dict({"cpu": "1"})) - self.assertEqual({"memory": "858993460"}, sanitize_resource_spec_dict({"memory": "0.8Gi"})) - - def test_is_valid_spec(self): - self.assertTrue(is_valid_spec(None)) - self.assertTrue(is_valid_spec({})) - self.assertTrue(is_valid_spec({"cpu": "1"})) - self.assertTrue(is_valid_spec({"memory": "858993460"})) - self.assertTrue(is_valid_spec({"memory": "0.8Gi"})) - self.assertTrue(is_valid_spec({"cpu": None, "memory": None})) - - self.assertFalse(is_valid_spec({"bad": "combo"})) - self.assertFalse(is_valid_spec({"invalid-key": "1"})) From 9c8495d68c9fbdd4805f06d40618949842a89d4a Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 16:35:58 +0300 Subject: [PATCH 07/15] lint --- .../unit/test_kubernetes_compute_resources.py | 32 +++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index e0cd4ac..a397cc5 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -2,12 +2,15 @@ # See LICENSE file for licensing details. import unittest from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, Mock +import httpx import yaml from charms.observability_libs.v0.kubernetes_compute_resources_patch import ( KubernetesComputeResourcesPatch, adjust_resource_requirements, + is_valid_spec, + sanitize_resource_spec_dict, ) from ops.charm import CharmBase from ops.testing import Harness @@ -87,8 +90,14 @@ def test_invalid_config_emits_custom_event(self, client_mock): self.assertGreater(after, before) @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply",MagicMock(return_value=None)) - @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready",MagicMock(return_value=True)) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply", + MagicMock(return_value=None), + ) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", + MagicMock(return_value=True), + ) def test_get_status_success(self): self.harness.begin_with_initial_hooks() charm = self.harness.charm @@ -97,9 +106,20 @@ def test_get_status_success(self): assert msg == "" @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready",MagicMock(return_value=False)) - @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_in_progress",MagicMock(return_value=False)) - @mock.patch("charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply",MagicMock(return_value=httpx.Response(status_code=401, content='{"message": "unauthorized"}'))) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", + MagicMock(return_value=False), + ) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_in_progress", + MagicMock(return_value=False), + ) + @mock.patch( + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply", + MagicMock( + return_value=httpx.Response(status_code=401, content='{"message": "unauthorized"}') + ), + ) def test_get_status_failed(self): self.harness.begin_with_initial_hooks() charm = self.harness.charm From e98599e223b89a74df72a176235322495b92c928 Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 17:04:28 +0300 Subject: [PATCH 08/15] fix UT --- .../v0/kubernetes_compute_resources_patch.py | 2 +- tests/unit/test_kubernetes_compute_resources.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 15c5f98..66d6143 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -452,7 +452,7 @@ def is_in_progress(self) -> bool: """Returns a boolean to indicate whether a patch operation is in progress. Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout. - Reference: https://github.com/kubernetes/kubectl/blob/master/pkg/polymorphichelpers/rollout_status.go + Reference: https://github.com/kubernetes/kubectl/blob/kubernetes-1.31.0/pkg/polymorphichelpers/rollout_status.go """ sts = self.client.get(StatefulSet, name=self.statefulset_name, namespace=self.namespace) if sts.status is None: diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index a397cc5..dd0ceaa 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -98,7 +98,8 @@ def test_invalid_config_emits_custom_event(self, client_mock): "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", MagicMock(return_value=True), ) - def test_get_status_success(self): + @mock.patch("lightkube.core.client.GenericSyncClient") + def test_get_status_success(self, mock_client): self.harness.begin_with_initial_hooks() charm = self.harness.charm status, msg = charm.resources_patch.get_status() @@ -120,7 +121,8 @@ def test_get_status_success(self): return_value=httpx.Response(status_code=401, content='{"message": "unauthorized"}') ), ) - def test_get_status_failed(self): + @mock.patch("lightkube.core.client.GenericSyncClient") + def test_get_status_failed(self, mock_client): self.harness.begin_with_initial_hooks() charm = self.harness.charm status, msg = charm.resources_patch.get_status() From 18b376ffea5a4a1c9fc98375ccf79f1bd35827c6 Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 20:30:25 +0300 Subject: [PATCH 09/15] fixes --- .../v0/kubernetes_compute_resources_patch.py | 81 ++++++++++++------- 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 66d6143..8d1aef7 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -352,6 +352,22 @@ def __init__(self, namespace: str, statefulset_name: str, container_name: str): self.container_name = container_name self.client = Client() # pyright: ignore + def _raw_patch_data(self, resources: dict) -> dict: + return { + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": f"{self.container_name}", + "resources": resources, + } + ] + } + } + } + } + def _patched_delta(self, resource_reqs: ResourceRequirements) -> StatefulSet: statefulset = self.client.get( StatefulSet, name=self.statefulset_name, namespace=self.namespace @@ -435,17 +451,17 @@ def is_failed( logger.error(msg) return True, msg - resource_reqs = ResourceRequirements( - limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] - requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] - ) + resources = { + "limits": sanitize_resource_spec_dict(limits), + "requests": sanitize_resource_spec_dict(requests), + } - response = self.dry_run_apply(resource_reqs) + response = self.dry_run_apply(resources) if response.status_code == 403: return True, "Kubernetes resources patch failed: `juju trust` this application." if response.status_code != 200: - return True, response.json().get("message", "") - + msg = f"Kubernetes resources patch failed: {response.json().get('message', '')}" + return True, msg return False, "" def is_in_progress(self) -> bool: @@ -454,28 +470,36 @@ def is_in_progress(self) -> bool: Implementation follows a similar approach to `kubectl rollout status statefulset` to track the progress of a rollout. Reference: https://github.com/kubernetes/kubectl/blob/kubernetes-1.31.0/pkg/polymorphichelpers/rollout_status.go """ - sts = self.client.get(StatefulSet, name=self.statefulset_name, namespace=self.namespace) - if sts.status is None: - logger.debug("status is not yet available") + try: + sts = self.client.get( + StatefulSet, name=self.statefulset_name, namespace=self.namespace + ) + except (ValueError, ApiError): + return False + + if sts.status is None or sts.spec is None: + logger.debug("status/spec are not yet available") return False if sts.status.observedGeneration == 0 or ( - sts.metadata and sts.metadata.generation > sts.status.observedGeneration # type: ignore + sts.metadata + and sts.status.observedGeneration + and sts.metadata.generation + and sts.metadata.generation > sts.status.observedGeneration ): logger.debug("waiting for statefulset spec update to be observed...") return True if ( - sts.spec - and sts.spec.replicas is not None - and sts.status.readyReplicas < sts.spec.replicas # type: ignore + sts.spec.replicas is not None + and sts.status.readyReplicas is not None + and sts.status.readyReplicas < sts.spec.replicas ): logger.debug( - f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..." # type: ignore + f"Waiting for {sts.spec.replicas-sts.status.readyReplicas} pods to be ready..." ) return True if ( - sts.spec - and sts.spec.updateStrategy + sts.spec.updateStrategy and sts.spec.updateStrategy.type == "rollingUpdate" and sts.spec.updateStrategy.rollingUpdate is not None ): @@ -483,7 +507,7 @@ def is_in_progress(self) -> bool: sts.spec.replicas is not None and sts.spec.updateStrategy.rollingUpdate.partition is not None ): - if sts.status.updatedReplicas < ( # type: ignore + if sts.status.updatedReplicas and sts.status.updatedReplicas < ( sts.spec.replicas - sts.spec.updateStrategy.rollingUpdate.partition ): logger.debug( @@ -533,7 +557,7 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: field_manager=self.__class__.__name__, ) - def dry_run_apply(self, resource_reqs: ResourceRequirements): + def dry_run_apply(self, resources: dict): """Run a dry-run patch operation.""" # Read the token for authentication with open(self.token_path, "r") as token_file: @@ -543,7 +567,7 @@ def dry_run_apply(self, resource_reqs: ResourceRequirements): headers = { "Authorization": f"Bearer {token}", - "Content-Type": "application/apply-patch+yaml", + "Content-Type": "application/strategic-merge-patch+json", "Accept": "application/json", } @@ -551,8 +575,8 @@ def dry_run_apply(self, resource_reqs: ResourceRequirements): response = client.patch( url=patch_url, headers=headers, - json=self._patched_delta(resource_reqs).__dict__, - params={"dryRun": "All"}, + json=self._raw_patch_data(resources), + params={"dryRun": "All", "fieldManager": self.__class__.__name__}, ) return response @@ -646,7 +670,7 @@ def _patch(self) -> None: reraise=True, ): with attempt: - logger.info( + logger.debug( f"attempt #{attempt.retry_state.attempt_number} to patch resource limits" ) self.patcher.apply(resource_reqs) @@ -729,17 +753,14 @@ def get_status(self) -> Tuple[PatchState, str]: - ("failed", "Failed due to missing permissions.") - ("in_progress", "Patch is in progress.") """ - # succeeded - if self.is_ready(): - return PatchState.succeeded, "" - # waiting - if self.patcher.is_in_progress(): - return PatchState.in_progress, "" # failed is_failed, msg = self.patcher.is_failed(self.resource_reqs_func) if is_failed: return PatchState.failed, msg - # if none of the above, then it probably means nothing has been patched yet + # waiting + if self.patcher.is_in_progress(): + return PatchState.in_progress, "" + # succeeded or nothing has been patched yet return PatchState.succeeded, "" @property From c646fda5b4617a55d973245a938f3fe034babb43 Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 21:27:37 +0300 Subject: [PATCH 10/15] change return type for get_status --- .../v0/kubernetes_compute_resources_patch.py | 59 ++++++++++--------- .../unit/test_kubernetes_compute_resources.py | 41 +++---------- 2 files changed, 39 insertions(+), 61 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 8d1aef7..07b3582 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -76,6 +76,17 @@ def _resource_spec_from_config(self) -> ResourceRequirements: return ResourceRequirements(limits=spec, requests=spec) ``` +If you wish to pull the state of the resources patch operation and set the charm unit status based on that patch result, +you can achieve that using `get_status()` function. +```python +class SomeCharm(CharmBase): + def __init__(self, *args): + #... + self.framework.observe(self.on.collect_unit_status, self._on_collect_unit_status) + #... + def _on_collect_unit_status(self, event: CollectStatusEvent): + event.add_status(self.resources_patch.get_status()) +``` Additionally, you may wish to use mocks in your charm's unit testing to ensure that the library does not try to make any API calls, or open any files during testing that are unlikely to be @@ -83,12 +94,14 @@ def _resource_spec_from_config(self) -> ResourceRequirements: ```python # ... +from ops import ActiveStatus @patch.multiple( "charm.KubernetesComputeResourcesPatch", _namespace="test-namespace", _is_patched=lambda *a, **kw: True, is_ready=lambda *a, **kw: True, + get_status=lambda _: ActiveStatus(), ) @patch("lightkube.core.client.GenericSyncClient") def setUp(self, *unused): @@ -104,7 +117,6 @@ def setUp(self, *unused): import decimal import logging from decimal import Decimal -from enum import Enum, unique from math import ceil, floor from typing import Callable, Dict, List, Optional, Tuple, Union @@ -123,8 +135,10 @@ def setUp(self, *unused): from lightkube.resources.core_v1 import Pod from lightkube.types import PatchType from lightkube.utils.quantity import equals_canonically, parse_quantity +from ops import ActiveStatus, BlockedStatus, WaitingStatus from ops.charm import CharmBase from ops.framework import BoundEvent, EventBase, EventSource, Object, ObjectEvents +from ops.model import StatusBase logger = logging.getLogger(__name__) @@ -304,15 +318,6 @@ def _retry_on_condition(exception): return False -@unique -class PatchState(str, Enum): - """Patch operation possible states.""" - - in_progress = "in_progress" - succeeded = "succeeded" - failed = "failed" - - class K8sResourcePatchFailedEvent(EventBase): """Emitted when patching fails.""" @@ -571,6 +576,7 @@ def dry_run_apply(self, resources: dict): "Accept": "application/json", } + # TODO: replace httpx with lightkube's client once https://github.com/gtsystem/lightkube/issues/73 is addressed with httpx.Client(verify=self.ca_path) as client: response = client.patch( url=patch_url, @@ -585,6 +591,7 @@ class KubernetesComputeResourcesPatch(Object): """A utility for patching the Kubernetes compute resources set up by Juju.""" on = K8sResourcePatchEvents() # pyright: ignore + # TODO: revisit values once we know leadership lease behavior PATCH_RETRY_STOP = tenacity.stop_after_delay(20) PATCH_RETRY_WAIT = tenacity.wait_fixed(5) PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) @@ -736,32 +743,28 @@ def is_ready(self) -> bool: self.on.patch_failed.emit(message=msg) return False - def get_status(self) -> Tuple[PatchState, str]: - """Return the status of patching the resource limits along with an optional message. + def get_status(self) -> StatusBase: + """Return the status of patching the resource limits in a `StatusBase` format. Returns: - tuple(PatchState, str): A tuple where: - - The first value is an Enum `PatchState` indicating the status of the patch operation. - Possible values are: - - PatchState.succeeded: The patch was applied successfully. - - PatchState.failed: The patch failed. - - PatchState.in_progress: The patch is still in progress. - - The second value is a string that provides additional information or a message about the status. + StatusBase: There is a 1:1 mapping between the state of the patching operation and a `StatusBase` value that the charm can be set to. + Possible values are: + - ActiveStatus: The patch was applied successfully. + - BlockedStatus: The patch failed and requires a human intervention. + - WaitingStatus: The patch is still in progress. Example: - - ("succeeded", "Patch applied successfully.") - - ("failed", "Failed due to missing permissions.") - - ("in_progress", "Patch is in progress.") + - ActiveStatus("Patch applied successfully") + - BlockedStatus("Failed due to missing permissions") + - WaitingStatus("Patch is in progress") """ - # failed is_failed, msg = self.patcher.is_failed(self.resource_reqs_func) if is_failed: - return PatchState.failed, msg - # waiting + return BlockedStatus(msg) if self.patcher.is_in_progress(): - return PatchState.in_progress, "" - # succeeded or nothing has been patched yet - return PatchState.succeeded, "" + return WaitingStatus("waiting for resources patch to apply") + # patch successful or nothing has been patched yet + return ActiveStatus() @property def _app(self) -> str: diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index dd0ceaa..8a6a8af 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -12,6 +12,7 @@ is_valid_spec, sanitize_resource_spec_dict, ) +from ops import ActiveStatus, BlockedStatus, WaitingStatus from ops.charm import CharmBase from ops.testing import Harness @@ -89,32 +90,8 @@ def test_invalid_config_emits_custom_event(self, client_mock): after = self.harness.charm.patch_failed_counter self.assertGreater(after, before) + @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply", - MagicMock(return_value=None), - ) - @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", - MagicMock(return_value=True), - ) - @mock.patch("lightkube.core.client.GenericSyncClient") - def test_get_status_success(self, mock_client): - self.harness.begin_with_initial_hooks() - charm = self.harness.charm - status, msg = charm.resources_patch.get_status() - assert status == "succeeded" - assert msg == "" - - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") - @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", - MagicMock(return_value=False), - ) - @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_in_progress", - MagicMock(return_value=False), - ) @mock.patch( "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply", MagicMock( @@ -125,9 +102,8 @@ def test_get_status_success(self, mock_client): def test_get_status_failed(self, mock_client): self.harness.begin_with_initial_hooks() charm = self.harness.charm - status, msg = charm.resources_patch.get_status() - assert status == "failed" - assert msg == "unauthorized" + status = charm.resources_patch.get_status() + assert status == BlockedStatus("Kubernetes resources patch failed: unauthorized") @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch( @@ -135,8 +111,8 @@ def test_get_status_failed(self, mock_client): MagicMock(return_value=None), ) @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_ready", - MagicMock(return_value=False), + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.is_failed", + MagicMock(return_value=(False, "")), ) @mock.patch("lightkube.core.client.GenericSyncClient") def test_get_status_in_progress(self, mock_client): @@ -147,9 +123,8 @@ def test_get_status_in_progress(self, mock_client): self.harness.begin_with_initial_hooks() charm = self.harness.charm - status, msg = charm.resources_patch.get_status() - assert status == "in_progress" - assert msg == "" + status = charm.resources_patch.get_status() + assert status == WaitingStatus("waiting for resources patch to apply") class TestResourceSpecDictValidation(unittest.TestCase): From de93c9fbf9a1481b4f73d4f66d4cf160ec205734 Mon Sep 17 00:00:00 2001 From: michael Date: Wed, 21 Aug 2024 21:29:26 +0300 Subject: [PATCH 11/15] lint --- tests/unit/test_kubernetes_compute_resources.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index 8a6a8af..67de707 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -12,7 +12,7 @@ is_valid_spec, sanitize_resource_spec_dict, ) -from ops import ActiveStatus, BlockedStatus, WaitingStatus +from ops import BlockedStatus, WaitingStatus from ops.charm import CharmBase from ops.testing import Harness @@ -90,7 +90,6 @@ def test_invalid_config_emits_custom_event(self, client_mock): after = self.harness.charm.patch_failed_counter self.assertGreater(after, before) - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch( "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply", From 3c4d8a5a51e2181e271a3cb73c3f8d82a516d18f Mon Sep 17 00:00:00 2001 From: michael Date: Mon, 26 Aug 2024 08:48:04 +0300 Subject: [PATCH 12/15] bymp lightkube --- .../v0/kubernetes_compute_resources_patch.py | 82 ++++++------------- requirements.txt | 2 +- .../unit/test_kubernetes_compute_resources.py | 31 ++++--- 3 files changed, 43 insertions(+), 72 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 07b3582..82e1dd1 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -120,7 +120,6 @@ def setUp(self, *unused): from math import ceil, floor from typing import Callable, Dict, List, Optional, Tuple, Union -import httpx import tenacity from lightkube import ApiError, Client # pyright: ignore from lightkube.core import exceptions @@ -347,32 +346,12 @@ class ContainerNotFoundError(ValueError): class ResourcePatcher: """Helper class for patching a container's resource limits in a given StatefulSet.""" - api_url = "https://kubernetes.default.svc" - token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token" - ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - def __init__(self, namespace: str, statefulset_name: str, container_name: str): self.namespace = namespace self.statefulset_name = statefulset_name self.container_name = container_name self.client = Client() # pyright: ignore - def _raw_patch_data(self, resources: dict) -> dict: - return { - "spec": { - "template": { - "spec": { - "containers": [ - { - "name": f"{self.container_name}", - "resources": resources, - } - ] - } - } - } - } - def _patched_delta(self, resource_reqs: ResourceRequirements) -> StatefulSet: statefulset = self.client.get( StatefulSet, name=self.statefulset_name, namespace=self.namespace @@ -456,17 +435,23 @@ def is_failed( logger.error(msg) return True, msg - resources = { - "limits": sanitize_resource_spec_dict(limits), - "requests": sanitize_resource_spec_dict(requests), - } + resource_reqs = ResourceRequirements( + limits=sanitize_resource_spec_dict(limits), # type: ignore[arg-type] + requests=sanitize_resource_spec_dict(requests), # type: ignore[arg-type] + ) - response = self.dry_run_apply(resources) - if response.status_code == 403: - return True, "Kubernetes resources patch failed: `juju trust` this application." - if response.status_code != 200: - msg = f"Kubernetes resources patch failed: {response.json().get('message', '')}" + try: + self.dry_run_apply(resource_reqs) + except ApiError as e: + if e.status.code == 403: + msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" + else: + msg = f"Kubernetes resources patch failed: {e}" return True, msg + except ValueError as e: + msg = f"Kubernetes resources patch failed: {e}" + return True, msg + return False, "" def is_in_progress(self) -> bool: @@ -562,36 +547,23 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: field_manager=self.__class__.__name__, ) - def dry_run_apply(self, resources: dict): + def dry_run_apply(self, resource_reqs: ResourceRequirements): """Run a dry-run patch operation.""" - # Read the token for authentication - with open(self.token_path, "r") as token_file: - token = token_file.read() - - patch_url = f"{self.api_url}/apis/apps/v1/namespaces/{self.namespace}/statefulsets/{self.statefulset_name}" - - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "application/strategic-merge-patch+json", - "Accept": "application/json", - } - - # TODO: replace httpx with lightkube's client once https://github.com/gtsystem/lightkube/issues/73 is addressed - with httpx.Client(verify=self.ca_path) as client: - response = client.patch( - url=patch_url, - headers=headers, - json=self._raw_patch_data(resources), - params={"dryRun": "All", "fieldManager": self.__class__.__name__}, - ) - return response + self.client.patch( + StatefulSet, + self.statefulset_name, + self._patched_delta(resource_reqs), + namespace=self.namespace, + patch_type=PatchType.APPLY, + field_manager=self.__class__.__name__, + dry_run=True, + ) class KubernetesComputeResourcesPatch(Object): """A utility for patching the Kubernetes compute resources set up by Juju.""" on = K8sResourcePatchEvents() # pyright: ignore - # TODO: revisit values once we know leadership lease behavior PATCH_RETRY_STOP = tenacity.stop_after_delay(20) PATCH_RETRY_WAIT = tenacity.wait_fixed(5) PATCH_RETRY_IF = tenacity.retry_if_exception(_retry_on_condition) @@ -758,8 +730,8 @@ def get_status(self) -> StatusBase: - BlockedStatus("Failed due to missing permissions") - WaitingStatus("Patch is in progress") """ - is_failed, msg = self.patcher.is_failed(self.resource_reqs_func) - if is_failed: + failed, msg = self.patcher.is_failed(self.resource_reqs_func) + if failed: return BlockedStatus(msg) if self.patcher.is_in_progress(): return WaitingStatus("waiting for resources patch to apply") diff --git a/requirements.txt b/requirements.txt index 8160700..3eb724c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,5 @@ ops PyYAML -lightkube +lightkube>=v0.15.4 tenacity diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index 67de707..8df4e00 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -12,6 +12,7 @@ is_valid_spec, sanitize_resource_spec_dict, ) +from lightkube import ApiError from ops import BlockedStatus, WaitingStatus from ops.charm import CharmBase from ops.testing import Harness @@ -25,11 +26,16 @@ class TestKubernetesComputeResourcesPatch(unittest.TestCase): class _TestCharm(CharmBase): def __init__(self, *args): super().__init__(*args) - self.resources_patch = KubernetesComputeResourcesPatch( - self, - "placeholder", - resource_reqs_func=lambda: adjust_resource_requirements(None, None), - ) + with mock.patch.object( + KubernetesComputeResourcesPatch, + "_namespace", + "test-namespace", + ): + self.resources_patch = KubernetesComputeResourcesPatch( + self, + "placeholder", + resource_reqs_func=lambda: adjust_resource_requirements(None, None), + ) self.framework.observe(self.resources_patch.on.patch_failed, self._patch_failed) self.patch_failed_counter = 0 @@ -44,7 +50,6 @@ def setUp(self) -> None: ) @mock.patch("lightkube.core.client.GenericSyncClient", Mock) - @mock.patch(f"{CL_PATH}._namespace", "test-namespace") def test_listener_is_attached_for_config_changed_event(self): self.harness.begin() charm = self.harness.charm @@ -53,7 +58,6 @@ def test_listener_is_attached_for_config_changed_event(self): self.assertEqual(patch.call_count, 1) @mock.patch("lightkube.core.client.GenericSyncClient", Mock) - @mock.patch(f"{CL_PATH}._namespace", "test-namespace") def test_patch_is_applied_regardless_of_leadership_status(self): self.harness.begin() charm = self.harness.charm @@ -64,13 +68,11 @@ def test_patch_is_applied_regardless_of_leadership_status(self): charm.on.config_changed.emit() self.assertEqual(patch.call_count, 1) - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch("lightkube.core.client.GenericSyncClient") def test_patch_is_applied_during_startup_sequence(self, client_mock): self.harness.begin_with_initial_hooks() self.assertGreater(client_mock.call_count, 0) - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch("lightkube.core.client.GenericSyncClient") def test_invalid_config_emits_custom_event(self, client_mock): self.harness.begin_with_initial_hooks() @@ -90,21 +92,18 @@ def test_invalid_config_emits_custom_event(self, client_mock): after = self.harness.charm.patch_failed_counter self.assertGreater(after, before) - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply", - MagicMock( - return_value=httpx.Response(status_code=401, content='{"message": "unauthorized"}') - ), + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply" ) @mock.patch("lightkube.core.client.GenericSyncClient") - def test_get_status_failed(self, mock_client): + def test_get_status_failed(self, mock_client, mock_resource_patcher): + response = httpx.Response(status_code=401, content='{"message": "unauthorized"}') + mock_resource_patcher.side_effect = ApiError(response=response) self.harness.begin_with_initial_hooks() charm = self.harness.charm status = charm.resources_patch.get_status() assert status == BlockedStatus("Kubernetes resources patch failed: unauthorized") - @mock.patch.object(KubernetesComputeResourcesPatch, "_namespace", "test-namespace") @mock.patch( "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply", MagicMock(return_value=None), From b16e183038c289e86542231afb7e839b32e16677 Mon Sep 17 00:00:00 2001 From: michael Date: Mon, 26 Aug 2024 11:11:02 +0300 Subject: [PATCH 13/15] static checks --- .../v0/kubernetes_compute_resources_patch.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 82e1dd1..1d40612 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -118,7 +118,7 @@ def setUp(self, *unused): import logging from decimal import Decimal from math import ceil, floor -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union import tenacity from lightkube import ApiError, Client # pyright: ignore @@ -156,7 +156,9 @@ def setUp(self, *unused): def adjust_resource_requirements( - limits: Optional[dict], requests: Optional[dict], adhere_to_requests: bool = True + limits: Optional[Dict[Any, Any]], + requests: Optional[Dict[Any, Any]], + adhere_to_requests: bool = True, ) -> ResourceRequirements: """Adjust resource limits so that `limits` and `requests` are consistent with each other. From 57d46d23d75f6ca9af36184e59fc584eea6cd40c Mon Sep 17 00:00:00 2001 From: michael Date: Thu, 29 Aug 2024 08:59:44 +0300 Subject: [PATCH 14/15] converge dry_run --- .../v0/kubernetes_compute_resources_patch.py | 26 +++++++------------ .../unit/test_kubernetes_compute_resources.py | 2 +- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index 1d40612..a72341e 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -443,7 +443,7 @@ def is_failed( ) try: - self.dry_run_apply(resource_reqs) + self.apply(resource_reqs, dry_run=True) except ApiError as e: if e.status.code == 403: msg = f"Kubernetes resources patch failed: `juju trust` this application. {e}" @@ -466,7 +466,10 @@ def is_in_progress(self) -> bool: sts = self.client.get( StatefulSet, name=self.statefulset_name, namespace=self.namespace ) - except (ValueError, ApiError): + except (ValueError, ApiError) as e: + # Assumption: if there was a presistent issue, it'd have been caught in `is_failed` + # Wait until next run to try again. + logger.error(f"Failed to fetch statefulset from K8s api: {e}") return False if sts.status is None or sts.spec is None: @@ -532,12 +535,12 @@ def is_ready(self, pod_name, resource_reqs: ResourceRequirements): resource_reqs, self.get_actual(pod_name) # pyright: ignore ) - def apply(self, resource_reqs: ResourceRequirements) -> None: + def apply(self, resource_reqs: ResourceRequirements, dry_run=False) -> None: """Patch the Kubernetes resources created by Juju to limit cpu or mem.""" # Need to ignore invalid input, otherwise the StatefulSet gives "FailedCreate" and the # charm would be stuck in unknown/lost. - if self.is_patched(resource_reqs): - logger.debug(f"resource requests {resource_reqs} are already patched.") + if not dry_run and self.is_patched(resource_reqs): + logger.debug(f"Resource requests are already patched: {resource_reqs}") return self.client.patch( @@ -547,18 +550,7 @@ def apply(self, resource_reqs: ResourceRequirements) -> None: namespace=self.namespace, patch_type=PatchType.APPLY, field_manager=self.__class__.__name__, - ) - - def dry_run_apply(self, resource_reqs: ResourceRequirements): - """Run a dry-run patch operation.""" - self.client.patch( - StatefulSet, - self.statefulset_name, - self._patched_delta(resource_reqs), - namespace=self.namespace, - patch_type=PatchType.APPLY, - field_manager=self.__class__.__name__, - dry_run=True, + dry_run=dry_run, ) diff --git a/tests/unit/test_kubernetes_compute_resources.py b/tests/unit/test_kubernetes_compute_resources.py index 8df4e00..e64df26 100644 --- a/tests/unit/test_kubernetes_compute_resources.py +++ b/tests/unit/test_kubernetes_compute_resources.py @@ -93,7 +93,7 @@ def test_invalid_config_emits_custom_event(self, client_mock): self.assertGreater(after, before) @mock.patch( - "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.dry_run_apply" + "charms.observability_libs.v0.kubernetes_compute_resources_patch.ResourcePatcher.apply" ) @mock.patch("lightkube.core.client.GenericSyncClient") def test_get_status_failed(self, mock_client, mock_resource_patcher): From cd182e93e153191ab238026cde62de8fc7775df5 Mon Sep 17 00:00:00 2001 From: michael Date: Thu, 29 Aug 2024 09:01:52 +0300 Subject: [PATCH 15/15] lint --- .../observability_libs/v0/kubernetes_compute_resources_patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py index a72341e..34dd026 100644 --- a/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py +++ b/lib/charms/observability_libs/v0/kubernetes_compute_resources_patch.py @@ -467,7 +467,7 @@ def is_in_progress(self) -> bool: StatefulSet, name=self.statefulset_name, namespace=self.namespace ) except (ValueError, ApiError) as e: - # Assumption: if there was a presistent issue, it'd have been caught in `is_failed` + # Assumption: if there was a persistent issue, it'd have been caught in `is_failed` # Wait until next run to try again. logger.error(f"Failed to fetch statefulset from K8s api: {e}") return False