diff --git a/lib/charms/data_platform_libs/v0/data_interfaces.py b/lib/charms/data_platform_libs/v0/data_interfaces.py index c0ddec58c9..021de1dc89 100644 --- a/lib/charms/data_platform_libs/v0/data_interfaces.py +++ b/lib/charms/data_platform_libs/v0/data_interfaces.py @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 46 +LIBPATCH = 48 PYDEPS = ["ops>=2.0.0"] @@ -2569,7 +2569,7 @@ def __init__( ################################################################################ -# Cross-charm Relatoins Data Handling and Evenets +# Cross-charm Relations Data Handling and Events ################################################################################ # Generic events @@ -3268,7 +3268,7 @@ def __init__( # Kafka Events -class KafkaProvidesEvent(RelationEvent): +class KafkaProvidesEvent(RelationEventWithSecret): """Base class for Kafka events.""" @property @@ -3287,6 +3287,40 @@ def consumer_group_prefix(self) -> Optional[str]: return self.relation.data[self.relation.app].get("consumer-group-prefix") + @property + def mtls_cert(self) -> Optional[str]: + """Returns TLS cert of the client.""" + if not self.relation.app: + return None + + if not self.secrets_enabled: + raise SecretsUnavailableError("Secrets unavailable on current Juju version") + + secret_field = f"{PROV_SECRET_PREFIX}{SECRET_GROUPS.MTLS}" + if secret_uri := self.relation.data[self.app].get(secret_field): + secret = self.framework.model.get_secret(id=secret_uri) + content = secret.get_content(refresh=True) + if content: + return content.get("mtls-cert") + + +class KafkaClientMtlsCertUpdatedEvent(KafkaProvidesEvent): + """Event emitted when the mtls relation is updated.""" + + def __init__(self, handle, relation, old_mtls_cert: Optional[str] = None, app=None, unit=None): + super().__init__(handle, relation, app, unit) + + self.old_mtls_cert = old_mtls_cert + + def snapshot(self): + """Return a snapshot of the event.""" + return super().snapshot() | {"old_mtls_cert": self.old_mtls_cert} + + def restore(self, snapshot): + """Restore the event from a snapshot.""" + super().restore(snapshot) + self.old_mtls_cert = snapshot["old_mtls_cert"] + class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent): """Event emitted when a new topic is requested for use on this relation.""" @@ -3299,6 +3333,7 @@ class KafkaProvidesEvents(CharmEvents): """ topic_requested = EventSource(TopicRequestedEvent) + mtls_cert_updated = EventSource(KafkaClientMtlsCertUpdatedEvent) class KafkaRequiresEvent(RelationEvent): @@ -3416,6 +3451,13 @@ def __init__(self, charm: CharmBase, relation_data: KafkaProviderData) -> None: def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: """Event emitted when the relation has changed.""" super()._on_relation_changed_event(event) + + new_data_keys = list(event.relation.data[event.app].keys()) + if any(newval for newval in new_data_keys if self.relation_data._is_secret_field(newval)): + self.relation_data._register_secrets_to_relation(event.relation, new_data_keys) + + getattr(self.on, "mtls_cert_updated").emit(event.relation, app=event.app, unit=event.unit) + # Leader only if not self.relation_data.local_unit.is_leader(): return @@ -3430,6 +3472,33 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: event.relation, app=event.app, unit=event.unit ) + def _on_secret_changed_event(self, event: SecretChangedEvent): + """Event notifying about a new value of a secret.""" + if not event.secret.label: + return + + relation = self.relation_data._relation_from_secret_label(event.secret.label) + if not relation: + logging.info( + f"Received secret {event.secret.label} but couldn't parse, seems irrelevant" + ) + return + + if relation.app == self.charm.app: + logging.info("Secret changed event ignored for Secret Owner") + + remote_unit = None + for unit in relation.units: + if unit.app != self.charm.app: + remote_unit = unit + + old_mtls_cert = event.secret.get_content().get("mtls-cert") + # mtls-cert is the only secret that can be updated + logger.info("mtls-cert updated") + getattr(self.on, "mtls_cert_updated").emit( + relation, app=relation.app, unit=remote_unit, old_mtls_cert=old_mtls_cert + ) + class KafkaProvides(KafkaProviderData, KafkaProviderEventHandlers): """Provider-side of the Kafka relation.""" @@ -3450,11 +3519,13 @@ def __init__( extra_user_roles: Optional[str] = None, consumer_group_prefix: Optional[str] = None, additional_secret_fields: Optional[List[str]] = [], + mtls_cert: Optional[str] = None, ): """Manager of Kafka client relations.""" super().__init__(model, relation_name, extra_user_roles, additional_secret_fields) self.topic = topic self.consumer_group_prefix = consumer_group_prefix or "" + self.mtls_cert = mtls_cert @property def topic(self): @@ -3468,6 +3539,15 @@ def topic(self, value): raise ValueError(f"Error on topic '{value}', cannot be a wildcard.") self._topic = value + def set_mtls_cert(self, relation_id: int, mtls_cert: str) -> None: + """Set the mtls cert in the application relation databag / secret. + + Args: + relation_id: the identifier for a particular relation. + mtls_cert: mtls cert. + """ + self.update_relation_data(relation_id, {"mtls-cert": mtls_cert}) + class KafkaRequirerEventHandlers(RequirerEventHandlers): """Requires-side of the Kafka relation.""" @@ -3489,6 +3569,9 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None: # Sets topic, extra user roles, and "consumer-group-prefix" in the relation relation_data = {"topic": self.relation_data.topic} + if self.relation_data.mtls_cert: + relation_data["mtls-cert"] = self.relation_data.mtls_cert + if self.relation_data.extra_user_roles: relation_data["extra-user-roles"] = self.relation_data.extra_user_roles @@ -3547,15 +3630,17 @@ def __init__( extra_user_roles: Optional[str] = None, consumer_group_prefix: Optional[str] = None, additional_secret_fields: Optional[List[str]] = [], + mtls_cert: Optional[str] = None, ) -> None: KafkaRequirerData.__init__( self, charm.model, relation_name, topic, - extra_user_roles, - consumer_group_prefix, - additional_secret_fields, + extra_user_roles=extra_user_roles, + consumer_group_prefix=consumer_group_prefix, + additional_secret_fields=additional_secret_fields, + mtls_cert=mtls_cert, ) KafkaRequirerEventHandlers.__init__(self, charm, self) @@ -3675,6 +3760,10 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None: event.relation, app=event.app, unit=event.unit ) + def _on_secret_changed_event(self, event: SecretChangedEvent) -> None: + """Event emitted when the relation data has changed.""" + pass + class OpenSearchProvides(OpenSearchProvidesData, OpenSearchProvidesEventHandlers): """Provider-side of the OpenSearch relation.""" diff --git a/src/backups.py b/src/backups.py index 4c414d75e8..74f754b276 100644 --- a/src/backups.py +++ b/src/backups.py @@ -1262,6 +1262,7 @@ def _render_pgbackrest_conf_file(self) -> bool: storage_path=self.charm._storage_path, user=BACKUP_USER, retention_full=s3_parameters["delete-older-than-days"], + process_max=max(os.cpu_count() - 2, 1), ) # Render pgBackRest config file. self.charm._patroni.render_file(f"{PGBACKREST_CONF_PATH}/pgbackrest.conf", rendered, 0o644) diff --git a/templates/pgbackrest.conf.j2 b/templates/pgbackrest.conf.j2 index 46776c2e7c..bf43b83516 100644 --- a/templates/pgbackrest.conf.j2 +++ b/templates/pgbackrest.conf.j2 @@ -1,5 +1,6 @@ [global] backup-standby=y +compress-type=zst lock-path=/tmp log-path={{ log_path }} repo1-retention-full-type=time @@ -47,3 +48,6 @@ pg{{ ns.count }}-user={{ user }} {% set ns.count = ns.count + 1 %} {%- endfor %} {%- endif %} + +[global:restore] +process-max={{process_max}} diff --git a/tests/unit/test_backups.py b/tests/unit/test_backups.py index 5cd5cac8c4..2f93daa663 100644 --- a/tests/unit/test_backups.py +++ b/tests/unit/test_backups.py @@ -1,5 +1,6 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. +from os import cpu_count from pathlib import PosixPath from subprocess import CompletedProcess, TimeoutExpired from unittest.mock import ANY, MagicMock, PropertyMock, call, mock_open, patch @@ -1754,6 +1755,7 @@ def test_render_pgbackrest_conf_file(harness, tls_ca_chain_filename): storage_path=harness.charm._storage_path, user="backup", retention_full=30, + process_max=max(cpu_count() - 2, 1), ) # Patch the `open` method with our mock.