From f8ecbf86657503c61f6efc28789fdd8badf8f2d2 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 17 Apr 2024 17:13:48 -0300 Subject: [PATCH 1/7] Add async replication implementation Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 6 + lib/charms/postgresql_k8s/v0/postgresql.py | 14 +- metadata.yaml | 8 + poetry.lock | 39 +- pyproject.toml | 6 +- src/backups.py | 13 + src/charm.py | 96 ++- src/constants.py | 1 + src/patroni.py | 70 ++- src/relations/async_replication.py | 642 +++++++++++++++++++++ templates/patroni.yml.j2 | 12 + tests/unit/test_charm.py | 52 +- 12 files changed, 895 insertions(+), 64 deletions(-) create mode 100644 src/relations/async_replication.py diff --git a/actions.yaml b/actions.yaml index 03a90556cc..ba46f1108b 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,12 @@ list-backups: description: Lists backups in s3 storage in AWS. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-cluster: + description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. + params: + force-promotion: + type: boolean + description: Force the promotion of a cluster when there is already a primary cluster. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index b0c30b5d5e..8783f76814 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 25 +LIBPATCH = 26 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -476,11 +476,11 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return + # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -490,6 +490,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") diff --git a/metadata.yaml b/metadata.yaml index f1a385b7f6..54182e451a 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,6 +39,10 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication + limit: 1 + optional: true database: interface: postgresql_client db: @@ -51,6 +55,10 @@ provides: interface: grafana_dashboard requires: + async-replica: + interface: async_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/poetry.lock b/poetry.lock index 23257c2625..3b442d7261 100644 --- a/poetry.lock +++ b/poetry.lock @@ -131,17 +131,17 @@ typecheck = ["mypy"] [[package]] name = "boto3" -version = "1.34.84" +version = "1.34.86" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.34.84-py3-none-any.whl", hash = "sha256:7a02f44af32095946587d748ebeb39c3fa15b9d7275307ff612a6760ead47e04"}, - {file = "boto3-1.34.84.tar.gz", hash = "sha256:91e6343474173e9b82f603076856e1d5b7b68f44247bdd556250857a3f16b37b"}, + {file = "boto3-1.34.86-py3-none-any.whl", hash = "sha256:be594c449a0079bd1898ba1b7d90e0e5ac6b5803b2ada03993da01179073808d"}, + {file = "boto3-1.34.86.tar.gz", hash = "sha256:992ba74459fef2bf1572050408db73d33c43e7531d81bda85a027f39156926a1"}, ] [package.dependencies] -botocore = ">=1.34.84,<1.35.0" +botocore = ">=1.34.86,<1.35.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -150,13 +150,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.34.84" +version = "1.34.86" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.34.84-py3-none-any.whl", hash = "sha256:da1ae0a912e69e10daee2a34dafd6c6c106450d20b8623665feceb2d96c173eb"}, - {file = "botocore-1.34.84.tar.gz", hash = "sha256:a2b309bf5594f0eb6f63f355ade79ba575ce8bf672e52e91da1a7933caa245e6"}, + {file = "botocore-1.34.86-py3-none-any.whl", hash = "sha256:57c1e3b2e1db745d22c45cbd761bbc0c143d2cfc2b532e3245cf5d874aa30b6d"}, + {file = "botocore-1.34.86.tar.gz", hash = "sha256:2fd62b63d8788e15629bfc95be1bd2d99c0da6c1d45ef1f40c0a0101e412f6b5"}, ] [package.dependencies] @@ -165,7 +165,7 @@ python-dateutil = ">=2.1,<3.0.0" urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} [package.extras] -crt = ["awscrt (==0.19.19)"] +crt = ["awscrt (==0.20.9)"] [[package]] name = "cachetools" @@ -976,13 +976,13 @@ files = [ [[package]] name = "matplotlib-inline" -version = "0.1.6" +version = "0.1.7" description = "Inline Matplotlib backend for Jupyter" optional = false -python-versions = ">=3.5" +python-versions = ">=3.8" files = [ - {file = "matplotlib-inline-0.1.6.tar.gz", hash = "sha256:f887e5f10ba98e8d2b150ddcf4702c1e5f8b3a20005eb0f74bfdbd360ee6f304"}, - {file = "matplotlib_inline-0.1.6-py3-none-any.whl", hash = "sha256:f1f41aab5328aa5aaea9b16d083b128102f8712542f819fe7e6a420ff581b311"}, + {file = "matplotlib_inline-0.1.7-py3-none-any.whl", hash = "sha256:df192d39a4ff8f21b1895d72e6a13f5fcc5099f00fa84384e0ea28c2cc0653ca"}, + {file = "matplotlib_inline-0.1.7.tar.gz", hash = "sha256:8423b23ec666be3d16e16b60bdd8ac4e86e840ebd1dd11a30b9f117f2fa0ab90"}, ] [package.dependencies] @@ -1517,8 +1517,8 @@ develop = false [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/github_secrets" [[package]] @@ -1572,8 +1572,8 @@ pyyaml = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/pytest_operator_cache" [[package]] @@ -1591,8 +1591,8 @@ pytest = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/pytest_operator_groups" [[package]] @@ -1645,7 +1645,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -2145,4 +2144,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "a7797f2edfcdb4005c42df91b8dac4bb389b9abcf015b1f990d5509145880b6a" +content-hash = "65a72ffc063a7e1ceda9f4419123a99269b6e4a9d2d5a6144945cecb80ba9c4b" diff --git a/pyproject.toml b/pyproject.toml index 9b231a34f5..019ba62aae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,10 +68,10 @@ optional = true [tool.poetry.group.integration.dependencies] lightkube = "^0.15.2" pytest = "^8.1.1" -pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/github_secrets"} +pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/github_secrets"} pytest-operator = "^0.34.0" -pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/pytest_operator_cache"} -pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/pytest_operator_groups"} +pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/pytest_operator_cache"} +pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/pytest_operator_groups"} juju = "^3.2.2" psycopg2-binary = "^2.9.9" boto3 = "^1.34.84" diff --git a/src/backups.py b/src/backups.py index 3bf9ebe8c7..952658e736 100644 --- a/src/backups.py +++ b/src/backups.py @@ -26,6 +26,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER +from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION logger = logging.getLogger(__name__) @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for relation in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not relation: + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" diff --git a/src/charm.py b/src/charm.py index 512a2438c8..258a3cccc1 100755 --- a/src/charm.py +++ b/src/charm.py @@ -81,6 +81,7 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -176,6 +177,7 @@ def __init__(self, *args): self.legacy_db_admin_relation = DbProvides(self, admin=True) self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint]) + self.async_replication = PostgreSQLAsyncReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -350,6 +352,18 @@ def _get_endpoints_to_remove(self) -> List[str]: endpoints_to_remove = list(set(old) - set(current)) return endpoints_to_remove + def get_unit_ip(self, unit: Unit) -> Optional[str]: + """Get the IP address of a specific unit.""" + # Check if host is current host. + if unit == self.unit: + return str(self.model.get_binding(PEER).network.bind_address) + # Check if host is a peer. + elif unit in self._peers.data: + return str(self._peers.data[unit].get("private-address")) + # Return None if the unit is not a peer neither the current unit. + else: + return None + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """The leader removes the departing units from the list of cluster members.""" # Allow leader to update endpoints if it isn't leaving. @@ -367,6 +381,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_peer_relation_changed(self, event: HookEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -411,9 +428,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -439,8 +460,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: else: self.unit_peer_data.pop("start-tls-server", None) - if not self.is_blocked: - self._set_active_status() + self.async_replication.handle_read_only_mode() def _on_config_changed(self, event) -> None: """Handle configuration changes, like enabling plugins.""" @@ -470,6 +490,9 @@ def _on_config_changed(self, event) -> None: if self.is_blocked and "Configuration Error" in self.unit.status.message: self._set_active_status() + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + if not self.unit.is_leader(): return @@ -497,6 +520,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self._patroni.get_primary() is None: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status @@ -640,6 +666,25 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_to_endpoints(self._endpoint) self._cleanup_old_cluster_resources() + + if not self.fix_leader_annotation(): + return + + # Create resources and add labels needed for replication. + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return + + # Remove departing units when the leader changes. + self._remove_from_endpoints(self._get_endpoints_to_remove()) + + self._add_members(event) + + def fix_leader_annotation(self) -> bool: + """Fix the leader annotation if it's missing.""" client = Client() try: endpoint = client.get(Endpoints, name=self.cluster_name, namespace=self._namespace) @@ -656,23 +701,11 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: except ApiError as e: if e.status.code == 403: self.on_deployed_without_trust() - return + return False # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - - # Create resources and add labels needed for replication. - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return - - # Remove departing units when the leader changes. - self._remove_from_endpoints(self._get_endpoints_to_remove()) - - self._add_members(event) + return True def _create_pgdata(self, container: Container): """Create the PostgreSQL data directory.""" @@ -743,6 +776,8 @@ def _set_active_status(self): try: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") + elif self.is_standby_leader: + self.unit.status = ActiveStatus("Standby Leader") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: @@ -815,6 +850,9 @@ def _on_upgrade_charm(self, _) -> None: self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _patch_pod_labels(self, member: str) -> None: """Add labels required for replication to the current pod. @@ -984,6 +1022,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_replication.update_async_replication_data() + event.set_results({"password": password}) def _on_get_primary(self, event: ActionEvent) -> None: @@ -1105,6 +1146,9 @@ def _on_update_status(self, _) -> None: if self._handle_processes_failures(): return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + self._set_active_status() def _handle_processes_failures(self) -> bool: @@ -1127,8 +1171,15 @@ def _handle_processes_failures(self) -> bool: return False return True + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self.is_primary + not is_primary + and not is_standby_leader and self._patroni.member_started and not self._patroni.member_streaming ): @@ -1166,6 +1217,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self._unit == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" diff --git a/src/constants.py b/src/constants.py index 0cdf7b26ec..af32248a60 100644 --- a/src/constants.py +++ b/src/constants.py @@ -20,6 +20,7 @@ WORKLOAD_OS_GROUP = "postgres" WORKLOAD_OS_USER = "postgres" METRICS_PORT = "9187" +POSTGRESQL_DATA_PATH = "/var/lib/postgresql/data/pgdata" POSTGRES_LOG_FILES = [ "/var/log/pgbackrest/*", "/var/log/postgresql/patroni.log", diff --git a/src/patroni.py b/src/patroni.py index 7c5f6d434e..32ffba7414 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -30,6 +30,10 @@ logger = logging.getLogger(__name__) +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" @@ -38,6 +42,10 @@ class EndpointNotReadyError(Exception): """Raised when an endpoint is not ready.""" +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class SwitchoverFailedError(Exception): """Raised when a switchover failed for some reason.""" @@ -79,6 +87,20 @@ def _patroni_url(self) -> str: """Patroni REST API URL.""" return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008" + # def configure_standby_cluster(self, host: str) -> None: + # """Configure this cluster as a standby cluster.""" + # requests.patch( + # f"{self._patroni_url}/config", + # verify=self._verify, + # json={ + # "standby_cluster": { + # "create_replica_methods": ["basebackup"], + # "host": host, + # "port": 5432, + # } + # }, + # ) + @property def rock_postgresql_version(self) -> Optional[str]: """Version of Postgresql installed in the Rock image.""" @@ -127,6 +149,36 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary + def get_standby_leader( + self, unit_name_pattern=False, check_whether_is_running: bool = False + ) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + check_whether_is_running: whether to check if the standby leader is running + + Returns: + standby leader pod or unit name. + """ + standby_leader = None + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + r = requests.get(f"{url}/cluster", verify=self._verify) + for member in r.json()["members"]: + if member["role"] == "standby_leader": + if check_whether_is_running and member["state"] not in RUNNING_STATES: + logger.warning(f"standby leader {member['name']} is not running") + continue + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + break + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -260,7 +312,7 @@ def member_started(self) -> bool: allow server time to start up. """ try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): with attempt: r = requests.get(f"{self._patroni_url}/health", verify=self._verify) except RetryError: @@ -310,6 +362,19 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: Dict[str, Any json={"postgresql": {"parameters": parameters}}, ) + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self._verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self._verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reinitialize_postgresql(self) -> None: """Reinitialize PostgreSQL.""" @@ -365,6 +430,7 @@ def render_patroni_yml_file( with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. + logger.warning(self._charm.async_replication.get_standby_endpoints()) rendered = template.render( connectivity=connectivity, enable_tls=enable_tls, @@ -386,6 +452,8 @@ def render_patroni_yml_file( minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, + primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(), + extra_replication_endpoints=self._charm.async_replication.get_standby_endpoints(), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..5e18ab8c86 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,642 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Async Replication implementation.""" + +import itertools +import json +import logging +from typing import List, Optional, Tuple + +from lightkube import ApiError, Client +from lightkube.resources.core_v1 import Endpoints, Service +from ops import ( + ActionEvent, + Application, + BlockedStatus, + MaintenanceStatus, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from ops.pebble import ChangeError +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed + +from constants import ( + APP_SCOPE, + POSTGRESQL_DATA_PATH, + WORKLOAD_OS_GROUP, + WORKLOAD_OS_USER, +) +from patroni import ClusterNotPromotedError, NotReadyError, StandbyClusterAlreadyPromotedError + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" +INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( + "Incompatible cluster versions - cannot enable async replication" +) +READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm): + """Constructor.""" + super().__init__(charm, "postgresql") + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + ) + + # Actions + self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + + self.container = self.charm.unit.get_container("postgresql") + + # def _are_cluster_versions_compatible(self) -> bool: + # """Check if the cluster versions are compatible.""" + # return primary_cluster.data.get("postgresql-version") + + def _can_promote_cluster(self, event: ActionEvent) -> bool: + """Check if the cluster can be promoted.""" + if not self.charm.is_cluster_initialised: + event.fail("Cluster not initialised yet.") + return False + + # Check if there is a relation. If not, see if there is a standby leader. If so promote it to leader. If not, + # fail the action telling that there is no relation and no standby leader. + relation = self._relation + if relation is None: + standby_leader = self.charm._patroni.get_standby_leader() + if standby_leader is not None: + try: + self.charm._patroni.promote_standby_cluster() + if ( + self.charm.is_blocked + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._peers.data[self.charm.app].update({ + "promoted-cluster-counter": "" + }) + self.charm._set_active_status() + except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: + event.fail(str(e)) + return False + event.fail("No relation and no standby leader found.") + return False + + # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already + # the primary cluster. + primary_cluster = self._get_primary_cluster() + if self.charm.app == primary_cluster: + event.fail("This cluster is already the primary cluster.") + return False + + if relation.app == primary_cluster: + if not event.params.get("force-promotion"): + event.fail( + f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." + ) + return False + else: + logger.warning( + "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", + relation.app.name, + self.charm.app.name, + ) + + return True + + def _configure_primary_cluster( + self, primary_cluster: Application, event: RelationChangedEvent + ) -> bool: + """Configure the primary cluster.""" + if self.charm.app == primary_cluster: + self.charm.update_config() + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + if self.charm._patroni.get_standby_leader() is not None: + self.charm._patroni.promote_standby_cluster() + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if not self.charm.is_primary: + raise ClusterNotPromotedError() + except RetryError: + logger.debug( + "Deferring on_async_relation_changed: standby cluster not promoted yet." + ) + event.defer() + return True + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + self.charm._set_active_status() + return True + return False + + def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: + """Configure the standby cluster.""" + relation = self._relation + if relation.name == ASYNC_REPLICA_RELATION: + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) + except SecretNotFoundError: + logger.warning("Secret not found, deferring event") + logger.debug("Secret not found, deferring event") + event.defer() + return False + credentials = secret.peek_content() + logger.warning("Credentials: %s", credentials) + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.warning("Synced %s password to %s", user, password) + logger.debug("Synced %s password", user) + self._remove_previous_cluster_information() + return True + + def _get_highest_promoted_cluster_counter_value(self) -> str: + """Return the highest promoted cluster counter.""" + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for databag in [ + async_relation.data[async_relation.app], + self.charm._peers.data[self.charm.app], + ]: + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if int(relation_promoted_cluster_counter) > int(promoted_cluster_counter): + promoted_cluster_counter = relation_promoted_cluster_counter + return promoted_cluster_counter + + def _get_primary_cluster(self) -> Optional[Application]: + """Return the primary cluster.""" + primary_cluster = None + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for app, relation_data in { + async_relation.app: async_relation.data, + self.charm.app: self.charm._peers.data, + }.items(): + databag = relation_data[app] + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if relation_promoted_cluster_counter > promoted_cluster_counter: + promoted_cluster_counter = relation_promoted_cluster_counter + primary_cluster = app + return primary_cluster + + def get_primary_cluster_endpoint(self) -> Optional[str]: + """Return the primary cluster endpoint.""" + primary_cluster = self._get_primary_cluster() + if primary_cluster is None or self.charm.app == primary_cluster: + return None + relation = self._relation + primary_cluster_data = relation.data[relation.app].get("primary-cluster-data") + if primary_cluster_data is None: + return None + return json.loads(primary_cluster_data).get("endpoint") + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=self._secret_label) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + logger.warning("Secret not found, creating a new one") + pass + + app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + + def get_standby_endpoints(self) -> List[str]: + """Return the standby endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the standby endpoints only for the primary cluster. + if relation is None or primary_cluster is None or self.charm.app != primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + try: + system_identifier, error = self.container.exec( + [ + f'/usr/lib/postgresql/{self.charm._patroni.rock_postgresql_version.split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ).wait_output() + except ChangeError as e: + return None, str(e) + if error != "": + return None, error + system_identifier = [ + line for line in system_identifier.splitlines() if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def _get_unit_ip(self) -> str: + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + def _handle_database_start(self, event: RelationChangedEvent) -> None: + """Handle the database start in the standby cluster.""" + try: + if self.charm._patroni.member_started: + self.charm._peers.data[self.charm.unit].update({"stopped": ""}) + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + + if self.charm.unit.is_leader(): + if all( + self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm._peers.data[self.charm.app].update({ + "cluster_initialised": "True" + }) + elif self._is_following_promoted_cluster(): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be started in all units" + ) + event.defer() + return + + self.charm._set_active_status() + elif not self.charm.unit.is_leader(): + raise NotReadyError() + else: + self.charm.fix_leader_annotation() + self.charm.unit.status = WaitingStatus( + "Still starting the database in the standby leader" + ) + event.defer() + except NotReadyError: + self.charm.unit.status = WaitingStatus("Waiting for the database to start") + logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") + event.defer() + + def handle_read_only_mode(self) -> None: + """Handle read-only mode.""" + promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( + "promoted-cluster-counter", "" + ) + if not self.charm.is_blocked or ( + promoted_cluster_counter != "0" + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._set_active_status() + if ( + promoted_cluster_counter == "0" + and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + def _is_following_promoted_cluster(self) -> bool: + """Return True if this cluster is following the promoted cluster.""" + if self._get_primary_cluster() is None: + return False + return ( + self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ) + + def _is_primary_cluster(self) -> bool: + """Return the primary cluster name.""" + return self.charm.app == self._get_primary_cluster() + + def _on_async_relation_broken(self, _) -> None: + if "departing" in self.charm._peers.data[self.charm.unit]: + logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") + return + + self.charm._peers.data[self.charm.unit].update({ + "stopped": "", + "unit-promoted-cluster-counter": "", + }) + + if self.charm._patroni.get_standby_leader() is not None: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + else: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) + self.charm.update_config() + + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: + """Update the Patroni configuration if one of the clusters was already promoted.""" + primary_cluster = self._get_primary_cluster() + logger.warning("Primary cluster: %s", primary_cluster) + if primary_cluster is None: + logger.debug("Early exit on_async_relation_changed: No primary cluster found.") + return + + if self._configure_primary_cluster(primary_cluster, event): + return + + # Return if this is a new unit. + if not self.charm.unit.is_leader() and self._is_following_promoted_cluster(): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return + + if not self._stop_database(event): + return + + if not all( + "stopped" in self.charm._peers.data[unit] + or self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be stopped in all units" + ) + logger.debug("Deferring on_async_relation_changed: not all units stopped.") + event.defer() + return + + if self._wait_for_standby_leader(event): + return + + # Update the asynchronous replication configuration and start the database. + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + self._handle_database_start(event) + + def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: + """Set a flag to avoid setting a wrong status message on relation broken event handler.""" + # This is needed because of https://bugs.launchpad.net/juju/+bug/1979811. + if event.departing_unit == self.charm.unit: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_async_relation_joined(self, _) -> None: + """Publish this unit address in the relation data.""" + self._relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + + # Set the counter for new units. + highest_promoted_cluster_counter = self._get_highest_promoted_cluster_counter_value() + if highest_promoted_cluster_counter != "0": + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": highest_promoted_cluster_counter + }) + + def _on_promote_cluster(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._can_promote_cluster(event): + return + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly setup the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + # Set the status. + self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + + @property + def _primary_cluster_endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm.get_unit_ip(unit) + else: + return self.charm.get_unit_ip(self.charm.unit) + + def _re_emit_async_relation_changed_event(self) -> None: + """Re-emit the async relation changed event.""" + relation = self._relation + getattr(self.charm.on, f'{relation.name.replace("-", "_")}_relation_changed').emit( + relation, + app=relation.app, + unit=[unit for unit in relation.units if unit.app == relation.app][0], + ) + + @property + def _relation(self) -> Relation: + """Return the relation object.""" + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if relation is not None: + return relation + + def _remove_previous_cluster_information(self) -> None: + client = Client() + for values in itertools.product( + [Endpoints, Service], + [ + f"patroni-{self.charm._name}", + f"patroni-{self.charm._name}-config", + f"patroni-{self.charm._name}-sync", + ], + ): + try: + client.delete( + values[0], + name=values[1], + namespace=self.charm._namespace, + ) + logger.warning(f"Deleted {values[0]} {values[1]}") + except ApiError as e: + # Ignore the error only when the resource doesn't exist. + if e.status.code != 404: + raise e + logger.warning(f"{values[0]} {values[1]} not found") + + @property + def _secret_label(self) -> str: + """Return the secret label.""" + return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + + def _stop_database(self, event: RelationChangedEvent) -> bool: + """Stop the database.""" + if ( + "stopped" not in self.charm._peers.data[self.charm.unit] + and not self._is_following_promoted_cluster() + ): + if not self.charm.unit.is_leader() and not self.container.exists(POSTGRESQL_DATA_PATH): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return False + + self.container.stop(self.charm._postgresql_service) + + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) + if not self._configure_standby_cluster(event): + return False + + logger.info("Removing and recreating pgdata folder") + self.container.exec(f"rm -r {POSTGRESQL_DATA_PATH}".split()).wait_output() + self.charm._create_pgdata(self.container) + + self.charm._peers.data[self.charm.unit].update({"stopped": "True"}) + + return True + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + relation = self._relation + if relation is None: + return + relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + + def _update_primary_cluster_data( + self, promoted_cluster_counter: int = None, system_identifier: str = None + ) -> None: + """Update the primary cluster data.""" + async_relation = self._relation + + if promoted_cluster_counter is not None: + for relation in [async_relation, self.charm._peers]: + relation.data[self.charm.app].update({ + "promoted-cluster-counter": str(promoted_cluster_counter) + }) + + # Update the data in the relation. + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "postgresql-version": self.charm._patroni.rock_postgresql_version, + } + + # Retrieve the secrets that will be shared between the clusters. + if async_relation.name == ASYNC_PRIMARY_RELATION: + secret = self._get_secret() + secret.grant(async_relation) + primary_cluster_data["secret-id"] = secret.id + + if system_identifier is not None: + primary_cluster_data["system-id"] = system_identifier + + async_relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + + def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: + """Wait for the standby leader to be up and running.""" + try: + standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) + except RetryError: + standby_leader = None + logger.warning("Standby leader: %s", standby_leader) + if not self.charm.unit.is_leader() and standby_leader is None: + self.charm.unit.status = WaitingStatus( + "Waiting for the standby leader start the database" + ) + logger.debug("Deferring on_async_relation_changed: standby leader hasn't started yet.") + event.defer() + return True + return False diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 6d3c50802f..2d4ee5f92f 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -51,6 +51,11 @@ bootstrap: command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif primary_cluster_endpoint %} + standby_cluster: + host: {{ primary_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - auth-host: md5 @@ -61,6 +66,9 @@ bootstrap: pg_hba: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} bypass_api_service: true log: dir: /var/log/postgresql @@ -116,6 +124,10 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.6/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} {%- for endpoint in endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5 {%- endfor %} diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index fbae133dc0..9e6a00d6cd 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -128,6 +128,7 @@ def test_on_leader_elected(self, _, __, ___, _set_secret, _get_secret, _____, _c @patch("charm.PostgresqlOperatorCharm._set_active_status") @patch("charm.Patroni.rock_postgresql_version", new_callable=PropertyMock) @patch("charm.Patroni.primary_endpoint_ready", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.enable_disable_extensions") @patch("charm.PostgresqlOperatorCharm.update_config") @patch("charm.PostgresqlOperatorCharm.postgresql") @patch( @@ -149,6 +150,7 @@ def test_on_postgresql_pebble_ready( _create_services, _postgresql, ___, + ____, _primary_endpoint_ready, _rock_postgresql_version, _set_active_status, @@ -1201,6 +1203,7 @@ def test_on_peer_relation_changed( @patch("charm.Patroni.reinitialize_postgresql") @patch("charm.Patroni.member_streaming", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) @patch("charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock) @patch("charm.Patroni.is_database_running", new_callable=PropertyMock) @patch("charm.Patroni.member_started", new_callable=PropertyMock) @@ -1211,6 +1214,7 @@ def test_handle_processes_failures( _member_started, _is_database_running, _is_primary, + _is_standby_leader, _member_streaming, _reinitialize_postgresql, ): @@ -1270,6 +1274,7 @@ def test_handle_processes_failures( # Test when the unit is a replica and it's not streaming from primary. _restart.reset_mock() _is_primary.return_value = False + _is_standby_leader.return_value = False _member_streaming.return_value = False for values in itertools.product( [None, RetryError(last_attempt=1)], [True, False], [True, False] @@ -1444,8 +1449,9 @@ def test_handle_postgresql_restart_need( _restart.assert_not_called() @patch("charm.Patroni.member_started", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) @patch("charm.Patroni.get_primary") - def test_set_active_status(self, _get_primary, _member_started): + def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started): for values in itertools.product( [ RetryError(last_attempt=1), @@ -1453,26 +1459,42 @@ def test_set_active_status(self, _get_primary, _member_started): self.charm.unit.name, f"{self.charm.app.name}/2", ], + [ + RetryError(last_attempt=1), + ConnectionError, + True, + False, + ], [True, False], ): self.charm.unit.status = MaintenanceStatus("fake status") - _member_started.return_value = values[1] + _member_started.return_value = values[2] if isinstance(values[0], str): _get_primary.side_effect = None _get_primary.return_value = values[0] - self.charm._set_active_status() - self.assertIsInstance( - self.charm.unit.status, - ActiveStatus - if values[0] == self.charm.unit.name or values[1] - else MaintenanceStatus, - ) - self.assertEqual( - self.charm.unit.status.message, - "Primary" - if values[0] == self.charm.unit.name - else ("" if values[1] else "fake status"), - ) + if values[0] != self.charm.unit.name and not isinstance(values[1], bool): + _is_standby_leader.side_effect = values[1] + _is_standby_leader.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + else: + _is_standby_leader.side_effect = None + _is_standby_leader.return_value = values[1] + self.charm._set_active_status() + self.assertIsInstance( + self.charm.unit.status, + ActiveStatus + if values[0] == self.charm.unit.name or values[1] or values[2] + else MaintenanceStatus, + ) + self.assertEqual( + self.charm.unit.status.message, + "Primary" + if values[0] == self.charm.unit.name + else ( + "Standby Leader" if values[1] else ("" if values[2] else "fake status") + ), + ) else: _get_primary.side_effect = values[0] _get_primary.return_value = None From 6a3009e6e7c1e26770f244e13bf7fd8132cbeef4 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 20:22:55 -0300 Subject: [PATCH 2/7] Backup standby pgdata folder Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 5e18ab8c86..1b14cfa7ce 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -6,6 +6,7 @@ import itertools import json import logging +from datetime import datetime from typing import List, Optional, Tuple from lightkube import ApiError, Client @@ -192,6 +193,17 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: self.charm.set_secret(APP_SCOPE, key, password) logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(error) + if system_identifier != relation.data[relation.app].get("system-id"): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" + self.container.exec( + f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split() + ).wait_output() + logger.warning("Please review the backup file %s and handle its removal", filename) self._remove_previous_cluster_information() return True From 79452cf732b4ae75b6f45a19b325685d080a169d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 22:07:20 -0300 Subject: [PATCH 3/7] Improve comments and logs Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 62 ++++++++++++++++++------------ 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 1b14cfa7ce..bc86b628ee 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,7 +1,18 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Async Replication implementation.""" +"""Async Replication implementation. + +The highest "promoted-cluster-counter" value is used to determine the primary cluster. +The application in any side of the relation which has the highest value in its application +relation databag is considered the primary cluster. + +The "unit-promoted-cluster-counter" field in the unit relation databag is used to determine +if the unit is following the promoted cluster. If the value is the same as the highest value +in the application relation databag, then the unit is following the promoted cluster. +Otherwise, it's needed to restart the database in the unit to follow the promoted cluster +if the unit is from the standby cluster (the one that was not promoted). +""" import itertools import json @@ -87,10 +98,6 @@ def __init__(self, charm): self.container = self.charm.unit.get_container("postgresql") - # def _are_cluster_versions_compatible(self) -> bool: - # """Check if the cluster versions are compatible.""" - # return primary_cluster.data.get("postgresql-version") - def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" if not self.charm.is_cluster_initialised: @@ -126,6 +133,8 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False + # To promote the other cluster if there is already a primary cluster, the action must be called with + # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. if relation.app == primary_cluster: if not event.params.get("force-promotion"): event.fail( @@ -149,6 +158,8 @@ def _configure_primary_cluster( self.charm.update_config() if self._is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + # If this is a standby cluster, remove the information from DCS to make it + # a normal cluster. if self.charm._patroni.get_standby_leader() is not None: self.charm._patroni.promote_standby_cluster() try: @@ -173,6 +184,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation if relation.name == ASYNC_REPLICA_RELATION: + # Update the secrets between the clusters. primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") secret_id = ( None @@ -182,22 +194,19 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: try: secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) except SecretNotFoundError: - logger.warning("Secret not found, deferring event") logger.debug("Secret not found, deferring event") event.defer() return False credentials = secret.peek_content() - logger.warning("Credentials: %s", credentials) for key, password in credentials.items(): user = key.split("-password")[0] self.charm.set_secret(APP_SCOPE, key, password) - logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) if system_identifier != relation.data[relation.app].get("system-id"): - # Store current data in a ZIP file, clean folder and generate configuration. + # Store current data in a tar.gz file. logger.info("Creating backup of pgdata folder") filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" self.container.exec( @@ -267,7 +276,7 @@ def _get_secret(self) -> Secret: secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" return secret except SecretNotFoundError: - logger.warning("Secret not found, creating a new one") + logger.debug("Secret not found, creating a new one") pass app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") @@ -332,12 +341,16 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: """Handle the database start in the standby cluster.""" try: if self.charm._patroni.member_started: + # If the database is started, update the databag in a way the unit is marked as configured + # for async replication. self.charm._peers.data[self.charm.unit].update({"stopped": ""}) self.charm._peers.data[self.charm.unit].update({ "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() }) if self.charm.unit.is_leader(): + # If this unit is the leader, check if all units are ready before making the cluster + # active again (including the health checks from the update status hook). if all( self.charm._peers.data[unit].get("unit-promoted-cluster-counter") == self._get_highest_promoted_cluster_counter_value() @@ -357,6 +370,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: elif not self.charm.unit.is_leader(): raise NotReadyError() else: + # If the standby leader fails to start, fix the leader annotation and defer the event. self.charm.fix_leader_annotation() self.charm.unit.status = WaitingStatus( "Still starting the database in the standby leader" @@ -368,7 +382,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: event.defer() def handle_read_only_mode(self) -> None: - """Handle read-only mode.""" + """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( "promoted-cluster-counter", "" ) @@ -384,7 +398,7 @@ def handle_read_only_mode(self) -> None: self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) def _is_following_promoted_cluster(self) -> bool: - """Return True if this cluster is following the promoted cluster.""" + """Return True if this unit is following the promoted cluster.""" if self._get_primary_cluster() is None: return False return ( @@ -406,6 +420,8 @@ def _on_async_relation_broken(self, _) -> None: "unit-promoted-cluster-counter": "", }) + # If this is the standby cluster, set 0 in the "promoted-cluster-counter" field to set + # the cluster in read-only mode message also in the other units. if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) @@ -418,7 +434,7 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" primary_cluster = self._get_primary_cluster() - logger.warning("Primary cluster: %s", primary_cluster) + logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: logger.debug("Early exit on_async_relation_changed: No primary cluster found.") return @@ -499,7 +515,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: # Increment the current cluster counter in this application side based on the highest counter value. promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) promoted_cluster_counter += 1 - logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) @@ -513,7 +529,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: @property def _primary_cluster_endpoint(self) -> str: - """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" sync_standby_names = self.charm._patroni.get_sync_standby_names() if len(sync_standby_names) > 0: unit = self.model.get_unit(sync_standby_names[0]) @@ -541,6 +557,7 @@ def _relation(self) -> Relation: return relation def _remove_previous_cluster_information(self) -> None: + """Remove the previous cluster information.""" client = Client() for values in itertools.product( [Endpoints, Service], @@ -556,12 +573,12 @@ def _remove_previous_cluster_information(self) -> None: name=values[1], namespace=self.charm._namespace, ) - logger.warning(f"Deleted {values[0]} {values[1]}") + logger.debug(f"Deleted {values[0]} {values[1]}") except ApiError as e: # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - logger.warning(f"{values[0]} {values[1]} not found") + logger.debug(f"{values[0]} {values[1]} not found") @property def _secret_label(self) -> str: @@ -581,10 +598,13 @@ def _stop_database(self, event: RelationChangedEvent) -> bool: self.container.stop(self.charm._postgresql_service) if self.charm.unit.is_leader(): + # Remove the "cluster_initialised" flag to avoid self-healing in the update status hook. self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) if not self._configure_standby_cluster(event): return False + # Remove and recreate the pgdata folder to enable replication of the data from the + # primary cluster. logger.info("Removing and recreating pgdata folder") self.container.exec(f"rm -r {POSTGRESQL_DATA_PATH}".split()).wait_output() self.charm._create_pgdata(self.container) @@ -597,7 +617,6 @@ def update_async_replication_data(self) -> None: """Updates the async-replication data, if the unit is the leader. This is used to update the standby units with the new primary information. - If the unit is not the leader, then the data is removed from its databag. """ relation = self._relation if relation is None: @@ -618,11 +637,7 @@ def _update_primary_cluster_data( "promoted-cluster-counter": str(promoted_cluster_counter) }) - # Update the data in the relation. - primary_cluster_data = { - "endpoint": self._primary_cluster_endpoint, - "postgresql-version": self.charm._patroni.rock_postgresql_version, - } + primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. if async_relation.name == ASYNC_PRIMARY_RELATION: @@ -643,7 +658,6 @@ def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) except RetryError: standby_leader = None - logger.warning("Standby leader: %s", standby_leader) if not self.charm.unit.is_leader() and standby_leader is None: self.charm.unit.status = WaitingStatus( "Waiting for the standby leader start the database" From a53fde3e79d5233bd0e935381348126d2fd1caba Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 23 Apr 2024 01:08:30 -0300 Subject: [PATCH 4/7] Remove unused constant Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index bc86b628ee..0748f94017 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -51,9 +51,6 @@ ASYNC_PRIMARY_RELATION = "async-primary" ASYNC_REPLICA_RELATION = "async-replica" -INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( - "Incompatible cluster versions - cannot enable async replication" -) READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" From c0e82591cf5db9161adef42fd607db72a39ab389 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 29 Apr 2024 14:07:38 -0300 Subject: [PATCH 5/7] Remove warning log call and add optional type hint Signed-off-by: Marcelo Henrique Neppel --- src/patroni.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/patroni.py b/src/patroni.py index 32ffba7414..3295841c64 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -151,7 +151,7 @@ def get_primary(self, unit_name_pattern=False) -> str: def get_standby_leader( self, unit_name_pattern=False, check_whether_is_running: bool = False - ) -> str: + ) -> Optional[str]: """Get standby leader instance. Args: @@ -430,7 +430,6 @@ def render_patroni_yml_file( with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. - logger.warning(self._charm.async_replication.get_standby_endpoints()) rendered = template.render( connectivity=connectivity, enable_tls=enable_tls, From d6707fbd79d0ce261dd058865095fff696753781 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 12:02:04 -0300 Subject: [PATCH 6/7] Revert poetry.lock Signed-off-by: Marcelo Henrique Neppel --- poetry.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/poetry.lock b/poetry.lock index f66dfaa826..74133126a0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1645,6 +1645,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, From 8360ec15612ee92bc99eff27568ee2688a0aae7b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 12:04:02 -0300 Subject: [PATCH 7/7] Revert poetry.lock Signed-off-by: Marcelo Henrique Neppel --- poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index 74133126a0..e5436014db 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "allure-pytest"