From 6ceebb05205e6cfaf57b8b941008869413afd1f3 Mon Sep 17 00:00:00 2001 From: Luca Bello <36242061+lucabello@users.noreply.github.com> Date: Thu, 25 Jan 2024 16:39:29 +0100 Subject: [PATCH] Add support for pebble log forwarding (#332) * add draft skeleton for pebble log forwarding * add enable/disable logic * fix static checks and linting * refactor log labels * refactor label names Co-authored-by: Luca Bello <36242061+lucabello@users.noreply.github.com> * enabling forwarding through relation events * fix static checks and linting * added unit tests for the logforwarder class * fix endpoints sorting * add docs and minor improvements * tox fmt * address one pr comment * doc fix and tox fmt * doc fix * more doc improvements * addressing PR comments in a live session * addressing comments * addressing live review * minor improvements and unit tests * enable/disable to be public * exposing loki data key to init * refactor again without docs * minor fixes * add self init * fixed library again * final fix * docs fix * tox fmt * make update_logging public * update unit tests * fix lint * move ManualLogForwarder out * remove old docs * minor docstring fix * use logger.warning * fix unit tests * minor docfix * charmcraft fetch-lib --------- Co-authored-by: IbraAoad Co-authored-by: Ibrahim Awwad --- lib/charms/loki_k8s/v1/loki_push_api.py | 250 +++++++++++++++++- .../v2/tls_certificates.py | 228 +++++++++++----- tests/unit/test_log_forwarder.py | 95 +++++++ 3 files changed, 500 insertions(+), 73 deletions(-) create mode 100644 tests/unit/test_log_forwarder.py diff --git a/lib/charms/loki_k8s/v1/loki_push_api.py b/lib/charms/loki_k8s/v1/loki_push_api.py index bcc3e6a64..acd4338c6 100644 --- a/lib/charms/loki_k8s/v1/loki_push_api.py +++ b/lib/charms/loki_k8s/v1/loki_push_api.py @@ -20,6 +20,10 @@ send telemetry, such as logs, to Loki through a Log Proxy by implementing the consumer side of the `loki_push_api` relation interface. +- `LogForwarder`: This object can be used by any Charmed Operator which needs to send the workload +standard output (stdout) through Pebble's log forwarding mechanism, to Loki endpoints through the +`loki_push_api` relation interface. + Filtering logs in Loki is largely performed on the basis of labels. In the Juju ecosystem, Juju topology labels are used to uniquely identify the workload which generates telemetry like logs. @@ -349,6 +353,45 @@ def _promtail_error(self, event): ) ``` +## LogForwarder class Usage + +Let's say that we have a charm's workload that writes logs to the standard output (stdout), +and we need to send those logs to a workload implementing the `loki_push_api` interface, +such as `Loki` or `Grafana Agent`. To know how to reach a Loki instance, a charm would +typically use the `loki_push_api` interface. + +Use the `LogForwarder` class by instantiating it in the `__init__` method of the charm: + +```python +from charms.loki_k8s.v1.loki_push_api import LogForwarder + +... + + def __init__(self, *args): + ... + self._log_forwarder = LogForwarder( + self, + relation_name="logging" # optional, defaults to `logging` + ) +``` + +The `LogForwarder` by default will observe relation events on the `logging` endpoint and +enable/disable log forwarding automatically. +Next, modify the `metadata.yaml` file to add: + +The `log-forwarding` relation in the `requires` section: +```yaml +requires: + logging: + interface: loki_push_api + optional: true +``` + +Once the LogForwader class is implemented in your charm and the relation (implementing the +`loki_push_api` interface) is active and healthy, the library will inject a Pebble layer in +each workload container the charm has access to, to configure Pebble's log forwarding +feature and start sending logs to Loki. + ## Alerting Rules This charm library also supports gathering alerting rules from all related Loki client @@ -463,6 +506,7 @@ def _alert_rules_error(self, event): WorkloadEvent, ) from ops.framework import EventBase, EventSource, Object, ObjectEvents +from ops.jujuversion import JujuVersion from ops.model import Container, ModelError, Relation from ops.pebble import APIError, ChangeError, Layer, PathError, ProtocolError @@ -474,7 +518,7 @@ def _alert_rules_error(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 1 +LIBPATCH = 2 logger = logging.getLogger(__name__) @@ -2313,6 +2357,210 @@ def _containers(self) -> Dict[str, Container]: return {cont: self._charm.unit.get_container(cont) for cont in self._logs_scheme.keys()} +class _PebbleLogClient: + @staticmethod + def check_juju_version() -> bool: + """Make sure the Juju version supports Log Forwarding.""" + juju_version = JujuVersion.from_environ() + if not juju_version > JujuVersion(version=str("3.3")): + msg = f"Juju version {juju_version} does not support Pebble log forwarding. Juju >= 3.4 is needed." + logger.warning(msg) + return False + return True + + @staticmethod + def _build_log_target( + unit_name: str, loki_endpoint: str, topology: JujuTopology, enable: bool + ) -> Dict: + """Build a log target for the log forwarding Pebble layer. + + Log target's syntax for enabling/disabling forwarding is explained here: + https://github.com/canonical/pebble?tab=readme-ov-file#log-forwarding + """ + services_value = ["all"] if enable else ["-all"] + + log_target = { + "override": "replace", + "services": services_value, + "type": "loki", + "location": loki_endpoint, + } + if enable: + log_target.update( + { + "labels": { + "product": "Juju", + "charm": topology._charm_name, + "juju_model": topology._model, + "juju_model_uuid": topology._model_uuid, + "juju_application": topology._application, + "juju_unit": topology._unit, + }, + } + ) + + return {unit_name: log_target} + + @staticmethod + def _build_log_targets( + loki_endpoints: Optional[Dict[str, str]], topology: JujuTopology, enable: bool + ): + """Build all the targets for the log forwarding Pebble layer.""" + targets = {} + if not loki_endpoints: + return targets + + for unit_name, endpoint in loki_endpoints.items(): + targets.update( + _PebbleLogClient._build_log_target( + unit_name=unit_name, + loki_endpoint=endpoint, + topology=topology, + enable=enable, + ) + ) + return targets + + @staticmethod + def disable_inactive_endpoints( + container: Container, active_endpoints: Dict[str, str], topology: JujuTopology + ): + """Disable forwarding for inactive endpoints by checking against the Pebble plan.""" + pebble_layer = container.get_plan().to_dict().get("log-targets", None) + if not pebble_layer: + return + + for unit_name, target in pebble_layer.items(): + # If the layer is a disabled log forwarding endpoint, skip it + if "-all" in target["services"]: # pyright: ignore + continue + + if unit_name not in active_endpoints: + layer = Layer( + { # pyright: ignore + "log-targets": _PebbleLogClient._build_log_targets( + loki_endpoints={unit_name: "(removed)"}, + topology=topology, + enable=False, + ) + } + ) + container.add_layer(f"{container.name}-log-forwarding", layer=layer, combine=True) + + @staticmethod + def enable_endpoints( + container: Container, active_endpoints: Dict[str, str], topology: JujuTopology + ): + """Enable forwarding for the specified Loki endpoints.""" + layer = Layer( + { # pyright: ignore + "log-targets": _PebbleLogClient._build_log_targets( + loki_endpoints=active_endpoints, + topology=topology, + enable=True, + ) + } + ) + container.add_layer(f"{container.name}-log-forwarding", layer, combine=True) + + +class LogForwarder(ConsumerBase): + """Forward the standard outputs of all workloads operated by a charm to one or multiple Loki endpoints.""" + + def __init__( + self, + charm: CharmBase, + *, + relation_name: str = DEFAULT_RELATION_NAME, + alert_rules_path: str = DEFAULT_ALERT_RULES_RELATIVE_PATH, + recursive: bool = True, + skip_alert_topology_labeling: bool = False, + ): + _PebbleLogClient.check_juju_version() + super().__init__( + charm, relation_name, alert_rules_path, recursive, skip_alert_topology_labeling + ) + self._charm = charm + self._relation_name = relation_name + + on = self._charm.on[self._relation_name] + self.framework.observe(on.relation_joined, self._update_logging) + self.framework.observe(on.relation_changed, self._update_logging) + self.framework.observe(on.relation_departed, self._update_logging) + self.framework.observe(on.relation_broken, self._update_logging) + + def _update_logging(self, _): + """Update the log forwarding to match the active Loki endpoints.""" + loki_endpoints = {} + + # Get the endpoints from relation data + for relation in self._charm.model.relations[self._relation_name]: + loki_endpoints.update(self._fetch_endpoints(relation)) + + if not loki_endpoints: + logger.warning("No Loki endpoints available") + return + + for container in self._charm.unit.containers.values(): + _PebbleLogClient.disable_inactive_endpoints( + container=container, + active_endpoints=loki_endpoints, + topology=self.topology, + ) + _PebbleLogClient.enable_endpoints( + container=container, active_endpoints=loki_endpoints, topology=self.topology + ) + + def is_ready(self, relation: Optional[Relation] = None): + """Check if the relation is active and healthy.""" + if not relation: + relations = self._charm.model.relations[self._relation_name] + if not relations: + return False + return all(self.is_ready(relation) for relation in relations) + + try: + if self._extract_urls(relation): + return True + return False + except (KeyError, json.JSONDecodeError): + return False + + def _extract_urls(self, relation: Relation) -> Dict[str, str]: + """Default getter function to extract Loki endpoints from a relation. + + Returns: + A dictionary of remote units and the respective Loki endpoint. + { + "loki/0": "http://loki:3100/loki/api/v1/push", + "another-loki/0": "http://another-loki:3100/loki/api/v1/push", + } + """ + endpoints: Dict = {} + + for unit in relation.units: + endpoint = relation.data[unit]["endpoint"] + deserialized_endpoint = json.loads(endpoint) + url = deserialized_endpoint["url"] + endpoints[unit.name] = url + + return endpoints + + def _fetch_endpoints(self, relation: Relation) -> Dict[str, str]: + """Fetch Loki Push API endpoints from relation data using the endpoints getter.""" + endpoints: Dict = {} + + if not self.is_ready(relation): + logger.warning(f"The relation '{relation.name}' is not ready yet.") + return endpoints + + # if the code gets here, the function won't raise anymore because it's + # also called in is_ready() + endpoints = self._extract_urls(relation) + + return endpoints + + class CosTool: """Uses cos-tool to inject label matchers into alert rule expressions and validate rules.""" diff --git a/lib/charms/tls_certificates_interface/v2/tls_certificates.py b/lib/charms/tls_certificates_interface/v2/tls_certificates.py index b8855bea1..08c5cb500 100644 --- a/lib/charms/tls_certificates_interface/v2/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v2/tls_certificates.py @@ -308,13 +308,13 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 21 +LIBPATCH = 22 PYDEPS = ["cryptography", "jsonschema"] REQUIRER_JSON_SCHEMA = { "$schema": "http://json-schema.org/draft-04/schema#", - "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v2/schemas/requirer.json", # noqa: E501 + "$id": "https://canonical.github.io/charm-relation-interfaces/interfaces/tls_certificates/v1/schemas/requirer.json", "type": "object", "title": "`tls_certificates` requirer root schema", "description": "The `tls_certificates` root schema comprises the entire requirer databag for this interface.", # noqa: E501 @@ -349,7 +349,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven PROVIDER_JSON_SCHEMA = { "$schema": "http://json-schema.org/draft-04/schema#", - "$id": "https://canonical.github.io/charm-relation-interfaces/tls_certificates/v2/schemas/provider.json", # noqa: E501 + "$id": "https://canonical.github.io/charm-relation-interfaces/interfaces/tls_certificates/v1/schemas/provider.json", "type": "object", "title": "`tls_certificates` provider root schema", "description": "The `tls_certificates` root schema comprises the entire provider databag for this interface.", # noqa: E501 @@ -623,6 +623,40 @@ def _load_relation_data(relation_data_content: RelationDataContent) -> dict: return certificate_data +def _get_closest_future_time( + expiry_notification_time: datetime, expiry_time: datetime +) -> datetime: + """Return expiry_notification_time if not in the past, otherwise return expiry_time. + + Args: + expiry_notification_time (datetime): Notification time of impending expiration + expiry_time (datetime): Expiration time + + Returns: + datetime: expiry_notification_time if not in the past, expiry_time otherwise + """ + return ( + expiry_notification_time if datetime.utcnow() < expiry_notification_time else expiry_time + ) + + +def _get_certificate_expiry_time(certificate: str) -> Optional[datetime]: + """Extract expiry time from a certificate string. + + Args: + certificate (str): x509 certificate as a string + + Returns: + Optional[datetime]: Expiry datetime or None + """ + try: + certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) + return certificate_object.not_valid_after + except ValueError: + logger.warning("Could not load certificate.") + return None + + def generate_ca( private_key: bytes, subject: str, @@ -984,6 +1018,38 @@ def generate_csr( return signed_certificate.public_bytes(serialization.Encoding.PEM) +def csr_matches_certificate(csr: str, cert: str) -> bool: + """Check if a CSR matches a certificate. + + Args: + csr (str): Certificate Signing Request as a string + cert (str): Certificate as a string + Returns: + bool: True/False depending on whether the CSR matches the certificate. + """ + try: + csr_object = x509.load_pem_x509_csr(csr.encode("utf-8")) + cert_object = x509.load_pem_x509_certificate(cert.encode("utf-8")) + + if csr_object.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) != cert_object.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ): + return False + if ( + csr_object.public_key().public_numbers().n # type: ignore[union-attr] + != cert_object.public_key().public_numbers().n # type: ignore[union-attr] + ): + return False + except ValueError: + logger.warning("Could not load certificate or CSR.") + return False + return True + + class CertificatesProviderCharmEvents(CharmEvents): """List of events that the TLS Certificates provider charm can leverage.""" @@ -1447,7 +1513,7 @@ def __init__( @property def _requirer_csrs(self) -> List[Dict[str, Union[bool, str]]]: - """Returns list of requirer's CSRs from relation data. + """Returns list of requirer's CSRs from relation unit data. Example: [ @@ -1592,6 +1658,92 @@ def request_certificate_renewal( ) logger.info("Certificate renewal request completed.") + def get_assigned_certificates(self) -> List[Dict[str, str]]: + """Get a list of certificates that were assigned to this unit. + + Returns: + List of certificates. For example: + [ + { + "ca": "-----BEGIN CERTIFICATE-----...", + "chain": [ + "-----BEGIN CERTIFICATE-----..." + ], + "certificate": "-----BEGIN CERTIFICATE-----...", + "certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----...", + } + ] + """ + final_list = [] + for csr in self.get_certificate_signing_requests(fulfilled_only=True): + assert type(csr["certificate_signing_request"]) == str + if cert := self._find_certificate_in_relation_data(csr["certificate_signing_request"]): + final_list.append(cert) + return final_list + + def get_expiring_certificates(self) -> List[Dict[str, str]]: + """Get a list of certificates that were assigned to this unit that are expiring or expired. + + Returns: + List of certificates. For example: + [ + { + "ca": "-----BEGIN CERTIFICATE-----...", + "chain": [ + "-----BEGIN CERTIFICATE-----..." + ], + "certificate": "-----BEGIN CERTIFICATE-----...", + "certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----...", + } + ] + """ + final_list = [] + for csr in self.get_certificate_signing_requests(fulfilled_only=True): + assert type(csr["certificate_signing_request"]) == str + if cert := self._find_certificate_in_relation_data(csr["certificate_signing_request"]): + expiry_time = _get_certificate_expiry_time(cert["certificate"]) + if not expiry_time: + continue + expiry_notification_time = expiry_time - timedelta( + hours=self.expiry_notification_time + ) + if datetime.utcnow() > expiry_notification_time: + final_list.append(cert) + return final_list + + def get_certificate_signing_requests( + self, + fulfilled_only: bool = False, + unfulfilled_only: bool = False, + ) -> List[Dict[str, Union[bool, str]]]: + """Gets the list of CSR's that were sent to the provider. + + You can choose to get only the CSR's that have a certificate assigned or only the CSR's + that don't. + + Args: + fulfilled_only (bool): This option will discard CSRs that don't have certificates yet. + unfulfilled_only (bool): This option will discard CSRs that have certificates signed. + Returns: + List of CSR dictionaries. For example: + [ + { + "certificate_signing_request": "-----BEGIN CERTIFICATE REQUEST-----...", + "ca": false + } + ] + """ + + final_list = [] + for csr in self._requirer_csrs: + assert type(csr["certificate_signing_request"]) == str + cert = self._find_certificate_in_relation_data(csr["certificate_signing_request"]) + if (unfulfilled_only and cert) or (fulfilled_only and not cert): + continue + final_list.append(csr) + + return final_list + @staticmethod def _relation_data_is_valid(certificates_data: dict) -> bool: """Checks whether relation data is valid based on json schema. @@ -1802,71 +1954,3 @@ def _on_update_status(self, event: UpdateStatusEvent) -> None: certificate=certificate_dict["certificate"], expiry=expiry_time.isoformat(), ) - - -def csr_matches_certificate(csr: str, cert: str) -> bool: - """Check if a CSR matches a certificate. - - expects to get the original string representations. - - Args: - csr (str): Certificate Signing Request - cert (str): Certificate - Returns: - bool: True/False depending on whether the CSR matches the certificate. - """ - try: - csr_object = x509.load_pem_x509_csr(csr.encode("utf-8")) - cert_object = x509.load_pem_x509_certificate(cert.encode("utf-8")) - - if csr_object.public_key().public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ) != cert_object.public_key().public_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PublicFormat.SubjectPublicKeyInfo, - ): - return False - if ( - csr_object.public_key().public_numbers().n # type: ignore[union-attr] - != cert_object.public_key().public_numbers().n # type: ignore[union-attr] - ): - return False - except ValueError: - logger.warning("Could not load certificate or CSR.") - return False - return True - - -def _get_closest_future_time( - expiry_notification_time: datetime, expiry_time: datetime -) -> datetime: - """Return expiry_notification_time if not in the past, otherwise return expiry_time. - - Args: - expiry_notification_time (datetime): Notification time of impending expiration - expiry_time (datetime): Expiration time - - Returns: - datetime: expiry_notification_time if not in the past, expiry_time otherwise - """ - return ( - expiry_notification_time if datetime.utcnow() < expiry_notification_time else expiry_time - ) - - -def _get_certificate_expiry_time(certificate: str) -> Optional[datetime]: - """Extract expiry time from a certificate string. - - Args: - certificate (str): x509 certificate as a string - - Returns: - Optional[datetime]: Expiry datetime or None - """ - try: - certificate_object = x509.load_pem_x509_certificate(data=certificate.encode()) - return certificate_object.not_valid_after - except ValueError: - logger.warning("Could not load certificate.") - return None diff --git a/tests/unit/test_log_forwarder.py b/tests/unit/test_log_forwarder.py new file mode 100644 index 000000000..4463317b7 --- /dev/null +++ b/tests/unit/test_log_forwarder.py @@ -0,0 +1,95 @@ +# Copyright 2020 Canonical Ltd. +# See LICENSE file for licensing details. + +import json +import textwrap +import unittest + +from charms.loki_k8s.v1.loki_push_api import LogForwarder, _PebbleLogClient +from ops.charm import CharmBase +from ops.testing import Harness + + +class FakeCharm(CharmBase): + """Container charm for forwarding logs using the logforwarder class.""" + + metadata_yaml = textwrap.dedent( + """ + containers: + consumer: + resource: consumer-image + + requires: + logging: + interface: loki_push_api + """ + ) + + def __init__(self, *args): + super().__init__(*args) + self.log_forwarder = LogForwarder(self) + + +class TestLogForwarding(unittest.TestCase): + """Test that the Log Forwarder implementation works.""" + + def setUp(self): + self.harness = Harness(FakeCharm, meta=FakeCharm.metadata_yaml) + self.addCleanup(self.harness.cleanup) + self.harness.begin_with_initial_hooks() + + def test_handle_logging_with_relation_lifecycle(self): + rel_id = self.harness.add_relation("logging", "loki") + for i in range(2): + loki_unit = f"loki/{i}" + endpoint = f"http://loki-{i}:3100/loki/api/v1/push" + data = json.dumps({"url": f"{endpoint}"}) + self.harness.add_relation_unit(rel_id, loki_unit) + self.harness.set_planned_units(1) + self.harness.update_relation_data( + rel_id, + loki_unit, + {"endpoint": data}, + ) + relation_obj = self.harness.model.relations.get("logging")[0] + expected_endpoints = { + "loki/0": "http://loki-0:3100/loki/api/v1/push", + "loki/1": "http://loki-1:3100/loki/api/v1/push", + } + self.assertDictEqual( + self.harness.charm.log_forwarder._fetch_endpoints(relation_obj), expected_endpoints + ) + expected_layer_config = { + "loki/0": { + "override": "replace", + "type": "loki", + "location": "http://loki-0:3100/loki/api/v1/push", + "services": ["all"], + "labels": { + "product": "Juju", + "charm": self.harness.charm.log_forwarder.topology._charm_name, + "juju_model": self.harness.charm.log_forwarder.topology._model, + "juju_model_uuid": self.harness.charm.log_forwarder.topology._model_uuid, + "juju_application": self.harness.charm.log_forwarder.topology._application, + "juju_unit": self.harness.charm.log_forwarder.topology._unit, + }, + }, + "loki/1": { + "override": "replace", + "type": "loki", + "location": "http://loki-1:3100/loki/api/v1/push", + "services": ["all"], + "labels": { + "product": "Juju", + "charm": self.harness.charm.log_forwarder.topology._charm_name, + "juju_model": self.harness.charm.log_forwarder.topology._model, + "juju_model_uuid": self.harness.charm.log_forwarder.topology._model_uuid, + "juju_application": self.harness.charm.log_forwarder.topology._application, + "juju_unit": self.harness.charm.log_forwarder.topology._unit, + }, + }, + } + actual_layer_config = _PebbleLogClient._build_log_targets( + expected_endpoints, self.harness.charm.log_forwarder.topology, True + ) + self.assertDictEqual(expected_layer_config, actual_layer_config)