Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
f50bb2c
Update patroni configuration
dragomirp Nov 20, 2024
235e19d
Update test assertion
dragomirp Nov 20, 2024
30a7f56
Copy update_synchronous_node_count from VM
dragomirp Nov 21, 2024
c6339ce
Add unit test
dragomirp Nov 21, 2024
9bb565e
Set sync node count during upgrade
dragomirp Nov 21, 2024
6e60993
Fix tls test
dragomirp Nov 21, 2024
025bfb2
Switchover primary
dragomirp Nov 21, 2024
0aa9850
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 21, 2024
f88c956
Add different helper to get leader
dragomirp Nov 21, 2024
39817f6
Add config boilerplate
dragomirp Nov 29, 2024
2d5ab35
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 29, 2024
5752281
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Nov 29, 2024
43412d1
Use config value when setting sync node count
dragomirp Nov 29, 2024
0b1c289
Escape tuple
dragomirp Nov 29, 2024
707a014
Add policy values
dragomirp Dec 2, 2024
562dad5
Add integration test
dragomirp Dec 2, 2024
048e720
Fix casting
dragomirp Dec 2, 2024
1f65187
Fix test
dragomirp Dec 2, 2024
88541f1
Update to spec
dragomirp Dec 3, 2024
95c80de
Bump retry timout
dragomirp Dec 3, 2024
a74eae1
Switch to planned units
dragomirp Dec 3, 2024
b3e9684
Merge branch 'main' into dpe-5827-all-sync
dragomirp Dec 5, 2024
a5f8f2a
Merge branch 'main' into dpe-5827-all-sync
dragomirp Dec 19, 2024
0a5916b
Fix generator
dragomirp Dec 19, 2024
ac006d1
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 8, 2025
26c536c
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 8, 2025
91d533b
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 15, 2025
372048d
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 21, 2025
b63c5d1
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 5, 2025
017fbb6
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 14, 2025
faee124
Update conf description
dragomirp Feb 14, 2025
ee6093f
Spread task
dragomirp Feb 14, 2025
f628e2c
Pass the charm
dragomirp Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.

options:
synchronous_node_count:
description: |
Sets the number of synchronous nodes to be maintained in the cluster. Should be
either "all", "majority" or a positive integer value.
type: string
default: "all"
durability_synchronous_commit:
description: |
Sets the current transactions synchronization level. This charm allows only the
Expand Down
19 changes: 18 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,13 +470,22 @@ def get_unit_ip(self, unit: Unit) -> str | None:
else:
return None

def updated_synchronous_node_count(self) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count()
return True
except RetryError:
logger.debug("Unable to set synchronous_node_count")
return False

def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
"""The leader removes the departing units from the list of cluster members."""
# Allow leader to update endpoints if it isn't leaving.
if not self.unit.is_leader() or event.departing_unit == self.unit:
return

if not self.is_cluster_initialised:
if not self.is_cluster_initialised or not self.updated_synchronous_node_count():
logger.debug(
"Deferring on_peer_relation_departed: Cluster must be initialized before members can leave"
)
Expand Down Expand Up @@ -680,6 +689,10 @@ def _on_config_changed(self, event) -> None:
self.unit.status = BlockedStatus("Configuration Error. Please check the logs")
logger.error("Invalid configuration: %s", str(e))
return
if not self.updated_synchronous_node_count():
logger.debug("Defer on_config_changed: unable to set synchronous node count")
event.defer()
return

if self.is_blocked and "Configuration Error" in self.unit.status.message:
self._set_active_status()
Expand All @@ -693,6 +706,9 @@ def _on_config_changed(self, event) -> None:
# Enable and/or disable the extensions.
self.enable_disable_extensions()

self._unblock_extensions()

def _unblock_extensions(self) -> None:
# Unblock the charm after extensions are enabled (only if it's blocked due to application
# charms requesting extensions).
if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE:
Expand Down Expand Up @@ -803,6 +819,7 @@ def _add_members(self, event) -> None:
for member in self._hosts - self._patroni.cluster_members:
logger.debug("Adding %s to cluster", member)
self.add_cluster_member(member)
self._patroni.update_synchronous_node_count()
except NotReadyError:
logger.info("Deferring reconfigure: another member doing sync right now")
event.defer()
Expand Down
4 changes: 3 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@
"""Structured configuration for the PostgreSQL charm."""

import logging
from typing import Literal

from charms.data_platform_libs.v0.data_models import BaseConfigModel
from pydantic import validator
from pydantic import PositiveInt, validator

logger = logging.getLogger(__name__)


class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

synchronous_node_count: Literal["all", "majority"] | PositiveInt
durability_synchronous_commit: str | None
instance_default_text_search_config: str | None
instance_max_locks_per_transaction: int | None
Expand Down
36 changes: 35 additions & 1 deletion src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class SwitchoverFailedError(Exception):
"""Raised when a switchover failed for some reason."""


class UpdateSyncNodeCountError(Exception):
"""Raised when updating synchronous_node_count failed for some reason."""


class Patroni:
"""This class handles the communication with Patroni API and configuration files."""

Expand Down Expand Up @@ -126,6 +130,36 @@ def _get_alternative_patroni_url(
url = self._patroni_url
return url

@property
def _synchronous_node_count(self) -> int:
planned_units = self._charm.app.planned_units()
if self._charm.config.synchronous_node_count == "all":
return planned_units - 1
elif self._charm.config.synchronous_node_count == "majority":
return planned_units // 2
return (
self._charm.config.synchronous_node_count
if self._charm.config.synchronous_node_count < self._members_count - 1
else planned_units - 1
)

def update_synchronous_node_count(self) -> None:
"""Update synchronous_node_count."""
# Try to update synchronous_node_count.
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": self._synchronous_node_count},
verify=self._verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
)

# Check whether the update was unsuccessful.
if r.status_code != 200:
raise UpdateSyncNodeCountError(f"received {r.status_code}")

def get_primary(
self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None
) -> str:
Expand Down Expand Up @@ -525,7 +559,7 @@ def render_patroni_yml_file(
restore_to_latest=restore_to_latest,
stanza=stanza,
restore_stanza=restore_stanza,
minority_count=self._members_count // 2,
synchronous_node_count=self._synchronous_node_count,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
primary_cluster_endpoint=self._charm.async_replication.get_primary_cluster_endpoint(),
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def _on_upgrade_changed(self, event) -> None:
return

self.charm.update_config()
self.charm.updated_synchronous_node_count()

def _on_upgrade_charm_check_legacy(self, event: UpgradeCharmEvent) -> None:
if not self.peer_relation:
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ bootstrap:
dcs:
synchronous_mode: true
failsafe_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
41 changes: 41 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,26 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> in
return parameter_value


async def get_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = next(
unit for unit in status["applications"][application_name]["units"].values()
)["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "leader":
return member["name"]


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Expand Down Expand Up @@ -1145,3 +1165,24 @@ async def remove_unit_force(ops_test: OpsTest, num_units: int):
timeout=1000,
wait_for_exact_units=scale,
)


async def get_cluster_roles(
ops_test: OpsTest, unit_name: str
) -> dict[str, str | list[str] | None]:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await get_unit_address(ops_test, unit_name)
members = {"replicas": [], "primaries": [], "sync_standbys": []}
member_list = get_patroni_cluster(unit_ip)["members"]
logger.info(f"Cluster members are: {member_list}")
for member in member_list:
role = member["role"]
name = "/".join(member["name"].rsplit("-", 1))
if role == "leader":
members["primaries"].append(name)
elif role == "sync_standby":
members["sync_standbys"].append(name)
else:
members["replicas"].append(name)

return members
16 changes: 8 additions & 8 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
from .helpers import (
are_writes_increasing,
check_writes,
get_leader,
get_standby_leader,
get_sync_standby,
start_continuous_writes,
)

Expand Down Expand Up @@ -406,11 +406,11 @@ async def test_async_replication_failover_in_main_cluster(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"Sync-standby: {sync_standby}")
logger.info("deleting the sync-standby pod")
primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"Primary: {primary}")
logger.info("deleting the primary pod")
client = Client(namespace=first_model.info.name)
client.delete(Pod, name=sync_standby.replace("/", "-"))
client.delete(Pod, name=primary.replace("/", "-"))

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
await gather(
Expand All @@ -423,9 +423,9 @@ async def test_async_replication_failover_in_main_cluster(
)

# Check that the sync-standby unit is not the same as before.
new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_sync_standby}")
assert new_sync_standby != sync_standby, "Sync-standby is the same as before"
Comment on lines -426 to -428
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync standby will not change, since all the replicas are sync standbys.

new_primary = await get_leader(first_model, DATABASE_APP_NAME)
logger.info(f"New sync-standby: {new_primary}")
assert new_primary != primary, "Sync-standby is the same as before"

logger.info("Ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/ha_tests/test_synchronous_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import pytest
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_attempt, wait_fixed

from ..helpers import app_name, build_and_deploy
from .helpers import get_cluster_roles


@pytest.mark.abort_on_fail
async def test_build_and_deploy(ops_test: OpsTest, charm) -> None:
"""Build and deploy three unit of PostgreSQL."""
wait_for_apps = False
# It is possible for users to provide their own cluster for HA testing. Hence, check if there
# is a pre-existing cluster.
if not await app_name(ops_test):
wait_for_apps = True
await build_and_deploy(ops_test, charm, 3, wait_for_idle=False)

if wait_for_apps:
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000, raise_on_error=False)


async def test_default_all(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0


async def test_majority(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "majority"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 1
assert len(roles["replicas"]) == 1


async def test_constant(ops_test: OpsTest) -> None:
app = await app_name(ops_test)

await ops_test.model.applications[app].set_config({"synchronous_node_count": "2"})

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=300)

for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
roles = await get_cluster_roles(
ops_test, ops_test.model.applications[app].units[0].name
)

assert len(roles["primaries"]) == 1
assert len(roles["sync_standbys"]) == 2
assert len(roles["replicas"]) == 0
3 changes: 1 addition & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,15 +754,14 @@ async def switchover(
)
assert response.status_code == 200, f"Switchover status code is {response.status_code}"
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len([
member for member in response.json()["members"] if member["role"] == "sync_standby"
])
assert standbys >= minority_count
assert standbys == len(ops_test.model.applications[app_name].units) - 1


async def wait_for_idle_on_blocked(
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ async def test_config_parameters(ops_test: OpsTest, charm) -> None:
test_string = "abcXYZ123"

configs = [
{"synchronous_node_count": ["0", "1"]}, # config option is greater than 0
{
"synchronous_node_count": [test_string, "all"]
}, # config option is one of `all`, `minority` or `majority`
{
"durability_synchronous_commit": [test_string, "on"]
}, # config option is one of `on`, `remote_apply` or `remote_write`
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_tls(ops_test: OpsTest) -> None:
patroni_password = await get_password(ops_test, "patroni")
cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False)
for member in cluster_info.json()["members"]:
if member["role"] == "replica":
if member["role"] != "leader":
replica = "/".join(member["name"].rsplit("-", 1))

# Check if TLS enabled for replication
Expand Down
7 changes: 7 additions & 0 deletions tests/spread/test_synchronous_policy.py/task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
summary: test_synchronous_policy.py
environment:
TEST_MODULE: ha_tests/test_synchronous_policy.py
execute: |
tox run -e integration -- "tests/integration/$TEST_MODULE" --model testing --alluredir="$SPREAD_TASK/allure-results"
artifacts:
- allure-results
Loading