diff --git a/config.yaml b/config.yaml index c7f715ed15..e835ce66e3 100644 --- a/config.yaml +++ b/config.yaml @@ -2,6 +2,12 @@ # See LICENSE file for licensing details. options: + synchronous_node_count: + description: | + Sets the number of synchronous nodes to be maintained in the cluster. Should be + either "all", "majority" or a positive integer value. + type: string + default: "all" durability_synchronous_commit: description: | Sets the current transactions synchronization level. This charm allows only the diff --git a/src/charm.py b/src/charm.py index 3d21f19f51..fa21a1ec7b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -470,13 +470,22 @@ def get_unit_ip(self, unit: Unit) -> str | None: else: return None + def updated_synchronous_node_count(self) -> bool: + """Tries to update synchronous_node_count configuration and reports the result.""" + try: + self._patroni.update_synchronous_node_count() + return True + except RetryError: + logger.debug("Unable to set synchronous_node_count") + return False + def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: """The leader removes the departing units from the list of cluster members.""" # Allow leader to update endpoints if it isn't leaving. if not self.unit.is_leader() or event.departing_unit == self.unit: return - if not self.is_cluster_initialised: + if not self.is_cluster_initialised or not self.updated_synchronous_node_count(): logger.debug( "Deferring on_peer_relation_departed: Cluster must be initialized before members can leave" ) @@ -680,6 +689,10 @@ def _on_config_changed(self, event) -> None: self.unit.status = BlockedStatus("Configuration Error. Please check the logs") logger.error("Invalid configuration: %s", str(e)) return + if not self.updated_synchronous_node_count(): + logger.debug("Defer on_config_changed: unable to set synchronous node count") + event.defer() + return if self.is_blocked and "Configuration Error" in self.unit.status.message: self._set_active_status() @@ -693,6 +706,9 @@ def _on_config_changed(self, event) -> None: # Enable and/or disable the extensions. self.enable_disable_extensions() + self._unblock_extensions() + + def _unblock_extensions(self) -> None: # Unblock the charm after extensions are enabled (only if it's blocked due to application # charms requesting extensions). if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE: @@ -803,6 +819,7 @@ def _add_members(self, event) -> None: for member in self._hosts - self._patroni.cluster_members: logger.debug("Adding %s to cluster", member) self.add_cluster_member(member) + self._patroni.update_synchronous_node_count() except NotReadyError: logger.info("Deferring reconfigure: another member doing sync right now") event.defer() diff --git a/src/config.py b/src/config.py index 61018420ad..5b8098268d 100644 --- a/src/config.py +++ b/src/config.py @@ -5,9 +5,10 @@ """Structured configuration for the PostgreSQL charm.""" import logging +from typing import Literal from charms.data_platform_libs.v0.data_models import BaseConfigModel -from pydantic import validator +from pydantic import PositiveInt, validator logger = logging.getLogger(__name__) @@ -15,6 +16,7 @@ class CharmConfig(BaseConfigModel): """Manager for the structured configuration.""" + synchronous_node_count: Literal["all", "majority"] | PositiveInt durability_synchronous_commit: str | None instance_default_text_search_config: str | None instance_max_locks_per_transaction: int | None diff --git a/src/patroni.py b/src/patroni.py index 148c77f865..342d97f4a6 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -53,6 +53,10 @@ class SwitchoverFailedError(Exception): """Raised when a switchover failed for some reason.""" +class UpdateSyncNodeCountError(Exception): + """Raised when updating synchronous_node_count failed for some reason.""" + + class Patroni: """This class handles the communication with Patroni API and configuration files.""" @@ -126,6 +130,36 @@ def _get_alternative_patroni_url( url = self._patroni_url return url + @property + def _synchronous_node_count(self) -> int: + planned_units = self._charm.app.planned_units() + if self._charm.config.synchronous_node_count == "all": + return planned_units - 1 + elif self._charm.config.synchronous_node_count == "majority": + return planned_units // 2 + return ( + self._charm.config.synchronous_node_count + if self._charm.config.synchronous_node_count < self._members_count - 1 + else planned_units - 1 + ) + + def update_synchronous_node_count(self) -> None: + """Update synchronous_node_count.""" + # Try to update synchronous_node_count. + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + r = requests.patch( + f"{self._patroni_url}/config", + json={"synchronous_node_count": self._synchronous_node_count}, + verify=self._verify, + auth=self._patroni_auth, + timeout=PATRONI_TIMEOUT, + ) + + # Check whether the update was unsuccessful. + if r.status_code != 200: + raise UpdateSyncNodeCountError(f"received {r.status_code}") + def get_primary( self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None ) -> str: @@ -525,7 +559,7 @@ def render_patroni_yml_file( restore_to_latest=restore_to_latest, stanza=stanza, restore_stanza=restore_stanza, - minority_count=self._members_count // 2, + synchronous_node_count=self._synchronous_node_count, version=self.rock_postgresql_version.split(".")[0], pg_parameters=parameters, primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(), diff --git a/src/upgrade.py b/src/upgrade.py index 5e0068944d..92dc307dc5 100644 --- a/src/upgrade.py +++ b/src/upgrade.py @@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None: return self.charm.update_config() + self.charm.updated_synchronous_node_count() def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None: if not self.peer_relation: diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 0921fcfda5..ef7ffbd6d5 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -2,7 +2,7 @@ bootstrap: dcs: synchronous_mode: true failsafe_mode: true - synchronous_node_count: {{ minority_count }} + synchronous_node_count: {{ synchronous_node_count }} postgresql: use_pg_rewind: true remove_data_directory_on_rewind_failure: true diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index a67bde151a..4f242b0b78 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -501,6 +501,26 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> in return parameter_value +async def get_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = next( + unit for unit in status["applications"][application_name]["units"].values() + )["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "leader": + return member["name"] + + async def get_standby_leader(model: Model, application_name: str) -> str: """Get the standby leader name. @@ -1145,3 +1165,24 @@ async def remove_unit_force(ops_test: OpsTest, num_units: int): timeout=1000, wait_for_exact_units=scale, ) + + +async def get_cluster_roles( + ops_test: OpsTest, unit_name: str +) -> dict[str, str | list[str] | None]: + """Returns whether the unit a replica in the cluster.""" + unit_ip = await get_unit_address(ops_test, unit_name) + members = {"replicas": [], "primaries": [], "sync_standbys": []} + member_list = get_patroni_cluster(unit_ip)["members"] + logger.info(f"Cluster members are: {member_list}") + for member in member_list: + role = member["role"] + name = "/".join(member["name"].rsplit("-", 1)) + if role == "leader": + members["primaries"].append(name) + elif role == "sync_standby": + members["sync_standbys"].append(name) + else: + members["replicas"].append(name) + + return members diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py index bec58350a0..7facd6b61f 100644 --- a/tests/integration/ha_tests/test_async_replication.py +++ b/tests/integration/ha_tests/test_async_replication.py @@ -30,8 +30,8 @@ from .helpers import ( are_writes_increasing, check_writes, + get_leader, get_standby_leader, - get_sync_standby, start_continuous_writes, ) @@ -406,11 +406,11 @@ async def test_async_replication_failover_in_main_cluster( logger.info("checking whether writes are increasing") await are_writes_increasing(ops_test) - sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) - logger.info(f"Sync-standby: {sync_standby}") - logger.info("deleting the sync-standby pod") + primary = await get_leader(first_model, DATABASE_APP_NAME) + logger.info(f"Primary: {primary}") + logger.info("deleting the primary pod") client = Client(namespace=first_model.info.name) - client.delete(Pod, name=sync_standby.replace("/", "-")) + client.delete(Pod, name=primary.replace("/", "-")) async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): await gather( @@ -423,9 +423,9 @@ async def test_async_replication_failover_in_main_cluster( ) # Check that the sync-standby unit is not the same as before. - new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) - logger.info(f"New sync-standby: {new_sync_standby}") - assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + new_primary = await get_leader(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_primary}") + assert new_primary != primary, "Sync-standby is the same as before" logger.info("Ensure continuous_writes after the crashed unit") await are_writes_increasing(ops_test) diff --git a/tests/integration/ha_tests/test_synchronous_policy.py b/tests/integration/ha_tests/test_synchronous_policy.py new file mode 100644 index 0000000000..4214a4ae11 --- /dev/null +++ b/tests/integration/ha_tests/test_synchronous_policy.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import pytest +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_attempt, wait_fixed + +from ..helpers import app_name, build_and_deploy +from .helpers import get_cluster_roles + + +@pytest.mark.abort_on_fail +async def test_build_and_deploy(ops_test: OpsTest, charm) -> None: + """Build and deploy three unit of PostgreSQL.""" + wait_for_apps = False + # It is possible for users to provide their own cluster for HA testing. Hence, check if there + # is a pre-existing cluster. + if not await app_name(ops_test): + wait_for_apps = True + await build_and_deploy(ops_test, charm, 3, wait_for_idle=False) + + if wait_for_apps: + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=1000, raise_on_error=False) + + +async def test_default_all(ops_test: OpsTest) -> None: + app = await app_name(ops_test) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300) + + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): + with attempt: + roles = await get_cluster_roles( + ops_test, ops_test.model.applications[app].units[0].name + ) + + assert len(roles["primaries"]) == 1 + assert len(roles["sync_standbys"]) == 2 + assert len(roles["replicas"]) == 0 + + +async def test_majority(ops_test: OpsTest) -> None: + app = await app_name(ops_test) + + await ops_test.model.applications[app].set_config({"synchronous_node_count": "majority"}) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[app], status="active") + + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): + with attempt: + roles = await get_cluster_roles( + ops_test, ops_test.model.applications[app].units[0].name + ) + + assert len(roles["primaries"]) == 1 + assert len(roles["sync_standbys"]) == 1 + assert len(roles["replicas"]) == 1 + + +async def test_constant(ops_test: OpsTest) -> None: + app = await app_name(ops_test) + + await ops_test.model.applications[app].set_config({"synchronous_node_count": "2"}) + + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300) + + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): + with attempt: + roles = await get_cluster_roles( + ops_test, ops_test.model.applications[app].units[0].name + ) + + assert len(roles["primaries"]) == 1 + assert len(roles["sync_standbys"]) == 2 + assert len(roles["replicas"]) == 0 diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index c3f695b6bd..e359457acc 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -754,7 +754,6 @@ async def switchover( ) assert response.status_code == 200, f"Switchover status code is {response.status_code}" app_name = current_primary.split("/")[0] - minority_count = len(ops_test.model.applications[app_name].units) // 2 for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True): with attempt: response = requests.get(f"http://{primary_ip}:8008/cluster") @@ -762,7 +761,7 @@ async def switchover( standbys = len([ member for member in response.json()["members"] if member["role"] == "sync_standby" ]) - assert standbys >= minority_count + assert standbys == len(ops_test.model.applications[app_name].units) - 1 async def wait_for_idle_on_blocked( diff --git a/tests/integration/test_config.py b/tests/integration/test_config.py index 89e26ec13f..70f20f15d9 100644 --- a/tests/integration/test_config.py +++ b/tests/integration/test_config.py @@ -29,6 +29,10 @@ async def test_config_parameters(ops_test: OpsTest, charm) -> None: test_string = "abcXYZ123" configs = [ + {"synchronous_node_count": ["0", "1"]}, # config option is greater than 0 + { + "synchronous_node_count": [test_string, "all"] + }, # config option is one of `all`, `minority` or `majority` { "durability_synchronous_commit": [test_string, "on"] }, # config option is one of `on`, `remote_apply` or `remote_write` diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index cf11a0d864..3ff11bcb60 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -97,7 +97,7 @@ async def test_tls(ops_test: OpsTest) -> None: patroni_password = await get_password(ops_test, "patroni") cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False) for member in cluster_info.json()["members"]: - if member["role"] == "replica": + if member["role"] != "leader": replica = "/".join(member["name"].rsplit("-", 1)) # Check if TLS enabled for replication diff --git a/tests/spread/test_synchronous_policy.py/task.yaml b/tests/spread/test_synchronous_policy.py/task.yaml new file mode 100644 index 0000000000..fada7cb4fb --- /dev/null +++ b/tests/spread/test_synchronous_policy.py/task.yaml @@ -0,0 +1,7 @@ +summary: test_synchronous_policy.py +environment: + TEST_MODULE: ha_tests/test_synchronous_policy.py +execute: | + tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results" +artifacts: + - allure-results diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 619ace7e63..06aff155dc 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -290,6 +290,9 @@ def test_on_config_changed(harness): "charm.PostgreSQLUpgrade.idle", return_value=False, new_callable=PropertyMock ) as _idle, patch("charm.PostgresqlOperatorCharm.update_config") as _update_config, + patch( + "charm.PostgresqlOperatorCharm.updated_synchronous_node_count", return_value=True + ) as _updated_synchronous_node_count, patch("charm.Patroni.member_started", return_value=True, new_callable=PropertyMock), patch("charm.Patroni.get_primary"), patch( @@ -332,6 +335,14 @@ def test_on_config_changed(harness): harness.charm._on_config_changed(mock_event) assert isinstance(harness.charm.unit.status, ActiveStatus) assert not _enable_disable_extensions.called + _updated_synchronous_node_count.assert_called_once_with() + + # Deferst on update sync nodes failure + _updated_synchronous_node_count.return_value = False + harness.charm._on_config_changed(mock_event) + mock_event.defer.assert_called_once_with() + mock_event.defer.reset_mock() + _updated_synchronous_node_count.return_value = True # Leader enables extensions with harness.hooks_disabled(): @@ -681,6 +692,7 @@ def test_on_peer_relation_departed(harness): "charm.PostgresqlOperatorCharm._get_endpoints_to_remove", return_value=sentinel.units ) as _get_endpoints_to_remove, patch("charm.PostgresqlOperatorCharm._remove_from_endpoints") as _remove_from_endpoints, + patch("charm.PostgresqlOperatorCharm.updated_synchronous_node_count"), ): # Early exit if not leader event = Mock() diff --git a/tests/unit/test_patroni.py b/tests/unit/test_patroni.py index 408b0367a1..249db26c1f 100644 --- a/tests/unit/test_patroni.py +++ b/tests/unit/test_patroni.py @@ -214,7 +214,7 @@ def test_render_patroni_yml_file(harness, patroni): replication_password=patroni._replication_password, rewind_user=REWIND_USER, rewind_password=patroni._rewind_password, - minority_count=patroni._members_count // 2, + synchronous_node_count=0, version="14", patroni_password=patroni._patroni_password, ) @@ -249,7 +249,7 @@ def test_render_patroni_yml_file(harness, patroni): replication_password=patroni._replication_password, rewind_user=REWIND_USER, rewind_password=patroni._rewind_password, - minority_count=patroni._members_count // 2, + synchronous_node_count=0, version="14", patroni_password=patroni._patroni_password, ) @@ -463,3 +463,29 @@ def test_last_postgresql_logs(harness, patroni): (root / "var" / "log" / "postgresql" / "postgresql.3.log").unlink() (root / "var" / "log" / "postgresql").rmdir() assert patroni.last_postgresql_logs() == "" + + +def test_update_synchronous_node_count(harness, patroni): + with ( + patch("patroni.stop_after_delay", return_value=stop_after_delay(0)) as _wait_fixed, + patch("patroni.wait_fixed", return_value=wait_fixed(0)) as _wait_fixed, + patch("requests.patch") as _patch, + ): + response = _patch.return_value + response.status_code = 200 + + patroni.update_synchronous_node_count() + + _patch.assert_called_once_with( + "http://postgresql-k8s-0:8008/config", + json={"synchronous_node_count": 0}, + verify=True, + auth=patroni._patroni_auth, + timeout=10, + ) + + # Test when the request fails. + response.status_code = 500 + with pytest.raises(RetryError): + patroni.update_synchronous_node_count() + assert False diff --git a/tests/unit/test_upgrade.py b/tests/unit/test_upgrade.py index 33623048fe..c25a54620f 100644 --- a/tests/unit/test_upgrade.py +++ b/tests/unit/test_upgrade.py @@ -155,6 +155,9 @@ def test_on_upgrade_changed(harness): with ( patch("charm.PostgresqlOperatorCharm.update_config") as _update_config, patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, + patch( + "charm.PostgresqlOperatorCharm.updated_synchronous_node_count" + ) as _updated_synchronous_node_count, ): harness.set_can_connect(POSTGRESQL_CONTAINER, True) _member_started.return_value = False @@ -165,6 +168,7 @@ def test_on_upgrade_changed(harness): _member_started.return_value = True harness.charm.on.upgrade_relation_changed.emit(relation) _update_config.assert_called_once() + _updated_synchronous_node_count.assert_called_once_with() def test_pre_upgrade_check(harness):