Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
04223a8
Add SST test
marceloneppel Oct 17, 2022
ebc4814
Enable previous tests
marceloneppel Oct 17, 2022
84fca0c
fix early tls deployment by only reloading patroni config if it's alr…
Oct 18, 2022
d5d0ad8
Improve code
marceloneppel Oct 18, 2022
1e64d31
Remove duplicate check
marceloneppel Oct 18, 2022
4688a95
Remove unused import
marceloneppel Oct 18, 2022
2b76d2e
added unit test for reloading patroni
Oct 18, 2022
edcfb0c
lint
Oct 18, 2022
51cee09
removing postgres restart check
Oct 18, 2022
f05a540
Pin Juju agent version on CI
marceloneppel Oct 18, 2022
5a97730
adding series flags to test apps
Oct 18, 2022
072252a
adding series flags to test apps
Oct 18, 2022
f340ccf
made series into a list
Oct 18, 2022
b537e82
Update test_new_relations.py
WRFitch Oct 18, 2022
6c0db4b
Add retrying
marceloneppel Oct 18, 2022
60bc8e2
updating test to better emulate bundle deploymen
Oct 18, 2022
dc3dd8f
Merge branch 'fix-early-tls-deployment' of https://github.com/canonic…
Oct 18, 2022
6772702
Merge remote-tracking branch 'origin/fix-early-tls-deployment' into s…
marceloneppel Oct 19, 2022
3348c2b
Merge branch 'main' into sst-test
marceloneppel Nov 16, 2022
f73aefc
Remove unused code
marceloneppel Nov 16, 2022
c539f78
Change processes list
marceloneppel Nov 16, 2022
ed987ed
Add logic for ensuring all units down
marceloneppel Nov 18, 2022
b1c8c2c
Change delay to only one unit
marceloneppel Nov 18, 2022
8e507a0
Add WAL switch
marceloneppel Nov 18, 2022
6fcdebd
Merge branch 'main' into sst-test
marceloneppel Nov 28, 2022
107e24c
Updates related to WAL removal
marceloneppel Nov 28, 2022
50721e1
Small improvements
marceloneppel Nov 28, 2022
addf716
Add comments
marceloneppel Nov 28, 2022
deb8d23
Change the way service is stopped
marceloneppel Nov 29, 2022
d8d58ac
Remove slot removal
marceloneppel Nov 29, 2022
730e9d4
Small fixes
marceloneppel Nov 29, 2022
3497cf3
Remove unussed parameter
marceloneppel Nov 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tests/integration/ha_tests/clean-data-dir.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env bash

set -Eeuo pipefail
rm -rf /var/lib/postgresql/data/pgdata/*
22 changes: 22 additions & 0 deletions tests/integration/ha_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
from tests.integration.ha_tests.helpers import (
app_name,
change_master_start_timeout,
change_wal_settings,
get_master_start_timeout,
get_postgresql_parameter,
)

APPLICATION_NAME = "application"
Expand Down Expand Up @@ -40,3 +42,23 @@ async def master_start_timeout(ops_test: OpsTest) -> None:
yield
# Rollback to the initial configuration.
await change_master_start_timeout(ops_test, initial_master_start_timeout)


@pytest.fixture()
async def wal_settings(ops_test: OpsTest) -> None:
"""Restore the WAL settings to the initial values."""
# Get the value for each setting.
initial_max_wal_size = await get_postgresql_parameter(ops_test, "max_wal_size")
initial_min_wal_size = await get_postgresql_parameter(ops_test, "min_wal_size")
initial_wal_keep_segments = await get_postgresql_parameter(ops_test, "wal_keep_segments")
yield
# Rollback to the initial settings.
app = await app_name(ops_test)
for unit in ops_test.model.applications[app].units:
await change_wal_settings(
ops_test,
unit.name,
initial_max_wal_size,
initial_min_wal_size,
initial_wal_keep_segments,
)
88 changes: 80 additions & 8 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
from pathlib import Path
from typing import Optional
from typing import Optional, Set

import psycopg2
import requests
Expand Down Expand Up @@ -62,6 +62,35 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int])
)


async def change_wal_settings(
ops_test: OpsTest, unit_name: str, max_wal_size: int, min_wal_size, wal_keep_segments
) -> None:
"""Change WAL settings in the unit.

Args:
ops_test: ops_test instance.
unit_name: name of the unit to change the WAL settings.
max_wal_size: maximum amount of WAL to keep (MB).
min_wal_size: minimum amount of WAL to keep (MB).
wal_keep_segments: number of WAL segments to keep.
"""
for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)):
with attempt:
unit_ip = get_unit_address(ops_test, unit_name)
requests.patch(
f"http://{unit_ip}:8008/config",
json={
"postgresql": {
"parameters": {
"max_wal_size": max_wal_size,
"min_wal_size": min_wal_size,
"wal_keep_segments": wal_keep_segments,
}
}
},
)


async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
Expand Down Expand Up @@ -128,6 +157,32 @@ async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]:
return int(master_start_timeout) if master_start_timeout is not None else None


async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Optional[int]:
"""Get the value of a PostgreSQL parameter from Patroni API.

Args:
ops_test: ops_test instance.
parameter_name: the name of the parameter to get the value for.

Returns:
the value of the requested PostgreSQL parameter.
"""
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
app = await app_name(ops_test)
primary_name = await get_primary(ops_test, app)
unit_ip = get_unit_address(ops_test, primary_name)
configuration_info = requests.get(f"http://{unit_ip}:8008/config")
postgresql_dict = configuration_info.json().get("postgresql")
if postgresql_dict is None:
return None
parameters = postgresql_dict.get("parameters")
if parameters is None:
return None
parameter_value = parameters.get(parameter_name)
return parameter_value


async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str:
"""Use the charm action to retrieve the password from provided application.

Expand Down Expand Up @@ -178,13 +233,30 @@ async def get_primary(ops_test: OpsTest, app) -> str:
"""Use the charm action to retrieve the primary from provided application.

Returns:
string with the password stored on the peer relation databag.
primary unit name.
"""
# Can retrieve from any unit running unit, so we pick the first.
unit_name = ops_test.model.applications[app].units[0].name
action = await ops_test.model.units.get(unit_name).run_action("get-primary")
action = await action.wait()
return action.results["primary"]
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
# Can retrieve from any unit running unit, so we pick the first.
unit_name = ops_test.model.applications[app].units[0].name
action = await ops_test.model.units.get(unit_name).run_action("get-primary")
action = await action.wait()
assert action.results["primary"] is not None
return action.results["primary"]


async def list_wal_files(ops_test: OpsTest, app: str) -> Set:
"""Returns the list of WAL segment files in each unit."""
units = [unit.name for unit in ops_test.model.applications[app].units]
command = "ls -1 /var/lib/postgresql/data/pgdata/pg_wal/"
files = {}
for unit in units:
complete_command = f"run --unit {unit} -- {command}"
return_code, stdout, stderr = await ops_test.juju(*complete_command.split())
files[unit] = stdout.splitlines()
files[unit] = {
i for i in files[unit] if ".history" not in i and i != "" and i != "archive_status"
}
return files


async def send_signal_to_process(
Expand Down Expand Up @@ -238,7 +310,7 @@ async def secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_write
)

try:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
with attempt:
with psycopg2.connect(
connection_string
Expand Down
115 changes: 115 additions & 0 deletions tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@
METADATA,
app_name,
change_master_start_timeout,
change_wal_settings,
count_writes,
fetch_cluster_members,
get_master_start_timeout,
get_primary,
is_replica,
list_wal_files,
postgresql_ready,
secondary_up_to_date,
send_signal_to_process,
start_continuous_writes,
stop_continuous_writes,
)
from tests.integration.helpers import (
db_connect,
get_password,
get_unit_address,
run_command_on_unit,
)

APP_NAME = METADATA["name"]
PATRONI_PROCESS = "/usr/local/bin/patroni"
Expand Down Expand Up @@ -221,3 +229,110 @@ async def test_restart_db_process(
assert await secondary_up_to_date(
ops_test, primary_name, total_expected_writes
), "secondary not up to date with the cluster after restarting."


@pytest.mark.ha_self_healing_tests
async def test_forceful_restart_without_data_and_transaction_logs(
ops_test: OpsTest,
continuous_writes,
master_start_timeout,
wal_settings,
) -> None:
"""A forceful restart with deleted data and without transaction logs (forced clone)."""
app = await app_name(ops_test)
primary_name = await get_primary(ops_test, app)

# Copy data dir content removal script.
await ops_test.juju(
"scp", "tests/integration/ha_tests/clean-data-dir.sh", f"{primary_name}:/tmp"
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe that for this test, we need to remove the data directory while the postgress process is down (we need to extend the systemd restart timeout like mongodb here)

an excerpt of a message from mykola:

for SST test (in MySQL):
1) stop pebble/systemd on one member, remove all files in /var/lib/mysql [data directory] (simulate HDD failure)
2) write data to new primary
3) rotate binlog and remove rotated binlog ON ALL alive members. literally remove data written on step 2 (with such we simulate looong period of downtime)
4) run mysql on member from step 1)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the additional details, Shayan! I'll update the PR to have a similar approach on PostgreSQL.

Copy link
Member Author

@marceloneppel marceloneppel Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @shayancanonical! I updated the code to have the right steps that simulate the needed scenario.

I haven't changed the systemd service restart timeout as after stopping the service it was not being restarted until I request to start it again.

I also added a check to ensure the WAL files (the equivalent to MysQL binlog) are correctly rotated (a new one is created - in fact, more than one new WAL file is kept due to some settings that enabled the old ones to be removed).


# Start an application that continuously writes data to the database.
await start_continuous_writes(ops_test, app)

# Change the "master_start_timeout" parameter to speed up the fail-over.
original_master_start_timeout = await get_master_start_timeout(ops_test)
await change_master_start_timeout(ops_test, 0)

# Stop the systemd service on the primary unit.
await run_command_on_unit(ops_test, primary_name, "systemctl stop patroni")

# Data removal runs within a script, so it allows `*` expansion.
return_code, _, _ = await ops_test.juju(
"ssh",
primary_name,
"sudo",
"/tmp/clean-data-dir.sh",
)
assert return_code == 0, "Failed to remove data directory"

async with ops_test.fast_forward():
# Verify that a new primary gets elected (ie old primary is secondary).
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
new_primary_name = await get_primary(ops_test, app)
assert new_primary_name is not None
assert new_primary_name != primary_name

# Revert the "master_start_timeout" parameter to avoid fail-over again.
await change_master_start_timeout(ops_test, original_master_start_timeout)

# Verify new writes are continuing by counting the number of writes before and after a
# 60 seconds wait (this is a little more than the loop wait configuration, that is
# considered to trigger a fail-over after master_start_timeout is changed).
writes = await count_writes(ops_test, primary_name)
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
more_writes = await count_writes(ops_test, primary_name)
assert more_writes > writes, "writes not continuing to DB"

# Change some settings to enable WAL rotation.
for unit in ops_test.model.applications[app].units:
if unit.name == primary_name:
continue
await change_wal_settings(ops_test, unit.name, 32, 32, 1)

# Rotate the WAL segments.
files = await list_wal_files(ops_test, app)
host = get_unit_address(ops_test, new_primary_name)
password = await get_password(ops_test, new_primary_name)
with db_connect(host, password) as connection:
connection.autocommit = True
with connection.cursor() as cursor:
# Run some commands to make PostgreSQL do WAL rotation.
cursor.execute("SELECT pg_switch_wal();")
cursor.execute("CHECKPOINT;")
cursor.execute("SELECT pg_switch_wal();")
connection.close()
new_files = await list_wal_files(ops_test, app)
# Check that the WAL was correctly rotated.
for unit_name in files:
assert not files[unit_name].intersection(
new_files
), "WAL segments weren't correctly rotated"

# Start the systemd service in the old primary.
await run_command_on_unit(ops_test, primary_name, "systemctl start patroni")

# Verify that the database service got restarted and is ready in the old primary.
assert await postgresql_ready(ops_test, primary_name)

# Verify that the old primary is now a replica.
assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster."

# Verify that all units are part of the same cluster.
member_ips = await fetch_cluster_members(ops_test)
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units]
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."

# Verify that no writes to the database were missed after stopping the writes.
total_expected_writes = await stop_continuous_writes(ops_test)
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
actual_writes = await count_writes(ops_test)
assert total_expected_writes == actual_writes, "writes to the db were missed."

# Verify that old primary is up-to-date.
assert await secondary_up_to_date(
ops_test, primary_name, total_expected_writes
), "secondary not up to date with the cluster after restarting."
16 changes: 16 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,22 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool
return False


async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) -> None:
"""Run a command on a specific unit.

Args:
ops_test: The ops test framework instance
unit_name: The name of the unit to run the command on
command: The command to run
"""
complete_command = f"run --unit {unit_name} -- {command}"
return_code, stdout, _ = await ops_test.juju(*complete_command.split())
if return_code != 0:
raise Exception(
"Expected command %s to succeed instead it failed: %s", command, return_code
)


async def scale_application(ops_test: OpsTest, application_name: str, count: int) -> None:
"""Scale a given application to a specific unit count.

Expand Down