Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,14 @@ def _reconfigure_cluster(self, event: HookEvent):
and event.relation.data[event.unit].get("ip-to-remove") is not None
):
ip_to_remove = event.relation.data[event.unit].get("ip-to-remove")
logger.info("Removing %s from the cluster due to IP change", ip_to_remove)
try:
self._patroni.remove_raft_member(ip_to_remove)
except RemoveRaftMemberFailedError:
logger.debug("Deferring on_peer_relation_changed: failed to remove raft member")
return False
self._remove_from_members_ips(ip_to_remove)
if ip_to_remove in self.members_ips:
self._remove_from_members_ips(ip_to_remove)
self._add_members(event)
return True

Expand Down Expand Up @@ -804,6 +806,7 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None:

# Remove departing units when the leader changes.
for ip in self._get_ips_to_remove():
logger.info("Removing %s from the cluster", ip)
self._remove_from_members_ips(ip)

self.update_config()
Expand Down
135 changes: 101 additions & 34 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2022 Canonical Ltd.
# See LICENSE file for licensing details.
import logging
import os
import random
import subprocess
Expand All @@ -22,6 +23,8 @@

from ..helpers import APPLICATION_NAME, db_connect, get_unit_address, run_command_on_unit

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
PORT = 5432
APP_NAME = METADATA["name"]
Expand Down Expand Up @@ -74,13 +77,19 @@ async def are_all_db_processes_down(ops_test: OpsTest, process: str) -> bool:
return True


async def are_writes_increasing(ops_test, down_unit: str = None) -> None:
async def are_writes_increasing(
ops_test, down_unit: str = None, use_ip_from_inside: bool = False
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
writes, _ = await count_writes(ops_test, down_unit=down_unit)
writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
with attempt:
more_writes, _ = await count_writes(ops_test, down_unit=down_unit)
more_writes, _ = await count_writes(
ops_test, down_unit=down_unit, use_ip_from_inside=use_ip_from_inside
)
assert more_writes[member] > count, f"{member}: writes not continuing to DB"


Expand Down Expand Up @@ -161,33 +170,46 @@ async def change_wal_settings(
)


async def is_cluster_updated(ops_test: OpsTest, primary_name: str) -> None:
async def is_cluster_updated(
ops_test: OpsTest, primary_name: str, use_ip_from_inside: bool = False
) -> None:
# Verify that the old primary is now a replica.
logger.info("checking that the former primary is now a replica")
assert await is_replica(
ops_test, primary_name
ops_test, primary_name, use_ip_from_inside
), "there are more than one primary in the cluster."

# Verify that all units are part of the same cluster.
member_ips = await fetch_cluster_members(ops_test)
logger.info("checking that all units are part of the same cluster")
member_ips = await fetch_cluster_members(ops_test, use_ip_from_inside)
app = primary_name.split("/")[0]
ip_addresses = [
await get_unit_ip(ops_test, unit.name) for unit in ops_test.model.applications[app].units
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
for unit in ops_test.model.applications[app].units
]
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."

# Verify that no writes to the database were missed after stopping the writes.
total_expected_writes = await check_writes(ops_test)
logger.info("checking that no writes to the database were missed after stopping the writes")
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)

# Verify that old primary is up-to-date.
logger.info("checking that the former primary is up to date with the cluster after restarting")
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."


async def check_writes(ops_test) -> int:
async def check_writes(ops_test, use_ip_from_inside: bool = False) -> int:
"""Gets the total writes from the test charm and compares to the writes from db."""
total_expected_writes = await stop_continuous_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test)
actual_writes, max_number_written = await count_writes(
ops_test, use_ip_from_inside=use_ip_from_inside
)
for member, count in actual_writes.items():
assert (
count == max_number_written[member]
Expand All @@ -197,14 +219,20 @@ async def check_writes(ops_test) -> int:


async def count_writes(
ops_test: OpsTest, down_unit: str = None
ops_test: OpsTest, down_unit: str = None, use_ip_from_inside: bool = False
) -> Tuple[Dict[str, int], Dict[str, int]]:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
password = await get_password(ops_test, app, down_unit)
for unit in ops_test.model.applications[app].units:
if unit.name != down_unit:
cluster = get_patroni_cluster(await get_unit_ip(ops_test, unit.name))
cluster = get_patroni_cluster(
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
)
break
down_ips = []
if down_unit:
Expand Down Expand Up @@ -263,16 +291,21 @@ def cut_network_from_unit_without_ip_change(machine_name: str) -> None:
subprocess.check_call(limit_set_command.split())


async def fetch_cluster_members(ops_test: OpsTest):
async def fetch_cluster_members(ops_test: OpsTest, use_ip_from_inside: bool = False):
"""Fetches the IPs listed by Patroni as cluster members.

Args:
ops_test: OpsTest instance.
use_ip_from_inside: whether to use the IP from inside the unit.
"""
app = await app_name(ops_test)
member_ips = {}
for unit in ops_test.model.applications[app].units:
unit_ip = await get_unit_ip(ops_test, unit.name)
unit_ip = await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
cluster_info = requests.get(f"http://{unit_ip}:8008/cluster")
if len(member_ips) > 0:
# If the list of members IPs was already fetched, also compare the
Expand Down Expand Up @@ -304,6 +337,16 @@ async def get_controller_machine(ops_test: OpsTest) -> str:
][0]


async def get_ip_from_inside_the_unit(ops_test: OpsTest, unit_name: str) -> str:
command = f"exec --unit {unit_name} -- hostname -I"
return_code, stdout, stderr = await ops_test.juju(*command.split())
if return_code != 0:
raise ProcessError(
"Expected command %s to succeed instead it failed: %s %s", command, return_code, stderr
)
return stdout.splitlines()[0].strip()


async def get_patroni_setting(ops_test: OpsTest, setting: str) -> Optional[int]:
"""Get the value of one of the integer Patroni settings.

Expand Down Expand Up @@ -388,20 +431,28 @@ async def get_unit_ip(ops_test: OpsTest, unit_name: str) -> str:


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
async def is_connection_possible(
ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool = False
) -> bool:
"""Test a connection to a PostgreSQL server."""
app = unit_name.split("/")[0]
password = await get_password(ops_test, app, unit_name)
address = await get_unit_ip(ops_test, unit_name)
address = await (
get_ip_from_inside_the_unit(ops_test, unit_name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit_name)
)
try:
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT 1;")
success = cursor.fetchone()[0] == 1
connection.close()
return success
except psycopg2.Error:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT 1;")
success = cursor.fetchone()[0] == 1
connection.close()
return success
except (psycopg2.Error, RetryError):
# Error raised when the connection is not possible.
return False

Expand All @@ -420,9 +471,13 @@ def is_machine_reachable_from(origin_machine: str, target_machine: str) -> bool:
return False


async def is_replica(ops_test: OpsTest, unit_name: str) -> bool:
async def is_replica(ops_test: OpsTest, unit_name: str, use_ip_from_inside: bool = False) -> bool:
"""Returns whether the unit a replica in the cluster."""
unit_ip = await get_unit_ip(ops_test, unit_name)
unit_ip = await (
get_ip_from_inside_the_unit(ops_test, unit_name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit_name)
)
member_name = unit_name.replace("/", "-")

try:
Expand Down Expand Up @@ -532,9 +587,13 @@ async def send_signal_to_process(
)


async def is_postgresql_ready(ops_test, unit_name: str) -> bool:
async def is_postgresql_ready(ops_test, unit_name: str, use_ip_from_inside: bool = False) -> bool:
"""Verifies a PostgreSQL instance is running and available."""
unit_ip = get_unit_address(ops_test, unit_name)
unit_ip = (
(await get_ip_from_inside_the_unit(ops_test, unit_name))
if use_ip_from_inside
else get_unit_address(ops_test, unit_name)
)
try:
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3)):
with attempt:
Expand Down Expand Up @@ -571,15 +630,21 @@ def restore_network_for_unit_without_ip_change(machine_name: str) -> None:
subprocess.check_call(limit_set_command.split())


async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_writes: int) -> bool:
async def is_secondary_up_to_date(
ops_test: OpsTest, unit_name: str, expected_writes: int, use_ip_from_inside: bool = False
) -> bool:
"""Checks if secondary is up-to-date with the cluster.

Retries over the period of one minute to give secondary adequate time to copy over data.
"""
app = await app_name(ops_test)
password = await get_password(ops_test, app)
host = [
await get_unit_ip(ops_test, unit.name)
await (
get_ip_from_inside_the_unit(ops_test, unit.name)
if use_ip_from_inside
else get_unit_ip(ops_test, unit.name)
)
for unit in ops_test.model.applications[app].units
if unit.name == unit_name
][0]
Expand Down Expand Up @@ -679,15 +744,17 @@ async def update_restart_condition(ops_test: OpsTest, unit, condition: str):


@retry(stop=stop_after_attempt(20), wait=wait_fixed(30))
async def wait_network_restore(ops_test: OpsTest, hostname: str, old_ip: str) -> None:
async def wait_network_restore(ops_test: OpsTest, unit_name: str, old_ip: str) -> None:
"""Wait until network is restored.

Args:
ops_test: pytest plugin helper
hostname: The name of the instance
unit_name: name of the unit
old_ip: old registered IP address
"""
if await instance_ip(ops_test, hostname) == old_ip:
# Retrieve the unit IP from inside the unit because it may not be updated in the
# Juju status too quickly.
if (await get_ip_from_inside_the_unit(ops_test, unit_name)) == old_ip:
raise Exception


Expand Down
20 changes: 13 additions & 7 deletions tests/integration/ha_tests/test_self_healing.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,6 @@ async def test_forceful_restart_without_data_and_transaction_logs(


@pytest.mark.group(1)
@pytest.mark.unstable
async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_timeout):
"""Completely cut and restore network."""
# Locate primary unit.
Expand Down Expand Up @@ -456,19 +455,22 @@ async def test_network_cut(ops_test: OpsTest, continuous_writes, primary_start_t

# Wait the LXD unit has its IP updated.
logger.info("waiting for IP address to be updated on Juju unit")
await wait_network_restore(ops_test, primary_hostname, primary_ip)
await wait_network_restore(ops_test, primary_name, primary_ip)

# Verify that the database service got restarted and is ready in the old primary.
logger.info(f"waiting for the database service to be ready on {primary_name}")
assert await is_postgresql_ready(ops_test, primary_name, use_ip_from_inside=True)

# Verify that connection is possible.
logger.info("checking whether the connectivity to the database is working")
assert await is_connection_possible(
ops_test, primary_name
ops_test, primary_name, use_ip_from_inside=True
), "Connection is not possible after network restore"

await is_cluster_updated(ops_test, primary_name)
await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True)


@pytest.mark.group(1)
@pytest.mark.unstable
async def test_network_cut_without_ip_change(
ops_test: OpsTest, continuous_writes, primary_start_timeout
):
Expand Down Expand Up @@ -516,7 +518,7 @@ async def test_network_cut_without_ip_change(

async with ops_test.fast_forward():
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, primary_name)
await are_writes_increasing(ops_test, primary_name, use_ip_from_inside=True)

logger.info("checking whether a new primary was elected")
# Verify that a new primary gets elected (ie old primary is secondary).
Expand All @@ -533,10 +535,14 @@ async def test_network_cut_without_ip_change(
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(apps=[app], status="active")

# Verify that the database service got restarted and is ready in the old primary.
logger.info(f"waiting for the database service to be ready on {primary_name}")
assert await is_postgresql_ready(ops_test, primary_name)

# Verify that connection is possible.
logger.info("checking whether the connectivity to the database is working")
assert await is_connection_possible(
ops_test, primary_name
), "Connection is not possible after network restore"

await is_cluster_updated(ops_test, primary_name)
await is_cluster_updated(ops_test, primary_name, use_ip_from_inside=True)
Loading