diff --git a/actions.yaml b/actions.yaml index 44231ad4a9..a60db6765f 100644 --- a/actions.yaml +++ b/actions.yaml @@ -17,6 +17,23 @@ list-backups: description: Lists backups in s3 storage. pre-upgrade-check: description: Run necessary pre-upgrade checks and preparations before executing a charm refresh. +promote-standby-cluster: + description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. + params: + force: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. + force-really-really-mean-it: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. restore: description: Restore a database backup using pgBackRest. S3 credentials are retrieved from a relation with the S3 integrator charm. diff --git a/metadata.yaml b/metadata.yaml index b4bd1a4331..51ff15120f 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -26,6 +26,8 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication database: interface: postgresql_client db: @@ -37,6 +39,9 @@ provides: limit: 1 requires: + async-replica: + interface: async_replication + limit: 1 certificates: interface: tls-certificates limit: 1 diff --git a/poetry.lock b/poetry.lock index f6690f92b0..d07cd2791c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1571,7 +1571,6 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, diff --git a/src/charm.py b/src/charm.py index 1fdd086288..ead0960b6e 100755 --- a/src/charm.py +++ b/src/charm.py @@ -84,6 +84,7 @@ USER, USER_PASSWORD_KEY, ) +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model @@ -182,6 +183,7 @@ def __init__(self, *args): ], log_slots=[f"{POSTGRESQL_SNAP_NAME}:logs"], ) + self.async_manager = PostgreSQLAsyncReplication(self) def patroni_scrape_config(self) -> List[Dict]: """Generates scrape config for the Patroni metrics endpoint.""" @@ -676,6 +678,7 @@ def _hosts(self) -> set: def _patroni(self) -> Patroni: """Returns an instance of the Patroni object.""" return Patroni( + self, self._unit_ip, self.cluster_name, self._member_name, diff --git a/src/cluster.py b/src/cluster.py index d4ae2ac873..0b807b5199 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -47,10 +47,22 @@ RUNNING_STATES = ["running", "streaming"] +class ClusterNotPromotedError(Exception): + """Raised when a cluster is not promoted.""" + + class NotReadyError(Exception): """Raised when not all cluster members healthy or finished initial sync.""" +class EndpointNotReadyError(Exception): + """Raised when an endpoint is not ready.""" + + +class StandbyClusterAlreadyPromotedError(Exception): + """Raised when a standby cluster is already promoted.""" + + class RemoveRaftMemberFailedError(Exception): """Raised when a remove raft member failed for some reason.""" @@ -68,6 +80,7 @@ class Patroni: def __init__( self, + charm, unit_ip: str, cluster_name: str, member_name: str, @@ -81,6 +94,7 @@ def __init__( """Initialize the Patroni class. Args: + charm: PostgreSQL charm instance. unit_ip: IP address of the current unit cluster_name: name of the cluster member_name: name of the member inside the cluster @@ -91,6 +105,7 @@ def __init__( rewind_password: password for the user used on rewinds tls_enabled: whether TLS is enabled """ + self.charm = charm self.unit_ip = unit_ip self.cluster_name = cluster_name self.member_name = member_name @@ -241,6 +256,32 @@ def get_primary(self, unit_name_pattern=False) -> str: primary = "/".join(primary.rsplit("-", 1)) return primary + def get_standby_leader(self, unit_name_pattern=False) -> str: + """Get standby leader instance. + + Args: + unit_name_pattern: whether to convert pod name to unit name + + Returns: + standby leader pod or unit name. + """ + # Request info from cluster endpoint (which returns all members of the cluster). + for attempt in Retrying(stop=stop_after_attempt(2 * len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_patroni_url(attempt) + cluster_status = requests.get( + f"{url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", + verify=self.verify, + timeout=API_REQUEST_TIMEOUT, + ) + for member in cluster_status.json()["members"]: + if member["role"] == "standby_leader": + standby_leader = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + standby_leader = "/".join(standby_leader.rsplit("-", 1)) + return standby_leader + def get_sync_standby_names(self) -> List[str]: """Get the list of sync standby unit names.""" sync_standbys = [] @@ -296,12 +337,12 @@ def are_all_members_ready(self) -> bool: except RetryError: return False - # Check if all members are running and one of them is a leader (primary), - # because sometimes there may exist (for some period of time) only - # replicas after a failed switchover. + # Check if all members are running and one of them is a leader (primary) or + # a standby leader, because sometimes there may exist (for some period of time) + # only replicas after a failed switchover. return all( member["state"] in RUNNING_STATES for member in cluster_status.json()["members"] - ) and any(member["role"] == "leader" for member in cluster_status.json()["members"]) + ) and any(member["role"] in ["leader", "standby_leader"] for member in cluster_status.json()["members"]) def get_patroni_health(self) -> Dict[str, str]: """Gets, retires and parses the Patroni health endpoint.""" @@ -423,6 +464,19 @@ def is_member_isolated(self) -> bool: return len(cluster_status.json()["members"]) == 0 + def promote_standby_cluster(self) -> None: + """Promote a standby cluster to be a regular cluster.""" + config_response = requests.get(f"{self._patroni_url}/config", verify=self.verify) + if "standby_cluster" not in config_response.json(): + raise StandbyClusterAlreadyPromotedError("standby cluster is already promoted") + requests.patch( + f"{self._patroni_url}/config", verify=self.verify, json={"standby_cluster": None} + ) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + if self.get_primary() is None: + raise ClusterNotPromotedError("cluster not promoted") + def render_file(self, path: str, content: str, mode: int) -> None: """Write a content rendered from a template to a file. @@ -465,6 +519,9 @@ def render_patroni_yml_file( # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) + + primary = self.charm.async_manager.get_primary_data() + # Render the template file with the correct values. rendered = template.render( conf_path=PATRONI_CONF_PATH, @@ -480,8 +537,12 @@ def render_patroni_yml_file( scope=self.cluster_name, self_ip=self.unit_ip, superuser=USER, - superuser_password=self.superuser_password, - replication_password=self.replication_password, + superuser_password=primary["superuser-password"] + if primary + else self.superuser_password, + replication_password=primary["replication-password"] + if primary + else self.replication_password, rewind_user=REWIND_USER, rewind_password=self.rewind_password, enable_pgbackrest=stanza is not None, @@ -492,8 +553,12 @@ def render_patroni_yml_file( version=self.get_postgresql_version().split(".")[0], minority_count=self.planned_units // 2, pg_parameters=parameters, + standby_cluster_endpoint=primary["endpoint"] if primary else None, + extra_replication_endpoints={"{}/32".format(primary["endpoint"])} + if primary + else self.charm.async_manager.standby_endpoints(), ) - self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o600) + self.render_file(f"{PATRONI_CONF_PATH}/patroni.yaml", rendered, 0o644) def start_patroni(self) -> bool: """Start Patroni service using snap. diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py new file mode 100644 index 0000000000..2bd1c2becc --- /dev/null +++ b/src/coordinator_ops.py @@ -0,0 +1,219 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""The coordinated ops is a class that ensures a certain activity is ran together. + +The concept is similar to the "cohort" in snaps, where all units wait until they can +proceed to execute a certain activity, for example, restarting your service. + +The process starts with the leader issuing a new coordination request. Effectively, +that is implemented as the __coord_counter is increased +1 in the app level. +__coord_approved is set to "False". + +Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested +event. Once the unit done its task, it should ack the request. +Each unit should ack the request by equaling its own __coord_counter +to the app's value. + +Once all units ack'ed the __coord_counter, then the leader switches the +__coord_approved to "True". All units then will process that new change as a +"coordinator-approved" event and execute the activity they have been waiting. + +If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then +coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows: + + + class MyCharm: + + def __init__(self, *args): + self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm") + self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm") + + self.framework.observe( + self.stop_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.stop_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + self.framework.observe( + self.start_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.start_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + + def _a_method(): + # A method that kick starts the restarting coordination + ...... + if self.charm.unit.is_leader(): + self.stop_coordinator.coordinate() + + def _on_coordinator_requested(self, event): + if self.service_is_running and event.tag == "_stop_my_charm": + # We are in the stop-phase + self.service.stop() + self.stop_coordinator.acknowledge(event) + elif event.tag == "_start_my_charm": + # we are in the starting-phase + self.service.start() + self.start_coordinator.acknowledge(event) + + def _on_coordinator_approved(self, event): + # All units have ack'ed the activity, which means we have stopped. + if self.charm.unit.is_leader() and event.tag == "_stop_my_charm": + # Now kickstart the restarting process + self.start_coordinator.coordinate() +""" + + +import logging +from typing import AnyStr + +from ops.charm import ( + CharmBase, + CharmEvents, + EventSource, + RelationChangedEvent, +) +from ops.framework import EventBase, Handle, Object + +logger = logging.getLogger(__name__) + + +class CoordinatorEventBase(EventBase): + """Base event for the coordination activities.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle) + self._tag = tag + + @property + def tag(self): + """Returns the tag representing this coordinator's controllers.""" + return self._tag + + +class CoordinatorRequestedEvent(CoordinatorEventBase): + """Event to signal that the leader requested the units to coordinate a new activity.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle, tag) + + +class CoordinatorApprovedEvent(CoordinatorEventBase): + """Event to signal that all units ack'ed the coordination request and can proceed.""" + + def __init__(self, handle: "Handle", tag: str): + super().__init__(handle, tag) + + +class CoordinatorCharmEvents(CharmEvents): + """List of events that the TLS Certificates requirer charm can leverage.""" + + coordinator_approved = EventSource(CoordinatorApprovedEvent) + coordinator_requested = EventSource(CoordinatorRequestedEvent) + + +class CoordinatedOpsManager(Object): + """Coordinates activities that demand the entire peer group to act at once.""" + + on = CoordinatorCharmEvents() + + def __init__(self, charm: CharmBase, relation: AnyStr, tag: AnyStr = ""): + super().__init__(charm, relation) + self.tag = tag + self.relation = relation + self.app = charm.app + self.name = relation + tag # use the tag to separate multiple coordinator objects + # in the same charm class. + self.charm = charm # Maintain a reference to charm, so we can emit events. + self.framework.observe(charm.on[self.relation].relation_changed, self._on_relation_changed) + + @property + def under_coordination(self): + """Returns True if the _coord_approved == False.""" + return ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_approved", "True") + == "False" + ) + + def coordinate(self): + """Process a request to ask a new coordination activity. + + If we are the leader, fire off a coordinator requested event in the self.name. + """ + logger.info("coordinate: starting") + if self.charm.unit.is_leader(): + counter = int( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_counter", "0") + ) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_counter" + ] = str(counter + 1 if counter < 10000000 else 0) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_approved" + ] = "False" + logger.info("coordinate: tasks executed") + + def acknowledge(self, event): + """Runs the ack of the latest requested coordination. + + Each unit will set their own _counter to the same value as app's. + """ + coord_counter = f"_{self.name}_coord_counter" + self.model.get_relation(self.relation).data[self.charm.unit][coord_counter] = str( + self.model.get_relation(self.relation).data[self.app].get(coord_counter, 0) + ) + logger.info("acknowledge: updated internal counter") + + if not self.charm.unit.is_leader(): + # Nothing to do anymore. + logger.info("acknowledge: this unit is not a leader") + return + + relation = self.model.get_relation(self.relation) + # Now, the leader must check if everyone has ack'ed + for unit in relation.units: + if relation.data[unit].get(coord_counter, "0") != relation.data[self.app].get( + coord_counter, "0" + ): + logger.info(f"acknowledge: {unit.name} still has a different coord_counter") + # We defer the event until _coord_approved == True. + # If we have _coord_counter differing, then we are not yet there. + event.defer() + return + logger.info("acknowledge: all units are set, set coord_approved == True") + # Just confirmed we have all units ack'ed. Now, set the approval. + relation.data[self.app][f"_{self.name}_coord_approved"] = "True" + + def _on_relation_changed(self: CharmBase, _: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has received a new request for coordination. + + Then, if we are the leader, fire off a coordinator requested event. + """ + logger.info("coordinator: starting _on_relation_changed") + relation_data = self.model.get_relation(self.relation).data[self.app] + unit_data = self.model.get_relation(self.relation).data[self.charm.unit] + + if relation_data.get(f"_{self.name}_coord_approved", "False") == "True": + logger.info("coordinator: _on_relation_changed -- coordinator approved") + # We are approved to move on, issue the coordinator_approved event. + self.on.coordinator_approved.emit(self.tag) + return + coord_counter = f"_{self.name}_coord_counter" + if coord_counter in relation_data and relation_data.get( + coord_counter, "0" + ) != unit_data.get(coord_counter, "0"): + logger.info("coordinator: _on_relation_changed -- coordinator requested") + self.on.coordinator_requested.emit(self.tag) + return diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..494c181b1e --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,582 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Implements the state-machine.""" + +import json +import logging +import os +import pwd +import shutil +import subprocess +from datetime import datetime +from pathlib import Path +from subprocess import PIPE, run +from typing import Dict, Optional, Set, Tuple + +from ops.charm import ( + ActionEvent, + CharmBase, +) +from ops.framework import Object +from ops.model import ( + ActiveStatus, + MaintenanceStatus, + Relation, + Unit, + WaitingStatus, +) +from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed + +from cluster import ClusterNotPromotedError, StandbyClusterAlreadyPromotedError +from constants import ( + APP_SCOPE, + MONITORING_PASSWORD_KEY, + PATRONI_CONF_PATH, + POSTGRESQL_DATA_PATH, + REPLICATION_PASSWORD_KEY, + REWIND_PASSWORD_KEY, + USER_PASSWORD_KEY, +) +from coordinator_ops import CoordinatedOpsManager + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" + + +class MoreThanOnePrimarySelectedError(Exception): + """Represents more than one primary has been selected.""" + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION) -> None: + super().__init__(charm, relation_name) + self.relation_name = relation_name + self.charm = charm + self.restart_coordinator = CoordinatedOpsManager(charm, "restart", tag="_asyncreplica") + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_requested, self._on_coordination_request + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_approved, self._on_coordination_approval + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_departure + ) + + # Actions + self.framework.observe( + self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster + ) + + # We treat both relations above as actually the same. + # The big difference appears only at promote/demote actions + self.relation_set = { + *set(self.charm.model.relations[ASYNC_PRIMARY_RELATION]), + *set(self.charm.model.relations[ASYNC_REPLICA_RELATION]), + } + + @property + def endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + sync_standby_names = self.charm._patroni.get_sync_standby_names() + if len(sync_standby_names) > 0: + unit = self.model.get_unit(sync_standby_names[0]) + return self.charm._get_unit_ip(unit) + else: + return self.charm._get_unit_ip(self.charm.unit) + + def standby_endpoints(self) -> Set[str]: + """Returns the set of IPs used by each standby unit with a /32 mask.""" + standby_endpoints = set() + for rel in self.relation_set: + for unit in self._all_units(rel): + if not rel.data[unit].get("elected", None): + standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) + if "pod-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) + return standby_endpoints + + def get_primary_data(self) -> Optional[Dict[str, str]]: + """Returns the primary info, if available and if the primary cluster is ready.""" + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: + # If this unit is the leader, then return None + return None + + if rel.data[unit].get("elected", None) and rel.data[unit].get( + "primary-cluster-ready", None + ): + elected_data = json.loads(rel.data[unit]["elected"]) + return { + "endpoint": str(elected_data["endpoint"]), + "monitoring-password": elected_data["monitoring-password"], + "replication-password": elected_data["replication-password"], + "rewind-password": elected_data["rewind-password"], + "superuser-password": elected_data["superuser-password"], + } + return None + + def _all_units(self, relation): + found_units = {*relation.units, self.charm.unit} + logger.debug(f"Units found: {found_units}") + return found_units + + def _all_replica_published_unit_ips(self) -> bool: + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit]: + # This is the leader unit, it will not publish its own unit address + continue + if "unit-address" not in rel.data[unit]: + return False + return True + + def _on_departure(self, _): + for rel in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not rel: # if no relation exits, then it rel == None + continue + if "pod-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["pod-address"] + if "elected" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["elected"] + if "primary-cluster-ready" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["primary-cluster-ready"] + if self.charm.unit.is_leader() and "promoted" in self.charm.app_peer_data: + del self.charm.app_peer_data["promoted"] + + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + self.charm.update_config() + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + self.charm.unit.status = ActiveStatus() + + def _on_primary_changed(self, event): + """Triggers a configuration change in the primary units.""" + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + return + self.charm.unit.status = MaintenanceStatus("configuring main cluster") + logger.info("_on_primary_changed: primary_relation exists") + + primary = self._check_if_primary_already_selected() + if not primary: + # primary may not be available because the action of promoting a cluster was + # executed way after the relation changes. + # Defer it until + logger.debug("defer _on_primary_changed: primary not present") + event.defer() + return + logger.info("_on_primary_changed: primary cluster exists") + + if primary.name != self.charm.unit.name: + # this unit is not the system leader + logger.debug("early exit _on_primary_changed: unit is not the primary's leader") + self.charm.unit.status = ActiveStatus() + return + logger.info("_on_primary_changed: unit is the primary's leader") + + if not self._all_replica_published_unit_ips(): + # We will have more events happening, no need for retrigger + logger.debug("defer _on_primary_changed: not all replicas published pod details") + event.defer() + return + logger.info("_on_primary_changed: all replicas published pod details") + + # This unit is the leader, generate a new configuration and leave. + # There is nothing to do for the leader. + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + except RetryError: + logger.debug("defer _on_primary_changed: failed to stop the container") + event.defer() + return + self.charm.update_config() + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + + # Retrigger the other units' async-replica-changed + primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" + self.charm.unit.status = ActiveStatus() + + def _on_standby_changed(self, event): # noqa C901 + """Triggers a configuration change.""" + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + if not replica_relation: + return + logger.info("_on_standby_changed: replica relation available") + + self.charm.unit.status = MaintenanceStatus("configuring standby cluster") + primary = self._check_if_primary_already_selected() + if not primary: + return + logger.info("_on_standby_changed: primary is present") + + # Check if we have already published unit-address. If not, then we are waiting + # for the leader to catch all the unit ips and restart itself + if "unit-address" not in replica_relation.data[self.charm.unit]: + replica_relation.data[self.charm.unit]["unit-address"] = self.charm._unit_ip + # Finish here and wait for the retrigger from the primary cluster + event.defer() + return + logger.info("_on_standby_changed: unit-address published in own replica databag") + + primary_data = self.get_primary_data() + if not primary_data: + # We've made thus far. + # However, the get_primary_data will return != None ONLY if the primary cluster + # is ready and configured. Until then, we wait. + event.defer() + return + logger.info("_on_standby_changed: primary cluster is ready") + + if "system-id" not in replica_relation.data[self.charm.unit]: + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + replica_relation.data[self.charm.unit]["system-id"] = system_identifier + + if self.charm.unit.is_leader(): + self.charm.set_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY, primary_data["monitoring-password"] + ) + self.charm.set_secret( + APP_SCOPE, USER_PASSWORD_KEY, primary_data["superuser-password"] + ) + self.charm.set_secret( + APP_SCOPE, REPLICATION_PASSWORD_KEY, primary_data["replication-password"] + ) + self.charm.set_secret( + APP_SCOPE, REWIND_PASSWORD_KEY, primary_data["rewind-password"] + ) + del self.charm._peers.data[self.charm.app]["cluster_initialised"] + + if "cluster_initialised" in self.charm._peers.data[self.charm.app]: + return + + ################ + # Initiate restart logic + ################ + + # We need to: + # 1) Stop all standby units + # 2) Delete the k8s service + # 3) Remove the pgdata folder (if the clusters are different) + # 4) Start all standby units + # For that, the peer leader must first stop its own service and then, issue a + # coordination request to all units. All units ack that request once they all have + # their service stopped. + # Then, we get an approved coordination from the leader, which triggers the + # steps 2-4. + if self.charm.unit.is_leader() and not self.restart_coordinator.under_coordination: + # The leader now requests a ack from each unit that they have stopped. + self.restart_coordinator.coordinate() + + self.charm.unit.status = WaitingStatus("waiting for promotion of the main cluster") + + def _on_coordination_request(self, event): + # Stop the service. + # We need all replicas to be stopped, so we can remove the previous cluster info. + self.charm.unit.status = MaintenanceStatus("stopping database to enable async replication") + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + for unit in replica_relation.units: + if "elected" not in replica_relation.data[unit]: + continue + elected_data = json.loads(replica_relation.data[unit]["elected"]) + if "system-id" not in elected_data: + continue + if replica_relation.data[self.charm.unit]["system-id"] != elected_data["system-id"]: + if self.charm.unit.is_leader(): + # Store current data in a ZIP file, clean folder and generate configuration. + logger.info("Creating backup of pgdata folder") + subprocess.check_call( + f"tar -zcf {POSTGRESQL_DATA_PATH}-{str(datetime.now()).replace(' ', '-').replace(':', '-')}.zip {POSTGRESQL_DATA_PATH}".split() + ) + logger.info("Removing and recreating pgdata folder") + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) + break + + # Remove previous cluster information to make it possible to initialise a new cluster. + logger.info("Removing previous cluster information") + try: + path = Path(f"{PATRONI_CONF_PATH}/raft") + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove previous cluster information with error: {str(e)}" + ) + + self.restart_coordinator.acknowledge(event) + + def _on_coordination_approval(self, event): + """Runs when the coordinator guaranteed all units have stopped.""" + self.charm.unit.status = MaintenanceStatus("starting database to enable async replication") + + self.charm.update_config() + logger.info( + "_on_coordination_approval: configuration done, waiting for restart of the service" + ) + + # We are ready to restart the service now: all peers have configured themselves. + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + + if self.charm.unit.is_leader(): + self._handle_leader_startup(event) + elif not self.charm._patroni.get_standby_leader(unit_name_pattern=True): + self.charm.unit.status = WaitingStatus("waiting for standby leader to be ready") + event.defer() + return + self.charm.unit.status = ActiveStatus() + + def _handle_leader_startup(self, event) -> None: + diverging_databases = False + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if ( + self.charm._patroni.get_standby_leader(unit_name_pattern=True) + != self.charm.unit.name + ): + raise Exception + except RetryError: + diverging_databases = True + if diverging_databases: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.stop_patroni(): + raise Exception("Failed to stop patroni service.") + + logger.info( + "Removing and recreating pgdata folder due to diverging databases between this and the other cluster" + ) + try: + path = Path(POSTGRESQL_DATA_PATH) + if path.exists() and path.is_dir(): + shutil.rmtree(path) + except OSError as e: + raise Exception( + f"Failed to remove contents of the data directory with error: {str(e)}" + ) + os.mkdir(POSTGRESQL_DATA_PATH) + os.chmod(POSTGRESQL_DATA_PATH, 0o750) + self.charm._patroni._change_owner(POSTGRESQL_DATA_PATH) + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3), reraise=True): + with attempt: + if not self.charm._patroni.start_patroni(): + raise Exception("Failed to start patroni service.") + try: + for attempt in Retrying(stop=stop_after_attempt(10), wait=wait_fixed(5)): + with attempt: + if not self.charm._patroni.member_started: + raise Exception + except RetryError: + logger.debug("defer _on_coordination_approval: database hasn't started yet") + event.defer() + return + + if not self.charm._patroni.are_all_members_ready(): + self.charm.unit.status = WaitingStatus("waiting for all members to be ready") + event.defer() + return + + self.charm._peers.data[self.charm.app]["cluster_initialised"] = "True" + + def _check_if_primary_already_selected(self) -> Optional[Unit]: + """Returns the unit if a primary is present.""" + result = None + if not self.relation_set: + return None + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and not result: + result = unit + elif "elected" in rel.data[unit] and result: + raise MoreThanOnePrimarySelectedError + return result + + def _on_promote_standby_cluster(self, event: ActionEvent) -> None: + """Moves a standby cluster to a primary, if none is present.""" + if ( + "cluster_initialised" not in self.charm._peers.data[self.charm.app] + or not self.charm._patroni.member_started + ): + event.fail("Cluster not initialized yet.") + return + + if not self.charm.unit.is_leader(): + event.fail("Not the charm leader unit.") + return + + # Now, publish that this unit is the leader + if not self.endpoint: + event.fail("No relation found.") + return + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if primary_relation: + self._promote_standby_cluster_from_two_clusters(event, primary_relation) + else: + # Remove the standby cluster information from the Patroni configuration. + try: + self.charm._patroni.promote_standby_cluster() + except Exception as e: + event.fail(str(e)) + + def _promote_standby_cluster_from_two_clusters( + self, event: ActionEvent, primary_relation: Relation + ) -> None: + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {self.charm.app.name} is already the main cluster") + return + + try: + self.charm._patroni.promote_standby_cluster() + except StandbyClusterAlreadyPromotedError: + # Ignore this error for non-standby clusters. + pass + except ClusterNotPromotedError as e: + event.fail(str(e)) + + system_identifier, error = self.get_system_identifier() + if error is not None: + event.fail(f"Failed to get system identifier: {error}") + return + + # If this is a standby-leader, then execute switchover logic + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret(APP_SCOPE, MONITORING_PASSWORD_KEY), + "replication-password": self.charm._patroni.replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni.superuser_password, + "system-id": system_identifier, + } + ) + self.charm.app_peer_data["promoted"] = "True" + + # Now, check if postgresql it had originally published its unit IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "unit-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["unit-address"] + + def get_system_identifier(self) -> Tuple[Optional[str], Optional[str]]: + """Returns the PostgreSQL system identifier from this instance.""" + + def demote(): + pw_record = pwd.getpwnam("snap_daemon") + + def result(): + os.setgid(pw_record.pw_gid) + os.setuid(pw_record.pw_uid) + + return result + + process = run( + [ + f'/snap/charmed-postgresql/current/usr/lib/postgresql/{self.charm._patroni.get_postgresql_version().split(".")[0]}/bin/pg_controldata', + POSTGRESQL_DATA_PATH, + ], + stdout=PIPE, + stderr=PIPE, + preexec_fn=demote(), + ) + if process.returncode != 0: + return None, process.stderr.decode() + system_identifier = [ + line + for line in process.stdout.decode().splitlines() + if "Database system identifier" in line + ][0].split(" ")[-1] + return system_identifier, None + + def update_async_replication_data(self) -> None: + """Updates the async-replication data, if the unit is the leader. + + This is used to update the standby units with the new primary information. + If the unit is not the leader, then the data is removed from its databag. + """ + if "promoted" not in self.charm.app_peer_data: + return + + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if self.charm.unit.is_leader(): + system_identifier, error = self.get_system_identifier() + if error is not None: + raise Exception(f"Failed to get system identifier: {error}") + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "monitoring-password": self.charm.get_secret( + APP_SCOPE, MONITORING_PASSWORD_KEY + ), + "replication-password": self.charm._patroni._replication_password, + "rewind-password": self.charm.get_secret(APP_SCOPE, REWIND_PASSWORD_KEY), + "superuser-password": self.charm._patroni._superuser_password, + "system-id": system_identifier, + } + ) + else: + primary_relation.data[self.charm.unit]["elected"] = "" diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 0732ec7156..6af6888291 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -103,6 +103,11 @@ bootstrap: command: pgbackrest {{ pgbackrest_configuration_file }} --stanza={{ restore_stanza }} --pg1-path={{ data_path }} --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif standby_cluster_endpoint %} + standby_cluster: + host: {{ standby_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - encoding: UTF8 @@ -145,6 +150,9 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 + {%- for e in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5 + {%- endfor -%} # Allow replications connections from other cluster members. {%- for peer_ip in peers_ips %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ peer_ip }}/0 md5 diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index ef55b6c27b..2c65e788ca 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -11,6 +11,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from pytest_operator.plugin import OpsTest from tenacity import ( RetryError, @@ -83,23 +84,21 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool: return True -async def are_writes_increasing( - ops_test, down_unit: str = None, use_ip_from_inside: bool = False -) -> None: +async def are_writes_increasing(ops_test, down_unit: str = None, use_ip_from_inside: bool = False, extra_model: Model = None) -> None: """Verify new writes are continuing by counting the number of writes.""" - writes, _ = await count_writes( - ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside - ) + writes, _ = await count_writes(ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside, extra_model=extra_model) for member, count in writes.items(): for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: - more_writes, _ = await count_writes( - ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside - ) - assert more_writes[member] > count, f"{member}: writes not continuing to DB" + more_writes, _ = await count_writes(ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside, extra_model=extra_model) + assert ( + more_writes[member] > count + ), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})" -async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> Optional[str]: +async def app_name( + ops_test: OpsTest, application_name: str = "postgresql", model: Model = None +) -> Optional[str]: """Returns the name of the cluster running PostgreSQL. This is important since not all deployments of the PostgreSQL charm have the application name @@ -107,8 +106,10 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql") -> O Note: if multiple clusters are running PostgreSQL this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: + if model is None: + model = ops_test.model + status = await model.get_status() + for app in model.applications: if application_name in status["applications"][app]["charm"]: return app @@ -205,13 +206,12 @@ async def is_cluster_updated( ), "secondary not up to date with the cluster after restarting." -async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int: +async def check_writes(ops_test, use_ip_from_inside: bool = False, extra_model: Model = None) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes, max_number_written = await count_writes( - ops_test, use_ip_from_inside=use_ip_from_inside - ) + actual_writes, max_number_written = await count_writes(ops_test, use_ip_from_inside=use_ip_from_inside, extra_model=extra_model) for member, count in actual_writes.items(): + print(f"member: {member}, count: {count}, max_number_written: {max_number_written[member]}, total_expected_writes: {total_expected_writes}") assert ( count == max_number_written[member] ), f"{member}: writes to the db were missed: count of actual writes different from the max number written." @@ -220,21 +220,28 @@ async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int: async def count_writes( - ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False + ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False, extra_model: Model = None ) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, app, down_unit) - for unit in ops_test.model.applications[app].units: - if unit.name != down_unit: - cluster = get_patroni_cluster( - await ( - get_ip_from_inside_the_unit(ops_test, unit.name) - if use_ip_from_inside - else get_unit_ip(ops_test, unit.name) - ) - ) - break + members = [] + for model in [ops_test.model, extra_model]: + if model is None: + continue + for unit in model.applications[app].units: + if unit.name != down_unit: + members_data = get_patroni_cluster( + await ( + get_ip_from_inside_the_unit(ops_test, unit.name) + if use_ip_from_inside + else get_unit_ip(ops_test, unit.name) + ) + )["members"] + for index, member_data in enumerate(members_data): + members_data[index]["model"] = model.info.name + members.extend(members_data) + break down_ips = [] if down_unit: for unit in ops_test.model.applications[app].units: @@ -243,7 +250,7 @@ async def count_writes( down_ips.append(await get_unit_ip(ops_test, unit.name)) count = {} maximum = {} - for member in cluster["members"]: + for member in members: if member["role"] != "replica" and member["host"] not in down_ips: host = member["host"] @@ -252,12 +259,23 @@ async def count_writes( f" host='{host}' password='{password}' connect_timeout=10" ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count[member["name"]] = results[0] - maximum[member["name"]] = results[1] - connection.close() + member_name = f'{member["model"]}.{member["name"]}' + connection = None + try: + with psycopg2.connect( + connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member_name] = results[0] + maximum[member_name] = results[1] + except psycopg2.Error: + # Error raised when the connection is not possible. + count[member_name] = -1 + maximum[member_name] = -1 + finally: + if connection is not None: + connection.close() return count, maximum @@ -399,6 +417,42 @@ def get_random_unit(ops_test: OpsTest, app: str) -> str: return random.choice(ops_test.model.applications[app].units).name +async def get_standby_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "standby_leader": + return member["name"] + + +async def get_sync_standby(model: Model, application_name: str) -> str: + """Get the sync_standby name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the sync standby. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "sync_standby": + return member["name"] + + async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str: """Use the charm action to retrieve the password from provided application. @@ -415,20 +469,24 @@ async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> st return action.results["password"] -async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str: +async def get_unit_ip(ops_test: OpsTest, unit_name: str, model: Model = None) -> str: """Wrapper for getting unit ip. Args: ops_test: The ops test object passed into every test case unit_name: The name of the unit to get the address + model: Optional model instance to use Returns: The (str) ip of the unit """ - application = unit_name.split("/")[0] - for unit in ops_test.model.applications[application].units: - if unit.name == unit_name: - break - return await instance_ip(ops_test, unit.machine.hostname) + if model is None: + application = unit_name.split("/")[0] + for unit in ops_test.model.applications[application].units: + if unit.name == unit_name: + break + return await instance_ip(ops_test, unit.machine.hostname) + else: + return get_unit_address(ops_test, unit_name) @retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) @@ -671,24 +729,26 @@ async def is_secondary_up_to_date( return True -async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: +async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None: """Start continuous writes to PostgreSQL.""" # Start the process by relating the application to the database or # by calling the action if the relation already exists. + if model is None: + model = ops_test.model relations = [ relation - for relation in ops_test.model.applications[app].relations + for relation in model.applications[app].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{APPLICATION_NAME}:first-database" ] if not relations: - await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database") - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await model.relate(app, f"{APPLICATION_NAME}:first-database") + await model.wait_for_idle(status="active", timeout=1000) for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): with attempt: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py new file mode 100644 index 0000000000..9e193a8c47 --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication.py @@ -0,0 +1,492 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import contextlib +import logging +from asyncio import gather +from typing import Optional + +import psycopg2 +import pytest as pytest +from juju.controller import Controller +from juju.model import Model +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from ..helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + get_leader_unit, + get_password, + get_primary, + get_unit_address, + wait_for_relation_removed_between, +) +from .helpers import ( + app_name, + are_writes_increasing, + check_writes, + get_standby_leader, + get_sync_standby, + start_continuous_writes, +) + +logger = logging.getLogger(__name__) + + +FAST_INTERVAL = "60s" +IDLE_PERIOD = 30 +TIMEOUT = 2000 + + +@contextlib.asynccontextmanager +async def fast_forward( + model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None +): + """Adaptation of OpsTest.fast_forward to work with different models.""" + update_interval_key = "update-status-hook-interval" + if slow_interval: + interval_after = slow_interval + else: + interval_after = (await model.get_config())[update_interval_key] + + await model.set_config({update_interval_key: fast_interval}) + yield + await model.set_config({update_interval_key: interval_after}) + + +@pytest.fixture(scope="module") +async def controller(first_model) -> Controller: + """Return the controller.""" + return await first_model.get_controller() + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Model: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model(controller, first_model) -> Model: + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + await controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + return second_model + + +@pytest.fixture +async def second_model_continuous_writes(second_model) -> None: + """Cleans up continuous writes on the second model after a test run.""" + yield + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await second_model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_setup( + ops_test: OpsTest, first_model: Model, second_model: Model, charm +) -> None: + """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" + if not await app_name(ops_test): + charm = await ops_test.build_charm(".") + await ops_test.model.deploy( + charm, + num_units=3, + config={"profile": "testing"}, + ) + if not await app_name(ops_test, model=second_model): + charm = await ops_test.build_charm(".") + await second_model.deploy( + charm, + num_units=3, + config={"profile": "testing"}, + ) + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + await second_model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(), fast_forward(second_model): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + ) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + continuous_writes, +) -> None: + """Test async replication between two PostgreSQL clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + await first_model.create_offer("async-primary", "async-primary", DATABASE_APP_NAME) + await second_model.consume( + f"admin/{first_model.info.name}.async-primary", controller=controller + ) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-primary") + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_switchover( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +): + """Test switching over to the second cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + await ops_test.juju(*second_offer_command.split()) + await second_model.consume( + f"admin/{first_model.info.name}.async-replica", controller=controller + ) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-replica") + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the second cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME, model=second_model) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_promote_standby( + ops_test: OpsTest, + controller: Controller, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +) -> None: + """Test promoting the standby cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-primary", "async-replica" + ) + wait_for_relation_removed_between(ops_test, "async-replica", "async-primary", first_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("removing the previous data") + primary = await get_primary(ops_test) + address = get_unit_address(ops_test, primary) + password = await get_password(ops_test, primary) + database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' + connection = None + try: + connection = psycopg2.connect( + f"dbname={database_name} user=operator password={password} host={address}" + ) + connection.autocommit = True + cursor = connection.cursor() + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + except psycopg2.Error as e: + assert False, f"Failed to drop continuous writes table: {e}" + finally: + if connection is not None: + connection.close() + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-standby-cluster") + await run_action.wait() + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + await first_model.applications[DATABASE_APP_NAME].destroy_units(sync_standby) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@pytest.mark.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + continuous_writes, + primary_start_timeout, +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + await first_model.applications[DATABASE_APP_NAME].destroy_units(standby_leader) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 1845e69778..ffa67313f1 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -16,6 +16,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from juju.unit import Unit from pytest_operator.plugin import OpsTest from tenacity import ( @@ -658,17 +659,20 @@ async def get_tls_ca( return json.loads(relation_data[0]["application-data"]["certificates"])[0].get("ca") -def get_unit_address(ops_test: OpsTest, unit_name: str) -> str: +def get_unit_address(ops_test: OpsTest, unit_name: str, model: Model = None) -> str: """Get unit IP address. Args: ops_test: The ops test framework instance unit_name: The name of the unit + model: Optional model to use to get the unit address Returns: IP address of the unit """ - return ops_test.model.units.get(unit_name).public_address + if model is None: + model = ops_test.model + return model.units.get(unit_name).public_address async def check_tls(ops_test: OpsTest, unit_name: str, enabled: bool) -> bool: @@ -777,6 +781,18 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool return False +def has_relation_exited( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> bool: + """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" + relations = model.relations if model is not None else ops_test.model.relations + for rel in relations: + endpoints = [endpoint.name for endpoint in rel.endpoints] + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True + + def remove_chown_workaround(original_charm_filename: str, patched_charm_filename: str) -> None: """Remove the chown workaround from the charm.""" with zipfile.ZipFile(original_charm_filename, "r") as charm_file, zipfile.ZipFile( @@ -990,3 +1006,23 @@ async def wait_for_idle_on_blocked( ), ops_test.model.block_until(lambda: unit.workload_status_message == status_message), ) + + +def wait_for_relation_removed_between( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> None: + """Wait for relation to be removed before checking if it's waiting or idle. + + Args: + ops_test: running OpsTest instance + endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. + endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. + """ + try: + for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): + with attempt: + if has_relation_exited(ops_test, endpoint_one, endpoint_two, model): + break + except RetryError: + assert False, "Relation failed to exit after 3 minutes."