From d8a90edece58110b19521faaab3307b16975c5d0 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 28 Jun 2022 17:00:15 -0300 Subject: [PATCH 01/21] Add k8s resources cleanup --- src/charm.py | 66 ++++- src/resources.yaml | 29 -- tests/integration/helpers.py | 136 +++++++++ tests/integration/test_charm.py | 484 +++++++++++++++++--------------- 4 files changed, 456 insertions(+), 259 deletions(-) create mode 100644 tests/integration/helpers.py diff --git a/src/charm.py b/src/charm.py index 73a0048474..acd081a321 100755 --- a/src/charm.py +++ b/src/charm.py @@ -9,7 +9,8 @@ import string from lightkube import ApiError, Client, codecs -from lightkube.resources.core_v1 import Pod +from lightkube.resources.core_v1 import Endpoints, Pod, Service +from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding from ops.charm import ActionEvent, CharmBase, WorkloadEvent from ops.main import main from ops.model import ( @@ -46,6 +47,7 @@ def __init__(self, *args): self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on.postgresql_pebble_ready, self._on_postgresql_pebble_ready) + self.framework.observe(self.on.stop, self._on_stop) self.framework.observe(self.on.upgrade_charm, self._on_upgrade_charm) self.framework.observe( self.on.get_postgres_password_action, self._on_get_postgres_password @@ -131,14 +133,20 @@ def _patch_pod_labels(self) -> None: def _create_resources(self): """Create kubernetes resources needed for Patroni.""" + client = Client() try: - client = Client() with open("src/resources.yaml") as f: - for obj in codecs.load_all_yaml(f, context=self._context): - client.create(obj) + for resource in codecs.load_all_yaml(f, context=self._context): + client.create(resource) + logger.info(f"created {str(resource)}") except ApiError as e: - logger.error("failed to create resources") - self.unit.status = BlockedStatus(f"failed to create resources with error {e}") + if e.status.code == 409: + logger.info("replacing resource: %s.", str(resource.to_dict())) + client.replace(resource) + else: + logger.error("failed to create resource: %s.", str(resource.to_dict())) + self.unit.status = BlockedStatus(f"failed to create services {e}") + return def _on_get_postgres_password(self, event: ActionEvent) -> None: """Returns the password for the postgres user as an action response.""" @@ -152,6 +160,51 @@ def _on_get_primary(self, event: ActionEvent) -> None: except RetryError as e: logger.error(f"failed to get primary with error {e}") + def _on_stop(self, _) -> None: + """Handle the stop event.""" + # Check to run the teardown actions only once. + if not self.unit.is_leader(): + return + + client = Client() + resources_to_delete = [] + + # Get the k8s resources created by the charm. + with open("src/resources.yaml") as f: + resources = codecs.load_all_yaml(f, context=self._context) + # Ignore the cluster role and its binding that were created together with the + # application and also the service resources, which will be retrieved in the next step. + resources_to_delete.extend( + list( + filter( + lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), + resources, + ) + ) + ) + + # Get the k8s resources created by Patroni. + for kind in [Endpoints, Service]: + resources_to_delete.extend( + client.list( + kind, + namespace=self._namespace, + labels={"app.juju.is/created-by": f"{self._name}"}, + ) + ) + + # Delete the resources. + for resource in resources_to_delete: + try: + client.delete( + type(resource), + name=resource.metadata.name, + namespace=resource.metadata.namespace, + ) + except ApiError: + # Only log a message, as the charm is being stopped. + logger.error(f"failed to delete resource: {resource}.") + def _on_update_status(self, _) -> None: # Until https://github.com/canonical/pebble/issues/6 is fixed, # we need to use the logic below to restart the leader @@ -209,6 +262,7 @@ def _postgresql_layer(self) -> Layer: "environment": { "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: {self._name}}}", "PATRONI_KUBERNETES_NAMESPACE": self._namespace, + "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", "PATRONI_NAME": pod_name, "PATRONI_SCOPE": self._namespace, "PATRONI_REPLICATION_USERNAME": "replication", diff --git a/src/resources.yaml b/src/resources.yaml index 219a7dedf3..739ee3756a 100644 --- a/src/resources.yaml +++ b/src/resources.yaml @@ -54,32 +54,3 @@ roleRef: subjects: - kind: ServiceAccount name: {{ app_name }} - ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: patroni-k8s-ep-access -rules: -- apiGroups: - - "" - resources: - - endpoints - resourceNames: - - kubernetes - verbs: - - get - ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRoleBinding -metadata: - name: patroni-k8s-ep-access -roleRef: - apiGroup: rbac.authorization.k8s.io - kind: ClusterRole - name: patroni-k8s-ep-access -subjects: -- kind: ServiceAccount - name: {{ app_name }} - namespace: {{ namespace }} \ No newline at end of file diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 0000000000..8c6ea52b91 --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +# Copyright 2022 Canonical Ltd. +# See LICENSE file for licensing details. +from typing import List + +from lightkube import codecs +from lightkube.core.client import Client +from lightkube.core.exceptions import ApiError +from lightkube.resources.core_v1 import Endpoints, Service +from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding +from pytest_operator.plugin import OpsTest + + +def get_charm_resources(namespace: str, application: str): + context = {"namespace": namespace, "app_name": application} + + # Check if any resource created by the charm still exists. + with open("src/resources.yaml") as f: + # Load the list of the resources that should be created by the charm. + charm_resources = list( + filter( + lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), + codecs.load_all_yaml(f, context=context), + ) + ) + return charm_resources + + +def get_expected_patroni_k8s_resources(application: str, namespace: str) -> set[str]: + resources = set() + + # Define the context needed for the k8s resources lists load. + context = {"namespace": namespace, "app_name": application} + + with open("src/resources.yaml") as f: + # Load the list of the resources that should be created by the charm. + charm_resources = codecs.load_all_yaml(f, context=context) + resources = set( + map( + lambda x: f"{type(x).__name__}/{x.metadata.name}", + charm_resources, + ) + ) + + # Include the resources that Patroni creates when it starts. + patroni_resources = [ + f'Endpoints/{namespace}-config', f'Endpoints/{namespace}', f'Service/{namespace}-config' + ] + resources.update(patroni_resources) + + return resources + + +def get_existing_patroni_k8s_resources( + ops_test: OpsTest, application: str, namespace: str +) -> set[str]: + """Count the k8s resources that were created by the charm or by Patroni. + + Args: + ops_test: ops_test instance. + application: application name. + namespace: namespace related to the model where + the charm was deployed. + + Returns: + count of existing charm/Patroni specific k8s resources. + """ + resources = set() + + # Define the context needed for the k8s resources lists load. + client = Client(namespace=namespace) + + # Define the context needed for the k8s resources lists load. + context = {"namespace": namespace, "app_name": application} + + with open("src/resources.yaml") as f: + # Load the list of the resources that should be created by the charm. + charm_resources = codecs.load_all_yaml(f, context=context) + + existing_charm_resources = list( + map( + lambda x: f"{type(x).__name__}/{x.metadata.name}", + filter( + lambda x: (resource_exists(client, x)), + charm_resources, + ) + ) + ) + # Add only the existing resources to the list. + resources.update( + set( + map( + lambda x: f"{x.split('/')[0]}/{x.split('/')[1]}", + existing_charm_resources, + ) + ) + ) + + # List the resources created by Patroni. + for kind in [Endpoints, Service]: + patroni_resources = client.list( + kind, + namespace=namespace, + labels={"app.juju.is/created-by": application}, + ) + mapped_patroni_resources = set( + map( + lambda x: f"{kind.__name__}/{x.metadata.name}", + patroni_resources, + ) + ) + resources.update( + mapped_patroni_resources + ) + + return resources + + +async def get_model_name(ops_test: OpsTest) -> str: + """Get the name of the current model. + + Args: + ops_test: ops_test instance. + Returns: + model name. + """ + model = await ops_test.model.get_info() + return model.name + + +def resource_exists(client: Client, resource) -> bool: + try: + client.get(type(resource), name=resource.metadata.name) + return True + except ApiError as e: + return False diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index bc3a645e4d..7450ad6642 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -14,6 +14,11 @@ from tenacity import retry, retry_if_result, stop_after_attempt, wait_exponential from tests.helpers import METADATA, STORAGE_PATH +from tests.integration.helpers import ( + get_existing_patroni_k8s_resources, + get_expected_patroni_k8s_resources, + get_model_name, +) logger = logging.getLogger(__name__) @@ -45,229 +50,260 @@ async def test_build_and_deploy(ops_test: OpsTest): assert ops_test.model.applications[APP_NAME].units[0].workload_status == "active" -@pytest.mark.parametrize("unit_id", UNIT_IDS) -async def test_labels_consistency_across_pods(ops_test: OpsTest, unit_id: int) -> None: - model = await ops_test.model.get_info() - client = AsyncClient(namespace=model.name) - pod = await client.get(Pod, name=f"postgresql-k8s-{unit_id}") - # Ensures that the correct kubernetes labels are set - # (these ones guarantee the correct working of replication). - assert pod.metadata.labels["application"] == "patroni" - assert pod.metadata.labels["cluster-name"] == model.name - - -async def test_database_is_up(ops_test: OpsTest): - password = await get_postgres_password(ops_test) - - # Testing the connection to each PostgreSQL instance. - status = await ops_test.model.get_status() # noqa: F821 - for unit in status["applications"][APP_NAME]["units"].values(): - host = unit["address"] - logger.info("connecting to the database host: %s", host) - connection = db_connect(host=host, password=password) - assert connection.status == psycopg2.extensions.STATUS_READY - connection.close() - - -@pytest.mark.parametrize("unit_id", UNIT_IDS) -async def test_config_files_are_correct(ops_test: OpsTest, unit_id: int): - unit_name = f"postgresql-k8s/{unit_id}" - - # Retrieve the pod IP. - status = await ops_test.model.get_status() # noqa: F821 - for _, unit in status["applications"][APP_NAME]["units"].items(): - if unit["provider-id"] == unit_name.replace("/", "-"): - pod_ip = unit["address"] - break - - # Get the expected contents from files. - with open("templates/patroni.yml.j2") as file: - template = Template(file.read()) - expected_patroni_yml = template.render(pod_ip=pod_ip, storage_path=STORAGE_PATH) - with open("tests/data/postgresql.conf") as file: - expected_postgresql_conf = file.read() - - unit = ops_test.model.units[unit_name] - - # Check whether Patroni configuration is correctly set up. - patroni_yml_data = await pull_content_from_unit_file(unit, f"{STORAGE_PATH}/patroni.yml") - assert patroni_yml_data == expected_patroni_yml - - # Check that the PostgreSQL settings are as expected. - postgresql_conf_data = await pull_content_from_unit_file( - unit, f"{STORAGE_PATH}/postgresql-k8s-operator.conf" - ) - assert postgresql_conf_data == expected_postgresql_conf - - -async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: - """Tests that the cluster maintains a primary after the primary is deleted.""" - # Find the current primary unit. - primary = await get_primary(ops_test) - - # Delete the primary pod. - model = await ops_test.model.get_info() - client = AsyncClient(namespace=model.name) - await client.delete(Pod, name=primary.replace("/", "-")) - logger.info(f"deleted pod {primary}") - - # Wait and get the primary again (which can be any unit, including the previous primary). - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 - ) - primary = await get_primary(ops_test) - - # We also need to check that a replica can see the leader - # to make sure that the cluster is stable again. - other_unit_id = 1 if primary.split("/")[1] == 0 else 0 - assert await get_primary(ops_test, other_unit_id) != "None" - - -async def test_persist_data_through_graceful_restart(ops_test: OpsTest): - """Test data persists through a graceful restart.""" - primary = await get_primary(ops_test) - password = await get_postgres_password(ops_test) - status = await ops_test.model.get_status() - address = status["applications"][APP_NAME].units[primary]["address"] - - # Write data to primary IP. - logger.info(f"connecting to primary {primary} on {address}") - with db_connect(host=address, password=password) as connection: - connection.autocommit = True - connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") - - # Restart all nodes by scaling to 0, then back up - # These have to run sequentially for the test to be valid/stable. - await ops_test.model.applications[APP_NAME].scale(0) - await ops_test.model.applications[APP_NAME].scale(3) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - - # Testing write occurred to every postgres instance by reading from them - status = await ops_test.model.get_status() # noqa: F821 - for unit in status["applications"][APP_NAME]["units"].values(): - host = unit["address"] - logger.info("connecting to the database host: %s", host) - with db_connect(host=host, password=password) as connection: - # Ensure we can read from "gracetest" table - connection.cursor().execute("SELECT * FROM gracetest;") - - -async def test_persist_data_through_failure(ops_test: OpsTest): - """Test data persists through a failure.""" - primary = await get_primary(ops_test) - password = await get_postgres_password(ops_test) - status = await ops_test.model.get_status() - address = status["applications"][APP_NAME].units[primary]["address"] - - # Write data to primary IP. - logger.info(f"connecting to primary {primary} on {address}") - with db_connect(host=address, password=password) as connection: - connection.autocommit = True - connection.cursor().execute("CREATE TABLE failtest (testcol INT );") - - # Cause a machine failure by killing a unit in k8s - model = await ops_test.model.get_info() - client = AsyncClient(namespace=model.name) - await client.delete(Pod, name=primary.replace("/", "-")) - logger.info("primary pod deleted") - - # Wait for juju to notice one of the pods is gone and fix it - logger.info("wait for juju to reset postgres container") - await ops_test.model.wait_for_idle( - apps=[APP_NAME], - status="active", - timeout=1000, - wait_for_exact_units=3, - check_freq=2, - idle_period=45, - ) - logger.info("juju has reset postgres container") - - # Testing write occurred to every postgres instance by reading from them - status = await ops_test.model.get_status() # noqa: F821 - for unit in status["applications"][APP_NAME]["units"].values(): - host = unit["address"] - logger.info("connecting to the database host: %s", host) - with db_connect(host=host, password=password) as connection: - # Ensure we can read from "failtest" table - connection.cursor().execute("SELECT * FROM failtest;") - - -async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: - """Tests that an automatic failover is triggered after an issue happens in the leader.""" - # Find the current primary unit. - primary = await get_primary(ops_test) - - # Crash PostgreSQL by removing the data directory. - await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") - - # Wait for charm to stabilise - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 - ) - - # Primary doesn't have to be different, but it does have to exist. - assert await get_primary(ops_test) != "None" - - -@retry( - retry=retry_if_result(lambda x: not x), - stop=stop_after_attempt(10), - wait=wait_exponential(multiplier=1, min=2, max=30), -) -async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: - """Checks whether or not the primary unit has changed.""" - primary = await get_primary(ops_test) - return primary != old_primary - - -async def get_primary(ops_test: OpsTest, unit_id=0) -> str: - """Get the primary unit. - - Args: - ops_test: ops_test instance. - unit_id: the number of the unit. - - Returns: - the current primary unit. - """ - action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action("get-primary") - action = await action.wait() - return action.results["primary"] - - -async def get_postgres_password(ops_test: OpsTest): - """Retrieve the postgres user password using the action.""" - unit = ops_test.model.units.get(f"{APP_NAME}/0") - action = await unit.run_action("get-postgres-password") - result = await action.wait() - return result.results["postgres-password"] - - -async def pull_content_from_unit_file(unit, path: str) -> str: - """Pull the content of a file from one unit. - - Args: - unit: the Juju unit instance. - path: the path of the file to get the contents from. - - Returns: - the entire content of the file. - """ - action = await unit.run(f"cat {path}") - return action.results.get("Stdout", None) - - -def db_connect(host: str, password: str): - """Returns psycopg2 connection object linked to postgres db in the given host. +@pytest.mark.abort_on_fail +async def test_application_created_required_resources(ops_test: OpsTest) -> None: + # TODO: move namespace to inside the get resources function. + namespace = await get_model_name(ops_test) + existing_resources = get_existing_patroni_k8s_resources(ops_test, APP_NAME, namespace) + expected_resources = get_expected_patroni_k8s_resources(APP_NAME, namespace) + print(f'existing_resources: {existing_resources}') + print(f'expected_resources: {expected_resources}') + assert set(existing_resources) == set(expected_resources) - Args: - host: the IP of the postgres host container - password: postgres password - Returns: - psycopg2 connection object linked to postgres db, under "postgres" user. - """ - return psycopg2.connect( - f"dbname='postgres' user='postgres' host='{host}' password='{password}' connect_timeout=10" - ) +@pytest.mark.abort_on_fail +async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: + # TODO: move namespace to inside the get resources function. + await ops_test.model.applications[APP_NAME].remove() + await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) + namespace = await get_model_name(ops_test) + existing_resources = get_existing_patroni_k8s_resources(ops_test, APP_NAME, namespace) + print(f'existing_resources: {existing_resources}') + assert set(existing_resources) == set() + + +# @pytest.mark.parametrize("unit_id", UNIT_IDS) +# async def test_labels_consistency_across_pods(ops_test: OpsTest, unit_id: int) -> None: +# model = await ops_test.model.get_info() +# client = AsyncClient(namespace=model.name) +# pod = await client.get(Pod, name=f"postgresql-k8s-{unit_id}") +# # Ensures that the correct kubernetes labels are set +# # (these ones guarantee the correct working of replication). +# assert pod.metadata.labels["application"] == "patroni" +# assert pod.metadata.labels["cluster-name"] == model.name +# +# +# async def test_database_is_up(ops_test: OpsTest): +# password = await get_postgres_password(ops_test) +# +# # Testing the connection to each PostgreSQL instance. +# status = await ops_test.model.get_status() # noqa: F821 +# for unit in status["applications"][APP_NAME]["units"].values(): +# host = unit["address"] +# logger.info("connecting to the database host: %s", host) +# connection = db_connect(host=host, password=password) +# assert connection.status == psycopg2.extensions.STATUS_READY +# connection.close() +# +# +# @pytest.mark.parametrize("unit_id", UNIT_IDS) +# async def test_config_files_are_correct(ops_test: OpsTest, unit_id: int): +# unit_name = f"postgresql-k8s/{unit_id}" +# +# # Retrieve the pod IP. +# status = await ops_test.model.get_status() # noqa: F821 +# for _, unit in status["applications"][APP_NAME]["units"].items(): +# if unit["provider-id"] == unit_name.replace("/", "-"): +# pod_ip = unit["address"] +# break +# +# # Get the expected contents from files. +# with open("templates/patroni.yml.j2") as file: +# template = Template(file.read()) +# expected_patroni_yml = template.render(pod_ip=pod_ip, storage_path=STORAGE_PATH) +# with open("tests/data/postgresql.conf") as file: +# expected_postgresql_conf = file.read() +# +# unit = ops_test.model.units[unit_name] +# +# # Check whether Patroni configuration is correctly set up. +# patroni_yml_data = await pull_content_from_unit_file(unit, f"{STORAGE_PATH}/patroni.yml") +# assert patroni_yml_data == expected_patroni_yml +# +# # Check that the PostgreSQL settings are as expected. +# postgresql_conf_data = await pull_content_from_unit_file( +# unit, f"{STORAGE_PATH}/postgresql-k8s-operator.conf" +# ) +# assert postgresql_conf_data == expected_postgresql_conf +# +# +# async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: +# """Tests that the cluster maintains a primary after the primary is deleted.""" +# # Find the current primary unit. +# primary = await get_primary(ops_test) +# +# # Delete the primary pod. +# model = await ops_test.model.get_info() +# client = AsyncClient(namespace=model.name) +# await client.delete(Pod, name=primary.replace("/", "-")) +# logger.info(f"deleted pod {primary}") +# +# # Wait and get the primary again (which can be any unit, including the previous primary). +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 +# ) +# primary = await get_primary(ops_test) +# +# # We also need to check that a replica can see the leader +# # to make sure that the cluster is stable again. +# other_unit_id = 1 if primary.split("/")[1] == 0 else 0 +# assert await get_primary(ops_test, other_unit_id) != "None" +# +# +# async def test_persist_data_through_graceful_restart(ops_test: OpsTest): +# """Test data persists through a graceful restart.""" +# primary = await get_primary(ops_test) +# password = await get_postgres_password(ops_test) +# status = await ops_test.model.get_status() +# address = status["applications"][APP_NAME].units[primary]["address"] +# +# # Write data to primary IP. +# logger.info(f"connecting to primary {primary} on {address}") +# with db_connect(host=address, password=password) as connection: +# connection.autocommit = True +# connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") +# +# # Restart all nodes by scaling to 0, then back up +# # These have to run sequentially for the test to be valid/stable. +# await ops_test.model.applications[APP_NAME].scale(0) +# await ops_test.model.applications[APP_NAME].scale(3) +# await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) +# +# # Testing write occurred to every postgres instance by reading from them +# status = await ops_test.model.get_status() # noqa: F821 +# for unit in status["applications"][APP_NAME]["units"].values(): +# host = unit["address"] +# logger.info("connecting to the database host: %s", host) +# with db_connect(host=host, password=password) as connection: +# # Ensure we can read from "gracetest" table +# connection.cursor().execute("SELECT * FROM gracetest;") +# +# +# async def test_persist_data_through_failure(ops_test: OpsTest): +# """Test data persists through a failure.""" +# primary = await get_primary(ops_test) +# password = await get_postgres_password(ops_test) +# status = await ops_test.model.get_status() +# address = status["applications"][APP_NAME].units[primary]["address"] +# +# # Write data to primary IP. +# logger.info(f"connecting to primary {primary} on {address}") +# with db_connect(host=address, password=password) as connection: +# connection.autocommit = True +# connection.cursor().execute("CREATE TABLE failtest (testcol INT );") +# +# # Cause a machine failure by killing a unit in k8s +# model = await ops_test.model.get_info() +# client = AsyncClient(namespace=model.name) +# await client.delete(Pod, name=primary.replace("/", "-")) +# logger.info("primary pod deleted") +# +# # Wait for juju to notice one of the pods is gone and fix it +# logger.info("wait for juju to reset postgres container") +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], +# status="active", +# timeout=1000, +# wait_for_exact_units=3, +# check_freq=2, +# idle_period=45, +# ) +# logger.info("juju has reset postgres container") +# +# # Testing write occurred to every postgres instance by reading from them +# status = await ops_test.model.get_status() # noqa: F821 +# for unit in status["applications"][APP_NAME]["units"].values(): +# host = unit["address"] +# logger.info("connecting to the database host: %s", host) +# with db_connect(host=host, password=password) as connection: +# # Ensure we can read from "failtest" table +# connection.cursor().execute("SELECT * FROM failtest;") +# +# +# async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: +# """Tests that an automatic failover is triggered after an issue happens in the leader.""" +# # Find the current primary unit. +# primary = await get_primary(ops_test) +# +# # Crash PostgreSQL by removing the data directory. +# await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") +# +# # Wait for charm to stabilise +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 +# ) +# +# # Primary doesn't have to be different, but it does have to exist. +# assert await get_primary(ops_test) != "None" +# +# +# async def test_remove_application_remove_all_created_k8s_resources(ops_test: OpsTest) -> None: +# # Check the existing resources. +# +# +# # Remove the application and check that the resources were correctly deleted. +# await ops_test.model.applications[APP_NAME].remove() +# await ops_test.model.wait_for_idle(timeout=1000) +# +# +# @retry( +# retry=retry_if_result(lambda x: not x), +# stop=stop_after_attempt(10), +# wait=wait_exponential(multiplier=1, min=2, max=30), +# ) +# async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: +# """Checks whether or not the primary unit has changed.""" +# primary = await get_primary(ops_test) +# return primary != old_primary +# +# +# async def get_primary(ops_test: OpsTest, unit_id=0) -> str: +# """Get the primary unit. +# +# Args: +# ops_test: ops_test instance. +# unit_id: the number of the unit. +# +# Returns: +# the current primary unit. +# """ +# action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action("get-primary") +# action = await action.wait() +# return action.results["primary"] +# +# +# async def get_postgres_password(ops_test: OpsTest): +# """Retrieve the postgres user password using the action.""" +# unit = ops_test.model.units.get(f"{APP_NAME}/0") +# action = await unit.run_action("get-postgres-password") +# result = await action.wait() +# return result.results["postgres-password"] +# +# +# async def pull_content_from_unit_file(unit, path: str) -> str: +# """Pull the content of a file from one unit. +# +# Args: +# unit: the Juju unit instance. +# path: the path of the file to get the contents from. +# +# Returns: +# the entire content of the file. +# """ +# action = await unit.run(f"cat {path}") +# return action.results.get("Stdout", None) +# +# +# def db_connect(host: str, password: str): +# """Returns psycopg2 connection object linked to postgres db in the given host. +# +# Args: +# host: the IP of the postgres host container +# password: postgres password +# +# Returns: +# psycopg2 connection object linked to postgres db, under "postgres" user. +# """ +# return psycopg2.connect( +# f"dbname='postgres' user='postgres' host='{host}' password='{password}' connect_timeout=10" +# ) From ef8819ba49ceb99b7ea346fd7d5293c25a77a9cc Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 Jun 2022 21:04:26 -0300 Subject: [PATCH 02/21] Improve code --- src/charm.py | 2 + tests/integration/helpers.py | 143 ++++++++++++++++++-------------- tests/integration/test_charm.py | 32 +++---- tests/unit/test_charm.py | 1 + 4 files changed, 100 insertions(+), 78 deletions(-) diff --git a/src/charm.py b/src/charm.py index acd081a321..36afe7f104 100755 --- a/src/charm.py +++ b/src/charm.py @@ -140,6 +140,8 @@ def _create_resources(self): client.create(resource) logger.info(f"created {str(resource)}") except ApiError as e: + # The 409 error code means that the resource was already created or has a higher version. + # This can happen if Patroni creates some resource that the charm is expected to create. if e.status.code == 409: logger.info("replacing resource: %s.", str(resource.to_dict())) client.replace(resource) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 8c6ea52b91..aff7cc6491 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -1,118 +1,127 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -from typing import List from lightkube import codecs from lightkube.core.client import Client from lightkube.core.exceptions import ApiError +from lightkube.core.resource import NamespacedResourceG from lightkube.resources.core_v1 import Endpoints, Service from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding from pytest_operator.plugin import OpsTest def get_charm_resources(namespace: str, application: str): - context = {"namespace": namespace, "app_name": application} - - # Check if any resource created by the charm still exists. - with open("src/resources.yaml") as f: - # Load the list of the resources that should be created by the charm. - charm_resources = list( - filter( - lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), - codecs.load_all_yaml(f, context=context), - ) - ) - return charm_resources - + """Return the list of k8s resources from resources.yaml file. -def get_expected_patroni_k8s_resources(application: str, namespace: str) -> set[str]: - resources = set() + Args: + namespace: namespace related to the model where + the charm was deployed. + application: application name. + Returns: + list of existing charm/Patroni specific k8s resources. + """ # Define the context needed for the k8s resources lists load. context = {"namespace": namespace, "app_name": application} + # Load the list of the resources from resources.yaml. with open("src/resources.yaml") as f: - # Load the list of the resources that should be created by the charm. - charm_resources = codecs.load_all_yaml(f, context=context) - resources = set( - map( - lambda x: f"{type(x).__name__}/{x.metadata.name}", - charm_resources, + return list( + filter( + lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), + codecs.load_all_yaml(f, context=context), ) ) - # Include the resources that Patroni creates when it starts. - patroni_resources = [ - f'Endpoints/{namespace}-config', f'Endpoints/{namespace}', f'Service/{namespace}-config' - ] - resources.update(patroni_resources) - - return resources - -def get_existing_patroni_k8s_resources( - ops_test: OpsTest, application: str, namespace: str -) -> set[str]: - """Count the k8s resources that were created by the charm or by Patroni. +def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set[str]: + """Return the list of k8s resources that were created by the charm and Patroni. Args: - ops_test: ops_test instance. - application: application name. namespace: namespace related to the model where the charm was deployed. + application: application name. Returns: - count of existing charm/Patroni specific k8s resources. + list of existing charm/Patroni specific k8s resources. """ - resources = set() - - # Define the context needed for the k8s resources lists load. + # Create a k8s API client instance. client = Client(namespace=namespace) - # Define the context needed for the k8s resources lists load. - context = {"namespace": namespace, "app_name": application} - - with open("src/resources.yaml") as f: - # Load the list of the resources that should be created by the charm. - charm_resources = codecs.load_all_yaml(f, context=context) + # Retrieve the k8s resources the charm should create. + charm_resources = get_charm_resources(namespace, application) + # Check the k8s API for the resources that currently exist. existing_charm_resources = list( map( lambda x: f"{type(x).__name__}/{x.metadata.name}", filter( lambda x: (resource_exists(client, x)), charm_resources, - ) + ), ) ) + # Add only the existing resources to the list. - resources.update( - set( - map( - lambda x: f"{x.split('/')[0]}/{x.split('/')[1]}", - existing_charm_resources, - ) + resources = set( + map( + lambda x: f"{x.split('/')[0]}/{x.split('/')[1]}", + existing_charm_resources, ) ) - # List the resources created by Patroni. + # Include the resources created by Patroni. for kind in [Endpoints, Service]: patroni_resources = client.list( kind, namespace=namespace, labels={"app.juju.is/created-by": application}, ) + + # Build an identifier for each resource (using its type and name). mapped_patroni_resources = set( - map( - lambda x: f"{kind.__name__}/{x.metadata.name}", - patroni_resources, - ) + map( + lambda x: f"{kind.__name__}/{x.metadata.name}", + patroni_resources, ) - resources.update( - mapped_patroni_resources ) + resources.update(mapped_patroni_resources) + + return resources + + +def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set[str]: + """Return the list of expected k8s resources when the charm is deployed. + + Args: + namespace: namespace related to the model where + the charm was deployed. + application: application name. + + Returns: + list of existing charm/Patroni specific k8s resources. + """ + # Retrieve the k8s resources created by the charm. + charm_resources = get_charm_resources(namespace, application) + + # Build an identifier for each resource (using its type and name). + resources = set( + map( + lambda x: f"{type(x).__name__}/{x.metadata.name}", + charm_resources, + ) + ) + + # Include the resources created by Patroni. + patroni_resources = [ + f"Endpoints/{namespace}-config", + f"Endpoints/{namespace}", + f"Service/{namespace}-config", + ] + resources.update(patroni_resources) + return resources @@ -121,6 +130,7 @@ async def get_model_name(ops_test: OpsTest) -> str: Args: ops_test: ops_test instance. + Returns: model name. """ @@ -128,9 +138,18 @@ async def get_model_name(ops_test: OpsTest) -> str: return model.name -def resource_exists(client: Client, resource) -> bool: +def resource_exists(client: Client, resource: NamespacedResourceG) -> bool: + """Get the name of the current model. + + Args: + client: k8s API client instance. + resource: k8s resource. + + Returns: + whether the resource exists. + """ try: client.get(type(resource), name=resource.metadata.name) return True - except ApiError as e: + except ApiError: return False diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 7450ad6642..a4fc6aa19d 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -52,26 +52,14 @@ async def test_build_and_deploy(ops_test: OpsTest): @pytest.mark.abort_on_fail async def test_application_created_required_resources(ops_test: OpsTest) -> None: - # TODO: move namespace to inside the get resources function. + # Compare the k8s resources that the charm and Patroni should create with + # the currently created k8s resources. namespace = await get_model_name(ops_test) - existing_resources = get_existing_patroni_k8s_resources(ops_test, APP_NAME, namespace) - expected_resources = get_expected_patroni_k8s_resources(APP_NAME, namespace) - print(f'existing_resources: {existing_resources}') - print(f'expected_resources: {expected_resources}') + existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) + expected_resources = get_expected_patroni_k8s_resources(namespace, APP_NAME) assert set(existing_resources) == set(expected_resources) -@pytest.mark.abort_on_fail -async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: - # TODO: move namespace to inside the get resources function. - await ops_test.model.applications[APP_NAME].remove() - await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) - namespace = await get_model_name(ops_test) - existing_resources = get_existing_patroni_k8s_resources(ops_test, APP_NAME, namespace) - print(f'existing_resources: {existing_resources}') - assert set(existing_resources) == set() - - # @pytest.mark.parametrize("unit_id", UNIT_IDS) # async def test_labels_consistency_across_pods(ops_test: OpsTest, unit_id: int) -> None: # model = await ops_test.model.get_info() @@ -307,3 +295,15 @@ async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: # return psycopg2.connect( # f"dbname='postgres' user='postgres' host='{host}' password='{password}' connect_timeout=10" # ) + + +@pytest.mark.abort_on_fail +async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: + # Remove the application and wait until it's gone. + await ops_test.model.applications[APP_NAME].remove() + await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) + + # Check that all k8s resources created by the charm and Patroni were removed. + namespace = await get_model_name(ops_test) + existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) + assert set(existing_resources) == set() diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 6220e29415..5e2536cd09 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -264,6 +264,7 @@ def test_postgresql_layer(self): "PATRONI_KUBERNETES_NAMESPACE": self.charm._namespace, "PATRONI_NAME": "postgresql-k8s-0", "PATRONI_SCOPE": self.charm._namespace, + "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", "PATRONI_REPLICATION_USERNAME": "replication", "PATRONI_REPLICATION_PASSWORD": self.charm._replication_password, "PATRONI_SUPERUSER_USERNAME": "postgres", From f355c9abc390737c3337ec53491331f85cbb991b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 Jun 2022 21:09:50 -0300 Subject: [PATCH 03/21] Uncomment previous tests --- src/charm.py | 5 +- tests/integration/test_charm.py | 469 ++++++++++++++++---------------- 2 files changed, 237 insertions(+), 237 deletions(-) diff --git a/src/charm.py b/src/charm.py index 36afe7f104..61a57ec8a0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -140,8 +140,9 @@ def _create_resources(self): client.create(resource) logger.info(f"created {str(resource)}") except ApiError as e: - # The 409 error code means that the resource was already created or has a higher version. - # This can happen if Patroni creates some resource that the charm is expected to create. + # The 409 error code means that the resource was already created + # or has a higher version. This can happen if Patroni creates a + # resource that the charm is expected to create. if e.status.code == 409: logger.info("replacing resource: %s.", str(resource.to_dict())) client.replace(resource) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index a4fc6aa19d..3d4487c456 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -60,241 +60,240 @@ async def test_application_created_required_resources(ops_test: OpsTest) -> None assert set(existing_resources) == set(expected_resources) -# @pytest.mark.parametrize("unit_id", UNIT_IDS) -# async def test_labels_consistency_across_pods(ops_test: OpsTest, unit_id: int) -> None: -# model = await ops_test.model.get_info() -# client = AsyncClient(namespace=model.name) -# pod = await client.get(Pod, name=f"postgresql-k8s-{unit_id}") -# # Ensures that the correct kubernetes labels are set -# # (these ones guarantee the correct working of replication). -# assert pod.metadata.labels["application"] == "patroni" -# assert pod.metadata.labels["cluster-name"] == model.name -# -# -# async def test_database_is_up(ops_test: OpsTest): -# password = await get_postgres_password(ops_test) -# -# # Testing the connection to each PostgreSQL instance. -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# connection = db_connect(host=host, password=password) -# assert connection.status == psycopg2.extensions.STATUS_READY -# connection.close() -# -# -# @pytest.mark.parametrize("unit_id", UNIT_IDS) -# async def test_config_files_are_correct(ops_test: OpsTest, unit_id: int): -# unit_name = f"postgresql-k8s/{unit_id}" -# -# # Retrieve the pod IP. -# status = await ops_test.model.get_status() # noqa: F821 -# for _, unit in status["applications"][APP_NAME]["units"].items(): -# if unit["provider-id"] == unit_name.replace("/", "-"): -# pod_ip = unit["address"] -# break -# -# # Get the expected contents from files. -# with open("templates/patroni.yml.j2") as file: -# template = Template(file.read()) -# expected_patroni_yml = template.render(pod_ip=pod_ip, storage_path=STORAGE_PATH) -# with open("tests/data/postgresql.conf") as file: -# expected_postgresql_conf = file.read() -# -# unit = ops_test.model.units[unit_name] -# -# # Check whether Patroni configuration is correctly set up. -# patroni_yml_data = await pull_content_from_unit_file(unit, f"{STORAGE_PATH}/patroni.yml") -# assert patroni_yml_data == expected_patroni_yml -# -# # Check that the PostgreSQL settings are as expected. -# postgresql_conf_data = await pull_content_from_unit_file( -# unit, f"{STORAGE_PATH}/postgresql-k8s-operator.conf" -# ) -# assert postgresql_conf_data == expected_postgresql_conf -# -# -# async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: -# """Tests that the cluster maintains a primary after the primary is deleted.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Delete the primary pod. -# model = await ops_test.model.get_info() -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info(f"deleted pod {primary}") -# -# # Wait and get the primary again (which can be any unit, including the previous primary). -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# primary = await get_primary(ops_test) -# -# # We also need to check that a replica can see the leader -# # to make sure that the cluster is stable again. -# other_unit_id = 1 if primary.split("/")[1] == 0 else 0 -# assert await get_primary(ops_test, other_unit_id) != "None" -# -# -# async def test_persist_data_through_graceful_restart(ops_test: OpsTest): -# """Test data persists through a graceful restart.""" -# primary = await get_primary(ops_test) -# password = await get_postgres_password(ops_test) -# status = await ops_test.model.get_status() -# address = status["applications"][APP_NAME].units[primary]["address"] -# -# # Write data to primary IP. -# logger.info(f"connecting to primary {primary} on {address}") -# with db_connect(host=address, password=password) as connection: -# connection.autocommit = True -# connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") -# -# # Restart all nodes by scaling to 0, then back up -# # These have to run sequentially for the test to be valid/stable. -# await ops_test.model.applications[APP_NAME].scale(0) -# await ops_test.model.applications[APP_NAME].scale(3) -# await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) -# -# # Testing write occurred to every postgres instance by reading from them -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# with db_connect(host=host, password=password) as connection: -# # Ensure we can read from "gracetest" table -# connection.cursor().execute("SELECT * FROM gracetest;") -# -# -# async def test_persist_data_through_failure(ops_test: OpsTest): -# """Test data persists through a failure.""" -# primary = await get_primary(ops_test) -# password = await get_postgres_password(ops_test) -# status = await ops_test.model.get_status() -# address = status["applications"][APP_NAME].units[primary]["address"] -# -# # Write data to primary IP. -# logger.info(f"connecting to primary {primary} on {address}") -# with db_connect(host=address, password=password) as connection: -# connection.autocommit = True -# connection.cursor().execute("CREATE TABLE failtest (testcol INT );") -# -# # Cause a machine failure by killing a unit in k8s -# model = await ops_test.model.get_info() -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info("primary pod deleted") -# -# # Wait for juju to notice one of the pods is gone and fix it -# logger.info("wait for juju to reset postgres container") -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], -# status="active", -# timeout=1000, -# wait_for_exact_units=3, -# check_freq=2, -# idle_period=45, -# ) -# logger.info("juju has reset postgres container") -# -# # Testing write occurred to every postgres instance by reading from them -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# with db_connect(host=host, password=password) as connection: -# # Ensure we can read from "failtest" table -# connection.cursor().execute("SELECT * FROM failtest;") -# -# -# async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: -# """Tests that an automatic failover is triggered after an issue happens in the leader.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Crash PostgreSQL by removing the data directory. -# await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") -# -# # Wait for charm to stabilise -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# -# # Primary doesn't have to be different, but it does have to exist. -# assert await get_primary(ops_test) != "None" -# -# -# async def test_remove_application_remove_all_created_k8s_resources(ops_test: OpsTest) -> None: -# # Check the existing resources. -# -# -# # Remove the application and check that the resources were correctly deleted. -# await ops_test.model.applications[APP_NAME].remove() -# await ops_test.model.wait_for_idle(timeout=1000) -# -# -# @retry( -# retry=retry_if_result(lambda x: not x), -# stop=stop_after_attempt(10), -# wait=wait_exponential(multiplier=1, min=2, max=30), -# ) -# async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: -# """Checks whether or not the primary unit has changed.""" -# primary = await get_primary(ops_test) -# return primary != old_primary -# -# -# async def get_primary(ops_test: OpsTest, unit_id=0) -> str: -# """Get the primary unit. -# -# Args: -# ops_test: ops_test instance. -# unit_id: the number of the unit. -# -# Returns: -# the current primary unit. -# """ -# action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action("get-primary") -# action = await action.wait() -# return action.results["primary"] -# -# -# async def get_postgres_password(ops_test: OpsTest): -# """Retrieve the postgres user password using the action.""" -# unit = ops_test.model.units.get(f"{APP_NAME}/0") -# action = await unit.run_action("get-postgres-password") -# result = await action.wait() -# return result.results["postgres-password"] -# -# -# async def pull_content_from_unit_file(unit, path: str) -> str: -# """Pull the content of a file from one unit. -# -# Args: -# unit: the Juju unit instance. -# path: the path of the file to get the contents from. -# -# Returns: -# the entire content of the file. -# """ -# action = await unit.run(f"cat {path}") -# return action.results.get("Stdout", None) -# -# -# def db_connect(host: str, password: str): -# """Returns psycopg2 connection object linked to postgres db in the given host. -# -# Args: -# host: the IP of the postgres host container -# password: postgres password -# -# Returns: -# psycopg2 connection object linked to postgres db, under "postgres" user. -# """ -# return psycopg2.connect( -# f"dbname='postgres' user='postgres' host='{host}' password='{password}' connect_timeout=10" -# ) +@pytest.mark.parametrize("unit_id", UNIT_IDS) +async def test_labels_consistency_across_pods(ops_test: OpsTest, unit_id: int) -> None: + model = await ops_test.model.get_info() + client = AsyncClient(namespace=model.name) + pod = await client.get(Pod, name=f"postgresql-k8s-{unit_id}") + # Ensures that the correct kubernetes labels are set + # (these ones guarantee the correct working of replication). + assert pod.metadata.labels["application"] == "patroni" + assert pod.metadata.labels["cluster-name"] == model.name + + +async def test_database_is_up(ops_test: OpsTest): + password = await get_postgres_password(ops_test) + + # Testing the connection to each PostgreSQL instance. + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + connection = db_connect(host=host, password=password) + assert connection.status == psycopg2.extensions.STATUS_READY + connection.close() + + +@pytest.mark.parametrize("unit_id", UNIT_IDS) +async def test_config_files_are_correct(ops_test: OpsTest, unit_id: int): + unit_name = f"postgresql-k8s/{unit_id}" + + # Retrieve the pod IP. + status = await ops_test.model.get_status() # noqa: F821 + for _, unit in status["applications"][APP_NAME]["units"].items(): + if unit["provider-id"] == unit_name.replace("/", "-"): + pod_ip = unit["address"] + break + + # Get the expected contents from files. + with open("templates/patroni.yml.j2") as file: + template = Template(file.read()) + expected_patroni_yml = template.render(pod_ip=pod_ip, storage_path=STORAGE_PATH) + with open("tests/data/postgresql.conf") as file: + expected_postgresql_conf = file.read() + + unit = ops_test.model.units[unit_name] + + # Check whether Patroni configuration is correctly set up. + patroni_yml_data = await pull_content_from_unit_file(unit, f"{STORAGE_PATH}/patroni.yml") + assert patroni_yml_data == expected_patroni_yml + + # Check that the PostgreSQL settings are as expected. + postgresql_conf_data = await pull_content_from_unit_file( + unit, f"{STORAGE_PATH}/postgresql-k8s-operator.conf" + ) + assert postgresql_conf_data == expected_postgresql_conf + + +async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: + """Tests that the cluster maintains a primary after the primary is deleted.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Delete the primary pod. + model = await ops_test.model.get_info() + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info(f"deleted pod {primary}") + + # Wait and get the primary again (which can be any unit, including the previous primary). + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + primary = await get_primary(ops_test) + + # We also need to check that a replica can see the leader + # to make sure that the cluster is stable again. + other_unit_id = 1 if primary.split("/")[1] == 0 else 0 + assert await get_primary(ops_test, other_unit_id) != "None" + + +async def test_persist_data_through_graceful_restart(ops_test: OpsTest): + """Test data persists through a graceful restart.""" + primary = await get_primary(ops_test) + password = await get_postgres_password(ops_test) + status = await ops_test.model.get_status() + address = status["applications"][APP_NAME].units[primary]["address"] + + # Write data to primary IP. + logger.info(f"connecting to primary {primary} on {address}") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") + + # Restart all nodes by scaling to 0, then back up + # These have to run sequentially for the test to be valid/stable. + await ops_test.model.applications[APP_NAME].scale(0) + await ops_test.model.applications[APP_NAME].scale(3) + await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) + + # Testing write occurred to every postgres instance by reading from them + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + with db_connect(host=host, password=password) as connection: + # Ensure we can read from "gracetest" table + connection.cursor().execute("SELECT * FROM gracetest;") + + +async def test_persist_data_through_failure(ops_test: OpsTest): + """Test data persists through a failure.""" + primary = await get_primary(ops_test) + password = await get_postgres_password(ops_test) + status = await ops_test.model.get_status() + address = status["applications"][APP_NAME].units[primary]["address"] + + # Write data to primary IP. + logger.info(f"connecting to primary {primary} on {address}") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("CREATE TABLE failtest (testcol INT );") + + # Cause a machine failure by killing a unit in k8s + model = await ops_test.model.get_info() + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info("primary pod deleted") + + # Wait for juju to notice one of the pods is gone and fix it + logger.info("wait for juju to reset postgres container") + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1000, + wait_for_exact_units=3, + check_freq=2, + idle_period=45, + ) + logger.info("juju has reset postgres container") + + # Testing write occurred to every postgres instance by reading from them + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + with db_connect(host=host, password=password) as connection: + # Ensure we can read from "failtest" table + connection.cursor().execute("SELECT * FROM failtest;") + + +async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: + """Tests that an automatic failover is triggered after an issue happens in the leader.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Crash PostgreSQL by removing the data directory. + await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") + + # Wait for charm to stabilise + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + + # Primary doesn't have to be different, but it does have to exist. + assert await get_primary(ops_test) != "None" + + +async def test_remove_application_remove_all_created_k8s_resources(ops_test: OpsTest) -> None: + # Check the existing resources. + + # Remove the application and check that the resources were correctly deleted. + await ops_test.model.applications[APP_NAME].remove() + await ops_test.model.wait_for_idle(timeout=1000) + + +@retry( + retry=retry_if_result(lambda x: not x), + stop=stop_after_attempt(10), + wait=wait_exponential(multiplier=1, min=2, max=30), +) +async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: + """Checks whether or not the primary unit has changed.""" + primary = await get_primary(ops_test) + return primary != old_primary + + +async def get_primary(ops_test: OpsTest, unit_id=0) -> str: + """Get the primary unit. + + Args: + ops_test: ops_test instance. + unit_id: the number of the unit. + + Returns: + the current primary unit. + """ + action = await ops_test.model.units.get(f"{APP_NAME}/{unit_id}").run_action("get-primary") + action = await action.wait() + return action.results["primary"] + + +async def get_postgres_password(ops_test: OpsTest): + """Retrieve the postgres user password using the action.""" + unit = ops_test.model.units.get(f"{APP_NAME}/0") + action = await unit.run_action("get-postgres-password") + result = await action.wait() + return result.results["postgres-password"] + + +async def pull_content_from_unit_file(unit, path: str) -> str: + """Pull the content of a file from one unit. + + Args: + unit: the Juju unit instance. + path: the path of the file to get the contents from. + + Returns: + the entire content of the file. + """ + action = await unit.run(f"cat {path}") + return action.results.get("Stdout", None) + + +def db_connect(host: str, password: str): + """Returns psycopg2 connection object linked to postgres db in the given host. + + Args: + host: the IP of the postgres host container + password: postgres password + + Returns: + psycopg2 connection object linked to postgres db, under "postgres" user. + """ + return psycopg2.connect( + f"dbname='postgres' user='postgres' host='{host}' password='{password}' connect_timeout=10" + ) @pytest.mark.abort_on_fail From 6058bcc245329f2dea032563e0a88a6dc904ddfe Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 Jun 2022 21:13:09 -0300 Subject: [PATCH 04/21] Remove duplicated and imcomplete test --- tests/integration/test_charm.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 3d4487c456..1f002bfc66 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -225,14 +225,6 @@ async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: assert await get_primary(ops_test) != "None" -async def test_remove_application_remove_all_created_k8s_resources(ops_test: OpsTest) -> None: - # Check the existing resources. - - # Remove the application and check that the resources were correctly deleted. - await ops_test.model.applications[APP_NAME].remove() - await ops_test.model.wait_for_idle(timeout=1000) - - @retry( retry=retry_if_result(lambda x: not x), stop=stop_after_attempt(10), From aec03e575ef6aaabe24d4eecfa18682ab2713362 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 29 Jun 2022 21:42:56 -0300 Subject: [PATCH 05/21] Fix imports order --- tests/integration/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 6f19af9d0b..c1232f3c0a 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -4,13 +4,13 @@ from typing import List +import requests from lightkube import codecs from lightkube.core.client import Client from lightkube.core.exceptions import ApiError from lightkube.core.resource import NamespacedResourceG from lightkube.resources.core_v1 import Endpoints, Service from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding -import requests from pytest_operator.plugin import OpsTest From f8af877c7aaa0680b80f103f3c7b7bdbee592a0f Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 30 Jun 2022 17:25:47 -0300 Subject: [PATCH 06/21] Minor fixes --- .github/workflows/ci.yaml | 1 + src/charm.py | 19 ++++++++----------- tests/integration/helpers.py | 13 ++++++------- tests/integration/test_charm.py | 2 -- 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4f63e29f41..0725c3290e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -64,6 +64,7 @@ jobs: - name: Setup operator environment uses: charmed-kubernetes/actions-operator@main with: + bootstrap-options: --agent-version 2.9.29 # patch until https://bugs.launchpad.net/juju/+bug/1977582 is resolved provider: microk8s # These two steps (download and load image) are needed until the OCI image is published to ROCKS. - name: Download image built in build oci image step diff --git a/src/charm.py b/src/charm.py index 13d17466c7..eef61ff80f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -11,7 +11,6 @@ from lightkube import ApiError, Client, codecs from lightkube.resources.core_v1 import Endpoints, Pod, Service -from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding from ops.charm import ( ActionEvent, CharmBase, @@ -348,22 +347,17 @@ def _on_get_primary(self, event: ActionEvent) -> None: def _on_stop(self, _) -> None: """Handle the stop event.""" - # Check to run the teardown actions only once. - if not self.unit.is_leader(): - return - client = Client() resources_to_delete = [] # Get the k8s resources created by the charm. with open("src/resources.yaml") as f: resources = codecs.load_all_yaml(f, context=self._context) - # Ignore the cluster role and its binding that were created together with the - # application and also the service resources, which will be retrieved in the next step. + # Ignore the service resources, which will be retrieved in the next step. resources_to_delete.extend( list( filter( - lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), + lambda x: not isinstance(x, Service), resources, ) ) @@ -387,9 +381,12 @@ def _on_stop(self, _) -> None: name=resource.metadata.name, namespace=resource.metadata.namespace, ) - except ApiError: - # Only log a message, as the charm is being stopped. - logger.error(f"failed to delete resource: {resource}.") + except ApiError as e: + if ( + e.status.code != 404 + ): # 404 means that the resource was already deleted by other unit. + # Only log a message, as the charm is being stopped. + logger.error(f"failed to delete resource: {resource}.") def _on_update_status(self, _) -> None: # Until https://github.com/canonical/pebble/issues/6 is fixed, diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index c1232f3c0a..746714312f 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -8,9 +8,8 @@ from lightkube import codecs from lightkube.core.client import Client from lightkube.core.exceptions import ApiError -from lightkube.core.resource import NamespacedResourceG +from lightkube.generic_resource import GenericNamespacedResource from lightkube.resources.core_v1 import Endpoints, Service -from lightkube.resources.rbac_authorization_v1 import ClusterRole, ClusterRoleBinding from pytest_operator.plugin import OpsTest @@ -51,7 +50,7 @@ def get_application_units(ops_test: OpsTest, application_name: str) -> List[str] ] -def get_charm_resources(namespace: str, application: str): +def get_charm_resources(namespace: str, application: str) -> List[GenericNamespacedResource]: """Return the list of k8s resources from resources.yaml file. Args: @@ -69,13 +68,13 @@ def get_charm_resources(namespace: str, application: str): with open("src/resources.yaml") as f: return list( filter( - lambda x: not isinstance(x, (ClusterRole, ClusterRoleBinding, Service)), + lambda x: not isinstance(x, Service), codecs.load_all_yaml(f, context=context), ) ) -def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set[str]: +def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set: """Return the list of k8s resources that were created by the charm and Patroni. Args: @@ -132,7 +131,7 @@ def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set[ return resources -def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set[str]: +def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set: """Return the list of expected k8s resources when the charm is deployed. Args: @@ -193,7 +192,7 @@ async def get_unit_address(ops_test: OpsTest, application_name: str, unit_name: return status["applications"][application_name].units[unit_name]["address"] -def resource_exists(client: Client, resource: NamespacedResourceG) -> bool: +def resource_exists(client: Client, resource: GenericNamespacedResource) -> bool: """Get the name of the current model. Args: diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 04d0c6c0d1..65db62fedd 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -56,7 +56,6 @@ async def test_build_and_deploy(ops_test: OpsTest): assert ops_test.model.applications[APP_NAME].units[unit_id].workload_status == "active" -@pytest.mark.abort_on_fail async def test_application_created_required_resources(ops_test: OpsTest) -> None: # Compare the k8s resources that the charm and Patroni should create with # the currently created k8s resources. @@ -307,7 +306,6 @@ def db_connect(host: str, password: str): ) -@pytest.mark.abort_on_fail async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: # Remove the application and wait until it's gone. await ops_test.model.applications[APP_NAME].remove() From fceb8bcc92d0dd866027bc3364472fae10ca3b39 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 11:45:09 -0300 Subject: [PATCH 07/21] Test idle period --- tests/integration/test_charm.py | 229 ++++++++++++++++---------------- 1 file changed, 114 insertions(+), 115 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 65db62fedd..7c9e4c1c01 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -14,15 +14,12 @@ from tenacity import retry, retry_if_result, stop_after_attempt, wait_exponential from tests.helpers import METADATA, STORAGE_PATH -from tests.integration.helpers import ( +from tests.integration.helpers import ( # get_application_units,; get_cluster_members,; scale_application, convert_records_to_dict, - get_application_units, - get_cluster_members, get_existing_patroni_k8s_resources, get_expected_patroni_k8s_resources, get_model_name, get_unit_address, - scale_application, ) logger = logging.getLogger(__name__) @@ -124,52 +121,52 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): assert settings["postgresql"]["use_pg_rewind"] -async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: - """Tests that the cluster maintains a primary after the primary is deleted.""" - # Find the current primary unit. - primary = await get_primary(ops_test) - - # Delete the primary pod. - model = await ops_test.model.get_info() - client = AsyncClient(namespace=model.name) - await client.delete(Pod, name=primary.replace("/", "-")) - logger.info(f"deleted pod {primary}") - - # Wait and get the primary again (which can be any unit, including the previous primary). - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 - ) - primary = await get_primary(ops_test) - - # We also need to check that a replica can see the leader - # to make sure that the cluster is stable again. - other_unit_id = 1 if primary.split("/")[1] == 0 else 0 - assert await get_primary(ops_test, other_unit_id) != "None" - - -async def test_scale_down_and_up(ops_test: OpsTest): - """Test data is replicated to new units after a scale up.""" - # Ensure the initial number of units in the application. - initial_scale = len(UNIT_IDS) - await scale_application(ops_test, APP_NAME, initial_scale) - - # Scale down the application. - await scale_application(ops_test, APP_NAME, initial_scale - 1) - - # Ensure the member was correctly removed from the cluster - # (by comparing the cluster members and the current units). - primary = await get_primary(ops_test) - address = await get_unit_address(ops_test, APP_NAME, primary) - assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) - - # Scale up the application (2 more units than the current scale). - await scale_application(ops_test, APP_NAME, initial_scale + 1) - - # Ensure the new members were added to the cluster. - assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) - - # Scale the application to the initial scale. - await scale_application(ops_test, APP_NAME, initial_scale) +# async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: +# """Tests that the cluster maintains a primary after the primary is deleted.""" +# # Find the current primary unit. +# primary = await get_primary(ops_test) +# +# # Delete the primary pod. +# model = await ops_test.model.get_info() +# client = AsyncClient(namespace=model.name) +# await client.delete(Pod, name=primary.replace("/", "-")) +# logger.info(f"deleted pod {primary}") +# +# # Wait and get the primary again (which can be any unit, including the previous primary). +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 +# ) +# primary = await get_primary(ops_test) +# +# # We also need to check that a replica can see the leader +# # to make sure that the cluster is stable again. +# other_unit_id = 1 if primary.split("/")[1] == 0 else 0 +# assert await get_primary(ops_test, other_unit_id) != "None" +# +# +# async def test_scale_down_and_up(ops_test: OpsTest): +# """Test data is replicated to new units after a scale up.""" +# # Ensure the initial number of units in the application. +# initial_scale = len(UNIT_IDS) +# await scale_application(ops_test, APP_NAME, initial_scale) +# +# # Scale down the application. +# await scale_application(ops_test, APP_NAME, initial_scale - 1) +# +# # Ensure the member was correctly removed from the cluster +# # (by comparing the cluster members and the current units). +# primary = await get_primary(ops_test) +# address = await get_unit_address(ops_test, APP_NAME, primary) +# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) +# +# # Scale up the application (2 more units than the current scale). +# await scale_application(ops_test, APP_NAME, initial_scale + 1) +# +# # Ensure the new members were added to the cluster. +# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) +# +# # Scale the application to the initial scale. +# await scale_application(ops_test, APP_NAME, initial_scale) async def test_persist_data_through_graceful_restart(ops_test: OpsTest): @@ -188,47 +185,9 @@ async def test_persist_data_through_graceful_restart(ops_test: OpsTest): # These have to run sequentially for the test to be valid/stable. await ops_test.model.applications[APP_NAME].scale(0) await ops_test.model.applications[APP_NAME].scale(3) - await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) - - # Testing write occurred to every postgres instance by reading from them - status = await ops_test.model.get_status() # noqa: F821 - for unit in status["applications"][APP_NAME]["units"].values(): - host = unit["address"] - logger.info("connecting to the database host: %s", host) - with db_connect(host=host, password=password) as connection: - # Ensure we can read from "gracetest" table - connection.cursor().execute("SELECT * FROM gracetest;") - - -async def test_persist_data_through_failure(ops_test: OpsTest): - """Test data persists through a failure.""" - primary = await get_primary(ops_test) - password = await get_postgres_password(ops_test) - address = await get_unit_address(ops_test, APP_NAME, primary) - - # Write data to primary IP. - logger.info(f"connecting to primary {primary} on {address}") - with db_connect(host=address, password=password) as connection: - connection.autocommit = True - connection.cursor().execute("CREATE TABLE failtest (testcol INT );") - - # Cause a machine failure by killing a unit in k8s - model = await ops_test.model.get_info() - client = AsyncClient(namespace=model.name) - await client.delete(Pod, name=primary.replace("/", "-")) - logger.info("primary pod deleted") - - # Wait for juju to notice one of the pods is gone and fix it - logger.info("wait for juju to reset postgres container") await ops_test.model.wait_for_idle( - apps=[APP_NAME], - status="active", - timeout=1000, - wait_for_exact_units=3, - check_freq=2, - idle_period=45, + apps=[APP_NAME], status="active", timeout=1000, idle_period=45, check_freq=2 ) - logger.info("juju has reset postgres container") # Testing write occurred to every postgres instance by reading from them status = await ops_test.model.get_status() # noqa: F821 @@ -236,25 +195,65 @@ async def test_persist_data_through_failure(ops_test: OpsTest): host = unit["address"] logger.info("connecting to the database host: %s", host) with db_connect(host=host, password=password) as connection: - # Ensure we can read from "failtest" table - connection.cursor().execute("SELECT * FROM failtest;") - - -async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: - """Tests that an automatic failover is triggered after an issue happens in the leader.""" - # Find the current primary unit. - primary = await get_primary(ops_test) - - # Crash PostgreSQL by removing the data directory. - await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") + # Ensure we can read from "gracetest" table + connection.cursor().execute("SELECT * FROM gracetest;") - # Wait for charm to stabilise - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 - ) - # Primary doesn't have to be different, but it does have to exist. - assert await get_primary(ops_test) != "None" +# async def test_persist_data_through_failure(ops_test: OpsTest): +# """Test data persists through a failure.""" +# primary = await get_primary(ops_test) +# password = await get_postgres_password(ops_test) +# address = await get_unit_address(ops_test, APP_NAME, primary) +# +# # Write data to primary IP. +# logger.info(f"connecting to primary {primary} on {address}") +# with db_connect(host=address, password=password) as connection: +# connection.autocommit = True +# connection.cursor().execute("CREATE TABLE failtest (testcol INT );") +# +# # Cause a machine failure by killing a unit in k8s +# model = await ops_test.model.get_info() +# client = AsyncClient(namespace=model.name) +# await client.delete(Pod, name=primary.replace("/", "-")) +# logger.info("primary pod deleted") +# +# # Wait for juju to notice one of the pods is gone and fix it +# logger.info("wait for juju to reset postgres container") +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], +# status="active", +# timeout=1000, +# wait_for_exact_units=3, +# check_freq=2, +# idle_period=45, +# ) +# logger.info("juju has reset postgres container") +# +# # Testing write occurred to every postgres instance by reading from them +# status = await ops_test.model.get_status() # noqa: F821 +# for unit in status["applications"][APP_NAME]["units"].values(): +# host = unit["address"] +# logger.info("connecting to the database host: %s", host) +# with db_connect(host=host, password=password) as connection: +# # Ensure we can read from "failtest" table +# connection.cursor().execute("SELECT * FROM failtest;") +# +# +# async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: +# """Tests that an automatic failover is triggered after an issue happens in the leader.""" +# # Find the current primary unit. +# primary = await get_primary(ops_test) +# +# # Crash PostgreSQL by removing the data directory. +# await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") +# +# # Wait for charm to stabilise +# await ops_test.model.wait_for_idle( +# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 +# ) +# +# # Primary doesn't have to be different, but it does have to exist. +# assert await get_primary(ops_test) != "None" @retry( @@ -306,12 +305,12 @@ def db_connect(host: str, password: str): ) -async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: - # Remove the application and wait until it's gone. - await ops_test.model.applications[APP_NAME].remove() - await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) - - # Check that all k8s resources created by the charm and Patroni were removed. - namespace = await get_model_name(ops_test) - existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) - assert set(existing_resources) == set() +# async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: +# # Remove the application and wait until it's gone. +# await ops_test.model.applications[APP_NAME].remove() +# await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) +# +# # Check that all k8s resources created by the charm and Patroni were removed. +# namespace = await get_model_name(ops_test) +# existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) +# assert set(existing_resources) == set() From 7448f8e5f908063a6f6a16af49a37216f4302b93 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 12:19:42 -0300 Subject: [PATCH 08/21] Test lib versions --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 61791238be..7886175e0f 100644 --- a/tox.ini +++ b/tox.ini @@ -67,9 +67,9 @@ commands = [testenv:integration] description = Run integration tests deps = - git+https://github.com/juju/python-libjuju.git + git+https://github.com/juju/python-libjuju.git@2.9.9 pytest - pytest-operator + pytest-operator==0.20.0 psycopg2-binary -r{toxinidir}/requirements.txt commands = From 159caac75eb7b39225c59083683bf08dde67e7aa Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 12:40:41 -0300 Subject: [PATCH 09/21] Change --- tests/integration/test_charm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 7c9e4c1c01..e3b6ebf14a 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -170,7 +170,7 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): async def test_persist_data_through_graceful_restart(ops_test: OpsTest): - """Test data persists through a graceful restart.""" + """Test data persists through a graceful restart.2""" primary = await get_primary(ops_test) password = await get_postgres_password(ops_test) address = await get_unit_address(ops_test, APP_NAME, primary) From 193c9dd304e626ba9ac4cfe96c9c5f50d54987b7 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 13:04:17 -0300 Subject: [PATCH 10/21] Remove version test --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 7886175e0f..2aa1da76f4 100644 --- a/tox.ini +++ b/tox.ini @@ -67,7 +67,7 @@ commands = [testenv:integration] description = Run integration tests deps = - git+https://github.com/juju/python-libjuju.git@2.9.9 + git+https://github.com/juju/python-libjuju.git pytest pytest-operator==0.20.0 psycopg2-binary From 2e5849e475dfe9536ae8f49f6b2754c0247bd429 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 13:07:02 -0300 Subject: [PATCH 11/21] Revert change --- tests/integration/test_charm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index e3b6ebf14a..7c9e4c1c01 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -170,7 +170,7 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): async def test_persist_data_through_graceful_restart(ops_test: OpsTest): - """Test data persists through a graceful restart.2""" + """Test data persists through a graceful restart.""" primary = await get_primary(ops_test) password = await get_postgres_password(ops_test) address = await get_unit_address(ops_test, APP_NAME, primary) From a07af7d1e708c113fdb7ff2edfb25e910dd2f0d6 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 13:58:55 -0300 Subject: [PATCH 12/21] Revert change --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 2aa1da76f4..db4ce8870f 100644 --- a/tox.ini +++ b/tox.ini @@ -67,9 +67,9 @@ commands = [testenv:integration] description = Run integration tests deps = - git+https://github.com/juju/python-libjuju.git + git+https://github.com/juju/python-libjuju.git@2.9.9 pytest - pytest-operator==0.20.0 + pytest-operator psycopg2-binary -r{toxinidir}/requirements.txt commands = From b29113d75acd314c0eb4f0ddf06068f9b9fce423 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 14:22:03 -0300 Subject: [PATCH 13/21] Revert change --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index db4ce8870f..61791238be 100644 --- a/tox.ini +++ b/tox.ini @@ -67,7 +67,7 @@ commands = [testenv:integration] description = Run integration tests deps = - git+https://github.com/juju/python-libjuju.git@2.9.9 + git+https://github.com/juju/python-libjuju.git pytest pytest-operator psycopg2-binary From 3223c9f2773ce44513813ae8648f1d5d70b7c4cd Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 1 Jul 2022 14:44:24 -0300 Subject: [PATCH 14/21] Uncomment tests --- tests/integration/test_charm.py | 225 ++++++++++++++++---------------- 1 file changed, 114 insertions(+), 111 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 7c9e4c1c01..c4f0f5ce89 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -14,12 +14,15 @@ from tenacity import retry, retry_if_result, stop_after_attempt, wait_exponential from tests.helpers import METADATA, STORAGE_PATH -from tests.integration.helpers import ( # get_application_units,; get_cluster_members,; scale_application, +from tests.integration.helpers import ( convert_records_to_dict, + get_application_units, + get_cluster_members, get_existing_patroni_k8s_resources, get_expected_patroni_k8s_resources, get_model_name, get_unit_address, + scale_application, ) logger = logging.getLogger(__name__) @@ -121,52 +124,52 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): assert settings["postgresql"]["use_pg_rewind"] -# async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: -# """Tests that the cluster maintains a primary after the primary is deleted.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Delete the primary pod. -# model = await ops_test.model.get_info() -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info(f"deleted pod {primary}") -# -# # Wait and get the primary again (which can be any unit, including the previous primary). -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# primary = await get_primary(ops_test) -# -# # We also need to check that a replica can see the leader -# # to make sure that the cluster is stable again. -# other_unit_id = 1 if primary.split("/")[1] == 0 else 0 -# assert await get_primary(ops_test, other_unit_id) != "None" -# -# -# async def test_scale_down_and_up(ops_test: OpsTest): -# """Test data is replicated to new units after a scale up.""" -# # Ensure the initial number of units in the application. -# initial_scale = len(UNIT_IDS) -# await scale_application(ops_test, APP_NAME, initial_scale) -# -# # Scale down the application. -# await scale_application(ops_test, APP_NAME, initial_scale - 1) -# -# # Ensure the member was correctly removed from the cluster -# # (by comparing the cluster members and the current units). -# primary = await get_primary(ops_test) -# address = await get_unit_address(ops_test, APP_NAME, primary) -# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) -# -# # Scale up the application (2 more units than the current scale). -# await scale_application(ops_test, APP_NAME, initial_scale + 1) -# -# # Ensure the new members were added to the cluster. -# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) -# -# # Scale the application to the initial scale. -# await scale_application(ops_test, APP_NAME, initial_scale) +async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: + """Tests that the cluster maintains a primary after the primary is deleted.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Delete the primary pod. + model = await ops_test.model.get_info() + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info(f"deleted pod {primary}") + + # Wait and get the primary again (which can be any unit, including the previous primary). + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + primary = await get_primary(ops_test) + + # We also need to check that a replica can see the leader + # to make sure that the cluster is stable again. + other_unit_id = 1 if primary.split("/")[1] == 0 else 0 + assert await get_primary(ops_test, other_unit_id) != "None" + + +async def test_scale_down_and_up(ops_test: OpsTest): + """Test data is replicated to new units after a scale up.""" + # Ensure the initial number of units in the application. + initial_scale = len(UNIT_IDS) + await scale_application(ops_test, APP_NAME, initial_scale) + + # Scale down the application. + await scale_application(ops_test, APP_NAME, initial_scale - 1) + + # Ensure the member was correctly removed from the cluster + # (by comparing the cluster members and the current units). + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, APP_NAME, primary) + assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) + + # Scale up the application (2 more units than the current scale). + await scale_application(ops_test, APP_NAME, initial_scale + 1) + + # Ensure the new members were added to the cluster. + assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) + + # Scale the application to the initial scale. + await scale_application(ops_test, APP_NAME, initial_scale) async def test_persist_data_through_graceful_restart(ops_test: OpsTest): @@ -199,61 +202,61 @@ async def test_persist_data_through_graceful_restart(ops_test: OpsTest): connection.cursor().execute("SELECT * FROM gracetest;") -# async def test_persist_data_through_failure(ops_test: OpsTest): -# """Test data persists through a failure.""" -# primary = await get_primary(ops_test) -# password = await get_postgres_password(ops_test) -# address = await get_unit_address(ops_test, APP_NAME, primary) -# -# # Write data to primary IP. -# logger.info(f"connecting to primary {primary} on {address}") -# with db_connect(host=address, password=password) as connection: -# connection.autocommit = True -# connection.cursor().execute("CREATE TABLE failtest (testcol INT );") -# -# # Cause a machine failure by killing a unit in k8s -# model = await ops_test.model.get_info() -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info("primary pod deleted") -# -# # Wait for juju to notice one of the pods is gone and fix it -# logger.info("wait for juju to reset postgres container") -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], -# status="active", -# timeout=1000, -# wait_for_exact_units=3, -# check_freq=2, -# idle_period=45, -# ) -# logger.info("juju has reset postgres container") -# -# # Testing write occurred to every postgres instance by reading from them -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# with db_connect(host=host, password=password) as connection: -# # Ensure we can read from "failtest" table -# connection.cursor().execute("SELECT * FROM failtest;") -# -# -# async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: -# """Tests that an automatic failover is triggered after an issue happens in the leader.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Crash PostgreSQL by removing the data directory. -# await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") -# -# # Wait for charm to stabilise -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# -# # Primary doesn't have to be different, but it does have to exist. -# assert await get_primary(ops_test) != "None" +async def test_persist_data_through_failure(ops_test: OpsTest): + """Test data persists through a failure.""" + primary = await get_primary(ops_test) + password = await get_postgres_password(ops_test) + address = await get_unit_address(ops_test, APP_NAME, primary) + + # Write data to primary IP. + logger.info(f"connecting to primary {primary} on {address}") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("CREATE TABLE failtest (testcol INT );") + + # Cause a machine failure by killing a unit in k8s + model = await ops_test.model.get_info() + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info("primary pod deleted") + + # Wait for juju to notice one of the pods is gone and fix it + logger.info("wait for juju to reset postgres container") + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1000, + wait_for_exact_units=3, + check_freq=2, + idle_period=45, + ) + logger.info("juju has reset postgres container") + + # Testing write occurred to every postgres instance by reading from them + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + with db_connect(host=host, password=password) as connection: + # Ensure we can read from "failtest" table + connection.cursor().execute("SELECT * FROM failtest;") + + +async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: + """Tests that an automatic failover is triggered after an issue happens in the leader.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Crash PostgreSQL by removing the data directory. + await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") + + # Wait for charm to stabilise + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + + # Primary doesn't have to be different, but it does have to exist. + assert await get_primary(ops_test) != "None" @retry( @@ -305,12 +308,12 @@ def db_connect(host: str, password: str): ) -# async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: -# # Remove the application and wait until it's gone. -# await ops_test.model.applications[APP_NAME].remove() -# await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) -# -# # Check that all k8s resources created by the charm and Patroni were removed. -# namespace = await get_model_name(ops_test) -# existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) -# assert set(existing_resources) == set() +async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: + # Remove the application and wait until it's gone. + await ops_test.model.applications[APP_NAME].remove() + await ops_test.model.block_until(lambda: APP_NAME not in ops_test.model.applications) + + # Check that all k8s resources created by the charm and Patroni were removed. + namespace = await get_model_name(ops_test) + existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) + assert set(existing_resources) == set() From 42cf0d5c0ec6edd37d098411f5823f888049e6ef Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 20 Jul 2022 16:41:29 -0300 Subject: [PATCH 15/21] Fix CI workflow --- .github/workflows/ci.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index cb11a97de5..d8ec8036c8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -35,7 +35,6 @@ jobs: - name: Setup operator environment uses: charmed-kubernetes/actions-operator@main with: - bootstrap-options: --agent-version 2.9.29 # patch until https://bugs.launchpad.net/juju/+bug/1977582 is resolved provider: microk8s # This is needed until https://bugs.launchpad.net/juju/+bug/1977582 is fixed. bootstrap-options: "--agent-version 2.9.29" From 8667ef4fedef6024cb699de4bddffae81578379c Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Thu, 25 Aug 2022 16:55:55 -0300 Subject: [PATCH 16/21] Fix tests --- tests/integration/helpers.py | 15 +- tests/integration/test_charm.py | 269 ++++++++++++++++---------------- 2 files changed, 139 insertions(+), 145 deletions(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 6c131da077..9821eed9cd 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -217,12 +217,7 @@ def get_charm_resources(namespace: str, application: str) -> List[GenericNamespa # Load the list of the resources from resources.yaml. with open("src/resources.yaml") as f: - return list( - filter( - lambda x: not isinstance(x, Service), - codecs.load_all_yaml(f, context=context), - ) - ) + return codecs.load_all_yaml(f, context=context) def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set: @@ -306,9 +301,11 @@ def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set: # Include the resources created by Patroni. patroni_resources = [ - f"Endpoints/{namespace}-config", - f"Endpoints/{namespace}", - f"Service/{namespace}-config", + f"Endpoints/patroni-{application}-config", + f"Endpoints/patroni-{application}", + f"Endpoints/{application}-primary", + f"Endpoints/{application}-replicas", + f"Service/patroni-{application}-config", ] resources.update(patroni_resources) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index ba7f0dbb7c..35107ad98d 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -60,9 +60,6 @@ async def test_application_created_required_resources(ops_test: OpsTest) -> None namespace = ops_test.model.info.name existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) expected_resources = get_expected_patroni_k8s_resources(namespace, APP_NAME) - print(existing_resources) - print(1) - print(expected_resources) assert set(existing_resources) == set(expected_resources) @@ -125,139 +122,139 @@ async def test_settings_are_correct(ops_test: OpsTest, unit_id: int): assert settings["postgresql"]["use_pg_rewind"] -# async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: -# """Tests that the cluster maintains a primary after the primary is deleted.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Delete the primary pod. -# model = ops_test.model.info -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info(f"deleted pod {primary}") -# -# # Wait and get the primary again (which can be any unit, including the previous primary). -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# primary = await get_primary(ops_test) -# -# # We also need to check that a replica can see the leader -# # to make sure that the cluster is stable again. -# other_unit_id = 1 if primary.split("/")[1] == 0 else 0 -# assert await get_primary(ops_test, other_unit_id) != "None" -# -# -# async def test_scale_down_and_up(ops_test: OpsTest): -# """Test data is replicated to new units after a scale up.""" -# # Ensure the initial number of units in the application. -# initial_scale = len(UNIT_IDS) -# await scale_application(ops_test, APP_NAME, initial_scale) -# -# # Scale down the application. -# await scale_application(ops_test, APP_NAME, initial_scale - 1) -# -# # Ensure the member was correctly removed from the cluster -# # (by comparing the cluster members and the current units). -# primary = await get_primary(ops_test) -# address = await get_unit_address(ops_test, primary) -# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) -# -# # Scale up the application (2 more units than the current scale). -# await scale_application(ops_test, APP_NAME, initial_scale + 1) -# -# # Ensure the new members were added to the cluster. -# assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) -# -# # Scale the application to the initial scale. -# await scale_application(ops_test, APP_NAME, initial_scale) -# -# -# async def test_persist_data_through_graceful_restart(ops_test: OpsTest): -# """Test data persists through a graceful restart.""" -# primary = await get_primary(ops_test) -# password = await get_operator_password(ops_test) -# address = await get_unit_address(ops_test, primary) -# -# # Write data to primary IP. -# logger.info(f"connecting to primary {primary} on {address}") -# with db_connect(host=address, password=password) as connection: -# connection.autocommit = True -# connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") -# -# # Restart all nodes by scaling to 0, then back up -# # These have to run sequentially for the test to be valid/stable. -# await ops_test.model.applications[APP_NAME].scale(0) -# await ops_test.model.applications[APP_NAME].scale(3) -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, idle_period=45, check_freq=2 -# ) -# -# # Testing write occurred to every postgres instance by reading from them -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# with db_connect(host=host, password=password) as connection: -# # Ensure we can read from "gracetest" table -# connection.cursor().execute("SELECT * FROM gracetest;") -# -# -# async def test_persist_data_through_failure(ops_test: OpsTest): -# """Test data persists through a failure.""" -# primary = await get_primary(ops_test) -# password = await get_operator_password(ops_test) -# address = await get_unit_address(ops_test, primary) -# -# # Write data to primary IP. -# logger.info(f"connecting to primary {primary} on {address}") -# with db_connect(host=address, password=password) as connection: -# connection.autocommit = True -# connection.cursor().execute("CREATE TABLE failtest (testcol INT );") -# -# # Cause a machine failure by killing a unit in k8s -# model = ops_test.model.info -# client = AsyncClient(namespace=model.name) -# await client.delete(Pod, name=primary.replace("/", "-")) -# logger.info("primary pod deleted") -# -# # Wait for juju to notice one of the pods is gone and fix it -# logger.info("wait for juju to reset postgres container") -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], -# status="active", -# timeout=1000, -# wait_for_exact_units=3, -# check_freq=2, -# idle_period=45, -# ) -# logger.info("juju has reset postgres container") -# -# # Testing write occurred to every postgres instance by reading from them -# status = await ops_test.model.get_status() # noqa: F821 -# for unit in status["applications"][APP_NAME]["units"].values(): -# host = unit["address"] -# logger.info("connecting to the database host: %s", host) -# with db_connect(host=host, password=password) as connection: -# # Ensure we can read from "failtest" table -# connection.cursor().execute("SELECT * FROM failtest;") -# -# -# async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: -# """Tests that an automatic failover is triggered after an issue happens in the leader.""" -# # Find the current primary unit. -# primary = await get_primary(ops_test) -# -# # Crash PostgreSQL by removing the data directory. -# await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") -# -# # Wait for charm to stabilise -# await ops_test.model.wait_for_idle( -# apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 -# ) -# -# # Primary doesn't have to be different, but it does have to exist. -# assert await get_primary(ops_test) != "None" +async def test_cluster_is_stable_after_leader_deletion(ops_test: OpsTest) -> None: + """Tests that the cluster maintains a primary after the primary is deleted.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Delete the primary pod. + model = ops_test.model.info + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info(f"deleted pod {primary}") + + # Wait and get the primary again (which can be any unit, including the previous primary). + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + primary = await get_primary(ops_test) + + # We also need to check that a replica can see the leader + # to make sure that the cluster is stable again. + other_unit_id = 1 if primary.split("/")[1] == 0 else 0 + assert await get_primary(ops_test, other_unit_id) != "None" + + +async def test_scale_down_and_up(ops_test: OpsTest): + """Test data is replicated to new units after a scale up.""" + # Ensure the initial number of units in the application. + initial_scale = len(UNIT_IDS) + await scale_application(ops_test, APP_NAME, initial_scale) + + # Scale down the application. + await scale_application(ops_test, APP_NAME, initial_scale - 1) + + # Ensure the member was correctly removed from the cluster + # (by comparing the cluster members and the current units). + primary = await get_primary(ops_test) + address = await get_unit_address(ops_test, primary) + assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) + + # Scale up the application (2 more units than the current scale). + await scale_application(ops_test, APP_NAME, initial_scale + 1) + + # Ensure the new members were added to the cluster. + assert get_cluster_members(address) == get_application_units(ops_test, APP_NAME) + + # Scale the application to the initial scale. + await scale_application(ops_test, APP_NAME, initial_scale) + + +async def test_persist_data_through_graceful_restart(ops_test: OpsTest): + """Test data persists through a graceful restart.""" + primary = await get_primary(ops_test) + password = await get_operator_password(ops_test) + address = await get_unit_address(ops_test, primary) + + # Write data to primary IP. + logger.info(f"connecting to primary {primary} on {address}") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("CREATE TABLE gracetest (testcol INT );") + + # Restart all nodes by scaling to 0, then back up + # These have to run sequentially for the test to be valid/stable. + await ops_test.model.applications[APP_NAME].scale(0) + await ops_test.model.applications[APP_NAME].scale(3) + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, idle_period=45, check_freq=2 + ) + + # Testing write occurred to every postgres instance by reading from them + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + with db_connect(host=host, password=password) as connection: + # Ensure we can read from "gracetest" table + connection.cursor().execute("SELECT * FROM gracetest;") + + +async def test_persist_data_through_failure(ops_test: OpsTest): + """Test data persists through a failure.""" + primary = await get_primary(ops_test) + password = await get_operator_password(ops_test) + address = await get_unit_address(ops_test, primary) + + # Write data to primary IP. + logger.info(f"connecting to primary {primary} on {address}") + with db_connect(host=address, password=password) as connection: + connection.autocommit = True + connection.cursor().execute("CREATE TABLE failtest (testcol INT );") + + # Cause a machine failure by killing a unit in k8s + model = ops_test.model.info + client = AsyncClient(namespace=model.name) + await client.delete(Pod, name=primary.replace("/", "-")) + logger.info("primary pod deleted") + + # Wait for juju to notice one of the pods is gone and fix it + logger.info("wait for juju to reset postgres container") + await ops_test.model.wait_for_idle( + apps=[APP_NAME], + status="active", + timeout=1000, + wait_for_exact_units=3, + check_freq=2, + idle_period=45, + ) + logger.info("juju has reset postgres container") + + # Testing write occurred to every postgres instance by reading from them + status = await ops_test.model.get_status() # noqa: F821 + for unit in status["applications"][APP_NAME]["units"].values(): + host = unit["address"] + logger.info("connecting to the database host: %s", host) + with db_connect(host=host, password=password) as connection: + # Ensure we can read from "failtest" table + connection.cursor().execute("SELECT * FROM failtest;") + + +async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: + """Tests that an automatic failover is triggered after an issue happens in the leader.""" + # Find the current primary unit. + primary = await get_primary(ops_test) + + # Crash PostgreSQL by removing the data directory. + await ops_test.model.units.get(primary).run(f"rm -rf {STORAGE_PATH}/pgdata") + + # Wait for charm to stabilise + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + + # Primary doesn't have to be different, but it does have to exist. + assert await get_primary(ops_test) != "None" async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: From 3268258c699fc29b7d24d0b3d1e3b48dca08583d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 26 Aug 2022 11:13:46 -0300 Subject: [PATCH 17/21] Add test about redeployment --- tests/integration/test_charm.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 35107ad98d..ae4bbbefe4 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -200,6 +200,7 @@ async def test_persist_data_through_graceful_restart(ops_test: OpsTest): connection.cursor().execute("SELECT * FROM gracetest;") +@pytest.mark.abort_on_fail async def test_persist_data_through_failure(ops_test: OpsTest): """Test data persists through a failure.""" primary = await get_primary(ops_test) @@ -282,6 +283,26 @@ async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: assert APP_NAME not in ops_test.model.applications +async def test_redeploy_charm_in_the_same_model(ops_test: OpsTest): + """Redeploy the charm in the same model to test that it works.""" + charm = await ops_test.build_charm(".") + async with ops_test.fast_forward(): + await ops_test.model.deploy( + charm, + resources={ + "postgresql-image": METADATA["resources"]["postgresql-image"]["upstream-source"] + }, + application_name=APP_NAME, + num_units=3, + trust=True, + ) + + # This check is enough to ensure that the charm/workload is working for this specific test. + await ops_test.model.wait_for_idle( + apps=[APP_NAME], status="active", timeout=1000, wait_for_exact_units=3 + ) + + @retry( retry=retry_if_result(lambda x: not x), stop=stop_after_attempt(10), From 7038d437392886a8371b03d427952ea4ad5962a0 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 26 Aug 2022 13:38:20 -0300 Subject: [PATCH 18/21] Revert some test changes --- tests/integration/test_charm.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index ae4bbbefe4..f58daab00d 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -186,9 +186,7 @@ async def test_persist_data_through_graceful_restart(ops_test: OpsTest): # These have to run sequentially for the test to be valid/stable. await ops_test.model.applications[APP_NAME].scale(0) await ops_test.model.applications[APP_NAME].scale(3) - await ops_test.model.wait_for_idle( - apps=[APP_NAME], status="active", timeout=1000, idle_period=45, check_freq=2 - ) + await ops_test.model.wait_for_idle(apps=[APP_NAME], status="active", timeout=1000) # Testing write occurred to every postgres instance by reading from them status = await ops_test.model.get_status() # noqa: F821 @@ -200,7 +198,6 @@ async def test_persist_data_through_graceful_restart(ops_test: OpsTest): connection.cursor().execute("SELECT * FROM gracetest;") -@pytest.mark.abort_on_fail async def test_persist_data_through_failure(ops_test: OpsTest): """Test data persists through a failure.""" primary = await get_primary(ops_test) @@ -258,7 +255,7 @@ async def test_automatic_failover_after_leader_issue(ops_test: OpsTest) -> None: assert await get_primary(ops_test) != "None" -async def test_application_removal_cleanup_resources(ops_test: OpsTest) -> None: +async def test_application_removal(ops_test: OpsTest) -> None: # Remove the application to trigger some hooks (like peer relation departed). await ops_test.model.applications[APP_NAME].remove() From 040f56c9730509d603f66af0e48b5a1584ddec2b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 26 Aug 2022 14:13:13 -0300 Subject: [PATCH 19/21] Improve code --- src/charm.py | 15 ++++----- tests/integration/helpers.py | 55 ++++++++++++++------------------- tests/integration/test_charm.py | 12 +++---- 3 files changed, 36 insertions(+), 46 deletions(-) diff --git a/src/charm.py b/src/charm.py index 1479b29d86..73e7b325a4 100755 --- a/src/charm.py +++ b/src/charm.py @@ -439,24 +439,21 @@ def _on_get_primary(self, event: ActionEvent) -> None: logger.error(f"failed to get primary with error {e}") def _on_stop(self, _) -> None: - """Handle the stop event.""" + """Remove k8s resources created by the charm and Patroni.""" client = Client() - resources_to_delete = [] # Get the k8s resources created by the charm. with open("src/resources.yaml") as f: resources = codecs.load_all_yaml(f, context=self._context) # Ignore the service resources, which will be retrieved in the next step. - resources_to_delete.extend( - list( - filter( - lambda x: not isinstance(x, Service), - resources, - ) + resources_to_delete = list( + filter( + lambda x: not isinstance(x, Service), + resources, ) ) - # Get the k8s resources created by Patroni. + # Get the k8s resources created by the charm and Patroni. for kind in [Endpoints, Service]: resources_to_delete.extend( client.list( diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 9821eed9cd..3e455e61d0 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -220,7 +220,7 @@ def get_charm_resources(namespace: str, application: str) -> List[GenericNamespa return codecs.load_all_yaml(f, context=context) -def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set: +def get_existing_k8s_resources(namespace: str, application: str) -> set: """Return the list of k8s resources that were created by the charm and Patroni. Args: @@ -237,9 +237,10 @@ def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set: # Retrieve the k8s resources the charm should create. charm_resources = get_charm_resources(namespace, application) - # Check the k8s API for the resources that currently exist. - existing_charm_resources = list( + # Add only the resources that currently exist. + resources = set( map( + # Build an identifier for each resource (using its type and name). lambda x: f"{type(x).__name__}/{x.metadata.name}", filter( lambda x: (resource_exists(client, x)), @@ -248,36 +249,27 @@ def get_existing_patroni_k8s_resources(namespace: str, application: str) -> set: ) ) - # Add only the existing resources to the list. - resources = set( - map( - lambda x: f"{x.split('/')[0]}/{x.split('/')[1]}", - existing_charm_resources, - ) - ) - - # Include the resources created by Patroni. + # Include the resources created by the charm and Patroni. for kind in [Endpoints, Service]: - patroni_resources = client.list( + extra_resources = client.list( kind, namespace=namespace, labels={"app.juju.is/created-by": application}, ) - - # Build an identifier for each resource (using its type and name). - mapped_patroni_resources = set( - map( - lambda x: f"{kind.__name__}/{x.metadata.name}", - patroni_resources, + resources.update( + set( + map( + # Build an identifier for each resource (using its type and name). + lambda x: f"{kind.__name__}/{x.metadata.name}", + extra_resources, + ) ) ) - resources.update(mapped_patroni_resources) - return resources -def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set: +def get_expected_k8s_resources(namespace: str, application: str) -> set: """Return the list of expected k8s resources when the charm is deployed. Args: @@ -299,15 +291,16 @@ def get_expected_patroni_k8s_resources(namespace: str, application: str) -> set: ) ) - # Include the resources created by Patroni. - patroni_resources = [ - f"Endpoints/patroni-{application}-config", - f"Endpoints/patroni-{application}", - f"Endpoints/{application}-primary", - f"Endpoints/{application}-replicas", - f"Service/patroni-{application}-config", - ] - resources.update(patroni_resources) + # Include the resources created by the charm and Patroni. + resources.update( + [ + f"Endpoints/patroni-{application}-config", + f"Endpoints/patroni-{application}", + f"Endpoints/{application}-primary", + f"Endpoints/{application}-replicas", + f"Service/patroni-{application}-config", + ] + ) return resources diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index f58daab00d..2dae345709 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -18,8 +18,8 @@ convert_records_to_dict, get_application_units, get_cluster_members, - get_existing_patroni_k8s_resources, - get_expected_patroni_k8s_resources, + get_existing_k8s_resources, + get_expected_k8s_resources, get_operator_password, get_unit_address, scale_application, @@ -58,8 +58,8 @@ async def test_application_created_required_resources(ops_test: OpsTest) -> None # Compare the k8s resources that the charm and Patroni should create with # the currently created k8s resources. namespace = ops_test.model.info.name - existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) - expected_resources = get_expected_patroni_k8s_resources(namespace, APP_NAME) + existing_resources = get_existing_k8s_resources(namespace, APP_NAME) + expected_resources = get_expected_k8s_resources(namespace, APP_NAME) assert set(existing_resources) == set(expected_resources) @@ -272,7 +272,7 @@ async def test_application_removal(ops_test: OpsTest) -> None: # Check that all k8s resources created by the charm and Patroni were removed. namespace = ops_test.model.info.name - existing_resources = get_existing_patroni_k8s_resources(namespace, APP_NAME) + existing_resources = get_existing_k8s_resources(namespace, APP_NAME) assert set(existing_resources) == set() # Check whether the application is gone @@ -280,7 +280,7 @@ async def test_application_removal(ops_test: OpsTest) -> None: assert APP_NAME not in ops_test.model.applications -async def test_redeploy_charm_in_the_same_model(ops_test: OpsTest): +async def test_redeploy_charm_same_model(ops_test: OpsTest): """Redeploy the charm in the same model to test that it works.""" charm = await ops_test.build_charm(".") async with ops_test.fast_forward(): From a178eccd66e81960cf242ac47bc4455c7d5883dd Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 26 Aug 2022 15:42:44 -0300 Subject: [PATCH 20/21] Fix docstring --- tests/integration/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 3e455e61d0..5a024e8c7c 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -345,7 +345,7 @@ async def get_unit_address(ops_test: OpsTest, unit_name: str) -> str: def resource_exists(client: Client, resource: GenericNamespacedResource) -> bool: - """Get the name of the current model. + """Check whether a specific resource exists. Args: client: k8s API client instance. From d7fcce66981be09f9509b7e4774d528a03a59338 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Sat, 27 Aug 2022 11:27:06 -0300 Subject: [PATCH 21/21] Add pytest marks --- tests/integration/test_charm.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index 2dae345709..5e04f54429 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -54,6 +54,7 @@ async def test_build_and_deploy(ops_test: OpsTest): assert ops_test.model.applications[APP_NAME].units[unit_id].workload_status == "active" +@pytest.mark.charm async def test_application_created_required_resources(ops_test: OpsTest) -> None: # Compare the k8s resources that the charm and Patroni should create with # the currently created k8s resources. @@ -280,6 +281,7 @@ async def test_application_removal(ops_test: OpsTest) -> None: assert APP_NAME not in ops_test.model.applications +@pytest.mark.charm async def test_redeploy_charm_same_model(ops_test: OpsTest): """Redeploy the charm in the same model to test that it works.""" charm = await ops_test.build_charm(".")