Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 95 additions & 6 deletions lib/charms/data_platform_libs/v0/data_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def _on_topic_requested(self, event: TopicRequestedEvent):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 46
LIBPATCH = 48

PYDEPS = ["ops>=2.0.0"]

Expand Down Expand Up @@ -2569,7 +2569,7 @@ def __init__(


################################################################################
# Cross-charm Relatoins Data Handling and Evenets
# Cross-charm Relations Data Handling and Events
################################################################################

# Generic events
Expand Down Expand Up @@ -3268,7 +3268,7 @@ def __init__(
# Kafka Events


class KafkaProvidesEvent(RelationEvent):
class KafkaProvidesEvent(RelationEventWithSecret):
"""Base class for Kafka events."""

@property
Expand All @@ -3287,6 +3287,40 @@ def consumer_group_prefix(self) -> Optional[str]:

return self.relation.data[self.relation.app].get("consumer-group-prefix")

@property
def mtls_cert(self) -> Optional[str]:
"""Returns TLS cert of the client."""
if not self.relation.app:
return None

if not self.secrets_enabled:
raise SecretsUnavailableError("Secrets unavailable on current Juju version")

secret_field = f"{PROV_SECRET_PREFIX}{SECRET_GROUPS.MTLS}"
if secret_uri := self.relation.data[self.app].get(secret_field):
secret = self.framework.model.get_secret(id=secret_uri)
content = secret.get_content(refresh=True)
if content:
return content.get("mtls-cert")


class KafkaClientMtlsCertUpdatedEvent(KafkaProvidesEvent):
"""Event emitted when the mtls relation is updated."""

def __init__(self, handle, relation, old_mtls_cert: Optional[str] = None, app=None, unit=None):
super().__init__(handle, relation, app, unit)

self.old_mtls_cert = old_mtls_cert

def snapshot(self):
"""Return a snapshot of the event."""
return super().snapshot() | {"old_mtls_cert": self.old_mtls_cert}

def restore(self, snapshot):
"""Restore the event from a snapshot."""
super().restore(snapshot)
self.old_mtls_cert = snapshot["old_mtls_cert"]


class TopicRequestedEvent(KafkaProvidesEvent, ExtraRoleEvent):
"""Event emitted when a new topic is requested for use on this relation."""
Expand All @@ -3299,6 +3333,7 @@ class KafkaProvidesEvents(CharmEvents):
"""

topic_requested = EventSource(TopicRequestedEvent)
mtls_cert_updated = EventSource(KafkaClientMtlsCertUpdatedEvent)


class KafkaRequiresEvent(RelationEvent):
Expand Down Expand Up @@ -3416,6 +3451,13 @@ def __init__(self, charm: CharmBase, relation_data: KafkaProviderData) -> None:
def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
"""Event emitted when the relation has changed."""
super()._on_relation_changed_event(event)

new_data_keys = list(event.relation.data[event.app].keys())
if any(newval for newval in new_data_keys if self.relation_data._is_secret_field(newval)):
self.relation_data._register_secrets_to_relation(event.relation, new_data_keys)

getattr(self.on, "mtls_cert_updated").emit(event.relation, app=event.app, unit=event.unit)

# Leader only
if not self.relation_data.local_unit.is_leader():
return
Expand All @@ -3430,6 +3472,33 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
event.relation, app=event.app, unit=event.unit
)

def _on_secret_changed_event(self, event: SecretChangedEvent):
"""Event notifying about a new value of a secret."""
if not event.secret.label:
return

relation = self.relation_data._relation_from_secret_label(event.secret.label)
if not relation:
logging.info(
f"Received secret {event.secret.label} but couldn't parse, seems irrelevant"
)
return

if relation.app == self.charm.app:
logging.info("Secret changed event ignored for Secret Owner")

remote_unit = None
for unit in relation.units:
if unit.app != self.charm.app:
remote_unit = unit

old_mtls_cert = event.secret.get_content().get("mtls-cert")
# mtls-cert is the only secret that can be updated
logger.info("mtls-cert updated")
getattr(self.on, "mtls_cert_updated").emit(
relation, app=relation.app, unit=remote_unit, old_mtls_cert=old_mtls_cert
)


class KafkaProvides(KafkaProviderData, KafkaProviderEventHandlers):
"""Provider-side of the Kafka relation."""
Expand All @@ -3450,11 +3519,13 @@ def __init__(
extra_user_roles: Optional[str] = None,
consumer_group_prefix: Optional[str] = None,
additional_secret_fields: Optional[List[str]] = [],
mtls_cert: Optional[str] = None,
):
"""Manager of Kafka client relations."""
super().__init__(model, relation_name, extra_user_roles, additional_secret_fields)
self.topic = topic
self.consumer_group_prefix = consumer_group_prefix or ""
self.mtls_cert = mtls_cert

@property
def topic(self):
Expand All @@ -3468,6 +3539,15 @@ def topic(self, value):
raise ValueError(f"Error on topic '{value}', cannot be a wildcard.")
self._topic = value

def set_mtls_cert(self, relation_id: int, mtls_cert: str) -> None:
"""Set the mtls cert in the application relation databag / secret.

Args:
relation_id: the identifier for a particular relation.
mtls_cert: mtls cert.
"""
self.update_relation_data(relation_id, {"mtls-cert": mtls_cert})


class KafkaRequirerEventHandlers(RequirerEventHandlers):
"""Requires-side of the Kafka relation."""
Expand All @@ -3489,6 +3569,9 @@ def _on_relation_created_event(self, event: RelationCreatedEvent) -> None:
# Sets topic, extra user roles, and "consumer-group-prefix" in the relation
relation_data = {"topic": self.relation_data.topic}

if self.relation_data.mtls_cert:
relation_data["mtls-cert"] = self.relation_data.mtls_cert

if self.relation_data.extra_user_roles:
relation_data["extra-user-roles"] = self.relation_data.extra_user_roles

Expand Down Expand Up @@ -3547,15 +3630,17 @@ def __init__(
extra_user_roles: Optional[str] = None,
consumer_group_prefix: Optional[str] = None,
additional_secret_fields: Optional[List[str]] = [],
mtls_cert: Optional[str] = None,
) -> None:
KafkaRequirerData.__init__(
self,
charm.model,
relation_name,
topic,
extra_user_roles,
consumer_group_prefix,
additional_secret_fields,
extra_user_roles=extra_user_roles,
consumer_group_prefix=consumer_group_prefix,
additional_secret_fields=additional_secret_fields,
mtls_cert=mtls_cert,
)
KafkaRequirerEventHandlers.__init__(self, charm, self)

Expand Down Expand Up @@ -3675,6 +3760,10 @@ def _on_relation_changed_event(self, event: RelationChangedEvent) -> None:
event.relation, app=event.app, unit=event.unit
)

def _on_secret_changed_event(self, event: SecretChangedEvent) -> None:
"""Event emitted when the relation data has changed."""
pass


class OpenSearchProvides(OpenSearchProvidesData, OpenSearchProvidesEventHandlers):
"""Provider-side of the OpenSearch relation."""
Expand Down
1 change: 1 addition & 0 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,7 @@ def _render_pgbackrest_conf_file(self) -> bool:
storage_path=self.charm._storage_path,
user=BACKUP_USER,
retention_full=s3_parameters["delete-older-than-days"],
process_max=max(os.cpu_count() - 2, 1),
)
# Render pgBackRest config file.
self.charm._patroni.render_file(f"{PGBACKREST_CONF_PATH}/pgbackrest.conf", rendered, 0o644)
Expand Down
4 changes: 4 additions & 0 deletions templates/pgbackrest.conf.j2
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[global]
backup-standby=y
compress-type=zst
lock-path=/tmp
log-path={{ log_path }}
repo1-retention-full-type=time
Expand Down Expand Up @@ -47,3 +48,6 @@ pg{{ ns.count }}-user={{ user }}
{% set ns.count = ns.count + 1 %}
{%- endfor %}
{%- endif %}

[global:restore]
process-max={{process_max}}
2 changes: 2 additions & 0 deletions tests/unit/test_backups.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
from os import cpu_count
from pathlib import PosixPath
from subprocess import CompletedProcess, TimeoutExpired
from unittest.mock import ANY, MagicMock, PropertyMock, call, mock_open, patch
Expand Down Expand Up @@ -1754,6 +1755,7 @@ def test_render_pgbackrest_conf_file(harness, tls_ca_chain_filename):
storage_path=harness.charm._storage_path,
user="backup",
retention_full=30,
process_max=max(cpu_count() - 2, 1),
)

# Patch the `open` method with our mock.
Expand Down