diff --git a/actions.yaml b/actions.yaml index 03a90556cc..1edc4666dc 100644 --- a/actions.yaml +++ b/actions.yaml @@ -43,3 +43,21 @@ set-tls-private-key: private-key: type: string description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified. + +promote-standby-cluster: + description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster. + params: + force: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. + force-really-really-mean-it: + type: boolean + default: False + description: | + WARNING: this option set to True WILL WIPE OUT your current primary cluster! + If this option and "force" are set both to true, then this unit will take over the primary role. + It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary. diff --git a/metadata.yaml b/metadata.yaml index 74485cbe02..e1cfa75799 100644 --- a/metadata.yaml +++ b/metadata.yaml @@ -39,6 +39,8 @@ peers: interface: upgrade provides: + async-primary: + interface: async_replication database: interface: postgresql_client db: @@ -51,6 +53,9 @@ provides: interface: grafana_dashboard requires: + async-replica: + interface: async_replication + limit: 1 certificates: interface: tls-certificates limit: 1 diff --git a/src/charm.py b/src/charm.py index 6797c86c17..f4b3ecd80b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -76,6 +76,7 @@ WORKLOAD_OS_USER, ) from patroni import NotReadyError, Patroni +from relations.async_replication import PostgreSQLAsyncReplication from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides from relations.postgresql_provider import PostgreSQLProvider from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model @@ -152,6 +153,7 @@ def __init__(self, *args): postgresql_db_port = ServicePort(5432, name="database") patroni_api_port = ServicePort(8008, name="api") self.service_patcher = KubernetesServicePatch(self, [postgresql_db_port, patroni_api_port]) + self.async_manager = PostgreSQLAsyncReplication(self) def _generate_metrics_jobs(self, enable_tls: bool) -> Dict: """Generate spec for Prometheus scraping.""" diff --git a/src/coordinator_ops.py b/src/coordinator_ops.py new file mode 100644 index 0000000000..08384a2fa9 --- /dev/null +++ b/src/coordinator_ops.py @@ -0,0 +1,217 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""The coordinated ops is a class that ensures a certain activity is ran together. + +The concept is similar to the "cohort" in snaps, where all units wait until they can +proceed to execute a certain activity, for example, restarting your service. + +The process starts with the leader issuing a new coordination request. Effectively, +that is implemented as the __coord_counter is increased +1 in the app level. +__coord_approved is set to "False". + +Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested +event. Once the unit done its task, it should ack the request. +Each unit should ack the request by equaling its own __coord_counter +to the app's value. + +Once all units ack'ed the __coord_counter, then the leader switches the +__coord_approved to "True". All units then will process that new change as a +"coordinator-approved" event and execute the activity they have been waiting. + +If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then +coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows: + + + class MyCharm: + + def __init__(self, *args): + self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm") + self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm") + + self.framework.observe( + self.stop_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.stop_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + self.framework.observe( + self.start_coordinator.on.coordinator_requested, + self._on_coordinator_requested + ) + self.framework.observe( + self.start_coordinator.on.coordinator_approved, + self._on_coordinator_approved + ) + + def _a_method(): + # A method that kick starts the restarting coordination + ...... + if self.charm.unit.is_leader(): + self.stop_coordinator.coordinate() + + def _on_coordinator_requested(self, event): + if self.service_is_running and event.tag == "_stop_my_charm": + # We are in the stop-phase + self.service.stop() + self.stop_coordinator.acknowledge(event) + elif event.tag == "_start_my_charm": + # we are in the starting-phase + self.service.start() + self.start_coordinator.acknowledge(event) + + def _on_coordinator_approved(self, event): + # All units have ack'ed the activity, which means we have stopped. + if self.charm.unit.is_leader() and event.tag == "_stop_my_charm": + # Now kickstart the restarting process + self.start_coordinator.coordinate() +""" + + +import logging +from typing import AnyStr + +from ops.charm import ( + CharmBase, + CharmEvents, + EventSource, + RelationChangedEvent, +) +from ops.framework import EventBase, Object + +logger = logging.getLogger(__name__) + + +class CoordinatorEventBase(EventBase): + """Base event for the coordination activities.""" + + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle) + self._tag = tag + + @property + def tag(self): + """Returns the tag representing this coordinator's controllers.""" + return self._tag + + +class CoordinatorRequestedEvent(CoordinatorEventBase): + """Event to signal that the leader requested the units to coordinate a new activity.""" + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle, tag) + + +class CoordinatorApprovedEvent(CoordinatorEventBase): + """Event to signal that all units ack'ed the coordination request and can proceed.""" + def __init__(self, handle: 'Handle', tag: str): + super().__init__(handle, tag) + + +class CoordinatorCharmEvents(CharmEvents): + """List of events that the TLS Certificates requirer charm can leverage.""" + + coordinator_approved = EventSource(CoordinatorApprovedEvent) + coordinator_requested = EventSource(CoordinatorRequestedEvent) + + +class CoordinatedOpsManager(Object): + """Coordinates activities that demand the entire peer group to act at once.""" + + on = CoordinatorCharmEvents() + + def __init__(self, charm: CharmBase, relation: AnyStr, tag: AnyStr = ""): + super().__init__(charm, relation) + self.tag = tag + self.relation = relation + self.app = charm.app + self.name = relation + tag # use the tag to separate multiple coordinator objects + # in the same charm class. + self.charm = charm # Maintain a reference to charm, so we can emit events. + self.framework.observe(charm.on[self.relation].relation_changed, self._on_relation_changed) + + @property + def under_coordination(self): + """Returns True if the _coord_approved == False.""" + return ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_approved", "True") + == "False" + ) + + def coordinate(self): + """Process a request to ask a new coordination activity. + + If we are the leader, fire off a coordinator requested event in the self.name. + """ + logger.info("coordinate: starting") + if self.charm.unit.is_leader(): + counter = ( + self.model.get_relation(self.relation) + .data[self.app] + .get(f"_{self.name}_coord_counter", 0) + ) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_counter" + ] = str(counter + 1 if counter < 10000000 else 0) + self.model.get_relation(self.relation).data[self.app][ + f"_{self.name}_coord_approved" + ] = "False" + logger.info("coordinate: tasks executed") + + def acknowledge(self, event): + """Runs the ack of the latest requested coordination. + + Each unit will set their own _counter to the same value as app's. + """ + coord_counter = f"_{self.name}_coord_counter" + self.model.get_relation(self.relation).data[self.charm.unit][coord_counter] = str( + self.model.get_relation(self.relation).data[self.app].get(coord_counter, 0) + ) + logger.info("acknowledge: updated internal counter") + + if not self.charm.unit.is_leader(): + # Nothing to do anymore. + logger.info("acknowledge: this unit is not a leader") + return + + relation = self.model.get_relation(self.relation) + # Now, the leader must check if everyone has ack'ed + for unit in relation.units: + if relation.data[unit].get(coord_counter, "0") != relation.data[self.app].get( + coord_counter, "0" + ): + logger.info(f"acknowledge: {unit.name} still has a different coord_counter") + # We defer the event until _coord_approved == True. + # If we have _coord_counter differing, then we are not yet there. + event.defer() + return + logger.info("acknowledge: all units are set, set coord_approved == True") + # Just confirmed we have all units ack'ed. Now, set the approval. + relation.data[self.app][f"_{self.name}_coord_approved"] = "True" + + def _on_relation_changed(self: CharmBase, _: RelationChangedEvent): + """Process relation changed. + + First, determine whether this unit has received a new request for coordination. + + Then, if we are the leader, fire off a coordinator requested event. + """ + logger.info("coordinator: starting _on_relation_changed") + relation_data = self.model.get_relation(self.relation).data[self.app] + unit_data = self.model.get_relation(self.relation).data[self.charm.unit] + + if relation_data.get(f"_{self.name}_coord_approved", "False") == "True": + logger.info("coordinator: _on_relation_changed -- coordinator approved") + # We are approved to move on, issue the coordinator_approved event. + self.on.coordinator_approved.emit(self.tag) + return + coord_counter = f"_{self.name}_coord_counter" + if coord_counter in relation_data and relation_data.get( + coord_counter, "0" + ) != unit_data.get(coord_counter, "0"): + logger.info("coordinator: _on_relation_changed -- coordinator requested") + self.on.coordinator_requested.emit(self.tag) + return diff --git a/src/patroni.py b/src/patroni.py index 0187633b4f..f15d2ebfaf 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -333,6 +333,9 @@ def render_patroni_yml_file( # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) + + primary = self._charm.async_manager.get_primary_data() + # Render the template file with the correct values. rendered = template.render( connectivity=connectivity, @@ -343,8 +346,12 @@ def render_patroni_yml_file( is_no_sync_member=is_no_sync_member, namespace=self._namespace, storage_path=self._storage_path, - superuser_password=self._superuser_password, - replication_password=self._replication_password, + superuser_password=primary["superuser-password"] + if primary + else self._superuser_password, + replication_password=primary["replication-password"] + if primary + else self._replication_password, rewind_user=REWIND_USER, rewind_password=self._rewind_password, enable_pgbackrest=stanza is not None, @@ -355,6 +362,10 @@ def render_patroni_yml_file( minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, + standby_cluster_endpoint=primary["endpoint"] if primary else None, + extra_replication_endpoints={"{}/32".format(primary["endpoint"])} + if primary + else self._charm.async_manager.standby_endpoints(), ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) diff --git a/src/relations/async_replication.py b/src/relations/async_replication.py new file mode 100644 index 0000000000..85ec2abece --- /dev/null +++ b/src/relations/async_replication.py @@ -0,0 +1,351 @@ +# Copyright 2023 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Implements the state-machine. + +1) First async replication relation is made: both units get blocked waiting for a leader +2) User runs the promote action against one of the clusters +3) The cluster moves leader and sets the async-replication data, marking itself as leader +4) The other units receive that new information and update themselves to become standby-leaders. +""" + +import json +import logging +from typing import Dict, Set + +from lightkube import Client +from lightkube.resources.core_v1 import Service +from ops.charm import ( + ActionEvent, + CharmBase, +) +from ops.framework import Object +from ops.model import ( + Unit, +) + +from coordinator_ops import CoordinatedOpsManager + +logger = logging.getLogger(__name__) + + +ASYNC_PRIMARY_RELATION = "async-primary" +ASYNC_REPLICA_RELATION = "async-replica" + + +class MoreThanOnePrimarySelectedError(Exception): + """Represents more than one primary has been selected.""" + + +def _get_pod_ip(): + """Reads some files to quickly figure out its own pod IP. + + It should work for any Ubuntu-based image + """ + with open("/etc/hosts") as f: + hosts = f.read() + with open("/etc/hostname") as f: + hostname = f.read().replace("\n", "") + line = [ln for ln in hosts.split("\n") if ln.find(hostname) >= 0][0] + return line.split("\t")[0] + + +class PostgreSQLAsyncReplication(Object): + """Defines the async-replication management logic.""" + + def __init__(self, charm: CharmBase, relation_name: str = ASYNC_PRIMARY_RELATION) -> None: + super().__init__(charm, relation_name) + self.relation_name = relation_name + self.charm = charm + self.restart_coordinator = CoordinatedOpsManager(charm, "restart", tag="_asyncreplica") + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_changed, self._on_primary_changed + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_changed, self._on_standby_changed + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_requested, self._on_coordination_request + ) + self.framework.observe( + self.restart_coordinator.on.coordinator_approved, self._on_coordination_approval + ) + + # Departure events + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_departed, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_PRIMARY_RELATION].relation_broken, self._on_departure + ) + self.framework.observe( + self.charm.on[ASYNC_REPLICA_RELATION].relation_broken, self._on_departure + ) + + # Actions + self.framework.observe( + self.charm.on.promote_standby_cluster_action, self._on_promote_standby_cluster + ) + + # We treat both relations above as actually the same. + # The big difference appears only at promote/demote actions + self.relation_set = { + *set(self.charm.model.relations[ASYNC_PRIMARY_RELATION]), + *set(self.charm.model.relations[ASYNC_REPLICA_RELATION]), + } + self.container = self.charm.unit.get_container("postgresql") + + @property + def endpoint(self) -> str: + """Assumes the endpoint is the same, disregard if we are a primary or standby cluster.""" + for rel in self.relation_set: + return str(self.charm.model.get_binding(rel).network.ingress_address) + return None + + def standby_endpoints(self) -> Set[str]: + """Returns the set of IPs used by each standby unit with a /32 mask.""" + standby_endpoints = set() + for rel in self.relation_set: + for unit in self._all_units(rel): + if not rel.data[unit].get("elected", None): + standby_endpoints.add("{}/32".format(str(rel.data[unit]["ingress-address"]))) + if "pod-address" in rel.data[unit]: + standby_endpoints.add("{}/32".format(str(rel.data[unit]["pod-address"]))) + return standby_endpoints + + def get_primary_data(self) -> Dict[str, str]: + """Returns the primary info, if available and if the primary cluster is ready.""" + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and unit.name == self.charm.unit.name: + # If this unit is the leader, then return None + return None + if rel.data[unit].get("elected", None) and rel.data[unit].get( + "primary-cluster-ready", None + ): + elected_data = json.loads(rel.data[unit]["elected"]) + return { + "endpoint": str(elected_data["endpoint"]), + "replication-password": elected_data["replication-password"], + "superuser-password": elected_data["superuser-password"], + } + return None + + def _all_units(self, relation): + found_units = {*relation.units, self.charm.unit} + logger.debug(f"Units found: {found_units}") + return found_units + + def _all_replica_published_pod_ips(self) -> bool: + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit]: + # This is the leader unit, it will not publish its own pod address + continue + if "pod-address" not in rel.data[unit]: + return False + return True + + def _on_departure(self, _): + for rel in [ + self.model.get_relation(ASYNC_REPLICA_RELATION), + self.model.get_relation(ASYNC_PRIMARY_RELATION), + ]: + if not rel: # if no relation exits, then it rel == None + continue + if "pod-address" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["pod-address"] + if "elected" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["elected"] + if "primary-cluster-ready" in rel.data[self.charm.unit]: + del rel.data[self.charm.unit]["primary-cluster-ready"] + + self.container.stop(self.charm._postgresql_service) + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + def _on_primary_changed(self, event): + """Triggers a configuration change in the primary units.""" + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + return + logger.info("_on_primary_changed: primary_relation exists") + + primary = self._check_if_primary_already_selected() + if not primary: + # primary may not be available because the action of promoting a cluster was + # executed way after the relation changes. + # Defer it until + event.defer() + return + logger.info("_on_primary_changed: primary cluster exists") + + if primary.name != self.charm.unit.name: + # this unit is not the system leader + return + logger.info("_on_primary_changed: unit is the primary's leader") + + if not self._all_replica_published_pod_ips(): + # We will have more events happening, no need for retrigger + event.defer() + return + logger.info("_on_primary_changed: all replicas published pod details") + + # This unit is the leader, generate a new configuration and leave. + # There is nothing to do for the leader. + self.container.stop(self.charm._postgresql_service) + self.charm.update_config() + self.container.start(self.charm._postgresql_service) + + # Retrigger the other units' async-replica-changed + primary_relation.data[self.charm.unit]["primary-cluster-ready"] = "True" + + def _on_standby_changed(self, event): # noqa C901 + """Triggers a configuration change.""" + replica_relation = self.model.get_relation(ASYNC_REPLICA_RELATION) + if not replica_relation: + return + logger.info("_on_standby_changed: replica relation available") + + primary = self._check_if_primary_already_selected() + if not primary: + return + logger.info("_on_standby_changed: primary is present") + + # Check if we have already published pod-address. If not, then we are waiting + # for the leader to catch all the pod ips and restart itself + if "pod-address" not in replica_relation.data[self.charm.unit]: + replica_relation.data[self.charm.unit]["pod-address"] = _get_pod_ip() + # Finish here and wait for the retrigger from the primary cluster + event.defer() + return + logger.info("_on_standby_changed: pod-address published in own replica databag") + + if not self.get_primary_data(): + # We've made thus far. + # However, the get_primary_data will return != None ONLY if the primary cluster + # is ready and configured. Until then, we wait. + event.defer() + return + logger.info("_on_standby_changed: primary cluster is ready") + + ################ + # Initiate restart logic + ################ + + # We need to: + # 1) Stop all standby units + # 2) Delete the k8s service + # 3) Remove the pgdata folder + # 4) Start all standby units + # For that, the peer leader must first stop its own service and then, issue a + # coordination request to all units. All units ack that request once they all have + # their service stopped. + # Then, we get an approved coordination from the leader, which triggers the + # steps 2-4. + if self.charm.unit.is_leader() and not self.restart_coordinator.under_coordination: + # The leader now requests a ack from each unit that they have stopped. + self.restart_coordinator.coordinate() + + def _on_coordination_request(self, event): + # Stop the container. + # We need all replicas to be stopped, so we can remove the patroni-postgresql-k8s + # service from Kubernetes and not getting it recreated! + # We will restart the it once the cluster is ready. + self.container.stop(self.charm._postgresql_service) + self.restart_coordinator.acknowledge(event) + + def _on_coordination_approval(self, event): + """Runs when the coordinator guaranteed all units have stopped.""" + if self.charm.unit.is_leader(): + # Delete the K8S endpoints that tracks the cluster information, including its id. + # This is the same as "patronictl remove patroni-postgresql-k8s", but the latter doesn't + # work after the database service is stopped on Pebble. + client = Client() + client.delete( + Service, + name=f"patroni-{self.charm._name}-config", + namespace=self.charm._namespace, + ) + + # Clean folder and generate configuration. + self.container.exec("rm -r /var/lib/postgresql/data/pgdata".split()).wait_output() + self.charm._create_pgdata(self.container) + + self.charm.update_config() + logger.info("_on_standby_changed: configuration done, waiting for restart of the service") + + # We are ready to restart the service now: all peers have configured themselves. + self.container.start(self.charm._postgresql_service) + + def _get_primary_candidates(self): + rel = self.model.get_relation(ASYNC_PRIMARY_RELATION) + return rel.units if rel else [] + + def _check_if_primary_already_selected(self) -> Unit: + """Returns the unit if a primary is present.""" + result = None + if not self.relation_set: + return None + for rel in self.relation_set: + for unit in self._all_units(rel): + if "elected" in rel.data[unit] and not result: + result = unit + elif "elected" in rel.data[unit] and result: + raise MoreThanOnePrimarySelectedError + return result + + def _on_promote_standby_cluster(self, event: ActionEvent) -> None: + """Moves a standby cluster to a primary, if none is present.""" + if ( + "cluster_initialised" not in self.charm._peers.data[self.charm.app] + or not self.charm._patroni.member_started + ): + event.fail("Cluster not initialized yet.") + return + + if not self.charm.unit.is_leader(): + event.fail("Not the charm leader unit.") + return + + # Now, publish that this unit is the leader + if not self.endpoint: + event.fail("No relation found.") + return + primary_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not primary_relation: + event.fail("No primary relation") + return + + # Check if this is a take over from a standby cluster + if event.params.get("force", False) and event.params.get( + "force-really-really-mean-it", False + ): + pass + + # Let the exception error the unit + unit = self._check_if_primary_already_selected() + if unit: + event.fail(f"Cannot promote - {unit.name} is already primary: demote it first") + return + + # If this is a standby-leader, then execute switchover logic + # TODO + primary_relation.data[self.charm.unit]["elected"] = json.dumps( + { + "endpoint": self.endpoint, + "replication-password": self.charm._patroni._replication_password, + "superuser-password": self.charm._patroni._superuser_password, + } + ) + + # Now, check if postgresql it had originally published its pod IP in the + # replica relation databag. Delete it, if yes. + replica_relation = self.model.get_relation(ASYNC_PRIMARY_RELATION) + if not replica_relation or "pod-address" not in replica_relation.data[self.charm.unit]: + return + del replica_relation.data[self.charm.unit]["pod-address"] + # event.set_result() diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index bdf0515b76..c43e12fa3d 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -48,6 +48,11 @@ bootstrap: command: pgbackrest --stanza={{ restore_stanza }} --pg1-path={{ storage_path }}/pgdata --set={{ backup_id }} --type=immediate --target-action=promote restore no_params: True keep_existing_recovery_conf: True + {% elif standby_cluster_endpoint %} + standby_cluster: + host: {{ standby_cluster_endpoint }} + port: 5432 + create_replica_methods: ["basebackup"] {% else %} initdb: - auth-host: md5 @@ -58,6 +63,9 @@ bootstrap: pg_hba: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for endpoint in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }} md5 + {%- endfor %} bypass_api_service: true log: dir: /var/log/postgresql @@ -113,6 +121,9 @@ postgresql: - {{ 'hostssl' if enable_tls else 'host' }} all all 0.0.0.0/0 md5 {%- endif %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication 127.0.0.1/32 md5 + {%- for e in extra_replication_endpoints %} + - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ e }} md5 + {%- endfor -%} {%- for endpoint in endpoints %} - {{ 'hostssl' if enable_tls else 'host' }} replication replication {{ endpoint }}.{{ namespace }}.svc.cluster.local md5 {%- endfor %}