Skip to content
2 changes: 1 addition & 1 deletion tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int:
f" host='{host}' password='{password}' connect_timeout=10"
)
try:
for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)):
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
with psycopg2.connect(
connection_string
Expand Down
53 changes: 53 additions & 0 deletions tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,56 @@ async def test_freeze_db_process(
assert await secondary_up_to_date(
ops_test, primary_name, total_expected_writes
), "secondary not up to date with the cluster after restarting."


@pytest.mark.ha_self_healing_tests
@pytest.mark.parametrize("process", DB_PROCESSES)
async def test_restart_db_process(
ops_test: OpsTest, process: str, continuous_writes, master_start_timeout
) -> None:
# Locate primary unit.
app = await app_name(ops_test)
primary_name = await get_primary(ops_test, app)

# Start an application that continuously writes data to the database.
await start_continuous_writes(ops_test, app)

# Restart the database process.
await send_signal_to_process(ops_test, primary_name, process, kill_code="SIGTERM")

async with ops_test.fast_forward():
# Verify new writes are continuing by counting the number of writes before and after a
# 60 seconds wait (this is a little more than the loop wait configuration, that is
# considered to trigger a fail-over after master_start_timeout is changed).
writes = await count_writes(ops_test)
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
more_writes = await count_writes(ops_test)
assert more_writes > writes, "writes not continuing to DB"

# Verify that the database service got restarted and is ready in the old primary.
assert await postgresql_ready(ops_test, primary_name)

# Verify that a new primary gets elected (ie old primary is secondary).
new_primary_name = await get_primary(ops_test, app)
assert new_primary_name != primary_name

# Verify that the old primary is now a replica.
assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster."

# Verify that all units are part of the same cluster.
member_ips = await fetch_cluster_members(ops_test)
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units]
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."

# Verify that no writes to the database were missed after stopping the writes.
total_expected_writes = await stop_continuous_writes(ops_test)
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
actual_writes = await count_writes(ops_test)
assert total_expected_writes == actual_writes, "writes to the db were missed."

# Verify that old primary is up-to-date.
assert await secondary_up_to_date(
ops_test, primary_name, total_expected_writes
), "secondary not up to date with the cluster after restarting."