From f8ecbf86657503c61f6efc28789fdd8badf8f2d2 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 17 Apr 2024 17:13:48 -0300 Subject: [PATCH 01/12] Add async replication implementation Signed-off-by: Marcelo Henrique Neppel --- actions.yaml | 6 + lib/charms/postgresql_k8s/v0/postgresql.py | 14 +- metadata.yaml | 8 + poetry.lock | 39 +- pyproject.toml | 6 +- src/backups.py | 13 + src/charm.py | 96 ++- src/constants.py | 1 + src/patroni.py | 70 ++- src/relations/async_replication.py | 642 +++++++++++++++++++++ templates/patroni.yml.j2 | 12 + tests/unit/test_charm.py | 52 +- 12 files changed, 895 insertions(+), 64 deletions(-) create mode 100644 src/relations/async_replication.py diff --git a/actions.yaml b/actions.yaml index 03a90556cc..ba46f1108b 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,12 @@ list-backups: description: Lists backups in s3 storage in AWS. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-cluster: + description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit. + params: + force-promotion: + type: boolean + description: Force the promotion of a cluster when there is already a primary cluster. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index b0c30b5d5e..8783f76814 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 25 +LIBPATCH = 26 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -476,11 +476,11 @@ def set_up_database(self) -> None: """Set up postgres database with the right permissions.""" connection = None try: - self.create_user( - "admin", - extra_user_roles="pg_read_all_data,pg_write_all_data", - ) with self._connect_to_database() as connection, connection.cursor() as cursor: + cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';") + if cursor.fetchone() is not None: + return + # Allow access to the postgres database only to the system users. cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;") cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;") @@ -490,6 +490,10 @@ def set_up_database(self) -> None: sql.Identifier(user) ) ) + self.create_user( + "admin", + extra_user_roles="pg_read_all_data,pg_write_all_data", + ) cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;") except psycopg2.Error as e: logger.error(f"Failed to set up databases: {e}") diff --git a/metadata.yaml b/metadata.yaml index f1a385b7f6..54182e451a 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,6 +39,10 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication + limit: 1 + optional: true database: interface: postgresql_client db: @@ -51,6 +55,10 @@ provides: interface: grafana_dashboard requires: + async-replica: + interface: async_replication + limit: 1 + optional: true certificates: interface: tls-certificates limit: 1 diff --git a/poetry.lock b/poetry.lock index 23257c2625..3b442d7261 100644 --- a/poetry.lock +++ b/poetry.lock @@ -131,17 +131,17 @@ typecheck = ["mypy"] [[package]] name = "boto3" -version = "1.34.84" +version = "1.34.86" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.34.84-py3-none-any.whl", hash = "sha256:7a02f44af32095946587d748ebeb39c3fa15b9d7275307ff612a6760ead47e04"}, - {file = "boto3-1.34.84.tar.gz", hash = "sha256:91e6343474173e9b82f603076856e1d5b7b68f44247bdd556250857a3f16b37b"}, + {file = "boto3-1.34.86-py3-none-any.whl", hash = "sha256:be594c449a0079bd1898ba1b7d90e0e5ac6b5803b2ada03993da01179073808d"}, + {file = "boto3-1.34.86.tar.gz", hash = "sha256:992ba74459fef2bf1572050408db73d33c43e7531d81bda85a027f39156926a1"}, ] [package.dependencies] -botocore = ">=1.34.84,<1.35.0" +botocore = ">=1.34.86,<1.35.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -150,13 +150,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.34.84" +version = "1.34.86" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.34.84-py3-none-any.whl", hash = "sha256:da1ae0a912e69e10daee2a34dafd6c6c106450d20b8623665feceb2d96c173eb"}, - {file = "botocore-1.34.84.tar.gz", hash = "sha256:a2b309bf5594f0eb6f63f355ade79ba575ce8bf672e52e91da1a7933caa245e6"}, + {file = "botocore-1.34.86-py3-none-any.whl", hash = "sha256:57c1e3b2e1db745d22c45cbd761bbc0c143d2cfc2b532e3245cf5d874aa30b6d"}, + {file = "botocore-1.34.86.tar.gz", hash = "sha256:2fd62b63d8788e15629bfc95be1bd2d99c0da6c1d45ef1f40c0a0101e412f6b5"}, ] [package.dependencies] @@ -165,7 +165,7 @@ python-dateutil = ">=2.1,<3.0.0" urllib3 = {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""} [package.extras] -crt = ["awscrt (==0.19.19)"] +crt = ["awscrt (==0.20.9)"] [[package]] name = "cachetools" @@ -976,13 +976,13 @@ files = [ [[package]] name = "matplotlib-inline" -version = "0.1.6" +version = "0.1.7" description = "Inline Matplotlib backend for Jupyter" optional = false -python-versions = ">=3.5" +python-versions = ">=3.8" files = [ - {file = "matplotlib-inline-0.1.6.tar.gz", hash = "sha256:f887e5f10ba98e8d2b150ddcf4702c1e5f8b3a20005eb0f74bfdbd360ee6f304"}, - {file = "matplotlib_inline-0.1.6-py3-none-any.whl", hash = "sha256:f1f41aab5328aa5aaea9b16d083b128102f8712542f819fe7e6a420ff581b311"}, + {file = "matplotlib_inline-0.1.7-py3-none-any.whl", hash = "sha256:df192d39a4ff8f21b1895d72e6a13f5fcc5099f00fa84384e0ea28c2cc0653ca"}, + {file = "matplotlib_inline-0.1.7.tar.gz", hash = "sha256:8423b23ec666be3d16e16b60bdd8ac4e86e840ebd1dd11a30b9f117f2fa0ab90"}, ] [package.dependencies] @@ -1517,8 +1517,8 @@ develop = false [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/github_secrets" [[package]] @@ -1572,8 +1572,8 @@ pyyaml = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/pytest_operator_cache" [[package]] @@ -1591,8 +1591,8 @@ pytest = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v13.1.1" -resolved_reference = "52f3d97ebb97f4f37ec9678af850ecfb97fcf71a" +reference = "v13.0.0" +resolved_reference = "b662fd727b538a4b2a65f25d9c25b9bd76e197ad" subdirectory = "python/pytest_plugins/pytest_operator_groups" [[package]] @@ -1645,7 +1645,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -2145,4 +2144,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "a7797f2edfcdb4005c42df91b8dac4bb389b9abcf015b1f990d5509145880b6a" +content-hash = "65a72ffc063a7e1ceda9f4419123a99269b6e4a9d2d5a6144945cecb80ba9c4b" diff --git a/pyproject.toml b/pyproject.toml index 9b231a34f5..019ba62aae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,10 +68,10 @@ optional = true [tool.poetry.group.integration.dependencies] lightkube = "^0.15.2" pytest = "^8.1.1" -pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/github_secrets"} +pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/github_secrets"} pytest-operator = "^0.34.0" -pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/pytest_operator_cache"} -pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.1.1", subdirectory = "python/pytest_plugins/pytest_operator_groups"} +pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/pytest_operator_cache"} +pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v13.0.0", subdirectory = "python/pytest_plugins/pytest_operator_groups"} juju = "^3.2.2" psycopg2-binary = "^2.9.9" boto3 = "^1.34.84" diff --git a/src/backups.py b/src/backups.py index 3bf9ebe8c7..952658e736 100644 --- a/src/backups.py +++ b/src/backups.py @@ -26,6 +26,7 @@ from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed from constants import BACKUP_USER, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER +from relations.async_replication import ASYNC_PRIMARY_RELATION, ASYNC_REPLICA_RELATION logger = logging.getLogger(__name__) @@ -717,6 +718,18 @@ def _pre_restore_checks(self, event: ActionEvent) -> bool: event.fail(error_message) return False + logger.info("Checking that the cluster is not replicating data to a standby cluster") + for relation in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not relation: + continue + error_message = "Unit cannot restore backup as the cluster is replicating data to a standby cluster" + logger.error(f"Restore failed: {error_message}") + event.fail(error_message) + return False + logger.info("Checking that this unit was already elected the leader unit") if not self.charm.unit.is_leader(): error_message = "Unit cannot restore backup as it was not elected the leader unit yet" diff --git a/src/charm.py b/src/charm.py index 512a2438c8..258a3cccc1 100755 --- a/src/charm.py +++ b/src/charm.py @@ -81,6 +81,7 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -176,6 +177,7 @@ def __init__(self, *args): self.legacy_db_admin_relation = DbProvides(self, admin=True) self.backup = PostgreSQLBackups(self, "s3-parameters") self.tls = PostgreSQLTLS(self, PEER, [self.primary_endpoint, self.replicas_endpoint]) + self.async_replication = PostgreSQLAsyncReplication(self) self.restart_manager = RollingOpsManager( charm=self, relation="restart", callback=self._restart ) @@ -350,6 +352,18 @@ def _get_endpoints_to_remove(self) -> List[str]: endpoints_to_remove = list(set(old) - set(current)) return endpoints_to_remove + def get_unit_ip(self, unit: Unit) -> Optional[str]: + """Get the IP address of a specific unit.""" + # Check if host is current host. + if unit == self.unit: + return str(self.model.get_binding(PEER).network.bind_address) + # Check if host is a peer. + elif unit in self._peers.data: + return str(self._peers.data[unit].get("private-address")) + # Return None if the unit is not a peer neither the current unit. + else: + return None + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """The leader removes the departing units from the list of cluster members.""" # Allow leader to update endpoints if it isn't leaving. @@ -367,6 +381,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _on_peer_relation_changed(self, event: HookEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -411,9 +428,13 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: # Restart the workload if it's stuck on the starting state after a timeline divergence # due to a backup that was restored. - if not self.is_primary and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 + if ( + not self.is_primary + and not self.is_standby_leader + and ( + self._patroni.member_replication_lag == "unknown" + or int(self._patroni.member_replication_lag) > 1000 + ) ): self._patroni.reinitialize_postgresql() logger.debug("Deferring on_peer_relation_changed: reinitialising replica") @@ -439,8 +460,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: else: self.unit_peer_data.pop("start-tls-server", None) - if not self.is_blocked: - self._set_active_status() + self.async_replication.handle_read_only_mode() def _on_config_changed(self, event) -> None: """Handle configuration changes, like enabling plugins.""" @@ -470,6 +490,9 @@ def _on_config_changed(self, event) -> None: if self.is_blocked and "Configuration Error" in self.unit.status.message: self._set_active_status() + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + if not self.unit.is_leader(): return @@ -497,6 +520,9 @@ def enable_disable_extensions(self, database: str = None) -> None: Args: database: optional database where to enable/disable the extension. """ + if self._patroni.get_primary() is None: + logger.debug("Early exit enable_disable_extensions: standby cluster") + return spi_module = ["refint", "autoinc", "insert_username", "moddatetime"] plugins_exception = {"uuid_ossp": '"uuid-ossp"'} original_status = self.unit.status @@ -640,6 +666,25 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_to_endpoints(self._endpoint) self._cleanup_old_cluster_resources() + + if not self.fix_leader_annotation(): + return + + # Create resources and add labels needed for replication. + try: + self._create_services() + except ApiError: + logger.exception("failed to create k8s services") + self.unit.status = BlockedStatus("failed to create k8s services") + return + + # Remove departing units when the leader changes. + self._remove_from_endpoints(self._get_endpoints_to_remove()) + + self._add_members(event) + + def fix_leader_annotation(self) -> bool: + """Fix the leader annotation if it's missing.""" client = Client() try: endpoint = client.get(Endpoints, name=self.cluster_name, namespace=self._namespace) @@ -656,23 +701,11 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: except ApiError as e: if e.status.code == 403: self.on_deployed_without_trust() - return + return False # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - - # Create resources and add labels needed for replication. - try: - self._create_services() - except ApiError: - logger.exception("failed to create k8s services") - self.unit.status = BlockedStatus("failed to create k8s services") - return - - # Remove departing units when the leader changes. - self._remove_from_endpoints(self._get_endpoints_to_remove()) - - self._add_members(event) + return True def _create_pgdata(self, container: Container): """Create the PostgreSQL data directory.""" @@ -743,6 +776,8 @@ def _set_active_status(self): try: if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name: self.unit.status = ActiveStatus("Primary") + elif self.is_standby_leader: + self.unit.status = ActiveStatus("Standby Leader") elif self._patroni.member_started: self.unit.status = ActiveStatus() except (RetryError, ConnectionError) as e: @@ -815,6 +850,9 @@ def _on_upgrade_charm(self, _) -> None: self.unit.status = BlockedStatus(f"failed to patch pod with error {e}") return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + def _patch_pod_labels(self, member: str) -> None: """Add labels required for replication to the current pod. @@ -984,6 +1022,9 @@ def _on_set_password(self, event: ActionEvent) -> None: # Other units Patroni configuration will be reloaded in the peer relation changed event. self.update_config() + # Update the password in the async replication data. + self.async_replication.update_async_replication_data() + event.set_results({"password": password}) def _on_get_primary(self, event: ActionEvent) -> None: @@ -1105,6 +1146,9 @@ def _on_update_status(self, _) -> None: if self._handle_processes_failures(): return + # Update the sync-standby endpoint in the async replication data. + self.async_replication.update_async_replication_data() + self._set_active_status() def _handle_processes_failures(self) -> bool: @@ -1127,8 +1171,15 @@ def _handle_processes_failures(self) -> bool: return False return True + try: + is_primary = self.is_primary + is_standby_leader = self.is_standby_leader + except RetryError: + return False + if ( - not self.is_primary + not is_primary + and not is_standby_leader and self._patroni.member_started and not self._patroni.member_streaming ): @@ -1166,6 +1217,11 @@ def is_primary(self) -> bool: """Return whether this unit is the primary instance.""" return self._unit == self._patroni.get_primary(unit_name_pattern=True) + @property + def is_standby_leader(self) -> bool: + """Return whether this unit is the standby leader instance.""" + return self._unit == self._patroni.get_standby_leader(unit_name_pattern=True) + @property def is_tls_enabled(self) -> bool: """Return whether TLS is enabled.""" diff --git a/src/constants.py b/src/constants.py index 0cdf7b26ec..af32248a60 100644 --- a/src/constants.py +++ b/src/constants.py @@ -20,6 +20,7 @@ WORKLOAD_OS_GROUP = "postgres" WORKLOAD_OS_USER = "postgres" METRICS_PORT = "9187" +POSTGRESQL_DATA_PATH = "/var/lib/postgresql/data/pgdata" POSTGRES_LOG_FILES = [ "/var/log/pgbackrest/*", "/var/log/postgresql/patroni.log", diff --git a/src/patroni.py b/src/patroni.py index 7c5f6d434e..32ffba7414 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -30,6 +30,10 @@ logger = logging.getLogger(__name__) +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" @@ -38,6 +42,10 @@ class EndpointNotReadyError(Exception): """Raised when an endpoint is not ready.""" +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class SwitchoverFailedError(Exception): """Raised when a switchover failed for some reason.""" @@ -79,6 +87,20 @@ def _patroni_url(self) -> str: """Patroni REST API URL.""" return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008" + # def configure_standby_cluster(self, host: str) -> None: + # """Configure this cluster as a standby cluster.""" + # requests.patch( + # f"{self._patroni_url}/config", + # verify=self._verify, + # json={ + # "standby_cluster": { + # "create_replica_methods": ["basebackup"], + # "host": host, + # "port": 5432, + # } + # }, + # ) + @property def rock_postgresql_version(self) -> Optional[str]: """Version of Postgresql installed in the Rock image.""" @@ -127,6 +149,36 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary + def get_standby_leader( + self, unit_name_pattern=False, check_whether_is_running: bool = False + ) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + check_whether_is_running: whether to check if the standby leader is running + + Returns: + standby leader pod or unit name. + """ + standby_leader = None + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(len(self._endpoints) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + r = requests.get(f"{url}/cluster", verify=self._verify) + for member in r.json()["members"]: + if member["role"] == "standby_leader": + if check_whether_is_running and member["state"] not in RUNNING_STATES: + logger.warning(f"standby leader {member['name']} is not running") + continue + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + break + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -260,7 +312,7 @@ def member_started(self) -> bool: allow server time to start up. """ try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(90), wait=wait_fixed(3)): with attempt: r = requests.get(f"{self._patroni_url}/health", verify=self._verify) except RetryError: @@ -310,6 +362,19 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: Dict[str, Any json={"postgresql": {"parameters": parameters}}, ) + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self._verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self._verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reinitialize_postgresql(self) -> None: """Reinitialize PostgreSQL.""" @@ -365,6 +430,7 @@ def render_patroni_yml_file( with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. + logger.warning(self._charm.async_replication.get_standby_endpoints()) rendered = template.render( connectivity=connectivity, enable_tls=enable_tls, @@ -386,6 +452,8 @@ def render_patroni_yml_file( minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, + primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(), + extra_replication_endpoints=self._charm.async_replication.get_standby_endpoints(), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..5e18ab8c86 --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,642 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Async Replication implementation.""" + +import itertools +import json +import logging +from typing import List, Optional, Tuple + +from lightkube import ApiError, Client +from lightkube.resources.core_v1 import Endpoints, Service +from ops import ( + ActionEvent, + Application, + BlockedStatus, + MaintenanceStatus, + Object, + Relation, + RelationChangedEvent, + RelationDepartedEvent, + Secret, + SecretNotFoundError, + WaitingStatus, +) +from ops.pebble import ChangeError +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed + +from constants import ( + APP_SCOPE, + POSTGRESQL_DATA_PATH, + WORKLOAD_OS_GROUP, + WORKLOAD_OS_USER, +) +from patroni import ClusterNotPromotedError, NotReadyError, StandbyClusterAlreadyPromotedError + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" +INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( + "Incompatible cluster versions - cannot enable async replication" +) +READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm): + """Constructor.""" + super().__init__(charm, "postgresql") + self.charm = charm + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_joined, self._on_async_relation_joined + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_async_relation_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_async_relation_changed + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, + self._on_async_relation_departed, + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_async_relation_broken + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_async_relation_broken + ) + + # Actions + self.framework.observe(self.charm.on.promote_cluster_action, self._on_promote_cluster) + + self.container = self.charm.unit.get_container("postgresql") + + # def _are_cluster_versions_compatible(self) -> bool: + # """Check if the cluster versions are compatible.""" + # return primary_cluster.data.get("postgresql-version") + + def _can_promote_cluster(self, event: ActionEvent) -> bool: + """Check if the cluster can be promoted.""" + if not self.charm.is_cluster_initialised: + event.fail("Cluster not initialised yet.") + return False + + # Check if there is a relation. If not, see if there is a standby leader. If so promote it to leader. If not, + # fail the action telling that there is no relation and no standby leader. + relation = self._relation + if relation is None: + standby_leader = self.charm._patroni.get_standby_leader() + if standby_leader is not None: + try: + self.charm._patroni.promote_standby_cluster() + if ( + self.charm.is_blocked + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._peers.data[self.charm.app].update({ + "promoted-cluster-counter": "" + }) + self.charm._set_active_status() + except (StandbyClusterAlreadyPromotedError, ClusterNotPromotedError) as e: + event.fail(str(e)) + return False + event.fail("No relation and no standby leader found.") + return False + + # Check if this cluster is already the primary cluster. If so, fail the action telling that it's already + # the primary cluster. + primary_cluster = self._get_primary_cluster() + if self.charm.app == primary_cluster: + event.fail("This cluster is already the primary cluster.") + return False + + if relation.app == primary_cluster: + if not event.params.get("force-promotion"): + event.fail( + f"{relation.app.name} is already the primary cluster. Pass `force-promotion=true` to promote anyway." + ) + return False + else: + logger.warning( + "%s is already the primary cluster. Forcing promotion of %s to primary cluster due to `force-promotion=true`.", + relation.app.name, + self.charm.app.name, + ) + + return True + + def _configure_primary_cluster( + self, primary_cluster: Application, event: RelationChangedEvent + ) -> bool: + """Configure the primary cluster.""" + if self.charm.app == primary_cluster: + self.charm.update_config() + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + if self.charm._patroni.get_standby_leader() is not None: + self.charm._patroni.promote_standby_cluster() + try: + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if not self.charm.is_primary: + raise ClusterNotPromotedError() + except RetryError: + logger.debug( + "Deferring on_async_relation_changed: standby cluster not promoted yet." + ) + event.defer() + return True + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + self.charm._set_active_status() + return True + return False + + def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: + """Configure the standby cluster.""" + relation = self._relation + if relation.name == ASYNC_REPLICA_RELATION: + primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") + secret_id = ( + None + if primary_cluster_info is None + else json.loads(primary_cluster_info).get("secret-id") + ) + try: + secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) + except SecretNotFoundError: + logger.warning("Secret not found, deferring event") + logger.debug("Secret not found, deferring event") + event.defer() + return False + credentials = secret.peek_content() + logger.warning("Credentials: %s", credentials) + for key, password in credentials.items(): + user = key.split("-password")[0] + self.charm.set_secret(APP_SCOPE, key, password) + logger.warning("Synced %s password to %s", user, password) + logger.debug("Synced %s password", user) + self._remove_previous_cluster_information() + return True + + def _get_highest_promoted_cluster_counter_value(self) -> str: + """Return the highest promoted cluster counter.""" + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for databag in [ + async_relation.data[async_relation.app], + self.charm._peers.data[self.charm.app], + ]: + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if int(relation_promoted_cluster_counter) > int(promoted_cluster_counter): + promoted_cluster_counter = relation_promoted_cluster_counter + return promoted_cluster_counter + + def _get_primary_cluster(self) -> Optional[Application]: + """Return the primary cluster.""" + primary_cluster = None + promoted_cluster_counter = "0" + for async_relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if async_relation is None: + continue + for app, relation_data in { + async_relation.app: async_relation.data, + self.charm.app: self.charm._peers.data, + }.items(): + databag = relation_data[app] + relation_promoted_cluster_counter = databag.get("promoted-cluster-counter", "0") + if relation_promoted_cluster_counter > promoted_cluster_counter: + promoted_cluster_counter = relation_promoted_cluster_counter + primary_cluster = app + return primary_cluster + + def get_primary_cluster_endpoint(self) -> Optional[str]: + """Return the primary cluster endpoint.""" + primary_cluster = self._get_primary_cluster() + if primary_cluster is None or self.charm.app == primary_cluster: + return None + relation = self._relation + primary_cluster_data = relation.data[relation.app].get("primary-cluster-data") + if primary_cluster_data is None: + return None + return json.loads(primary_cluster_data).get("endpoint") + + def _get_secret(self) -> Secret: + """Return async replication necessary secrets.""" + try: + # Avoid recreating the secret. + secret = self.charm.model.get_secret(label=self._secret_label) + if not secret.id: + # Workaround for the secret id not being set with model uuid. + secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" + return secret + except SecretNotFoundError: + logger.warning("Secret not found, creating a new one") + pass + + app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + content = app_secret.peek_content() + + # Filter out unnecessary secrets. + shared_content = dict(filter(lambda x: "password" in x[0], content.items())) + + return self.charm.model.app.add_secret(content=shared_content, label=self._secret_label) + + def get_standby_endpoints(self) -> List[str]: + """Return the standby endpoints.""" + relation = self._relation + primary_cluster = self._get_primary_cluster() + # List the standby endpoints only for the primary cluster. + if relation is None or primary_cluster is None or self.charm.app != primary_cluster: + return [] + return [ + relation.data[unit].get("unit-address") + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ] + if relation is not None + for unit in relation.units + if relation.data[unit].get("unit-address") is not None + ] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + try: + system_identifier, error = self.container.exec( + [ + f'/usr/lib/postgresql/{self.charm._patroni.rock_postgresql_version.split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ).wait_output() + except ChangeError as e: + return None, str(e) + if error != "": + return None, error + system_identifier = [ + line for line in system_identifier.splitlines() if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def _get_unit_ip(self) -> str: + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + def _handle_database_start(self, event: RelationChangedEvent) -> None: + """Handle the database start in the standby cluster.""" + try: + if self.charm._patroni.member_started: + self.charm._peers.data[self.charm.unit].update({"stopped": ""}) + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() + }) + + if self.charm.unit.is_leader(): + if all( + self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm._peers.data[self.charm.app].update({ + "cluster_initialised": "True" + }) + elif self._is_following_promoted_cluster(): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be started in all units" + ) + event.defer() + return + + self.charm._set_active_status() + elif not self.charm.unit.is_leader(): + raise NotReadyError() + else: + self.charm.fix_leader_annotation() + self.charm.unit.status = WaitingStatus( + "Still starting the database in the standby leader" + ) + event.defer() + except NotReadyError: + self.charm.unit.status = WaitingStatus("Waiting for the database to start") + logger.debug("Deferring on_async_relation_changed: database hasn't started yet.") + event.defer() + + def handle_read_only_mode(self) -> None: + """Handle read-only mode.""" + promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( + "promoted-cluster-counter", "" + ) + if not self.charm.is_blocked or ( + promoted_cluster_counter != "0" + and self.charm.unit.status.message == READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm._set_active_status() + if ( + promoted_cluster_counter == "0" + and self.charm.unit.status.message != READ_ONLY_MODE_BLOCKING_MESSAGE + ): + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + + def _is_following_promoted_cluster(self) -> bool: + """Return True if this cluster is following the promoted cluster.""" + if self._get_primary_cluster() is None: + return False + return ( + self.charm._peers.data[self.charm.unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + ) + + def _is_primary_cluster(self) -> bool: + """Return the primary cluster name.""" + return self.charm.app == self._get_primary_cluster() + + def _on_async_relation_broken(self, _) -> None: + if "departing" in self.charm._peers.data[self.charm.unit]: + logger.debug("Early exit on_async_relation_broken: Skipping departing unit.") + return + + self.charm._peers.data[self.charm.unit].update({ + "stopped": "", + "unit-promoted-cluster-counter": "", + }) + + if self.charm._patroni.get_standby_leader() is not None: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) + self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) + else: + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": ""}) + self.charm.update_config() + + def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: + """Update the Patroni configuration if one of the clusters was already promoted.""" + primary_cluster = self._get_primary_cluster() + logger.warning("Primary cluster: %s", primary_cluster) + if primary_cluster is None: + logger.debug("Early exit on_async_relation_changed: No primary cluster found.") + return + + if self._configure_primary_cluster(primary_cluster, event): + return + + # Return if this is a new unit. + if not self.charm.unit.is_leader() and self._is_following_promoted_cluster(): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return + + if not self._stop_database(event): + return + + if not all( + "stopped" in self.charm._peers.data[unit] + or self.charm._peers.data[unit].get("unit-promoted-cluster-counter") + == self._get_highest_promoted_cluster_counter_value() + for unit in {*self.charm._peers.units, self.charm.unit} + ): + self.charm.unit.status = WaitingStatus( + "Waiting for the database to be stopped in all units" + ) + logger.debug("Deferring on_async_relation_changed: not all units stopped.") + event.defer() + return + + if self._wait_for_standby_leader(event): + return + + # Update the asynchronous replication configuration and start the database. + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + self._handle_database_start(event) + + def _on_async_relation_departed(self, event: RelationDepartedEvent) -> None: + """Set a flag to avoid setting a wrong status message on relation broken event handler.""" + # This is needed because of https://bugs.launchpad.net/juju/+bug/1979811. + if event.departing_unit == self.charm.unit: + self.charm._peers.data[self.charm.unit].update({"departing": "True"}) + + def _on_async_relation_joined(self, _) -> None: + """Publish this unit address in the relation data.""" + self._relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + + # Set the counter for new units. + highest_promoted_cluster_counter = self._get_highest_promoted_cluster_counter_value() + if highest_promoted_cluster_counter != "0": + self.charm._peers.data[self.charm.unit].update({ + "unit-promoted-cluster-counter": highest_promoted_cluster_counter + }) + + def _on_promote_cluster(self, event: ActionEvent) -> None: + """Promote this cluster to the primary cluster.""" + if not self._can_promote_cluster(event): + return + + relation = self._relation + + # Check if all units from the other cluster published their pod IPs in the relation data. + # If not, fail the action telling that all units must publish their pod addresses in the + # relation data. + for unit in relation.units: + if "unit-address" not in relation.data[unit]: + event.fail( + "All units from the other cluster must publish their pod addresses in the relation data." + ) + return + + system_identifier, error = self.get_system_identifier() + if error is not None: + logger.exception(error) + event.fail("Failed to get system identifier") + return + + # Increment the current cluster counter in this application side based on the highest counter value. + promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) + promoted_cluster_counter += 1 + logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + + self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) + + # Emit an async replication changed event for this unit (to promote this cluster before demoting the + # other if this one is a standby cluster, which is needed to correctly setup the async replication + # when performing a switchover). + self._re_emit_async_relation_changed_event() + + # Set the status. + self.charm.unit.status = MaintenanceStatus("Promoting cluster...") + + @property + def _primary_cluster_endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm.get_unit_ip(unit) + else: + return self.charm.get_unit_ip(self.charm.unit) + + def _re_emit_async_relation_changed_event(self) -> None: + """Re-emit the async relation changed event.""" + relation = self._relation + getattr(self.charm.on, f'{relation.name.replace("-", "_")}_relation_changed').emit( + relation, + app=relation.app, + unit=[unit for unit in relation.units if unit.app == relation.app][0], + ) + + @property + def _relation(self) -> Relation: + """Return the relation object.""" + for relation in [ + self.model.get_relation(ASYNC_PRIMARY_RELATION), + self.model.get_relation(ASYNC_REPLICA_RELATION), + ]: + if relation is not None: + return relation + + def _remove_previous_cluster_information(self) -> None: + client = Client() + for values in itertools.product( + [Endpoints, Service], + [ + f"patroni-{self.charm._name}", + f"patroni-{self.charm._name}-config", + f"patroni-{self.charm._name}-sync", + ], + ): + try: + client.delete( + values[0], + name=values[1], + namespace=self.charm._namespace, + ) + logger.warning(f"Deleted {values[0]} {values[1]}") + except ApiError as e: + # Ignore the error only when the resource doesn't exist. + if e.status.code != 404: + raise e + logger.warning(f"{values[0]} {values[1]} not found") + + @property + def _secret_label(self) -> str: + """Return the secret label.""" + return f"async-replication-secret-{self._get_highest_promoted_cluster_counter_value()}" + + def _stop_database(self, event: RelationChangedEvent) -> bool: + """Stop the database.""" + if ( + "stopped" not in self.charm._peers.data[self.charm.unit] + and not self._is_following_promoted_cluster() + ): + if not self.charm.unit.is_leader() and not self.container.exists(POSTGRESQL_DATA_PATH): + logger.debug("Early exit on_async_relation_changed: following promoted cluster.") + return False + + self.container.stop(self.charm._postgresql_service) + + if self.charm.unit.is_leader(): + self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) + if not self._configure_standby_cluster(event): + return False + + logger.info("Removing and recreating pgdata folder") + self.container.exec(f"rm -r {POSTGRESQL_DATA_PATH}".split()).wait_output() + self.charm._create_pgdata(self.container) + + self.charm._peers.data[self.charm.unit].update({"stopped": "True"}) + + return True + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + relation = self._relation + if relation is None: + return + relation.data[self.charm.unit].update({"unit-address": self._get_unit_ip()}) + if self._is_primary_cluster() and self.charm.unit.is_leader(): + self._update_primary_cluster_data() + + def _update_primary_cluster_data( + self, promoted_cluster_counter: int = None, system_identifier: str = None + ) -> None: + """Update the primary cluster data.""" + async_relation = self._relation + + if promoted_cluster_counter is not None: + for relation in [async_relation, self.charm._peers]: + relation.data[self.charm.app].update({ + "promoted-cluster-counter": str(promoted_cluster_counter) + }) + + # Update the data in the relation. + primary_cluster_data = { + "endpoint": self._primary_cluster_endpoint, + "postgresql-version": self.charm._patroni.rock_postgresql_version, + } + + # Retrieve the secrets that will be shared between the clusters. + if async_relation.name == ASYNC_PRIMARY_RELATION: + secret = self._get_secret() + secret.grant(async_relation) + primary_cluster_data["secret-id"] = secret.id + + if system_identifier is not None: + primary_cluster_data["system-id"] = system_identifier + + async_relation.data[self.charm.app]["primary-cluster-data"] = json.dumps( + primary_cluster_data + ) + + def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: + """Wait for the standby leader to be up and running.""" + try: + standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) + except RetryError: + standby_leader = None + logger.warning("Standby leader: %s", standby_leader) + if not self.charm.unit.is_leader() and standby_leader is None: + self.charm.unit.status = WaitingStatus( + "Waiting for the standby leader start the database" + ) + logger.debug("Deferring on_async_relation_changed: standby leader hasn't started yet.") + event.defer() + return True + return False diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 6d3c50802f..2d4ee5f92f 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -51,6 +51,11 @@ bootstrap: command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif primary_cluster_endpoint %} + standby_cluster: + host: {{ primary_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - auth-host: md5 @@ -61,6 +66,9 @@ bootstrap: pg_hba: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} bypass_api_service: true log: dir: /var/log/postgresql @@ -116,6 +124,10 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.6/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}/32 md5 + {%- endfor %} {%- for endpoint in endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5 {%- endfor %} diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index fbae133dc0..9e6a00d6cd 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -128,6 +128,7 @@ def test_on_leader_elected(self, _, __, ___, _set_secret, _get_secret, _____, _c @patch("charm.PostgresqlOperatorCharm._set_active_status") @patch("charm.Patroni.rock_postgresql_version", new_callable=PropertyMock) @patch("charm.Patroni.primary_endpoint_ready", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.enable_disable_extensions") @patch("charm.PostgresqlOperatorCharm.update_config") @patch("charm.PostgresqlOperatorCharm.postgresql") @patch( @@ -149,6 +150,7 @@ def test_on_postgresql_pebble_ready( _create_services, _postgresql, ___, + ____, _primary_endpoint_ready, _rock_postgresql_version, _set_active_status, @@ -1201,6 +1203,7 @@ def test_on_peer_relation_changed( @patch("charm.Patroni.reinitialize_postgresql") @patch("charm.Patroni.member_streaming", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) @patch("charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock) @patch("charm.Patroni.is_database_running", new_callable=PropertyMock) @patch("charm.Patroni.member_started", new_callable=PropertyMock) @@ -1211,6 +1214,7 @@ def test_handle_processes_failures( _member_started, _is_database_running, _is_primary, + _is_standby_leader, _member_streaming, _reinitialize_postgresql, ): @@ -1270,6 +1274,7 @@ def test_handle_processes_failures( # Test when the unit is a replica and it's not streaming from primary. _restart.reset_mock() _is_primary.return_value = False + _is_standby_leader.return_value = False _member_streaming.return_value = False for values in itertools.product( [None, RetryError(last_attempt=1)], [True, False], [True, False] @@ -1444,8 +1449,9 @@ def test_handle_postgresql_restart_need( _restart.assert_not_called() @patch("charm.Patroni.member_started", new_callable=PropertyMock) + @patch("charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock) @patch("charm.Patroni.get_primary") - def test_set_active_status(self, _get_primary, _member_started): + def test_set_active_status(self, _get_primary, _is_standby_leader, _member_started): for values in itertools.product( [ RetryError(last_attempt=1), @@ -1453,26 +1459,42 @@ def test_set_active_status(self, _get_primary, _member_started): self.charm.unit.name, f"{self.charm.app.name}/2", ], + [ + RetryError(last_attempt=1), + ConnectionError, + True, + False, + ], [True, False], ): self.charm.unit.status = MaintenanceStatus("fake status") - _member_started.return_value = values[1] + _member_started.return_value = values[2] if isinstance(values[0], str): _get_primary.side_effect = None _get_primary.return_value = values[0] - self.charm._set_active_status() - self.assertIsInstance( - self.charm.unit.status, - ActiveStatus - if values[0] == self.charm.unit.name or values[1] - else MaintenanceStatus, - ) - self.assertEqual( - self.charm.unit.status.message, - "Primary" - if values[0] == self.charm.unit.name - else ("" if values[1] else "fake status"), - ) + if values[0] != self.charm.unit.name and not isinstance(values[1], bool): + _is_standby_leader.side_effect = values[1] + _is_standby_leader.return_value = None + self.charm._set_active_status() + self.assertIsInstance(self.charm.unit.status, MaintenanceStatus) + else: + _is_standby_leader.side_effect = None + _is_standby_leader.return_value = values[1] + self.charm._set_active_status() + self.assertIsInstance( + self.charm.unit.status, + ActiveStatus + if values[0] == self.charm.unit.name or values[1] or values[2] + else MaintenanceStatus, + ) + self.assertEqual( + self.charm.unit.status.message, + "Primary" + if values[0] == self.charm.unit.name + else ( + "Standby Leader" if values[1] else ("" if values[2] else "fake status") + ), + ) else: _get_primary.side_effect = values[0] _get_primary.return_value = None From d82230fbc52f6d1e786e9ca042a2ecdbfc283b5b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 17 Apr 2024 17:17:09 -0300 Subject: [PATCH 02/12] Add async replication integration tests Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/helpers.py | 112 ++++- .../ha_tests/test_async_replication.py | 468 ++++++++++++++++++ tests/integration/helpers.py | 45 +- 3 files changed, 584 insertions(+), 41 deletions(-) create mode 100644 tests/integration/ha_tests/test_async_replication.py diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4d1d50e11d..987daa7060 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -12,6 +12,7 @@ import kubernetes as kubernetes import psycopg2 import requests +from juju.model import Model from kubernetes import config from kubernetes.client.api import core_v1_api from kubernetes.stream import stream @@ -184,10 +185,10 @@ async def is_member_isolated( return True -async def check_writes(ops_test) -> int: +async def check_writes(ops_test, extra_model: Model = None) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes, max_number_written = await count_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model) for member, count in actual_writes.items(): assert ( count == max_number_written[member] @@ -196,13 +197,17 @@ async def check_writes(ops_test) -> int: return total_expected_writes -async def are_writes_increasing(ops_test, down_unit: str = None) -> None: +async def are_writes_increasing( + ops_test, down_unit: str = None, extra_model: Model = None +) -> None: """Verify new writes are continuing by counting the number of writes.""" - writes, _ = await count_writes(ops_test, down_unit=down_unit) + writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model) for member, count in writes.items(): for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True): with attempt: - more_writes, _ = await count_writes(ops_test, down_unit=down_unit) + more_writes, _ = await count_writes( + ops_test, down_unit=down_unit, extra_model=extra_model + ) assert ( more_writes[member] > count ), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})" @@ -265,28 +270,34 @@ def copy_file_into_pod( async def count_writes( - ops_test: OpsTest, down_unit: str = None + ops_test: OpsTest, down_unit: str = None, extra_model: Model = None ) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) - status = await ops_test.model.get_status() - for unit_name, unit in status["applications"][app]["units"].items(): - if unit_name != down_unit: - cluster = get_patroni_cluster(unit["address"]) - break + members = [] + for model in [ops_test.model, extra_model]: + if model is None: + continue + status = await model.get_status() + for unit_name, unit in status["applications"][app]["units"].items(): + if unit_name != down_unit: + members_data = get_patroni_cluster(unit["address"])["members"] + for index, member_data in enumerate(members_data): + members_data[index]["model"] = model.info.name + members.extend(members_data) + break count = {} maximum = {} - for member in cluster["members"]: + for member in members: if member["role"] != "replica" and member["host"].split(".")[0] != ( down_unit or "" ).replace("/", "-"): host = member["host"] # Translate the service hostname to an IP address. - model = ops_test.model.info - client = Client(namespace=model.name) + client = Client(namespace=member["model"]) service = client.get(Pod, name=host.split(".")[0]) ip = service.status.podIP @@ -295,12 +306,23 @@ async def count_writes( f" host='{ip}' password='{password}' connect_timeout=10" ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count[member["name"]] = results[0] - maximum[member["name"]] = results[1] - connection.close() + member_name = f'{member["model"]}.{member["name"]}' + connection = None + try: + with psycopg2.connect( + connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member_name] = results[0] + maximum[member_name] = results[1] + except psycopg2.Error: + # Error raised when the connection is not possible. + count[member_name] = -1 + maximum[member_name] = -1 + finally: + if connection is not None: + connection.close() return count, maximum @@ -415,6 +437,42 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op return parameter_value +async def get_standby_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "standby_leader": + return member["name"] + + +async def get_sync_standby(model: Model, application_name: str) -> str: + """Get the sync_standby name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the sync standby. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "sync_standby": + return member["name"] + + @retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool: """Test a connection to a PostgreSQL server.""" @@ -720,23 +778,25 @@ async def send_signal_to_process( ) -async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: +async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None: """Start continuous writes to PostgreSQL.""" # Start the process by relating the application to the database or # by calling the action if the relation already exists. + if model is None: + model = ops_test.model relations = [ relation - for relation in ops_test.model.applications[app].relations + for relation in model.applications[app].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{APPLICATION_NAME}:first-database" ] if not relations: - await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database") - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await model.relate(app, f"{APPLICATION_NAME}:first-database") + await model.wait_for_idle(status="active", timeout=1000) else: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) @@ -744,7 +804,7 @@ async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): with attempt: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py new file mode 100644 index 0000000000..dbe1669354 --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication.py @@ -0,0 +1,468 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import contextlib +import logging +from asyncio import gather +from typing import Optional + +import psycopg2 +import pytest as pytest +from juju.model import Model +from lightkube import Client +from lightkube.resources.core_v1 import Pod +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from tests.integration.ha_tests.helpers import ( + are_writes_increasing, + check_writes, + get_standby_leader, + get_sync_standby, + start_continuous_writes, +) +from tests.integration.helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + build_and_deploy, + get_leader_unit, + get_password, + get_primary, + get_unit_address, + wait_for_relation_removed_between, +) + +logger = logging.getLogger(__name__) + + +FAST_INTERVAL = "30s" +IDLE_PERIOD = 15 +TIMEOUT = 2000 + + +@contextlib.asynccontextmanager +async def fast_forward( + model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None +): + """Adaptation of OpsTest.fast_forward to work with different models.""" + update_interval_key = "update-status-hook-interval" + if slow_interval: + interval_after = slow_interval + else: + interval_after = (await model.get_config())[update_interval_key] + + await model.set_config({update_interval_key: fast_interval}) + yield + await model.set_config({update_interval_key: interval_after}) + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Model: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model(ops_test: OpsTest, first_model, request) -> Model: + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + if second_model_name not in await ops_test._controller.list_models(): + await ops_test._controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + yield second_model + if request.config.getoption("--keep-models"): + return + logger.info("Destroying second model") + await ops_test._controller.destroy_model(second_model_name, destroy_storage=True) + + +@pytest.fixture +async def second_model_continuous_writes(second_model) -> None: + """Cleans up continuous writes on the second model after a test run.""" + yield + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await second_model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_setup( + ops_test: OpsTest, first_model: Model, second_model: Model +) -> None: + """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" + await build_and_deploy(ops_test, 3, wait_for_idle=False) + await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + await second_model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(), fast_forward(second_model): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + ) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + continuous_writes, +) -> None: + """Test async replication between two PostgreSQL clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + await ops_test.juju(*first_offer_command.split()) + first_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" + ) + await ops_test.juju(*first_consume_command.split()) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-primary") + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_switchover( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +): + """Test switching over to the second cluster.""" + second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + await ops_test.juju(*second_offer_command.split()) + second_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" + ) + await ops_test.juju(*second_consume_command.split()) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the second cluster") + run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True}) + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME, model=second_model) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_promote_standby( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +) -> None: + """Test promoting the standby cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="blocked", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("removing the previous data") + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, primary) + password = await get_password(ops_test) + database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' + connection = None + try: + connection = psycopg2.connect( + f"dbname={database_name} user=operator password={password} host={address}" + ) + connection.autocommit = True + cursor = connection.cursor() + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + except psycopg2.Error as e: + assert False, f"Failed to drop continuous writes table: {e}" + finally: + if connection is not None: + connection.close() + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + client = Client(namespace=first_model.info.name) + client.delete(Pod, name=sync_standby.replace("/", "-")) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + client = Client(namespace=second_model.info.name) + client.delete(Pod, name=standby_leader.replace("/", "-")) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 2bdcd36e1c..3ae53233d4 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -11,6 +11,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from juju.unit import Unit from lightkube.core.client import Client from lightkube.core.exceptions import ApiError @@ -38,7 +39,9 @@ charm = None -async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") -> Optional[str]: +async def app_name( + ops_test: OpsTest, application_name: str = "postgresql-k8s", model: Model = None +) -> Optional[str]: """Returns the name of the cluster running PostgreSQL. This is important since not all deployments of the PostgreSQL charm have the application name @@ -46,8 +49,10 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") Note: if multiple clusters are running PostgreSQL this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: + if model is None: + model = ops_test.model + status = await model.get_status() + for app in model.applications: if application_name in status["applications"][app]["charm"]: return app @@ -60,11 +65,15 @@ async def build_and_deploy( database_app_name: str = DATABASE_APP_NAME, wait_for_idle: bool = True, status: str = "active", + model: Model = None, ) -> None: """Builds the charm and deploys a specified number of units.""" + if model is None: + model = ops_test.model + # It is possible for users to provide their own cluster for testing. Hence, check if there # is a pre-existing cluster. - if await app_name(ops_test, database_app_name): + if await app_name(ops_test, database_app_name, model): return global charm @@ -74,7 +83,7 @@ async def build_and_deploy( "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"], } ( - await ops_test.model.deploy( + await model.deploy( charm, resources=resources, application_name=database_app_name, @@ -86,7 +95,7 @@ async def build_and_deploy( ) if wait_for_idle: # Wait until the PostgreSQL charm is successfully deployed. - await ops_test.model.wait_for_idle( + await model.wait_for_idle( apps=[database_app_name], status=status, raise_on_blocked=True, @@ -408,9 +417,11 @@ def get_expected_k8s_resources(application: str) -> set: } -async def get_leader_unit(ops_test: OpsTest, app: str) -> Optional[Unit]: +async def get_leader_unit(ops_test: OpsTest, app: str, model: Model = None) -> Optional[Unit]: leader_unit = None - for unit in ops_test.model.applications[app].units: + if model is None: + model = ops_test.model + for unit in model.applications[app].units: if await unit.is_leader_from_status(): leader_unit = unit break @@ -588,13 +599,16 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool return False -def has_relation_exited(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) -> bool: +def has_relation_exited( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> bool: """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" - for rel in ops_test.model.relations: + relations = model.relations if model is not None else ops_test.model.relations + for rel in relations: endpoints = [endpoint.name for endpoint in rel.endpoints] - if endpoint_one not in endpoints and endpoint_two not in endpoints: - return True - return False + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True @retry( @@ -746,7 +760,7 @@ async def wait_for_idle_on_blocked( def wait_for_relation_removed_between( - ops_test: OpsTest, endpoint_one: str, endpoint_two: str + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None ) -> None: """Wait for relation to be removed before checking if it's waiting or idle. @@ -754,11 +768,12 @@ def wait_for_relation_removed_between( ops_test: running OpsTest instance endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. """ try: for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): with attempt: - if has_relation_exited(ops_test, endpoint_one, endpoint_two): + if has_relation_exited(ops_test, endpoint_one, endpoint_two, model): break except RetryError: assert False, "Relation failed to exit after 3 minutes." From 9a8e4e9f2ee679ef49e49635348adbdc703bdb1b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 15:12:17 -0300 Subject: [PATCH 03/12] Add test for scaling Signed-off-by: Marcelo Henrique Neppel --- .../ha_tests/test_async_replication.py | 51 +++++++++++++++++++ tests/integration/helpers.py | 15 ++++-- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index dbe1669354..4d007d008b 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -29,6 +29,7 @@ get_password, get_primary, get_unit_address, + scale_application, wait_for_relation_removed_between, ) @@ -466,3 +467,53 @@ async def test_async_replication_failover_in_secondary_cluster( # (check that all the units have all the writes). logger.info("checking whether no writes were lost") await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_scaling( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication works when scaling the clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + logger.info("scaling out the first cluster") + first_cluster_original_size = len(first_model.applications[DATABASE_APP_NAME].units) + await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size + 1) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling out the second cluster") + second_cluster_original_size = len(second_model.applications[DATABASE_APP_NAME].units) + await scale_application( + ops_test, DATABASE_APP_NAME, second_cluster_original_size + 1, model=second_model + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling in the first cluster") + await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling in the second cluster") + await scale_application( + ops_test, DATABASE_APP_NAME, second_cluster_original_size, model=second_model + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 3ae53233d4..456de270a5 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -676,22 +676,27 @@ async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) - return stdout -async def scale_application(ops_test: OpsTest, application_name: str, scale: int) -> None: +async def scale_application( + ops_test: OpsTest, application_name: str, scale: int, model: Model = None +) -> None: """Scale a given application to a specific unit count. Args: ops_test: The ops test framework instance application_name: The name of the application scale: The number of units to scale to + model: The model to scale the application in """ - await ops_test.model.applications[application_name].scale(scale) + if model is None: + model = ops_test.model + await model.applications[application_name].scale(scale) if scale == 0: - await ops_test.model.block_until( - lambda: len(ops_test.model.applications[DATABASE_APP_NAME].units) == scale, + await model.block_until( + lambda: len(model.applications[DATABASE_APP_NAME].units) == scale, timeout=1000, ) else: - await ops_test.model.wait_for_idle( + await model.wait_for_idle( apps=[application_name], status="active", timeout=1000, From 455a82c5cf88307454f722fc65797d8873a598d9 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 20:21:47 -0300 Subject: [PATCH 04/12] Speedup tests Signed-off-by: Marcelo Henrique Neppel --- tests/integration/ha_tests/test_async_replication.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 4d007d008b..448ddf1294 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -36,8 +36,9 @@ logger = logging.getLogger(__name__) -FAST_INTERVAL = "30s" -IDLE_PERIOD = 15 +CLUSTER_SIZE = 3 +FAST_INTERVAL = "10s" +IDLE_PERIOD = 5 TIMEOUT = 2000 @@ -102,8 +103,8 @@ async def test_deploy_async_replication_setup( ops_test: OpsTest, first_model: Model, second_model: Model ) -> None: """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" - await build_and_deploy(ops_test, 3, wait_for_idle=False) - await build_and_deploy(ops_test, 3, wait_for_idle=False, model=second_model) + await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False) + await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False, model=second_model) await ops_test.model.deploy(APPLICATION_NAME, num_units=1) await second_model.deploy(APPLICATION_NAME, num_units=1) From 6a3009e6e7c1e26770f244e13bf7fd8132cbeef4 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 20:22:55 -0300 Subject: [PATCH 05/12] Backup standby pgdata folder Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 5e18ab8c86..1b14cfa7ce 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -6,6 +6,7 @@ import itertools import json import logging +from datetime import datetime from typing import List, Optional, Tuple from lightkube import ApiError, Client @@ -192,6 +193,17 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: self.charm.set_secret(APP_SCOPE, key, password) logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(error) + if system_identifier != relation.data[relation.app].get("system-id"): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" + self.container.exec( + f"tar -zcf {filename} {POSTGRESQL_DATA_PATH}".split() + ).wait_output() + logger.warning("Please review the backup file %s and handle its removal", filename) self._remove_previous_cluster_information() return True From 79452cf732b4ae75b6f45a19b325685d080a169d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 22 Apr 2024 22:07:20 -0300 Subject: [PATCH 06/12] Improve comments and logs Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 62 ++++++++++++++++++------------ 1 file changed, 38 insertions(+), 24 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 1b14cfa7ce..bc86b628ee 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -1,7 +1,18 @@ # Copyright 2024 Canonical Ltd. # See LICENSE file for licensing details. -"""Async Replication implementation.""" +"""Async Replication implementation. + +The highest "promoted-cluster-counter" value is used to determine the primary cluster. +The application in any side of the relation which has the highest value in its application +relation databag is considered the primary cluster. + +The "unit-promoted-cluster-counter" field in the unit relation databag is used to determine +if the unit is following the promoted cluster. If the value is the same as the highest value +in the application relation databag, then the unit is following the promoted cluster. +Otherwise, it's needed to restart the database in the unit to follow the promoted cluster +if the unit is from the standby cluster (the one that was not promoted). +""" import itertools import json @@ -87,10 +98,6 @@ def __init__(self, charm): self.container = self.charm.unit.get_container("postgresql") - # def _are_cluster_versions_compatible(self) -> bool: - # """Check if the cluster versions are compatible.""" - # return primary_cluster.data.get("postgresql-version") - def _can_promote_cluster(self, event: ActionEvent) -> bool: """Check if the cluster can be promoted.""" if not self.charm.is_cluster_initialised: @@ -126,6 +133,8 @@ def _can_promote_cluster(self, event: ActionEvent) -> bool: event.fail("This cluster is already the primary cluster.") return False + # To promote the other cluster if there is already a primary cluster, the action must be called with + # `force-promotion=true`. If not, fail the action telling that the other cluster is already the primary. if relation.app == primary_cluster: if not event.params.get("force-promotion"): event.fail( @@ -149,6 +158,8 @@ def _configure_primary_cluster( self.charm.update_config() if self._is_primary_cluster() and self.charm.unit.is_leader(): self._update_primary_cluster_data() + # If this is a standby cluster, remove the information from DCS to make it + # a normal cluster. if self.charm._patroni.get_standby_leader() is not None: self.charm._patroni.promote_standby_cluster() try: @@ -173,6 +184,7 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: """Configure the standby cluster.""" relation = self._relation if relation.name == ASYNC_REPLICA_RELATION: + # Update the secrets between the clusters. primary_cluster_info = relation.data[relation.app].get("primary-cluster-data") secret_id = ( None @@ -182,22 +194,19 @@ def _configure_standby_cluster(self, event: RelationChangedEvent) -> bool: try: secret = self.charm.model.get_secret(id=secret_id, label=self._secret_label) except SecretNotFoundError: - logger.warning("Secret not found, deferring event") logger.debug("Secret not found, deferring event") event.defer() return False credentials = secret.peek_content() - logger.warning("Credentials: %s", credentials) for key, password in credentials.items(): user = key.split("-password")[0] self.charm.set_secret(APP_SCOPE, key, password) - logger.warning("Synced %s password to %s", user, password) logger.debug("Synced %s password", user) system_identifier, error = self.get_system_identifier() if error is not None: raise Exception(error) if system_identifier != relation.data[relation.app].get("system-id"): - # Store current data in a ZIP file, clean folder and generate configuration. + # Store current data in a tar.gz file. logger.info("Creating backup of pgdata folder") filename = f"{POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.tar.gz" self.container.exec( @@ -267,7 +276,7 @@ def _get_secret(self) -> Secret: secret._id = f"secret://{self.model.uuid}/{secret.get_info().id.split(':')[1]}" return secret except SecretNotFoundError: - logger.warning("Secret not found, creating a new one") + logger.debug("Secret not found, creating a new one") pass app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") @@ -332,12 +341,16 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: """Handle the database start in the standby cluster.""" try: if self.charm._patroni.member_started: + # If the database is started, update the databag in a way the unit is marked as configured + # for async replication. self.charm._peers.data[self.charm.unit].update({"stopped": ""}) self.charm._peers.data[self.charm.unit].update({ "unit-promoted-cluster-counter": self._get_highest_promoted_cluster_counter_value() }) if self.charm.unit.is_leader(): + # If this unit is the leader, check if all units are ready before making the cluster + # active again (including the health checks from the update status hook). if all( self.charm._peers.data[unit].get("unit-promoted-cluster-counter") == self._get_highest_promoted_cluster_counter_value() @@ -357,6 +370,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: elif not self.charm.unit.is_leader(): raise NotReadyError() else: + # If the standby leader fails to start, fix the leader annotation and defer the event. self.charm.fix_leader_annotation() self.charm.unit.status = WaitingStatus( "Still starting the database in the standby leader" @@ -368,7 +382,7 @@ def _handle_database_start(self, event: RelationChangedEvent) -> None: event.defer() def handle_read_only_mode(self) -> None: - """Handle read-only mode.""" + """Handle read-only mode (standby cluster that lost the relation with the primary cluster).""" promoted_cluster_counter = self.charm._peers.data[self.charm.app].get( "promoted-cluster-counter", "" ) @@ -384,7 +398,7 @@ def handle_read_only_mode(self) -> None: self.charm.unit.status = BlockedStatus(READ_ONLY_MODE_BLOCKING_MESSAGE) def _is_following_promoted_cluster(self) -> bool: - """Return True if this cluster is following the promoted cluster.""" + """Return True if this unit is following the promoted cluster.""" if self._get_primary_cluster() is None: return False return ( @@ -406,6 +420,8 @@ def _on_async_relation_broken(self, _) -> None: "unit-promoted-cluster-counter": "", }) + # If this is the standby cluster, set 0 in the "promoted-cluster-counter" field to set + # the cluster in read-only mode message also in the other units. if self.charm._patroni.get_standby_leader() is not None: if self.charm.unit.is_leader(): self.charm._peers.data[self.charm.app].update({"promoted-cluster-counter": "0"}) @@ -418,7 +434,7 @@ def _on_async_relation_broken(self, _) -> None: def _on_async_relation_changed(self, event: RelationChangedEvent) -> None: """Update the Patroni configuration if one of the clusters was already promoted.""" primary_cluster = self._get_primary_cluster() - logger.warning("Primary cluster: %s", primary_cluster) + logger.debug("Primary cluster: %s", primary_cluster) if primary_cluster is None: logger.debug("Early exit on_async_relation_changed: No primary cluster found.") return @@ -499,7 +515,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: # Increment the current cluster counter in this application side based on the highest counter value. promoted_cluster_counter = int(self._get_highest_promoted_cluster_counter_value()) promoted_cluster_counter += 1 - logger.warning("Promoted cluster counter: %s", promoted_cluster_counter) + logger.debug("Promoted cluster counter: %s", promoted_cluster_counter) self._update_primary_cluster_data(promoted_cluster_counter, system_identifier) @@ -513,7 +529,7 @@ def _on_promote_cluster(self, event: ActionEvent) -> None: @property def _primary_cluster_endpoint(self) -> str: - """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + """Return the endpoint from one of the sync-standbys, or from the primary if there is no sync-standby.""" sync_standby_names = self.charm._patroni.get_sync_standby_names() if len(sync_standby_names) > 0: unit = self.model.get_unit(sync_standby_names[0]) @@ -541,6 +557,7 @@ def _relation(self) -> Relation: return relation def _remove_previous_cluster_information(self) -> None: + """Remove the previous cluster information.""" client = Client() for values in itertools.product( [Endpoints, Service], @@ -556,12 +573,12 @@ def _remove_previous_cluster_information(self) -> None: name=values[1], namespace=self.charm._namespace, ) - logger.warning(f"Deleted {values[0]} {values[1]}") + logger.debug(f"Deleted {values[0]} {values[1]}") except ApiError as e: # Ignore the error only when the resource doesn't exist. if e.status.code != 404: raise e - logger.warning(f"{values[0]} {values[1]} not found") + logger.debug(f"{values[0]} {values[1]} not found") @property def _secret_label(self) -> str: @@ -581,10 +598,13 @@ def _stop_database(self, event: RelationChangedEvent) -> bool: self.container.stop(self.charm._postgresql_service) if self.charm.unit.is_leader(): + # Remove the "cluster_initialised" flag to avoid self-healing in the update status hook. self.charm._peers.data[self.charm.app].update({"cluster_initialised": ""}) if not self._configure_standby_cluster(event): return False + # Remove and recreate the pgdata folder to enable replication of the data from the + # primary cluster. logger.info("Removing and recreating pgdata folder") self.container.exec(f"rm -r {POSTGRESQL_DATA_PATH}".split()).wait_output() self.charm._create_pgdata(self.container) @@ -597,7 +617,6 @@ def update_async_replication_data(self) -> None: """Updates the async-replication data, if the unit is the leader. This is used to update the standby units with the new primary information. - If the unit is not the leader, then the data is removed from its databag. """ relation = self._relation if relation is None: @@ -618,11 +637,7 @@ def _update_primary_cluster_data( "promoted-cluster-counter": str(promoted_cluster_counter) }) - # Update the data in the relation. - primary_cluster_data = { - "endpoint": self._primary_cluster_endpoint, - "postgresql-version": self.charm._patroni.rock_postgresql_version, - } + primary_cluster_data = {"endpoint": self._primary_cluster_endpoint} # Retrieve the secrets that will be shared between the clusters. if async_relation.name == ASYNC_PRIMARY_RELATION: @@ -643,7 +658,6 @@ def _wait_for_standby_leader(self, event: RelationChangedEvent) -> bool: standby_leader = self.charm._patroni.get_standby_leader(check_whether_is_running=True) except RetryError: standby_leader = None - logger.warning("Standby leader: %s", standby_leader) if not self.charm.unit.is_leader() and standby_leader is None: self.charm.unit.status = WaitingStatus( "Waiting for the standby leader start the database" From a53fde3e79d5233bd0e935381348126d2fd1caba Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 23 Apr 2024 01:08:30 -0300 Subject: [PATCH 07/12] Remove unused constant Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index bc86b628ee..0748f94017 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -51,9 +51,6 @@ ASYNC_PRIMARY_RELATION = "async-primary" ASYNC_REPLICA_RELATION = "async-replica" -INCOMPATIBLE_CLUSTER_VERSIONS_BLOCKING_MESSAGE = ( - "Incompatible cluster versions - cannot enable async replication" -) READ_ONLY_MODE_BLOCKING_MESSAGE = "Cluster in read-only mode" From c0e82591cf5db9161adef42fd607db72a39ab389 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 29 Apr 2024 14:07:38 -0300 Subject: [PATCH 08/12] Remove warning log call and add optional type hint Signed-off-by: Marcelo Henrique Neppel --- src/patroni.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/patroni.py b/src/patroni.py index 32ffba7414..3295841c64 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -151,7 +151,7 @@ def get_primary(self, unit_name_pattern=False) -> str: def get_standby_leader( self, unit_name_pattern=False, check_whether_is_running: bool = False - ) -> str: + ) -> Optional[str]: """Get standby leader instance. Args: @@ -430,7 +430,6 @@ def render_patroni_yml_file( with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. - logger.warning(self._charm.async_replication.get_standby_endpoints()) rendered = template.render( connectivity=connectivity, enable_tls=enable_tls, From 55a48517d3448b4f62568881f1f8169386edbe3d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 29 Apr 2024 14:10:40 -0300 Subject: [PATCH 09/12] Fix juju3 markers Signed-off-by: Marcelo Henrique Neppel --- .../ha_tests/test_async_replication.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index 448ddf1294..c10611f797 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -14,6 +14,7 @@ from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed +from tests.integration import markers from tests.integration.ha_tests.helpers import ( are_writes_increasing, check_writes, @@ -97,7 +98,7 @@ async def second_model_continuous_writes(second_model) -> None: @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_deploy_async_replication_setup( ops_test: OpsTest, first_model: Model, second_model: Model @@ -124,7 +125,7 @@ async def test_deploy_async_replication_setup( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_async_replication( ops_test: OpsTest, @@ -202,7 +203,7 @@ async def test_async_replication( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_switchover( ops_test: OpsTest, @@ -257,7 +258,7 @@ async def test_switchover( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_promote_standby( ops_test: OpsTest, @@ -332,7 +333,7 @@ async def test_promote_standby( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_reestablish_relation( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes @@ -390,7 +391,7 @@ async def test_reestablish_relation( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_async_replication_failover_in_main_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes @@ -433,7 +434,7 @@ async def test_async_replication_failover_in_main_cluster( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_async_replication_failover_in_secondary_cluster( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes @@ -471,7 +472,7 @@ async def test_async_replication_failover_in_secondary_cluster( @pytest.mark.group(1) -@pytest.mark.juju3 +@markers.juju3 @pytest.mark.abort_on_fail async def test_scaling( ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes From d6707fbd79d0ce261dd058865095fff696753781 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 2 May 2024 12:02:04 -0300 Subject: [PATCH 10/12] Revert poetry.lock Signed-off-by: Marcelo Henrique Neppel --- poetry.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/poetry.lock b/poetry.lock index f66dfaa826..74133126a0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1645,6 +1645,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, From eadbbbaa2b8057dde240842712d8863e5d55c45d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 3 May 2024 14:18:25 -0300 Subject: [PATCH 11/12] Add relation name to secret label Signed-off-by: Marcelo Henrique Neppel --- src/relations/async_replication.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py index 0748f94017..f647f169b5 100644 --- a/src/relations/async_replication.py +++ b/src/relations/async_replication.py @@ -40,6 +40,7 @@ from constants import ( APP_SCOPE, + PEER, POSTGRESQL_DATA_PATH, WORKLOAD_OS_GROUP, WORKLOAD_OS_USER, @@ -276,7 +277,7 @@ def _get_secret(self) -> Secret: logger.debug("Secret not found, creating a new one") pass - app_secret = self.charm.model.get_secret(label=f"{self.model.app.name}.app") + app_secret = self.charm.model.get_secret(label=f"{PEER}.{self.model.app.name}.app") content = app_secret.peek_content() # Filter out unnecessary secrets. From 3508a439e1e9737da598165ef611d13154b422db Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 24 May 2024 15:58:16 -0300 Subject: [PATCH 12/12] Handle pebble socket not accessible while upgrading Signed-off-by: Marcelo Henrique Neppel --- src/charm.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/charm.py b/src/charm.py index 09ba21a1b0..da556e841d 100755 --- a/src/charm.py +++ b/src/charm.py @@ -722,6 +722,12 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # config-changed hook. # Get the postgresql container so we can configure/manipulate it. container = event.workload + if not container.can_connect(): + logger.debug( + "Defer on_postgresql_pebble_ready: Waiting for container to become available" + ) + event.defer() + return # Create the PostgreSQL data directory. This is needed on cloud environments # where the volume is mounted with more restrictive permissions.