Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,9 @@ def _create_pgdata(self, container: Container):

def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None:
"""Event handler for PostgreSQL container on PebbleReadyEvent."""
if self._endpoint in self._endpoints:
self._fix_pod()

# TODO: move this code to an "_update_layer" method in order to also utilize it in
# config-changed hook.
# Get the postgresql container so we can configure/manipulate it.
Expand Down Expand Up @@ -1033,25 +1036,7 @@ def is_blocked(self) -> bool:
return isinstance(self.unit.status, BlockedStatus)

def _on_upgrade_charm(self, _) -> None:
# Recreate k8s resources and add labels required for replication
# when the pod loses them (like when it's deleted).
if self.upgrade.idle:
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

try:
self._patch_pod_labels(self.unit.name)
except ApiError as e:
logger.error("failed to patch pod")
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()
self._fix_pod()

def _patch_pod_labels(self, member: str) -> None:
"""Add labels required for replication to the current pod.
Expand Down Expand Up @@ -1263,6 +1248,27 @@ def _on_get_primary(self, event: ActionEvent) -> None:
except RetryError as e:
logger.error(f"failed to get primary with error {e}")

def _fix_pod(self) -> None:
# Recreate k8s resources and add labels required for replication
# when the pod loses them (like when it's deleted).
if self.upgrade.idle:
try:
self._create_services()
except ApiError:
logger.exception("failed to create k8s services")
self.unit.status = BlockedStatus("failed to create k8s services")
return

try:
self._patch_pod_labels(self.unit.name)
except ApiError as e:
logger.error("failed to patch pod")
self.unit.status = BlockedStatus(f"failed to patch pod with error {e}")
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_stop(self, _):
# Remove data from the drive when scaling down to zero to prevent
# the cluster from getting stuck when scaling back up.
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,16 @@ async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_wr
return True


async def remove_charm_code(ops_test: OpsTest, unit_name: str) -> None:
"""Remove src/charm.py from the PostgreSQL unit."""
await run_command_on_unit(
ops_test,
unit_name,
f'rm /var/lib/juju/agents/unit-{unit_name.replace("/", "-")}/charm/src/charm.py',
"charm",
)


async def send_signal_to_process(
ops_test: OpsTest, unit_name: str, process: str, signal: str, use_ssh: bool = False
) -> None:
Expand Down
115 changes: 115 additions & 0 deletions tests/integration/ha_tests/test_restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import logging

import pytest as pytest
from kubernetes import config, dynamic
from kubernetes.client import api_client
from pytest_operator.plugin import OpsTest

from ..helpers import (
APPLICATION_NAME,
DATABASE_APP_NAME,
app_name,
build_and_deploy,
get_application_units,
get_cluster_members,
get_primary,
get_unit_address,
)
from .helpers import (
are_writes_increasing,
check_writes,
remove_charm_code,
start_continuous_writes,
)

logger = logging.getLogger(__name__)


CLUSTER_SIZE = 3


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_deploy(ops_test: OpsTest) -> None:
"""Build and deploy a PostgreSQL cluster and a test application."""
await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False)
if not await app_name(ops_test, APPLICATION_NAME):
await ops_test.model.deploy(APPLICATION_NAME, num_units=1)

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME, APPLICATION_NAME],
status="active",
timeout=1000,
raise_on_error=False,
)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_restart(ops_test: OpsTest, continuous_writes) -> None:
"""Test restart of all the units simultaneously."""
logger.info("starting continuous writes to the database")
await start_continuous_writes(ops_test, DATABASE_APP_NAME)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

logger.info(
"removing charm code from one non-primary unit to simulate a crash and prevent firing the update-charm hook"
)
primary = await get_primary(ops_test)
status = await ops_test.model.get_status()
for unit in status.applications[DATABASE_APP_NAME].units:
if unit != primary:
non_primary = unit
break
await remove_charm_code(ops_test, non_primary)
logger.info(f"removed charm code from {non_primary}")

logger.info("restarting all the units by deleting their pods")
client = dynamic.DynamicClient(api_client.ApiClient(configuration=config.load_kube_config()))
api = client.resources.get(api_version="v1", kind="Pod")
api.delete(
namespace=ops_test.model.info.name,
label_selector=f"app.kubernetes.io/name={DATABASE_APP_NAME}",
)
await ops_test.model.block_until(
lambda: all(
unit.workload_status == "error"
for unit in ops_test.model.units.values()
if unit.name == non_primary
)
)

# Resolve the error on the non-primary unit.
for unit in ops_test.model.units.values():
if unit.name == non_primary and unit.workload_status == "error":
logger.info(f"resolving {non_primary} error")
await unit.resolved(retry=False)
break

async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[DATABASE_APP_NAME], status="active", raise_on_error=False, timeout=300
)

# Check that all replication slots are present in the primary
# (by checking the list of cluster members).
logger.info(
"checking that all the replication slots are present in the primary by checking the list of cluster members"
)
primary = await get_primary(ops_test)
address = await get_unit_address(ops_test, primary)
assert get_cluster_members(address) == get_application_units(ops_test, DATABASE_APP_NAME)

logger.info("ensure continuous_writes after the crashed unit")
await are_writes_increasing(ops_test)

# Verify that no writes to the database were missed after stopping the writes
# (check that all the units have all the writes).
logger.info("checking whether no writes were lost")
await check_writes(ops_test)
7 changes: 5 additions & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,18 +666,21 @@ def resource_exists(client: Client, resource: GenericNamespacedResource) -> bool
return False


async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) -> str:
async def run_command_on_unit(
ops_test: OpsTest, unit_name: str, command: str, container: str = "postgresql"
) -> str:
"""Run a command on a specific unit.

Args:
ops_test: The ops test framework instance
unit_name: The name of the unit to run the command on
command: The command to run
container: The container to run the command in (default: postgresql)

Returns:
the command output if it succeeds, otherwise raises an exception.
"""
complete_command = f"ssh --container postgresql {unit_name} {command}"
complete_command = f"ssh --container {container} {unit_name} {command}"
return_code, stdout, stderr = await ops_test.juju(*complete_command.split())
if return_code != 0:
raise Exception(
Expand Down
Loading