diff --git a/poetry.lock b/poetry.lock index e5436014db..74133126a0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "allure-pytest" diff --git a/src/charm.py b/src/charm.py index 5b4bc6a6f0..ccbce436f3 100755 --- a/src/charm.py +++ b/src/charm.py @@ -722,6 +722,12 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # config-changed hook. # Get the postgresql container so we can configure/manipulate it. container = event.workload + if not container.can_connect(): + logger.debug( + "Defer on_postgresql_pebble_ready: Waiting for container to become available" + ) + event.defer() + return # Create the PostgreSQL data directory. This is needed on cloud environments # where the volume is mounted with more restrictive permissions. diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 4d1d50e11d..987daa7060 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -12,6 +12,7 @@ import kubernetes as kubernetes import psycopg2 import requests +from juju.model import Model from kubernetes import config from kubernetes.client.api import core_v1_api from kubernetes.stream import stream @@ -184,10 +185,10 @@ async def is_member_isolated( return True -async def check_writes(ops_test) -> int: +async def check_writes(ops_test, extra_model: Model = None) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes, max_number_written = await count_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model) for member, count in actual_writes.items(): assert ( count == max_number_written[member] @@ -196,13 +197,17 @@ async def check_writes(ops_test) -> int: return total_expected_writes -async def are_writes_increasing(ops_test, down_unit: str = None) -> None: +async def are_writes_increasing( + ops_test, down_unit: str = None, extra_model: Model = None +) -> None: """Verify new writes are continuing by counting the number of writes.""" - writes, _ = await count_writes(ops_test, down_unit=down_unit) + writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model) for member, count in writes.items(): for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True): with attempt: - more_writes, _ = await count_writes(ops_test, down_unit=down_unit) + more_writes, _ = await count_writes( + ops_test, down_unit=down_unit, extra_model=extra_model + ) assert ( more_writes[member] > count ), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})" @@ -265,28 +270,34 @@ def copy_file_into_pod( async def count_writes( - ops_test: OpsTest, down_unit: str = None + ops_test: OpsTest, down_unit: str = None, extra_model: Model = None ) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) - status = await ops_test.model.get_status() - for unit_name, unit in status["applications"][app]["units"].items(): - if unit_name != down_unit: - cluster = get_patroni_cluster(unit["address"]) - break + members = [] + for model in [ops_test.model, extra_model]: + if model is None: + continue + status = await model.get_status() + for unit_name, unit in status["applications"][app]["units"].items(): + if unit_name != down_unit: + members_data = get_patroni_cluster(unit["address"])["members"] + for index, member_data in enumerate(members_data): + members_data[index]["model"] = model.info.name + members.extend(members_data) + break count = {} maximum = {} - for member in cluster["members"]: + for member in members: if member["role"] != "replica" and member["host"].split(".")[0] != ( down_unit or "" ).replace("/", "-"): host = member["host"] # Translate the service hostname to an IP address. - model = ops_test.model.info - client = Client(namespace=model.name) + client = Client(namespace=member["model"]) service = client.get(Pod, name=host.split(".")[0]) ip = service.status.podIP @@ -295,12 +306,23 @@ async def count_writes( f" host='{ip}' password='{password}' connect_timeout=10" ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count[member["name"]] = results[0] - maximum[member["name"]] = results[1] - connection.close() + member_name = f'{member["model"]}.{member["name"]}' + connection = None + try: + with psycopg2.connect( + connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member_name] = results[0] + maximum[member_name] = results[1] + except psycopg2.Error: + # Error raised when the connection is not possible. + count[member_name] = -1 + maximum[member_name] = -1 + finally: + if connection is not None: + connection.close() return count, maximum @@ -415,6 +437,42 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op return parameter_value +async def get_standby_leader(model: Model, application_name: str) -> str: + """Get the standby leader name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the standby leader. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "standby_leader": + return member["name"] + + +async def get_sync_standby(model: Model, application_name: str) -> str: + """Get the sync_standby name. + + Args: + model: the model instance. + application_name: the name of the application to get the value for. + + Returns: + the name of the sync standby. + """ + status = await model.get_status() + first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"] + cluster = get_patroni_cluster(first_unit_ip) + for member in cluster["members"]: + if member["role"] == "sync_standby": + return member["name"] + + @retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True) async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool: """Test a connection to a PostgreSQL server.""" @@ -720,23 +778,25 @@ async def send_signal_to_process( ) -async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: +async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None: """Start continuous writes to PostgreSQL.""" # Start the process by relating the application to the database or # by calling the action if the relation already exists. + if model is None: + model = ops_test.model relations = [ relation - for relation in ops_test.model.applications[app].relations + for relation in model.applications[app].relations if not relation.is_peer and f"{relation.requires.application_name}:{relation.requires.name}" == f"{APPLICATION_NAME}:first-database" ] if not relations: - await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database") - await ops_test.model.wait_for_idle(status="active", timeout=1000) + await model.relate(app, f"{APPLICATION_NAME}:first-database") + await model.wait_for_idle(status="active", timeout=1000) else: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) @@ -744,7 +804,7 @@ async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): with attempt: action = ( - await ops_test.model.applications[APPLICATION_NAME] + await model.applications[APPLICATION_NAME] .units[0] .run_action("start-continuous-writes") ) diff --git a/tests/integration/ha_tests/test_async_replication.py b/tests/integration/ha_tests/test_async_replication.py new file mode 100644 index 0000000000..c10611f797 --- /dev/null +++ b/tests/integration/ha_tests/test_async_replication.py @@ -0,0 +1,521 @@ +#!/usr/bin/env python3 +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. +import contextlib +import logging +from asyncio import gather +from typing import Optional + +import psycopg2 +import pytest as pytest +from juju.model import Model +from lightkube import Client +from lightkube.resources.core_v1 import Pod +from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed + +from tests.integration import markers +from tests.integration.ha_tests.helpers import ( + are_writes_increasing, + check_writes, + get_standby_leader, + get_sync_standby, + start_continuous_writes, +) +from tests.integration.helpers import ( + APPLICATION_NAME, + DATABASE_APP_NAME, + build_and_deploy, + get_leader_unit, + get_password, + get_primary, + get_unit_address, + scale_application, + wait_for_relation_removed_between, +) + +logger = logging.getLogger(__name__) + + +CLUSTER_SIZE = 3 +FAST_INTERVAL = "10s" +IDLE_PERIOD = 5 +TIMEOUT = 2000 + + +@contextlib.asynccontextmanager +async def fast_forward( + model: Model, fast_interval: str = "10s", slow_interval: Optional[str] = None +): + """Adaptation of OpsTest.fast_forward to work with different models.""" + update_interval_key = "update-status-hook-interval" + if slow_interval: + interval_after = slow_interval + else: + interval_after = (await model.get_config())[update_interval_key] + + await model.set_config({update_interval_key: fast_interval}) + yield + await model.set_config({update_interval_key: interval_after}) + + +@pytest.fixture(scope="module") +def first_model(ops_test: OpsTest) -> Model: + """Return the first model.""" + first_model = ops_test.model + return first_model + + +@pytest.fixture(scope="module") +async def second_model(ops_test: OpsTest, first_model, request) -> Model: + """Create and return the second model.""" + second_model_name = f"{first_model.info.name}-other" + if second_model_name not in await ops_test._controller.list_models(): + await ops_test._controller.add_model(second_model_name) + second_model = Model() + await second_model.connect(model_name=second_model_name) + yield second_model + if request.config.getoption("--keep-models"): + return + logger.info("Destroying second model") + await ops_test._controller.destroy_model(second_model_name, destroy_storage=True) + + +@pytest.fixture +async def second_model_continuous_writes(second_model) -> None: + """Cleans up continuous writes on the second model after a test run.""" + yield + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await second_model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_deploy_async_replication_setup( + ops_test: OpsTest, first_model: Model, second_model: Model +) -> None: + """Build and deploy two PostgreSQL cluster in two separate models to test async replication.""" + await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False) + await build_and_deploy(ops_test, CLUSTER_SIZE, wait_for_idle=False, model=second_model) + await ops_test.model.deploy(APPLICATION_NAME, num_units=1) + await second_model.deploy(APPLICATION_NAME, num_units=1) + + async with ops_test.fast_forward(), fast_forward(second_model): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME, APPLICATION_NAME], + status="active", + timeout=TIMEOUT, + ), + ) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + continuous_writes, +) -> None: + """Test async replication between two PostgreSQL clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + first_offer_command = f"offer {DATABASE_APP_NAME}:async-primary async-primary" + await ops_test.juju(*first_offer_command.split()) + first_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-primary" + ) + await ops_test.juju(*first_consume_command.split()) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + await second_model.relate(DATABASE_APP_NAME, "async-primary") + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_switchover( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +): + """Test switching over to the second cluster.""" + second_offer_command = f"offer {DATABASE_APP_NAME}:async-replica async-replica" + await ops_test.juju(*second_offer_command.split()) + second_consume_command = ( + f"consume -m {second_model.info.name} admin/{first_model.info.name}.async-replica" + ) + await ops_test.juju(*second_consume_command.split()) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME, model=second_model) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the second cluster") + run_action = await leader_unit.run_action("promote-cluster", **{"force-promotion": True}) + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME, model=second_model) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_promote_standby( + ops_test: OpsTest, + first_model: Model, + second_model: Model, + second_model_continuous_writes, +) -> None: + """Test promoting the standby cluster.""" + logger.info("breaking the relation") + await second_model.applications[DATABASE_APP_NAME].remove_relation( + "async-replica", "async-primary" + ) + wait_for_relation_removed_between(ops_test, "async-primary", "async-replica", second_model) + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], + status="blocked", + idle_period=IDLE_PERIOD, + timeout=TIMEOUT, + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("removing the previous data") + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, primary) + password = await get_password(ops_test) + database_name = f'{APPLICATION_NAME.replace("-", "_")}_first_database' + connection = None + try: + connection = psycopg2.connect( + f"dbname={database_name} user=operator password={password} host={address}" + ) + connection.autocommit = True + cursor = connection.cursor() + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + except psycopg2.Error as e: + assert False, f"Failed to drop continuous writes table: {e}" + finally: + if connection is not None: + connection.close() + + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_reestablish_relation( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that the relation can be broken and re-established.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + logger.info("reestablishing the relation") + await second_model.relate(DATABASE_APP_NAME, "async-primary") + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Run the promote action. + logger.info("Get leader unit") + leader_unit = await get_leader_unit(ops_test, DATABASE_APP_NAME) + assert leader_unit is not None, "No leader unit found" + logger.info("promoting the first cluster") + run_action = await leader_unit.run_action("promote-cluster") + await run_action.wait() + assert (run_action.results.get("return-code", None) == 0) or ( + run_action.results.get("Code", None) == "0" + ), "Promote action failed" + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_main_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails over correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"Sync-standby: {sync_standby}") + logger.info("deleting the sync-standby pod") + client = Client(namespace=first_model.info.name) + client.delete(Pod, name=sync_standby.replace("/", "-")) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + # Check that the sync-standby unit is not the same as before. + new_sync_standby = await get_sync_standby(first_model, DATABASE_APP_NAME) + logger.info(f"New sync-standby: {new_sync_standby}") + assert new_sync_standby != sync_standby, "Sync-standby is the same as before" + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_async_replication_failover_in_secondary_cluster( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication fails back correctly.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + standby_leader = await get_standby_leader(second_model, DATABASE_APP_NAME) + logger.info(f"Standby leader: {standby_leader}") + logger.info("deleting the standby leader pod") + client = Client(namespace=second_model.info.name) + client.delete(Pod, name=standby_leader.replace("/", "-")) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + await gather( + first_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + second_model.wait_for_idle( + apps=[DATABASE_APP_NAME], status="active", idle_period=IDLE_PERIOD, timeout=TIMEOUT + ), + ) + + logger.info("Ensure continuous_writes after the crashed unit") + await are_writes_increasing(ops_test) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) + + +@pytest.mark.group(1) +@markers.juju3 +@pytest.mark.abort_on_fail +async def test_scaling( + ops_test: OpsTest, first_model: Model, second_model: Model, continuous_writes +) -> None: + """Test that async replication works when scaling the clusters.""" + logger.info("starting continuous writes to the database") + await start_continuous_writes(ops_test, DATABASE_APP_NAME) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test) + + async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL): + logger.info("scaling out the first cluster") + first_cluster_original_size = len(first_model.applications[DATABASE_APP_NAME].units) + await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size + 1) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling out the second cluster") + second_cluster_original_size = len(second_model.applications[DATABASE_APP_NAME].units) + await scale_application( + ops_test, DATABASE_APP_NAME, second_cluster_original_size + 1, model=second_model + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling in the first cluster") + await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + logger.info("scaling in the second cluster") + await scale_application( + ops_test, DATABASE_APP_NAME, second_cluster_original_size, model=second_model + ) + + logger.info("checking whether writes are increasing") + await are_writes_increasing(ops_test, extra_model=second_model) + + # Verify that no writes to the database were missed after stopping the writes + # (check that all the units have all the writes). + logger.info("checking whether no writes were lost") + await check_writes(ops_test, extra_model=second_model) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 361f491484..9d88d75af6 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -12,6 +12,7 @@ import psycopg2 import requests import yaml +from juju.model import Model from juju.unit import Unit from lightkube.core.client import Client from lightkube.core.exceptions import ApiError @@ -39,7 +40,9 @@ charm = None -async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") -> Optional[str]: +async def app_name( + ops_test: OpsTest, application_name: str = "postgresql-k8s", model: Model = None +) -> Optional[str]: """Returns the name of the cluster running PostgreSQL. This is important since not all deployments of the PostgreSQL charm have the application name @@ -47,8 +50,10 @@ async def app_name(ops_test: OpsTest, application_name: str = "postgresql-k8s") Note: if multiple clusters are running PostgreSQL this will return the one first found. """ - status = await ops_test.model.get_status() - for app in ops_test.model.applications: + if model is None: + model = ops_test.model + status = await model.get_status() + for app in model.applications: if application_name in status["applications"][app]["charm"]: return app @@ -61,11 +66,15 @@ async def build_and_deploy( database_app_name: str = DATABASE_APP_NAME, wait_for_idle: bool = True, status: str = "active", + model: Model = None, ) -> None: """Builds the charm and deploys a specified number of units.""" + if model is None: + model = ops_test.model + # It is possible for users to provide their own cluster for testing. Hence, check if there # is a pre-existing cluster. - if await app_name(ops_test, database_app_name): + if await app_name(ops_test, database_app_name, model): return global charm @@ -75,7 +84,7 @@ async def build_and_deploy( "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"], } ( - await ops_test.model.deploy( + await model.deploy( charm, resources=resources, application_name=database_app_name, @@ -87,7 +96,7 @@ async def build_and_deploy( ) if wait_for_idle: # Wait until the PostgreSQL charm is successfully deployed. - await ops_test.model.wait_for_idle( + await model.wait_for_idle( apps=[database_app_name], status=status, raise_on_blocked=True, @@ -409,9 +418,11 @@ def get_expected_k8s_resources(application: str) -> set: } -async def get_leader_unit(ops_test: OpsTest, app: str) -> Optional[Unit]: +async def get_leader_unit(ops_test: OpsTest, app: str, model: Model = None) -> Optional[Unit]: leader_unit = None - for unit in ops_test.model.applications[app].units: + if model is None: + model = ops_test.model + for unit in model.applications[app].units: if await unit.is_leader_from_status(): leader_unit = unit break @@ -589,13 +600,16 @@ async def check_tls_patroni_api(ops_test: OpsTest, unit_name: str, enabled: bool return False -def has_relation_exited(ops_test: OpsTest, endpoint_one: str, endpoint_two: str) -> bool: +def has_relation_exited( + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None +) -> bool: """Returns true if the relation between endpoint_one and endpoint_two has been removed.""" - for rel in ops_test.model.relations: + relations = model.relations if model is not None else ops_test.model.relations + for rel in relations: endpoints = [endpoint.name for endpoint in rel.endpoints] - if endpoint_one not in endpoints and endpoint_two not in endpoints: - return True - return False + if endpoint_one in endpoints and endpoint_two in endpoints: + return False + return True @retry( @@ -663,22 +677,27 @@ async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) - return stdout -async def scale_application(ops_test: OpsTest, application_name: str, scale: int) -> None: +async def scale_application( + ops_test: OpsTest, application_name: str, scale: int, model: Model = None +) -> None: """Scale a given application to a specific unit count. Args: ops_test: The ops test framework instance application_name: The name of the application scale: The number of units to scale to + model: The model to scale the application in """ - await ops_test.model.applications[application_name].scale(scale) + if model is None: + model = ops_test.model + await model.applications[application_name].scale(scale) if scale == 0: - await ops_test.model.block_until( - lambda: len(ops_test.model.applications[DATABASE_APP_NAME].units) == scale, + await model.block_until( + lambda: len(model.applications[DATABASE_APP_NAME].units) == scale, timeout=1000, ) else: - await ops_test.model.wait_for_idle( + await model.wait_for_idle( apps=[application_name], status="active", timeout=1000, @@ -747,7 +766,7 @@ async def wait_for_idle_on_blocked( def wait_for_relation_removed_between( - ops_test: OpsTest, endpoint_one: str, endpoint_two: str + ops_test: OpsTest, endpoint_one: str, endpoint_two: str, model: Model = None ) -> None: """Wait for relation to be removed before checking if it's waiting or idle. @@ -755,11 +774,12 @@ def wait_for_relation_removed_between( ops_test: running OpsTest instance endpoint_one: one endpoint of the relation. Doesn't matter if it's provider or requirer. endpoint_two: the other endpoint of the relation. + model: optional model to check for the relation. """ try: for attempt in Retrying(stop=stop_after_delay(3 * 60), wait=wait_fixed(3)): with attempt: - if has_relation_exited(ops_test, endpoint_one, endpoint_two): + if has_relation_exited(ops_test, endpoint_one, endpoint_two, model): break except RetryError: assert False, "Relation failed to exit after 3 minutes."