Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
bcafc87
Set all nodes to synchronous replicas
dragomirp Nov 14, 2024
bd81f55
Fix template var
dragomirp Nov 14, 2024
026d946
Also change config patching
dragomirp Nov 15, 2024
21e0b13
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 19, 2024
5082317
Update sync nodes during upgrade
dragomirp Nov 19, 2024
e2480e2
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 22, 2024
12c3e88
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 25, 2024
6687255
Merge branch 'main' into dpe-5827-all-sync
dragomirp Nov 28, 2024
df8a191
Revert are_writes_increasing changes
dragomirp Nov 28, 2024
ad8e216
Add back logging
dragomirp Nov 28, 2024
f6842a0
Try without logs
dragomirp Nov 28, 2024
98fcae3
Tactical sleep
dragomirp Nov 29, 2024
19bfc88
Log removal error
dragomirp Nov 29, 2024
e0fb263
Remove logs
dragomirp Nov 29, 2024
3cdf0ba
Tweak replication test
dragomirp Dec 1, 2024
4ded767
Pass down unit
dragomirp Dec 2, 2024
724c514
Wait for test app to idle
dragomirp Dec 2, 2024
40a0948
Add comment
dragomirp Dec 3, 2024
b675d6f
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 22, 2025
8965f0b
Port config changes
dragomirp Jan 22, 2025
300cdcc
Copy policy test
dragomirp Jan 22, 2025
2a5bd2c
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 23, 2025
4163fef
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Jan 23, 2025
c4a6ceb
Fix import
dragomirp Jan 23, 2025
0e5a5eb
Missed param removal
dragomirp Jan 23, 2025
0aae726
Unit test
dragomirp Jan 23, 2025
f188310
Merge branch 'main' into dpe-5827-all-sync-config
dragomirp Jan 23, 2025
549192b
Missing attr
dragomirp Jan 23, 2025
f99a0f3
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 29, 2025
bbb5c40
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Jan 29, 2025
afadf3c
Merge branch 'main' into dpe-5827-all-sync-config
dragomirp Jan 30, 2025
995627e
Merge branch 'main' into dpe-5827-all-sync
dragomirp Jan 30, 2025
dc79593
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Jan 30, 2025
46b0746
Add logs
dragomirp Jan 30, 2025
356faad
Merge branch 'main' into dpe-5827-all-sync-config
dragomirp Feb 4, 2025
69b352b
Add timeout to connection
dragomirp Feb 4, 2025
5a85899
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 4, 2025
d07fa5d
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Feb 4, 2025
642c3a0
Log conn str
dragomirp Feb 6, 2025
a570abb
Fix num of standbys
dragomirp Feb 6, 2025
004ba0e
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 11, 2025
a5593b1
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 11, 2025
df5ab55
Merge branch 'dpe-5827-all-sync' into dpe-5827-all-sync-config
dragomirp Feb 11, 2025
4433909
Charm fixture
dragomirp Feb 11, 2025
500b4ab
Remove stepdown hook
dragomirp Feb 11, 2025
878182e
Config description
dragomirp Feb 11, 2025
753853a
Revert conn str
dragomirp Feb 11, 2025
d10f98e
Add async scaling test
dragomirp Feb 12, 2025
7abbc83
Typo
dragomirp Feb 12, 2025
3efdb67
Don't remove standby and primary
dragomirp Feb 12, 2025
acb98f8
Merge branch 'main' into dpe-5827-all-sync
dragomirp Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.

options:
synchronous_node_count:
description: |
Sets the number of synchronous nodes to be maintained in the cluster. Should be
either "all", "majority" or a positive integer value.
type: string
default: "all"
durability_synchronous_commit:
description: |
Sets the current transactions synchronization level. This charm allows only the
Expand Down
62 changes: 10 additions & 52 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ def __init__(self, *args):
self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed)
self.framework.observe(self.on.secret_changed, self._on_peer_relation_changed)
self.framework.observe(self.on[PEER].relation_departed, self._on_peer_relation_departed)
self.framework.observe(self.on.pgdata_storage_detaching, self._on_pgdata_storage_detaching)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.get_password_action, self._on_get_password)
self.framework.observe(self.on.set_password_action, self._on_set_password)
Expand Down Expand Up @@ -432,10 +431,10 @@ def _on_get_primary(self, event: ActionEvent) -> None:
except RetryError as e:
logger.error(f"failed to get primary with error {e}")

def _updated_synchronous_node_count(self, num_units: int | None = None) -> bool:
def updated_synchronous_node_count(self) -> bool:
"""Tries to update synchronous_node_count configuration and reports the result."""
try:
self._patroni.update_synchronous_node_count(num_units)
self._patroni.update_synchronous_node_count()
return True
except RetryError:
logger.debug("Unable to set synchronous_node_count")
Expand Down Expand Up @@ -473,9 +472,7 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
if not self.unit.is_leader():
return

if not self.is_cluster_initialised or not self._updated_synchronous_node_count(
len(self._units_ips)
):
if not self.is_cluster_initialised or not self.updated_synchronous_node_count():
logger.debug("Deferring on_peer_relation_departed: cluster not initialized")
event.defer()
return
Expand All @@ -501,52 +498,6 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_pgdata_storage_detaching(self, _) -> None:
# Change the primary if it's the unit that is being removed.
try:
primary = self._patroni.get_primary(unit_name_pattern=True)
except RetryError:
# Ignore the event if the primary couldn't be retrieved.
# If a switchover is needed, an automatic failover will be triggered
# when the unit is removed.
logger.debug("Early exit on_pgdata_storage_detaching: primary cannot be retrieved")
return

if self.unit.name != primary:
return

if not self._patroni.are_all_members_ready():
logger.warning(
"could not switchover because not all members are ready"
" - an automatic failover will be triggered"
)
return

# Try to switchover to another member and raise an exception if it doesn't succeed.
# If it doesn't happen on time, Patroni will automatically run a fail-over.
try:
# Get the current primary to check if it has changed later.
current_primary = self._patroni.get_primary()

# Trigger the switchover.
self._patroni.switchover()

# Wait for the switchover to complete.
self._patroni.primary_changed(current_primary)

logger.info("successful switchover")
except (RetryError, SwitchoverFailedError) as e:
logger.warning(
f"switchover failed with reason: {e} - an automatic failover will be triggered"
)
return

# Only update the connection endpoints if there is a primary.
# A cluster can have all members as replicas for some time after
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self._update_relation_endpoints()

def _stuck_raft_cluster_check(self) -> None:
"""Check for stuck raft cluster and reinitialise if safe."""
raft_stuck = False
Expand Down Expand Up @@ -1184,6 +1135,11 @@ def _on_config_changed(self, event) -> None:
logger.error("Invalid configuration: %s", str(e))
return

if not self.updated_synchronous_node_count():
logger.debug("Defer on_config_changed: unable to set synchronous node count")
event.defer()
return

if self.is_blocked and "Configuration Error" in self.unit.status.message:
self.unit.status = ActiveStatus()

Expand All @@ -1195,7 +1151,9 @@ def _on_config_changed(self, event) -> None:

# Enable and/or disable the extensions.
self.enable_disable_extensions()
self._unblock_extensions()

def _unblock_extensions(self) -> None:
# Unblock the charm after extensions are enabled (only if it's blocked due to application
# charms requesting extensions).
if self.unit.status.message != EXTENSIONS_BLOCKING_MESSAGE:
Expand Down
23 changes: 18 additions & 5 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ def render_patroni_yml_file(
stanza=stanza,
restore_stanza=restore_stanza,
version=self.get_postgresql_version().split(".")[0],
minority_count=self.planned_units // 2,
synchronous_node_count=self._synchronous_node_count,
pg_parameters=parameters,
primary_cluster_endpoint=self.charm.async_replication.get_primary_cluster_endpoint(),
extra_replication_endpoints=self.charm.async_replication.get_standby_endpoints(),
Expand Down Expand Up @@ -926,6 +926,7 @@ def remove_raft_member(self, member_ip: str) -> None:
raise RemoveRaftMemberFailedError() from None

if not result.startswith("SUCCESS"):
logger.debug("Remove raft member: Remove call not successful")
raise RemoveRaftMemberFailedError()

@retry(stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=10))
Expand Down Expand Up @@ -988,16 +989,28 @@ def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any
timeout=PATRONI_TIMEOUT,
)

def update_synchronous_node_count(self, units: int | None = None) -> None:
@property
def _synchronous_node_count(self) -> int:
planned_units = self.charm.app.planned_units()
if self.charm.config.synchronous_node_count == "all":
return planned_units - 1
elif self.charm.config.synchronous_node_count == "majority":
return planned_units // 2
# -1 for leader
return (
self.charm.config.synchronous_node_count
if self.charm.config.synchronous_node_count < planned_units - 1
else planned_units - 1
)

def update_synchronous_node_count(self) -> None:
"""Update synchronous_node_count to the minority of the planned cluster."""
if units is None:
units = self.planned_units
# Try to update synchronous_node_count.
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
r = requests.patch(
f"{self._patroni_url}/config",
json={"synchronous_node_count": units // 2},
json={"synchronous_node_count": self._synchronous_node_count},
verify=self.verify,
auth=self._patroni_auth,
timeout=PATRONI_TIMEOUT,
Expand Down
3 changes: 2 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Literal

from charms.data_platform_libs.v0.data_models import BaseConfigModel
from pydantic import validator
from pydantic import PositiveInt, validator

from locales import SNAP_LOCALES

Expand All @@ -18,6 +18,7 @@
class CharmConfig(BaseConfigModel):
"""Manager for the structured configuration."""

synchronous_node_count: Literal["all", "majority"] | PositiveInt
durability_synchronous_commit: str | None
instance_default_text_search_config: str | None
instance_max_locks_per_transaction: int | None
Expand Down
1 change: 1 addition & 0 deletions src/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def _on_upgrade_granted(self, event: UpgradeGrantedEvent) -> None:
# Update the configuration.
self.charm.unit.status = MaintenanceStatus("updating configuration")
self.charm.update_config()
self.charm.updated_synchronous_node_count()

self.charm.unit.status = MaintenanceStatus("refreshing the snap")
self.charm._install_snap_packages(packages=SNAP_PACKAGES, refresh=True)
Expand Down
2 changes: 1 addition & 1 deletion templates/patroni.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bootstrap:
retry_timeout: 10
maximum_lag_on_failover: 1048576
synchronous_mode: true
synchronous_node_count: {{ minority_count }}
synchronous_node_count: {{ synchronous_node_count }}
postgresql:
use_pg_rewind: true
remove_data_directory_on_rewind_failure: true
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def are_writes_increasing(
with attempt:
more_writes, _ = await count_writes(
ops_test,
down_unit=down_unit,
down_unit=down_units[0],
use_ip_from_inside=use_ip_from_inside,
extra_model=extra_model,
)
Expand Down
4 changes: 1 addition & 3 deletions tests/integration/ha_tests/test_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ async def test_reelection(ops_test: OpsTest, continuous_writes, primary_start_ti

# Remove the primary unit.
primary_name = await get_primary(ops_test, app)
await ops_test.model.destroy_units(
primary_name,
)
await ops_test.model.destroy_units(primary_name)

# Wait and get the primary again (which can be any unit, including the previous primary).
async with ops_test.fast_forward():
Expand Down
72 changes: 6 additions & 66 deletions tests/integration/ha_tests/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,72 +158,15 @@ async def test_removing_raft_majority(ops_test: OpsTest, continuous_writes) -> N
ops_test.model.destroy_unit(
original_roles["primaries"][0], force=True, destroy_storage=False, max_wait=1500
),
ops_test.model.destroy_unit(
original_roles["replicas"][0], force=True, destroy_storage=False, max_wait=1500
),
ops_test.model.destroy_unit(
original_roles["sync_standbys"][0], force=True, destroy_storage=False, max_wait=1500
),
)

left_unit = ops_test.model.units[original_roles["sync_standbys"][1]]
await ops_test.model.block_until(
lambda: left_unit.workload_status == "blocked"
and left_unit.workload_status_message == "Raft majority loss, run: promote-to-primary",
timeout=600,
)

run_action = await left_unit.run_action("promote-to-primary", scope="unit", force=True)
await run_action.wait()

await ops_test.model.wait_for_idle(status="active", timeout=900, idle_period=45)

await are_writes_increasing(
ops_test,
[
original_roles["primaries"][0],
original_roles["replicas"][0],
original_roles["sync_standbys"][0],
],
)

logger.info("Scaling back up")
await ops_test.model.applications[DATABASE_APP_NAME].add_unit(count=3)
await ops_test.model.wait_for_idle(status="active", timeout=1500)

await check_writes(ops_test)
new_roles = await get_cluster_roles(
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name
)
assert len(new_roles["primaries"]) == 1
assert len(new_roles["sync_standbys"]) == 2
assert new_roles["primaries"][0] == original_roles["sync_standbys"][1]


@markers.juju3
@pytest.mark.abort_on_fail
async def test_removing_raft_majority_async(ops_test: OpsTest, continuous_writes) -> None:
# Start an application that continuously writes data to the database.
app = await app_name(ops_test)
original_roles = await get_cluster_roles(
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name
)

await start_continuous_writes(ops_test, app)
logger.info("Deleting primary")
await gather(
ops_test.model.destroy_unit(
original_roles["primaries"][0], force=True, destroy_storage=False, max_wait=1500
),
ops_test.model.destroy_unit(
original_roles["replicas"][0], force=True, destroy_storage=False, max_wait=1500
),
ops_test.model.destroy_unit(
original_roles["replicas"][1], force=True, destroy_storage=False, max_wait=1500
original_roles["sync_standbys"][1], force=True, destroy_storage=False, max_wait=1500
),
)

left_unit = ops_test.model.units[original_roles["sync_standbys"][0]]
left_unit = ops_test.model.units[original_roles["sync_standbys"][2]]
await ops_test.model.block_until(
lambda: left_unit.workload_status == "blocked"
and left_unit.workload_status_message == "Raft majority loss, run: promote-to-primary",
Expand All @@ -239,8 +182,8 @@ async def test_removing_raft_majority_async(ops_test: OpsTest, continuous_writes
ops_test,
[
original_roles["primaries"][0],
original_roles["replicas"][0],
original_roles["replicas"][1],
original_roles["sync_standbys"][0],
original_roles["sync_standbys"][1],
],
)

Expand All @@ -253,8 +196,5 @@ async def test_removing_raft_majority_async(ops_test: OpsTest, continuous_writes
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name
)
assert len(new_roles["primaries"]) == 1
assert len(new_roles["sync_standbys"]) == 2
assert (
new_roles["primaries"][0] == original_roles["sync_standbys"][0]
or new_roles["primaries"][0] == original_roles["sync_standbys"][1]
)
assert len(new_roles["sync_standbys"]) == 4
assert new_roles["primaries"][0] == original_roles["sync_standbys"][2]
15 changes: 8 additions & 7 deletions tests/integration/ha_tests/test_scaling_three_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# See LICENSE file for licensing details.
import logging
from asyncio import exceptions, gather, sleep
from copy import deepcopy

import pytest
from pytest_operator.plugin import OpsTest
Expand Down Expand Up @@ -60,9 +61,8 @@ async def test_build_and_deploy(ops_test: OpsTest, charm) -> None:
[
["primaries"],
["sync_standbys"],
["replicas"],
["primaries", "replicas"],
["sync_standbys", "replicas"],
["primaries", "sync_standbys"],
["sync_standbys", "sync_standbys"],
],
)
@pytest.mark.abort_on_fail
Expand All @@ -73,8 +73,9 @@ async def test_removing_unit(ops_test: OpsTest, roles: list[str], continuous_wri
original_roles = await get_cluster_roles(
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name
)
copied_roles = deepcopy(original_roles)
await start_continuous_writes(ops_test, app)
units = [original_roles[role][0] for role in roles]
units = [copied_roles[role].pop(0) for role in roles]
for unit in units:
logger.info(f"Stopping unit {unit}")
await stop_machine(ops_test, await get_machine_from_unit(ops_test, unit))
Expand Down Expand Up @@ -121,10 +122,10 @@ async def test_removing_unit(ops_test: OpsTest, roles: list[str], continuous_wri
ops_test, ops_test.model.applications[DATABASE_APP_NAME].units[0].name
)
assert len(new_roles["primaries"]) == 1
assert len(new_roles["sync_standbys"]) == 1
assert len(new_roles["replicas"]) == 1
assert len(new_roles["sync_standbys"]) == 2
assert len(new_roles["replicas"]) == 0
if "primaries" in roles:
assert new_roles["primaries"][0] == original_roles["sync_standbys"][0]
assert new_roles["primaries"][0] in original_roles["sync_standbys"]
else:
assert new_roles["primaries"][0] == original_roles["primaries"][0]

Expand Down
Loading