From 7e9abfd30bc2ec918296654b4135bb17ec88d180 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 09:04:44 -0300 Subject: [PATCH 1/6] Disable logical replication relation Signed-off-by: Marcelo Henrique Neppel --- metadata.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metadata.yaml b/metadata.yaml index 56a4bd2d7d..bf0f09565a 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -36,9 +36,9 @@ provides: interface: postgresql_async limit: 1 optional: true - logical-replication-offer: - interface: postgresql_logical_replication - optional: true +# logical-replication-offer: +# interface: postgresql_logical_replication +# optional: true database: interface: postgresql_client cos-agent: @@ -54,10 +54,10 @@ requires: interface: tls-certificates limit: 1 optional: true - logical-replication: - interface: postgresql_logical_replication - limit: 1 - optional: true +# logical-replication: +# interface: postgresql_logical_replication +# limit: 1 +# optional: true client-certificates: interface: tls-certificates limit: 1 From 969cccae487053fcc13223e46e951fe08284bc29 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 09:06:09 -0300 Subject: [PATCH 2/6] Disable logical replication integration tests Signed-off-by: Marcelo Henrique Neppel --- tests/spread/test_logical_replication.py/task.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/spread/test_logical_replication.py/task.yaml b/tests/spread/test_logical_replication.py/task.yaml index 1d60b23430..7b4eb6d1a0 100644 --- a/tests/spread/test_logical_replication.py/task.yaml +++ b/tests/spread/test_logical_replication.py/task.yaml @@ -1,7 +1,7 @@ -summary: test_logical_replication.py -environment: - TEST_MODULE: ha_tests/test_logical_replication.py -execute: | - tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" -artifacts: - - allure-results +# summary: test_logical_replication.py +# environment: +# TEST_MODULE: ha_tests/test_logical_replication.py +# execute: | +# tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +# artifacts: +# - allure-results From b169cbe744dc3d56279d90837248104e0d54d468 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 09:08:22 -0300 Subject: [PATCH 3/6] Disable logical replication config options Signed-off-by: Marcelo Henrique Neppel --- config.yaml | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/config.yaml b/config.yaml index 64174e8c25..6c2505dc9a 100644 --- a/config.yaml +++ b/config.yaml @@ -100,12 +100,12 @@ options: Allowed values are one of 'none', 'pl', 'all'. Enables tracking of function call counts and time used. Specify pl to track only procedural-language functions type: string - logical-replication-subscription-request: - description: | - Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from - publiblisher cluster to subscribe on via logical replication. - Example: {"": [".", ...], ...} - type: string +# logical-replication-subscription-request: +# description: | +# Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from +# publiblisher cluster to subscribe on via logical replication. +# Example: {"": [".
", ...], ...} +# type: string memory-maintenance-work-mem: description: | Sets the maximum memory (KB) to be used for maintenance operations. @@ -912,15 +912,15 @@ options: Enables tracking of function call counts and time used. Specify pl to track only procedural-language functions type: string default: "none" - logical_replication_subscription_request: - description: | - Deprecated: Use logical-replication-subscription-request instead. - - Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from - publiblisher cluster to subscribe on via logical replication. - Example: {"": [".
", ...], ...} - type: string - default: "{}" +# logical_replication_subscription_request: +# description: | +# Deprecated: Use logical-replication-subscription-request instead. +# +# Set of databases corresponding to list of tables with schema notation in JSON format, which will be requested from +# publiblisher cluster to subscribe on via logical replication. +# Example: {"": [".
", ...], ...} +# type: string +# default: "{}" memory_maintenance_work_mem: description: | Deprecated: Use memory-maintenance-work-mem instead. From 1c5c79a37d0f7fa6082b001186842262ff890aff Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 09:48:45 -0300 Subject: [PATCH 4/6] Completely disable logical replication within the charm code Signed-off-by: Marcelo Henrique Neppel --- src/backups.py | 22 +- src/charm.py | 35 +- src/config.py | 2 +- src/relations/logical_replication.py | 1326 +++++++++++++------------- tests/unit/test_charm.py | 18 +- 5 files changed, 697 insertions(+), 706 deletions(-) diff --git a/src/backups.py b/src/backups.py index 04e322a84e..ee67a00ab8 100644 --- a/src/backups.py +++ b/src/backups.py @@ -44,10 +44,6 @@ UNIT_SCOPE, ) from relations.async_replication import REPLICATION_CONSUMER_RELATION, REPLICATION_OFFER_RELATION -from relations.logical_replication import ( - LOGICAL_REPLICATION_OFFER_RELATION, - LOGICAL_REPLICATION_RELATION, -) logger = logging.getLogger(__name__) @@ -1240,15 +1236,15 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: return False logger.info("Checking that cluster does not have an active logical replication relation") - if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len( - self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) - ): - error_message = ( - "Unit cannot restore backup with an active logical replication connection" - ) - logger.error(f"Restore failed: {error_message}") - event.fail(error_message) - return False + # if self.model.get_relation(LOGICAL_REPLICATION_RELATION) or len( + # self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) + # ): + # error_message = ( + # "Unit cannot restore backup with an active logical replication connection" + # ) + # logger.error(f"Restore failed: {error_message}") + # event.fail(error_message) + # return False logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): diff --git a/src/charm.py b/src/charm.py index aca347891f..1b9d7c3e03 100755 --- a/src/charm.py +++ b/src/charm.py @@ -130,10 +130,6 @@ ) from ldap import PostgreSQLLDAP from relations.async_replication import PostgreSQLAsyncReplication -from relations.logical_replication import ( - LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS, - PostgreSQLLogicalReplication, -) from relations.postgresql_provider import PostgreSQLProvider from relations.tls import TLS from relations.tls_transfer import TLSTransfer @@ -339,7 +335,7 @@ def __init__(self, *args): self.tls = TLS(self, PEER) self.tls_transfer = TLSTransfer(self, PEER) self.async_replication = PostgreSQLAsyncReplication(self) - self.logical_replication = PostgreSQLLogicalReplication(self) + # self.logical_replication = PostgreSQLLogicalReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -1516,8 +1512,8 @@ def _on_config_changed(self, event) -> None: # noqa: C901 # Update the sync-standby endpoint in the async replication data. self.async_replication.update_async_replication_data() - if not self.logical_replication.apply_changed_config(event): - return + # if not self.logical_replication.apply_changed_config(event): + # return if not self.unit.is_leader(): return @@ -2001,7 +1997,7 @@ def _on_update_status(self, _) -> None: self.backup.coordinate_stanza_fields() - self.logical_replication.retry_validations() + # self.logical_replication.retry_validations() self._set_primary_status_message() @@ -2093,9 +2089,8 @@ def _can_run_on_update_status(self) -> bool: return False if ( - self.is_blocked - and self.unit.status not in S3_BLOCK_MESSAGES - and self.unit.status.message != LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS + self.is_blocked and self.unit.status not in S3_BLOCK_MESSAGES + # and self.unit.status.message != LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS ): # If charm was failing to disable plugin, try again (user may have removed the objects) if self.unit.status.message == EXTENSION_OBJECT_MESSAGE: @@ -2141,12 +2136,12 @@ def _set_primary_status_message(self) -> None: BlockedStatus(self.app_peer_data["s3-initialization-block-message"]) ) return - if self.unit.is_leader() and ( - self.app_peer_data.get("logical-replication-validation") == "error" - or self.logical_replication.has_remote_publisher_errors() - ): - self.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) - return + # if self.unit.is_leader() and ( + # self.app_peer_data.get("logical-replication-validation") == "error" + # or self.logical_replication.has_remote_publisher_errors() + # ): + # self.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) + # return if ( self._patroni.get_primary(unit_name_pattern=True) == self.unit.name or self.is_standby_leader @@ -2413,7 +2408,7 @@ def update_config( self.model.config, self.get_available_memory(), limit_memory ) - replication_slots = self.logical_replication.replication_slots() + # replication_slots = self.logical_replication.replication_slots() # Update and reload configuration based on TLS files availability. self._patroni.render_patroni_yml_file( @@ -2430,7 +2425,7 @@ def update_config( parameters=pg_parameters, no_peers=no_peers, user_databases_map=self.relations_user_databases_map, - slots=replication_slots, + # slots=replication_slots, ) if no_peers: return True @@ -2464,7 +2459,7 @@ def update_config( self._api_update_config() - self._patroni.ensure_slots_controller_by_patroni(replication_slots) + # self._patroni.ensure_slots_controller_by_patroni(replication_slots) self._handle_postgresql_restart_need() diff --git a/src/config.py b/src/config.py index 45e104f8e9..19943c0e17 100644 --- a/src/config.py +++ b/src/config.py @@ -52,7 +52,7 @@ class CharmConfig(BaseConfigModel): logging_log_lock_waits: bool | None = Field(default=None) logging_log_min_duration_statement: int | None = Field(ge=-1, le=2147483647, default=None) logging_track_functions: Literal["none", "pl", "all"] | None = Field(default=None) - logical_replication_subscription_request: str | None + # logical_replication_subscription_request: str | None memory_maintenance_work_mem: int | None = Field(ge=1024, le=2147483647, default=None) memory_max_prepared_transactions: int | None = Field(ge=0, le=262143, default=None) memory_shared_buffers: int | None = Field(ge=16, le=1073741823, default=None) diff --git a/src/relations/logical_replication.py b/src/relations/logical_replication.py index 3426801a51..77f83179c9 100644 --- a/src/relations/logical_replication.py +++ b/src/relations/logical_replication.py @@ -6,666 +6,666 @@ TODO: add description after specification is accepted. """ -import json -import logging -from typing import ( - TYPE_CHECKING, -) - -from ops import ( - BlockedStatus, - EventBase, - LeaderElectedEvent, - Object, - Relation, - RelationBrokenEvent, - RelationChangedEvent, - RelationDepartedEvent, - RelationJoinedEvent, - Secret, - SecretChangedEvent, - SecretNotFoundError, -) -from tenacity import Retrying, stop_after_delay, wait_fixed - -from cluster_topology_observer import ClusterTopologyChangeEvent -from utils import new_password - -if TYPE_CHECKING: - from charm import PostgresqlOperatorCharm - -logger = logging.getLogger(__name__) - -LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" -LOGICAL_REPLICATION_RELATION = "logical-replication" -SECRET_LABEL = "logical-replication-relation" # noqa: S105 -LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS = "Logical replication setup is invalid. Check logs" - - -class PostgreSQLLogicalReplication(Object): - """Defines the logical-replication logic.""" - - def __init__(self, charm: "PostgresqlOperatorCharm"): - super().__init__(charm, "postgresql_logical_replication") - self.charm = charm - # Relations - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, - self._on_offer_relation_joined, - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, - self._on_offer_relation_changed, - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_departed, - self._on_offer_relation_departed, - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_broken, - self._on_offer_relation_broken, - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_RELATION].relation_joined, self._on_relation_joined - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, self._on_relation_changed - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_RELATION].relation_departed, - self._on_relation_departed, - ) - self.charm.framework.observe( - self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken - ) - # Events - self.charm.framework.observe( - self.charm.on.cluster_topology_change, self._on_cluster_topology_change - ) - self.charm.framework.observe( - self.charm.on.leader_elected, self._on_cluster_topology_change - ) - self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) - - # region Relations - - def _on_offer_relation_joined(self, event: RelationJoinedEvent) -> None: - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} join early exit due to unit not being a leader" - ) - return - if not self.charm.primary_endpoint: - logger.debug( - f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} join due to primary unavailability" - ) - event.defer() - return - - secret = self._get_secret(event.relation.id) - logger.debug( - f"Sharing logical replciation secret to the {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id}" - ) - secret.grant(event.relation) - - self._save_published_resources_info(str(event.relation.id), secret.id, {}) # type: ignore - event.relation.data[self.model.app]["secret-id"] = secret.id # type: ignore - - def _on_offer_relation_changed(self, event: RelationChangedEvent) -> None: - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} change early exit due to unit not being a leader" - ) - return - if not self.charm.primary_endpoint: - logger.debug( - f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} change due to primary unavailability" - ) - event.defer() - return - self._process_offer(event.relation) - - def _on_offer_relation_departed(self, event: RelationDepartedEvent) -> None: - if event.departing_unit == self.charm.unit and self.charm._peers is not None: - logger.debug( - f"Marking unit as departed for {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} to skip break" - ) - self.charm.unit_peer_data.update({"departing": "True"}) - - def _on_offer_relation_broken(self, event: RelationBrokenEvent) -> None: - if not self.charm._peers or self.charm.is_unit_departing: - logger.debug( - f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break early exit due to unit departure" - ) - return - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break early exit due to unit not being a leader" - ) - return - if not self.charm.primary_endpoint: - logger.debug( - f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break due to primary unavailability" - ) - event.defer() - return - - published_resources = json.loads( - self.charm.app_peer_data.get("logical-replication-published-resources", "{}") - ) - active_relation_ids = [ - str(relation.id) - for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) - ] - - for relation_id, relation_resources in published_resources.copy().items(): - if relation_id in active_relation_ids: - continue - logger.info( - f"Cleaning up published logical replication resources for the redundant {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" - ) - try: - secret = self.model.get_secret(id=relation_resources["secret-id"]) - self.charm.postgresql.delete_user(secret.peek_content()["username"]) - secret.remove_all_revisions() - except SecretNotFoundError: - pass - for database, publication in relation_resources["publications"].items(): - self.charm.postgresql.drop_publication(database, publication["publication-name"]) - del published_resources[relation_id] - self.charm.app_peer_data["logical-replication-published-resources"] = json.dumps( - published_resources - ) - - self.charm.update_config() - - def _on_relation_joined(self, event: RelationJoinedEvent) -> None: - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} join early exit due to unit not being a leader" - ) - return - if self.charm.app_peer_data.get("logical-replication-validation") == "ongoing": - logger.debug( - f"Deferring {LOGICAL_REPLICATION_RELATION} #{event.relation.id} join due to still ongoing logical replication config validation" - ) - event.defer() - return - if self.charm.app_peer_data.get("logical-replication-validation") == "error": - logger.debug( - f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} join early exit due to validation error" - ) - return - if not self._validate_subscription_request(): - return - event.relation.data[self.model.app]["subscription-request"] = ( - self.charm.config.logical_replication_subscription_request or "" - ) - - def _on_relation_changed(self, event: RelationChangedEvent) -> None: - if not self._relation_changed_checks(event): - return - - for error in json.loads(event.relation.data[event.app].get("errors", "[]")): - logger.error( - f"Got logical replication error from the publisher in {LOGICAL_REPLICATION_RELATION} #{event.relation.id}: {error}" - ) - self.charm.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) - - secret_content = self.model.get_secret( - id=event.relation.data[event.app]["secret-id"] - ).get_content(refresh=True) - subscriptions = self._subscriptions_info() - publications = json.loads(event.relation.data[event.app].get("publications", "{}")) - - for database, publication in publications.items(): - subscription_name = self._subscription_name(event.relation.id, database) - if database in subscriptions: - self.charm.postgresql.refresh_subscription(database, subscription_name) - logger.info( - f"Refreshed subscription {subscription_name} in database {database} due to relation change" - ) - else: - publication_name = publication["publication-name"] - for attempt in Retrying( - stop=stop_after_delay(120), wait=wait_fixed(3), reraise=True - ): - with attempt: - self.charm.postgresql.create_subscription( - subscription_name, - secret_content["primary"], - database, - secret_content["username"], - secret_content["password"], - publication_name, - publication["replication-slot-name"], - ) - logger.info( - f"Created new subscription {subscription_name} for publication {publication_name} in database {database}" - ) - subscriptions[database] = subscription_name - - for database, subscription in subscriptions.copy().items(): - if database in publications: - continue - self.charm.postgresql.drop_subscription(database, subscription) - logger.info(f"Dropped redundant subscription {subscription} from database {database}") - del subscriptions[database] - - self.charm.app_peer_data["logical-replication-subscriptions"] = json.dumps({ - str(event.relation.id): subscriptions - }) - - def _on_relation_departed(self, event: RelationDepartedEvent) -> None: - if event.departing_unit == self.charm.unit and self.charm._peers is not None: - self.charm.unit_peer_data.update({"departing": "True"}) - - def _on_relation_broken(self, event: RelationBrokenEvent) -> None: - if not self.charm._peers or self.charm.is_unit_departing: - logger.debug(f"{LOGICAL_REPLICATION_RELATION} break skipped due to departing unit") - return - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} break early exit due to unit not being a leader" - ) - return - if not self.charm.primary_endpoint: - logger.debug( - f"Deferring {LOGICAL_REPLICATION_RELATION} break until primary is available" - ) - event.defer() - return - - for database, subscription in self._subscriptions_info().items(): - self.charm.postgresql.drop_subscription(database, subscription) - logger.info( - f"Dropped subscription {subscription} from database {database} due to relation break" - ) - self.charm.app_peer_data["logical-replication-subscriptions"] = "" - - # endregion - - # region Events - - def _on_cluster_topology_change( - self, event: ClusterTopologyChangeEvent | LeaderElectedEvent - ) -> None: - if not self.charm.unit.is_leader(): - logger.debug( - "Logical replication tolopoly change early exit due to unit not being a leader" - ) - return - if not len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())): - logger.debug( - f"Logical replication tolopoly change early exit due to {LOGICAL_REPLICATION_OFFER_RELATION} connections absence" - ) - return - if not self.charm.primary_endpoint: - logger.debug( - "Deferring logical replication topology change until primary is available" - ) - event.defer() - return - for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): - self._get_secret(relation.id) - - def _on_secret_changed(self, event: SecretChangedEvent) -> None: - if not self.charm.unit.is_leader(): - logger.debug( - "Logical replication secret change early exit due to unit not being a leader" - ) - return - if not self.charm.primary_endpoint: - logger.debug("Deferring logical replication secret change until primary is available") - event.defer() - return - - if ( - (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)) - and event.secret.label - and event.secret.label.startswith(SECRET_LABEL) - ): - logger.info("Logical replication secret changed, updating subscriptions") - secret_content = self.model.get_secret( - id=relation.data[relation.app]["secret-id"], label=SECRET_LABEL - ).get_content(refresh=True) - for database, subscription in self._subscriptions_info().items(): - self.charm.postgresql.update_subscription( - database, - subscription, - secret_content["primary"], - secret_content["username"], - secret_content["password"], - ) - - # endregion - - def apply_changed_config(self, event: EventBase) -> bool: - """Validate & apply (relation) logical_replication_subscription_request config parameter.""" - if not self.charm.unit.is_leader(): - return True - if not self.charm.primary_endpoint: - logger.debug( - "Marking logical replication config validation as ongoing and deferring event until primary as available" - ) - self.charm.app_peer_data["logical-replication-validation"] = "ongoing" - event.defer() - return False - if self._validate_subscription_request(): - self._apply_updated_subscription_request() - return True - - def retry_validations(self) -> None: - """Run recurrent logical replication validation attempt. - - For subscribers - try to validate & apply subscription request. - For publishers - try to validate & process all the offer relations. - """ - if not self.charm.unit.is_leader() or not self.charm.primary_endpoint: - return - if ( - self.charm.app_peer_data.get("logical-replication-validation") == "error" - and self._validate_subscription_request() - ): - self._apply_updated_subscription_request() - for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): - if json.loads(relation.data[self.model.app].get("errors", "[]")): - self._process_offer(relation) - - def has_remote_publisher_errors(self) -> bool: - """Check if remote publisher in logical-replication relation has any errors.""" - return bool( - relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION) - ) and json.loads(relation.data[relation.app].get("errors", "[]")) - - def replication_slots(self) -> dict[str, str]: - """Get list of all managed replication slots. - - Returns: dictionary in : format. - """ - return { - publication["replication-slot-name"]: database - for resources in json.loads( - self.charm.app_peer_data.get("logical-replication-published-resources", "{}") - ).values() - for database, publication in resources["publications"].items() - } - - def _apply_updated_subscription_request(self) -> None: - if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): - return - logger.debug( - "Logical replication config validation is passed, applying config to the active relations" - ) - subscription_request_config = json.loads( - self.charm.config.logical_replication_subscription_request or "{}" - ) - subscriptions = self._subscriptions_info() - relation.data[self.model.app]["subscription-request"] = ( # type: ignore - self.charm.config.logical_replication_subscription_request - ) - for database, subscription in subscriptions.copy().items(): - if database in subscription_request_config: - continue - self.charm.postgresql.drop_subscription(database, subscription) - logger.info(f"Dropped redundant subscription {subscription} from database {database}") - del subscriptions[database] - self.charm.app_peer_data["logical-replication-subscriptions"] = json.dumps({ - str(relation.id): subscriptions - }) - - def _validate_subscription_request(self) -> bool: - try: - subscription_request_config = json.loads( - self.charm.config.logical_replication_subscription_request or "{}" - ) - except json.JSONDecodeError as err: - return self._fail_validation(f"JSON decode error {err}") - - relation = self.model.get_relation(LOGICAL_REPLICATION_RELATION) - subscription_request_relation = ( - json.loads(relation.data[self.model.app].get("subscription-request", "{}")) - if relation - else {} - ) - - for database, schematables in subscription_request_config.items(): - if not self.charm.postgresql.database_exists(database): - return self._fail_validation(f"database {database} doesn't exist") - for schematable in schematables: - try: - schema, table = schematable.split(".") - except ValueError: - return self._fail_validation(f"table format isn't right at {schematable}") - if not self.charm.postgresql.table_exists(database, schema, table): - return self._fail_validation( - f"table {schematable} in database {database} doesn't exist" - ) - already_subscribed = ( - database in subscription_request_relation - and schematable in subscription_request_relation[database] - ) - if not already_subscribed and not self.charm.postgresql.is_table_empty( - database, schema, table - ): - return self._fail_validation( - f"table {schematable} in database {database} isn't empty" - ) - - self.charm.app_peer_data["logical-replication-validation"] = "" - return True - - def _fail_validation(self, message: str | None = None) -> bool: - if message: - logger.error(f"Logical replication validation: {message}") - self.charm.app_peer_data["logical-replication-validation"] = "error" - self.charm.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) - return False - - def _validate_new_publication( - self, - database: str, - schematables: list[str], - publication_schematables: list[str] | None = None, - ) -> str | None: - if not self.charm.postgresql.database_exists(database): - return f"database {database} doesn't exist" - for schematable in schematables: - if publication_schematables is not None and schematable in publication_schematables: - continue - schema, table = schematable.split(".") - if not self.charm.postgresql.table_exists(database, schema, table): - return f"table {schematable} in database {database} doesn't exist" - return None - - def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: - if not self.charm.unit.is_leader(): - logger.debug( - f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} change early exit due to unit not being a leader" - ) - return False - if not event.relation.data[event.app].get("secret-id"): - logger.warning( - f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} change early exit due to secret absence in remote application bag (unusual behavior)" - ) - return False - if not self.charm.primary_endpoint: - logger.debug( - f"Deferring {LOGICAL_REPLICATION_RELATION} #{event.relation.id} change due to primary unavailability" - ) - event.defer() - return False - return True - - def _process_offer(self, relation: Relation) -> None: - logger.debug( - f"Started processing offer for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - - subscriptions_request = json.loads( - relation.data[relation.app].get("subscription-request", "{}") - ) - publications = json.loads(relation.data[self.model.app].get("publications", "{}")) - secret = self._get_secret(relation.id) - user = secret.peek_content()["username"] - errors = [] - - for database, publication in publications.copy().items(): - if database in subscriptions_request: - continue - logger.info( - f"Dropping redundant publication {publication['publication-name']} in database {database} from {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.drop_publication(database, publication["publication-name"]) - del publications[database] - logger.info( - f"Revoking replication privileges on database {database} from user {user} from {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.revoke_replication_privileges( - user, database, publication["tables"] - ) - - for database, tables in subscriptions_request.items(): - if database not in publications: - if validation_error := self._validate_new_publication(database, tables): - errors.append(validation_error) - logger.error( - f"Cannot create new publication for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {validation_error}" - ) - continue - publication_name = self._publication_name(relation.id, database) - if self.charm.postgresql.publication_exists(database, publication_name): - error = f"conflicting publication {publication_name} in database {database}" - errors.append(error) - logger.error( - f"Cannot create new publication for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {error}" - ) - continue - logger.info( - f"Granting replication privileges on database {database} for user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.grant_replication_privileges(user, database, tables) - logger.info( - f"Creating new publication {publication_name} for tables {', '.join(tables)} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.create_publication(database, publication_name, tables) - publications[database] = { - "publication-name": publication_name, - "replication-slot-name": self._replication_slot_name(relation.id, database), - "tables": tables, - } - elif sorted(publication_tables := publications[database]["tables"]) != sorted(tables): - publication_name = publications[database]["publication-name"] - if validation_error := self._validate_new_publication( - database, tables, publication_tables - ): - errors.append(validation_error) - logger.error( - f"Cannot alter publication {publication_name} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {validation_error}" - ) - continue - if not self.charm.postgresql.publication_exists(database, publication_name): - errors.append( - f"managed publication {publication_name} in database {database} can't be found" - ) - logger.error( - f"Can't find managed publication {publication_name} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - continue - logger.info( - f"Altering replication privileges on database {database} for user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.grant_replication_privileges( - user, database, tables, publication_tables - ) - logger.info( - f"Altering publication {publication_name} tables from {','.join(publication_tables)} to {','.join(tables)} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - self.charm.postgresql.alter_publication(database, publication_name, tables) - publications[database]["tables"] = tables - self._save_published_resources_info(str(relation.id), secret.id, publications) # type: ignore - relation.data[self.model.app]["publications"] = json.dumps(publications) - - self._save_published_resources_info(str(relation.id), secret.id, publications) # type: ignore - relation.data[self.model.app].update({ - "errors": json.dumps(errors), - "publications": json.dumps(publications), - }) - self.charm.update_config() - - logger.debug( - f"Successfully processed offer for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" - ) - - def _publication_name(self, relation_id: int, database: str) -> str: - return f"relation_{relation_id}_{database}" - - def _replication_slot_name(self, relation_id: int, database: str) -> str: - return f"relation_{relation_id}_{database}" - - def _subscription_name(self, relation_id: int, database: str) -> str: - return f"relation_{relation_id}_{database}" - - def _save_published_resources_info( - self, - relation_id: str, - secret_id: str, - publications: dict[str, dict[str, str | list[str]]], - ) -> None: - published_resources = json.loads( - self.charm.app_peer_data.get("logical-replication-published-resources", "{}") - ) - published_resources[relation_id] = { - "secret-id": secret_id, - "publications": publications, - } - self.charm.app_peer_data["logical-replication-published-resources"] = json.dumps( - published_resources - ) - - def _subscriptions_info(self) -> dict[str, str]: - for subscriptions_info in json.loads( - self.charm.app_peer_data.get("logical-replication-subscriptions", "{}") - ).values(): - return subscriptions_info - return {} - - def _create_user(self, relation_id: int) -> tuple[str, str]: - user = f"logical_replication_relation_{relation_id}" - password = new_password() - logger.info( - f"Creating new user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" - ) - self.charm.postgresql.create_user(user, password, replication=True) - return user, password - - def _get_secret(self, relation_id: int) -> Secret: - """Returns logical replication secret. Updates, if content changed.""" - secret_label = f"{SECRET_LABEL}-{relation_id}" - primary = self.charm.primary_endpoint - try: - # Avoid recreating the secret. - secret = self.charm.model.get_secret(label=secret_label) - if not secret.id: - # Workaround for the secret id not being set with model uuid. - secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" - if (content := secret.peek_content())["primary"] != primary: - logger.debug( - f"Updating secret for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" - ) - content["primary"] = primary # type: ignore - secret.set_content(content) - return secret - except SecretNotFoundError: - logger.debug( - f"Creating new secret for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" - ) - username, password = self._create_user(relation_id) - return self.charm.model.app.add_secret( - content={ - "primary": primary, # type: ignore - "username": username, - "password": password, - }, - label=secret_label, - ) +# import json +# import logging +# from typing import ( +# TYPE_CHECKING, +# ) +# +# from ops import ( +# BlockedStatus, +# EventBase, +# LeaderElectedEvent, +# Object, +# Relation, +# RelationBrokenEvent, +# RelationChangedEvent, +# RelationDepartedEvent, +# RelationJoinedEvent, +# Secret, +# SecretChangedEvent, +# SecretNotFoundError, +# ) +# from tenacity import Retrying, stop_after_delay, wait_fixed +# +# from cluster_topology_observer import ClusterTopologyChangeEvent +# from utils import new_password +# +# if TYPE_CHECKING: +# from charm import PostgresqlOperatorCharm +# +# logger = logging.getLogger(__name__) +# +# LOGICAL_REPLICATION_OFFER_RELATION = "logical-replication-offer" +# LOGICAL_REPLICATION_RELATION = "logical-replication" +# SECRET_LABEL = "logical-replication-relation" +# LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS = "Logical replication setup is invalid. Check logs" +# +# +# class PostgreSQLLogicalReplication(Object): +# """Defines the logical-replication logic.""" +# +# def __init__(self, charm: "PostgresqlOperatorCharm"): +# super().__init__(charm, "postgresql_logical_replication") +# self.charm = charm +# # Relations +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_joined, +# self._on_offer_relation_joined, +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_changed, +# self._on_offer_relation_changed, +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_departed, +# self._on_offer_relation_departed, +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_OFFER_RELATION].relation_broken, +# self._on_offer_relation_broken, +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_RELATION].relation_joined, self._on_relation_joined +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_RELATION].relation_changed, self._on_relation_changed +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_RELATION].relation_departed, +# self._on_relation_departed, +# ) +# self.charm.framework.observe( +# self.charm.on[LOGICAL_REPLICATION_RELATION].relation_broken, self._on_relation_broken +# ) +# # Events +# self.charm.framework.observe( +# self.charm.on.cluster_topology_change, self._on_cluster_topology_change +# ) +# self.charm.framework.observe( +# self.charm.on.leader_elected, self._on_cluster_topology_change +# ) +# self.framework.observe(self.charm.on.secret_changed, self._on_secret_changed) +# +# # region Relations +# +# def _on_offer_relation_joined(self, event: RelationJoinedEvent) -> None: +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} join early exit due to unit not being a leader" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} join due to primary unavailability" +# ) +# event.defer() +# return +# +# secret = self._get_secret(event.relation.id) +# logger.debug( +# f"Sharing logical replciation secret to the {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id}" +# ) +# secret.grant(event.relation) +# +# self._save_published_resources_info(str(event.relation.id), secret.id, {}) # type: ignore +# event.relation.data[self.model.app]["secret-id"] = secret.id # type: ignore +# +# def _on_offer_relation_changed(self, event: RelationChangedEvent) -> None: +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} change early exit due to unit not being a leader" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} change due to primary unavailability" +# ) +# event.defer() +# return +# self._process_offer(event.relation) +# +# def _on_offer_relation_departed(self, event: RelationDepartedEvent) -> None: +# if event.departing_unit == self.charm.unit and self.charm._peers is not None: +# logger.debug( +# f"Marking unit as departed for {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} to skip break" +# ) +# self.charm.unit_peer_data.update({"departing": "True"}) +# +# def _on_offer_relation_broken(self, event: RelationBrokenEvent) -> None: +# if not self.charm._peers or self.charm.is_unit_departing: +# logger.debug( +# f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break early exit due to unit departure" +# ) +# return +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break early exit due to unit not being a leader" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_OFFER_RELATION} #{event.relation.id} break due to primary unavailability" +# ) +# event.defer() +# return +# +# published_resources = json.loads( +# self.charm.app_peer_data.get("logical-replication-published-resources", "{}") +# ) +# active_relation_ids = [ +# str(relation.id) +# for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()) +# ] +# +# for relation_id, relation_resources in published_resources.copy().items(): +# if relation_id in active_relation_ids: +# continue +# logger.info( +# f"Cleaning up published logical replication resources for the redundant {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" +# ) +# try: +# secret = self.model.get_secret(id=relation_resources["secret-id"]) +# self.charm.postgresql.delete_user(secret.peek_content()["username"]) +# secret.remove_all_revisions() +# except SecretNotFoundError: +# pass +# for database, publication in relation_resources["publications"].items(): +# self.charm.postgresql.drop_publication(database, publication["publication-name"]) +# del published_resources[relation_id] +# self.charm.app_peer_data["logical-replication-published-resources"] = json.dumps( +# published_resources +# ) +# +# self.charm.update_config() +# +# def _on_relation_joined(self, event: RelationJoinedEvent) -> None: +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} join early exit due to unit not being a leader" +# ) +# return +# if self.charm.app_peer_data.get("logical-replication-validation") == "ongoing": +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_RELATION} #{event.relation.id} join due to still ongoing logical replication config validation" +# ) +# event.defer() +# return +# if self.charm.app_peer_data.get("logical-replication-validation") == "error": +# logger.debug( +# f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} join early exit due to validation error" +# ) +# return +# if not self._validate_subscription_request(): +# return +# event.relation.data[self.model.app]["subscription-request"] = ( +# self.charm.config.logical_replication_subscription_request or "" +# ) +# +# def _on_relation_changed(self, event: RelationChangedEvent) -> None: +# if not self._relation_changed_checks(event): +# return +# +# for error in json.loads(event.relation.data[event.app].get("errors", "[]")): +# logger.error( +# f"Got logical replication error from the publisher in {LOGICAL_REPLICATION_RELATION} #{event.relation.id}: {error}" +# ) +# self.charm.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) +# +# secret_content = self.model.get_secret( +# id=event.relation.data[event.app]["secret-id"] +# ).get_content(refresh=True) +# subscriptions = self._subscriptions_info() +# publications = json.loads(event.relation.data[event.app].get("publications", "{}")) +# +# for database, publication in publications.items(): +# subscription_name = self._subscription_name(event.relation.id, database) +# if database in subscriptions: +# self.charm.postgresql.refresh_subscription(database, subscription_name) +# logger.info( +# f"Refreshed subscription {subscription_name} in database {database} due to relation change" +# ) +# else: +# publication_name = publication["publication-name"] +# for attempt in Retrying( +# stop=stop_after_delay(120), wait=wait_fixed(3), reraise=True +# ): +# with attempt: +# self.charm.postgresql.create_subscription( +# subscription_name, +# secret_content["primary"], +# database, +# secret_content["username"], +# secret_content["password"], +# publication_name, +# publication["replication-slot-name"], +# ) +# logger.info( +# f"Created new subscription {subscription_name} for publication {publication_name} in database {database}" +# ) +# subscriptions[database] = subscription_name +# +# for database, subscription in subscriptions.copy().items(): +# if database in publications: +# continue +# self.charm.postgresql.drop_subscription(database, subscription) +# logger.info(f"Dropped redundant subscription {subscription} from database {database}") +# del subscriptions[database] +# +# self.charm.app_peer_data["logical-replication-subscriptions"] = json.dumps({ +# str(event.relation.id): subscriptions +# }) +# +# def _on_relation_departed(self, event: RelationDepartedEvent) -> None: +# if event.departing_unit == self.charm.unit and self.charm._peers is not None: +# self.charm.unit_peer_data.update({"departing": "True"}) +# +# def _on_relation_broken(self, event: RelationBrokenEvent) -> None: +# if not self.charm._peers or self.charm.is_unit_departing: +# logger.debug(f"{LOGICAL_REPLICATION_RELATION} break skipped due to departing unit") +# return +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} break early exit due to unit not being a leader" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_RELATION} break until primary is available" +# ) +# event.defer() +# return +# +# for database, subscription in self._subscriptions_info().items(): +# self.charm.postgresql.drop_subscription(database, subscription) +# logger.info( +# f"Dropped subscription {subscription} from database {database} due to relation break" +# ) +# self.charm.app_peer_data["logical-replication-subscriptions"] = "" +# +# # endregion +# +# # region Events +# +# def _on_cluster_topology_change( +# self, event: ClusterTopologyChangeEvent | LeaderElectedEvent +# ) -> None: +# if not self.charm.unit.is_leader(): +# logger.debug( +# "Logical replication tolopoly change early exit due to unit not being a leader" +# ) +# return +# if not len(self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ())): +# logger.debug( +# f"Logical replication tolopoly change early exit due to {LOGICAL_REPLICATION_OFFER_RELATION} connections absence" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug( +# "Deferring logical replication topology change until primary is available" +# ) +# event.defer() +# return +# for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): +# self._get_secret(relation.id) +# +# def _on_secret_changed(self, event: SecretChangedEvent) -> None: +# if not self.charm.unit.is_leader(): +# logger.debug( +# "Logical replication secret change early exit due to unit not being a leader" +# ) +# return +# if not self.charm.primary_endpoint: +# logger.debug("Deferring logical replication secret change until primary is available") +# event.defer() +# return +# +# if ( +# (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)) +# and event.secret.label +# and event.secret.label.startswith(SECRET_LABEL) +# ): +# logger.info("Logical replication secret changed, updating subscriptions") +# secret_content = self.model.get_secret( +# id=relation.data[relation.app]["secret-id"], label=SECRET_LABEL +# ).get_content(refresh=True) +# for database, subscription in self._subscriptions_info().items(): +# self.charm.postgresql.update_subscription( +# database, +# subscription, +# secret_content["primary"], +# secret_content["username"], +# secret_content["password"], +# ) +# +# # endregion +# +# def apply_changed_config(self, event: EventBase) -> bool: +# """Validate & apply (relation) logical_replication_subscription_request config parameter.""" +# if not self.charm.unit.is_leader(): +# return True +# if not self.charm.primary_endpoint: +# logger.debug( +# "Marking logical replication config validation as ongoing and deferring event until primary as available" +# ) +# self.charm.app_peer_data["logical-replication-validation"] = "ongoing" +# event.defer() +# return False +# if self._validate_subscription_request(): +# self._apply_updated_subscription_request() +# return True +# +# def retry_validations(self) -> None: +# """Run recurrent logical replication validation attempt. +# +# For subscribers - try to validate & apply subscription request. +# For publishers - try to validate & process all the offer relations. +# """ +# if not self.charm.unit.is_leader() or not self.charm.primary_endpoint: +# return +# if ( +# self.charm.app_peer_data.get("logical-replication-validation") == "error" +# and self._validate_subscription_request() +# ): +# self._apply_updated_subscription_request() +# for relation in self.model.relations.get(LOGICAL_REPLICATION_OFFER_RELATION, ()): +# if json.loads(relation.data[self.model.app].get("errors", "[]")): +# self._process_offer(relation) +# +# def has_remote_publisher_errors(self) -> bool: +# """Check if remote publisher in logical-replication relation has any errors.""" +# return bool( +# relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION) +# ) and json.loads(relation.data[relation.app].get("errors", "[]")) +# +# def replication_slots(self) -> dict[str, str]: +# """Get list of all managed replication slots. +# +# Returns: dictionary in : format. +# """ +# return { +# publication["replication-slot-name"]: database +# for resources in json.loads( +# self.charm.app_peer_data.get("logical-replication-published-resources", "{}") +# ).values() +# for database, publication in resources["publications"].items() +# } +# +# def _apply_updated_subscription_request(self) -> None: +# if not (relation := self.model.get_relation(LOGICAL_REPLICATION_RELATION)): +# return +# logger.debug( +# "Logical replication config validation is passed, applying config to the active relations" +# ) +# subscription_request_config = json.loads( +# self.charm.config.logical_replication_subscription_request or "{}" +# ) +# subscriptions = self._subscriptions_info() +# relation.data[self.model.app]["subscription-request"] = ( # type: ignore +# self.charm.config.logical_replication_subscription_request +# ) +# for database, subscription in subscriptions.copy().items(): +# if database in subscription_request_config: +# continue +# self.charm.postgresql.drop_subscription(database, subscription) +# logger.info(f"Dropped redundant subscription {subscription} from database {database}") +# del subscriptions[database] +# self.charm.app_peer_data["logical-replication-subscriptions"] = json.dumps({ +# str(relation.id): subscriptions +# }) +# +# def _validate_subscription_request(self) -> bool: +# try: +# subscription_request_config = json.loads( +# self.charm.config.logical_replication_subscription_request or "{}" +# ) +# except json.JSONDecodeError as err: +# return self._fail_validation(f"JSON decode error {err}") +# +# relation = self.model.get_relation(LOGICAL_REPLICATION_RELATION) +# subscription_request_relation = ( +# json.loads(relation.data[self.model.app].get("subscription-request", "{}")) +# if relation +# else {} +# ) +# +# for database, schematables in subscription_request_config.items(): +# if not self.charm.postgresql.database_exists(database): +# return self._fail_validation(f"database {database} doesn't exist") +# for schematable in schematables: +# try: +# schema, table = schematable.split(".") +# except ValueError: +# return self._fail_validation(f"table format isn't right at {schematable}") +# if not self.charm.postgresql.table_exists(database, schema, table): +# return self._fail_validation( +# f"table {schematable} in database {database} doesn't exist" +# ) +# already_subscribed = ( +# database in subscription_request_relation +# and schematable in subscription_request_relation[database] +# ) +# if not already_subscribed and not self.charm.postgresql.is_table_empty( +# database, schema, table +# ): +# return self._fail_validation( +# f"table {schematable} in database {database} isn't empty" +# ) +# +# self.charm.app_peer_data["logical-replication-validation"] = "" +# return True +# +# def _fail_validation(self, message: str | None = None) -> bool: +# if message: +# logger.error(f"Logical replication validation: {message}") +# self.charm.app_peer_data["logical-replication-validation"] = "error" +# self.charm.unit.status = BlockedStatus(LOGICAL_REPLICATION_VALIDATION_ERROR_STATUS) +# return False +# +# def _validate_new_publication( +# self, +# database: str, +# schematables: list[str], +# publication_schematables: list[str] | None = None, +# ) -> str | None: +# if not self.charm.postgresql.database_exists(database): +# return f"database {database} doesn't exist" +# for schematable in schematables: +# if publication_schematables is not None and schematable in publication_schematables: +# continue +# schema, table = schematable.split(".") +# if not self.charm.postgresql.table_exists(database, schema, table): +# return f"table {schematable} in database {database} doesn't exist" +# return None +# +# def _relation_changed_checks(self, event: RelationChangedEvent) -> bool: +# if not self.charm.unit.is_leader(): +# logger.debug( +# f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} change early exit due to unit not being a leader" +# ) +# return False +# if not event.relation.data[event.app].get("secret-id"): +# logger.warning( +# f"{LOGICAL_REPLICATION_RELATION} #{event.relation.id} change early exit due to secret absence in remote application bag (unusual behavior)" +# ) +# return False +# if not self.charm.primary_endpoint: +# logger.debug( +# f"Deferring {LOGICAL_REPLICATION_RELATION} #{event.relation.id} change due to primary unavailability" +# ) +# event.defer() +# return False +# return True +# +# def _process_offer(self, relation: Relation) -> None: +# logger.debug( +# f"Started processing offer for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# +# subscriptions_request = json.loads( +# relation.data[relation.app].get("subscription-request", "{}") +# ) +# publications = json.loads(relation.data[self.model.app].get("publications", "{}")) +# secret = self._get_secret(relation.id) +# user = secret.peek_content()["username"] +# errors = [] +# +# for database, publication in publications.copy().items(): +# if database in subscriptions_request: +# continue +# logger.info( +# f"Dropping redundant publication {publication['publication-name']} in database {database} from {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.drop_publication(database, publication["publication-name"]) +# del publications[database] +# logger.info( +# f"Revoking replication privileges on database {database} from user {user} from {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.revoke_replication_privileges( +# user, database, publication["tables"] +# ) +# +# for database, tables in subscriptions_request.items(): +# if database not in publications: +# if validation_error := self._validate_new_publication(database, tables): +# errors.append(validation_error) +# logger.error( +# f"Cannot create new publication for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {validation_error}" +# ) +# continue +# publication_name = self._publication_name(relation.id, database) +# if self.charm.postgresql.publication_exists(database, publication_name): +# error = f"conflicting publication {publication_name} in database {database}" +# errors.append(error) +# logger.error( +# f"Cannot create new publication for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {error}" +# ) +# continue +# logger.info( +# f"Granting replication privileges on database {database} for user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.grant_replication_privileges(user, database, tables) +# logger.info( +# f"Creating new publication {publication_name} for tables {', '.join(tables)} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.create_publication(database, publication_name, tables) +# publications[database] = { +# "publication-name": publication_name, +# "replication-slot-name": self._replication_slot_name(relation.id, database), +# "tables": tables, +# } +# elif sorted(publication_tables := publications[database]["tables"]) != sorted(tables): +# publication_name = publications[database]["publication-name"] +# if validation_error := self._validate_new_publication( +# database, tables, publication_tables +# ): +# errors.append(validation_error) +# logger.error( +# f"Cannot alter publication {publication_name} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}: {validation_error}" +# ) +# continue +# if not self.charm.postgresql.publication_exists(database, publication_name): +# errors.append( +# f"managed publication {publication_name} in database {database} can't be found" +# ) +# logger.error( +# f"Can't find managed publication {publication_name} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# continue +# logger.info( +# f"Altering replication privileges on database {database} for user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.grant_replication_privileges( +# user, database, tables, publication_tables +# ) +# logger.info( +# f"Altering publication {publication_name} tables from {','.join(publication_tables)} to {','.join(tables)} in database {database} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# self.charm.postgresql.alter_publication(database, publication_name, tables) +# publications[database]["tables"] = tables +# self._save_published_resources_info(str(relation.id), secret.id, publications) # type: ignore +# relation.data[self.model.app]["publications"] = json.dumps(publications) +# +# self._save_published_resources_info(str(relation.id), secret.id, publications) # type: ignore +# relation.data[self.model.app].update({ +# "errors": json.dumps(errors), +# "publications": json.dumps(publications), +# }) +# self.charm.update_config() +# +# logger.debug( +# f"Successfully processed offer for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation.id}" +# ) +# +# def _publication_name(self, relation_id: int, database: str) -> str: +# return f"relation_{relation_id}_{database}" +# +# def _replication_slot_name(self, relation_id: int, database: str) -> str: +# return f"relation_{relation_id}_{database}" +# +# def _subscription_name(self, relation_id: int, database: str) -> str: +# return f"relation_{relation_id}_{database}" +# +# def _save_published_resources_info( +# self, +# relation_id: str, +# secret_id: str, +# publications: dict[str, dict[str, str | list[str]]], +# ) -> None: +# published_resources = json.loads( +# self.charm.app_peer_data.get("logical-replication-published-resources", "{}") +# ) +# published_resources[relation_id] = { +# "secret-id": secret_id, +# "publications": publications, +# } +# self.charm.app_peer_data["logical-replication-published-resources"] = json.dumps( +# published_resources +# ) +# +# def _subscriptions_info(self) -> dict[str, str]: +# for subscriptions_info in json.loads( +# self.charm.app_peer_data.get("logical-replication-subscriptions", "{}") +# ).values(): +# return subscriptions_info +# return {} +# +# def _create_user(self, relation_id: int) -> tuple[str, str]: +# user = f"logical_replication_relation_{relation_id}" +# password = new_password() +# logger.info( +# f"Creating new user {user} for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" +# ) +# self.charm.postgresql.create_user(user, password, replication=True) +# return user, password +# +# def _get_secret(self, relation_id: int) -> Secret: +# """Returns logical replication secret. Updates, if content changed.""" +# secret_label = f"{SECRET_LABEL}-{relation_id}" +# primary = self.charm.primary_endpoint +# try: +# # Avoid recreating the secret. +# secret = self.charm.model.get_secret(label=secret_label) +# if not secret.id: +# # Workaround for the secret id not being set with model uuid. +# secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" +# if (content := secret.peek_content())["primary"] != primary: +# logger.debug( +# f"Updating secret for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" +# ) +# content["primary"] = primary # type: ignore +# secret.set_content(content) +# return secret +# except SecretNotFoundError: +# logger.debug( +# f"Creating new secret for {LOGICAL_REPLICATION_OFFER_RELATION} #{relation_id}" +# ) +# username, password = self._create_user(relation_id) +# return self.charm.model.app.add_secret( +# content={ +# "primary": primary, # type: ignore +# "username": username, +# "password": password, +# }, +# label=secret_label, +# ) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index a9ccfa1f6c..18fc39fb42 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -241,10 +241,10 @@ def test_on_config_changed(harness): "charm.PostgresqlOperatorCharm.is_cluster_initialised", new_callable=PropertyMock ) as _is_cluster_initialised, patch("charm.PostgresqlOperatorCharm.update_endpoint_addresses"), - patch( - "relations.logical_replication.PostgreSQLLogicalReplication.apply_changed_config", - return_value=True, - ), + # patch( + # "relations.logical_replication.PostgreSQLLogicalReplication.apply_changed_config", + # return_value=True, + # ), ): # Test when the cluster was not initialised yet. _is_cluster_initialised.return_value = False @@ -368,9 +368,9 @@ def test_enable_disable_extensions(harness, caplog): synchronous_node_count: type: string default: "all" - logical_replication_subscription_request: - type: string - default: "{}" + # logical_replication_subscription_request: + # type: string + # default: "{}" plugin_citext_enable: default: false type: boolean @@ -1177,7 +1177,7 @@ class _MockSnap: parameters={"test": "test"}, no_peers=False, user_databases_map={"operator": "all", "replication": "all", "rewind": "all"}, - slots={}, + # slots={}, ) _handle_postgresql_restart_need.assert_called_once_with() _restart_ldap_sync_service.assert_called_once() @@ -1208,7 +1208,7 @@ class _MockSnap: parameters={"test": "test"}, no_peers=False, user_databases_map={"operator": "all", "replication": "all", "rewind": "all"}, - slots={}, + # slots={}, ) _handle_postgresql_restart_need.assert_called_once() _restart_ldap_sync_service.assert_called_once() From 3e4449025dc74a8737629c41bc30efe2f6442622 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 09:58:40 -0300 Subject: [PATCH 5/6] Fix spread test Signed-off-by: Marcelo Henrique Neppel --- .../test_logical_replication.py/task.yaml | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/spread/test_logical_replication.py/task.yaml b/tests/spread/test_logical_replication.py/task.yaml index 7b4eb6d1a0..1a70ab1175 100644 --- a/tests/spread/test_logical_replication.py/task.yaml +++ b/tests/spread/test_logical_replication.py/task.yaml @@ -1,7 +1,10 @@ -# summary: test_logical_replication.py -# environment: -# TEST_MODULE: ha_tests/test_logical_replication.py -# execute: | -# tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" -# artifacts: -# - allure-results +summary: test_logical_replication.py +environment: + TEST_MODULE: ha_tests/test_logical_replication.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results +systems: + - -ubuntu-24.04 + - -ubuntu-24.04-arm From 71905a4db256ba2c8e18e7adf0ecf86b4951844a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 Oct 2025 12:32:11 -0300 Subject: [PATCH 6/6] Catch exception to allow retry without error Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/charm.py b/src/charm.py index 1b9d7c3e03..24a7eb9a80 100755 --- a/src/charm.py +++ b/src/charm.py @@ -79,6 +79,7 @@ PostgreSQLGetCurrentTimelineError, PostgreSQLGrantDatabasePrivilegesToUserError, PostgreSQLListUsersError, + PostgreSQLUndefinedHostError, PostgreSQLUpdateUserPasswordError, ) from tenacity import RetryError, Retrying, retry, stop_after_attempt, stop_after_delay, wait_fixed @@ -1566,7 +1567,7 @@ def enable_disable_extensions(self, database: str | None = None) -> None: ) self.set_unit_status(BlockedStatus(EXTENSION_OBJECT_MESSAGE)) return - except PostgreSQLEnableDisableExtensionError as e: + except (PostgreSQLEnableDisableExtensionError, PostgreSQLUndefinedHostError) as e: logger.exception("failed to change plugins: %s", str(e)) if original_status.message == EXTENSION_OBJECT_MESSAGE: self.set_unit_status(ActiveStatus())