diff --git a/src/charm.py b/src/charm.py index 0ecf24cb45..200d2eddae 100755 --- a/src/charm.py +++ b/src/charm.py @@ -13,7 +13,7 @@ PostgreSQLUpdateUserPasswordError, ) from charms.postgresql_k8s.v0.postgresql_tls import PostgreSQLTLS -from charms.rolling_ops.v0.rollingops import RollingOpsManager +from charms.rolling_ops.v0.rollingops import RollingOpsManager, RunWithLock from lightkube import ApiError, Client, codecs from lightkube.models.core_v1 import ServicePort from lightkube.resources.core_v1 import Endpoints, Pod, Service @@ -76,7 +76,6 @@ def __init__(self, *args): self._context = {"namespace": self._namespace, "app_name": self._name} self.cluster_name = f"patroni-{self._name}" - self.framework.observe(self.on.install, self._on_install) self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) @@ -211,10 +210,6 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) - # Update the replication configuration. - self._patroni.render_postgresql_conf_file() - self._patroni.reload_patroni_configuration() - def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -262,11 +257,6 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: self.unit.status = ActiveStatus() - def _on_install(self, _) -> None: - """Event handler for InstallEvent.""" - # Creates custom postgresql.conf file. - self._patroni.render_postgresql_conf_file() - def _on_config_changed(self, _) -> None: """Handle the config-changed event.""" # TODO: placeholder method to implement logic specific to configuration change. @@ -384,13 +374,6 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_members(event) - # Update the replication configuration. - self._patroni.render_postgresql_conf_file() - try: - self._patroni.reload_patroni_configuration() - except RetryError: - pass # This error can happen in the first leader election, as Patroni is not running yet. - def _create_pgdata(self, container: Container): """Create the PostgreSQL data directory.""" path = f"{self._storage_path}/pgdata" @@ -410,9 +393,6 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # where the volume is mounted with more restrictive permissions. self._create_pgdata(container) - # Create a new config layer. - new_layer = self._postgresql_layer() - self.unit.set_workload_version(self._patroni.rock_postgresql_version) # Defer the initialization of the workload in the replicas @@ -436,18 +416,8 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: event.defer() return - # Get the current layer. - current_layer = container.get_plan() - # Check if there are any changes to layer services. - if current_layer.services != new_layer.services: - # Changes were made, add the new layer. - container.add_layer(self._postgresql_service, new_layer, combine=True) - logging.info("Added updated layer 'postgresql' to Pebble plan") - # TODO: move this file generation to on config changed hook - # when adding configs to this charm. - # Restart it and report a new status to Juju. - container.restart(self._postgresql_service) - logging.info("Restarted postgresql service") + # Start the database service. + self._update_pebble_layers() # Ensure the member is up and running before marking the cluster as initialised. if not self._patroni.member_started: @@ -840,6 +810,15 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": f"{self._patroni._patroni_url}/health", + }, + } + }, } return Layer(layer_config) @@ -878,6 +857,15 @@ def push_tls_files_to_workload(self, container: Container = None) -> None: user=WORKLOAD_OS_USER, group=WORKLOAD_OS_GROUP, ) + container.push( + "/usr/local/share/ca-certificates/ca.crt", + ca, + make_dirs=True, + permissions=0o400, + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ) + container.exec(["update-ca-certificates"]).wait() if cert is not None: container.push( f"{self._storage_path}/{TLS_CERT_FILE}", @@ -890,8 +878,13 @@ def push_tls_files_to_workload(self, container: Container = None) -> None: self.update_config() - def _restart(self, _) -> None: + def _restart(self, event: RunWithLock) -> None: """Restart PostgreSQL.""" + if not self._patroni.are_all_members_ready(): + logger.debug("Early exit _restart: not all members ready yet") + event.defer() + return + try: self._patroni.restart_postgresql() except RetryError: @@ -900,6 +893,9 @@ def _restart(self, _) -> None: self.unit.status = BlockedStatus(error_message) return + # Update health check URL. + self._update_pebble_layers() + # Start or stop the pgBackRest TLS server service when TLS certificate change. self.backup.start_stop_pgbackrest_service() @@ -915,7 +911,6 @@ def update_config(self) -> None: backup_id=self.app_peer_data.get("restoring-backup"), stanza=self.app_peer_data.get("stanza"), ) - self._patroni.render_postgresql_conf_file() if not self._patroni.member_started: # If Patroni/PostgreSQL has not started yet and TLS relations was initialised, # then mark TLS as enabled. This commonly happens when the charm is deployed @@ -934,6 +929,28 @@ def update_config(self) -> None: if restart_postgresql: self.on[self.restart_manager.name].acquire_lock.emit() + def _update_pebble_layers(self) -> None: + """Update the pebble layers to keep the health check URL up-to-date.""" + container = self.unit.get_container("postgresql") + + # Get the current layer. + current_layer = container.get_plan() + + # Create a new config layer. + new_layer = self._postgresql_layer() + + # Check if there are any changes to layer services. + if current_layer.services != new_layer.services: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Added updated layer 'postgresql' to Pebble plan") + container.restart(self._postgresql_service) + logging.info("Restarted postgresql service") + if current_layer.checks != new_layer.checks: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Updated health checks") + def _unit_name_to_pod_name(self, unit_name: str) -> str: """Converts unit name to pod name. diff --git a/src/patroni.py b/src/patroni.py index 1708cdc9e4..10d8a80f4e 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -224,7 +224,7 @@ def render_patroni_yml_file( stanza: name of the stanza created by pgBackRest. backup_id: id of the backup that is being restored. """ - # Open the template postgresql.conf file. + # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. @@ -244,24 +244,11 @@ def render_patroni_yml_file( restoring_backup=backup_id is not None, backup_id=backup_id, stanza=stanza, + minority_count=self._members_count // 2, version=self.rock_postgresql_version.split(".")[0], ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) - def render_postgresql_conf_file(self) -> None: - """Render the PostgreSQL configuration file.""" - # Open the template postgresql.conf file. - with open("templates/postgresql.conf.j2", "r") as file: - template = Template(file.read()) - # Render the template file with the correct values. - # TODO: add extra configurations here later. - rendered = template.render( - logging_collector="on", - synchronous_commit="on" if self._members_count > 1 else "off", - synchronous_standby_names="*", - ) - self._render_file(f"{self._storage_path}/postgresql-k8s-operator.conf", rendered, 0o644) - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reload_patroni_configuration(self) -> None: """Reloads the configuration after it was updated in the file.""" diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index f6f27fee4f..af64a428ed 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -1,17 +1,24 @@ bootstrap: dcs: + synchronous_mode: true + synchronous_node_count: {{ minority_count }} postgresql: use_pg_rewind: true remove_data_directory_on_rewind_failure: true remove_data_directory_on_diverged_timelines: true bin_dir: /usr/lib/postgresql/{{ version }}/bin parameters: + synchronous_commit: on + synchronous_standby_names: "*" {%- if enable_pgbackrest %} archive_command: 'pgbackrest --stanza={{ stanza }} archive-push %p' {% else %} archive_command: /bin/true {%- endif %} archive_mode: {{ archive_mode }} + log_filename: 'postgresql.log' + log_directory: '/var/log/postgresql' + logging_collector: 'on' password_encryption: md5 wal_level: logical {%- if restoring_backup %} @@ -50,7 +57,6 @@ ctl: pod_ip: '{{ endpoint }}' postgresql: connect_address: '{{ endpoint }}:5432' - custom_conf: {{ storage_path }}/postgresql-k8s-operator.conf data_dir: {{ storage_path }}/pgdata bin_dir: /usr/lib/postgresql/{{ version }}/bin listen: 0.0.0.0:5432 diff --git a/templates/postgresql.conf.j2 b/templates/postgresql.conf.j2 deleted file mode 100644 index 213fdfe069..0000000000 --- a/templates/postgresql.conf.j2 +++ /dev/null @@ -1,7 +0,0 @@ -######################################################################################### -# [ WARNING ] -# postgresql configuration file maintained by the postgresql-k8s-operator -# local changes may be overwritten. -######################################################################################### -synchronous_commit = '{{ synchronous_commit }}' -synchronous_standby_names = '{{ synchronous_standby_names }}' \ No newline at end of file diff --git a/tests/integration/ha_tests/application-charm/metadata.yaml b/tests/integration/ha_tests/application-charm/metadata.yaml index d9ba6ef6f3..ee4cb703e7 100644 --- a/tests/integration/ha_tests/application-charm/metadata.yaml +++ b/tests/integration/ha_tests/application-charm/metadata.yaml @@ -11,3 +11,7 @@ requires: database: interface: postgresql_client limit: 1 + +peers: + application-peers: + interface: application-peers diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index c4b136e249..7d96a1b59b 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -9,24 +9,41 @@ """ import logging +import os +import signal import subprocess -from typing import Optional +from typing import Dict, Optional import psycopg2 from charms.data_platform_libs.v0.data_interfaces import DatabaseRequires from ops.charm import ActionEvent, CharmBase -from ops.framework import StoredState from ops.main import main -from ops.model import ActiveStatus +from ops.model import ActiveStatus, Relation from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed logger = logging.getLogger(__name__) +PEER = "application-peers" +LAST_WRITTEN_FILE = "/tmp/last_written_value" +CONFIG_FILE = "/tmp/continuous_writes_config" +PROC_PID_KEY = "proc-pid" + class ApplicationCharm(CharmBase): """Application charm that connects to PostgreSQL charm.""" - _stored = StoredState() + @property + def _peers(self) -> Optional[Relation]: + """Retrieve the peer relation (`ops.model.Relation`).""" + return self.model.get_relation(PEER) + + @property + def app_peer_data(self) -> Dict: + """Application peer relation data object.""" + if self._peers is None: + return {} + + return self._peers.data[self.app] def __init__(self, *args): super().__init__(*args) @@ -37,8 +54,6 @@ def __init__(self, *args): # Events related to the database that is requested. self.database_name = "application" self.database = DatabaseRequires(self, "database", self.database_name) - self.framework.observe(self.database.on.database_created, self._on_database_created) - self.framework.observe(self.database.on.endpoints_changed, self._on_endpoints_changed) self.framework.observe( self.on.clear_continuous_writes_action, self._on_clear_continuous_writes_action ) @@ -49,9 +64,6 @@ def __init__(self, *args): self.on.stop_continuous_writes_action, self._on_stop_continuous_writes_action ) - # PID of the continuous writes OS process. - self._stored.set_default(continuous_writes_pid=None) - @property def _connection_string(self) -> Optional[str]: """Returns the PostgreSQL connection string.""" @@ -63,6 +75,10 @@ def _connection_string(self) -> Optional[str]: return None host = endpoints.split(":")[0] + + if not host or host == "None": + return None + return ( f"dbname='{self.database_name}' user='{username}'" f" host='{host}' password='{password}' connect_timeout=5" @@ -72,15 +88,6 @@ def _on_start(self, _) -> None: """Only sets an Active status.""" self.unit.status = ActiveStatus() - def _on_database_created(self, _) -> None: - """Event triggered when a database was created for this application.""" - self._start_continuous_writes(1) - - def _on_endpoints_changed(self, _) -> None: - """Event triggered when the read/write endpoints of the database change.""" - count = self._count_writes() - self._start_continuous_writes(count + 1) - def _count_writes(self) -> int: """Count the number of records in the continuous_writes table.""" with psycopg2.connect( @@ -91,18 +98,64 @@ def _count_writes(self) -> int: connection.close() return count - def _on_clear_continuous_writes_action(self, _) -> None: + def _on_clear_continuous_writes_action(self, event: ActionEvent) -> None: """Clears database writes.""" - self._stop_continuous_writes() - with psycopg2.connect( - self._connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("DROP TABLE continuous_writes;") - connection.close() + if self._connection_string is None: + event.set_results({"result": "False"}) + return - def _on_start_continuous_writes_action(self, _) -> None: + try: + self._stop_continuous_writes() + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to stop writes to drop table", exc_info=e) + return + + try: + with psycopg2.connect( + self._connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + event.set_results({"result": "True"}) + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to drop table", exc_info=e) + finally: + connection.close() + + def _on_start_continuous_writes_action(self, event: ActionEvent) -> None: """Start the continuous writes process.""" + if self._connection_string is None: + event.set_results({"result": "False"}) + return + + try: + self._stop_continuous_writes() + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to stop writes to create table", exc_info=e) + return + + try: + # Create the table to write records on and also a unique index to prevent duplicate + # writes. + with psycopg2.connect( + self._connection_string + ) as connection, connection.cursor() as cursor: + connection.autocommit = True + cursor.execute("CREATE TABLE IF NOT EXISTS continuous_writes(number INTEGER);") + cursor.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS number ON continuous_writes(number);" + ) + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to create table", exc_info=e) + return + finally: + connection.close() + self._start_continuous_writes(1) + event.set_results({"result": "True"}) def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None: """Stops the continuous writes process.""" @@ -117,47 +170,48 @@ def _start_continuous_writes(self, starting_number: int) -> None: # Stop any writes that might be going. self._stop_continuous_writes() + with open(CONFIG_FILE, "w") as fd: + fd.write(self._connection_string) + os.fsync(fd) + # Run continuous writes in the background. popen = subprocess.Popen( [ "/usr/bin/python3", "src/continuous_writes.py", - self._connection_string, str(starting_number), ] ) # Store the continuous writes process ID to stop the process later. - self._stored.continuous_writes_pid = popen.pid + self.app_peer_data[PROC_PID_KEY] = str(popen.pid) - def _stop_continuous_writes(self) -> int: + def _stop_continuous_writes(self) -> Optional[int]: """Stops continuous writes to PostgreSQL and returns the last written value.""" - # If there is no process running, returns -1. - if self._stored.continuous_writes_pid is None: - return -1 + if not self.app_peer_data.get(PROC_PID_KEY): + return None # Stop the process. - proc = subprocess.Popen(["pkill", "--signal", "SIGKILL", "-f", "src/continuous_writes.py"]) - - # Wait for process to be killed. - proc.communicate() + try: + os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGTERM) + except ProcessLookupError: + del self.app_peer_data[PROC_PID_KEY] + return None - self._stored.continuous_writes_pid = None + del self.app_peer_data[PROC_PID_KEY] # Return the max written value (or -1 if it was not possible to get that value). try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): with attempt: - with psycopg2.connect( - self._connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("SELECT MAX(number) FROM continuous_writes;") - last_written_value = int(cursor.fetchone()[0]) - connection.close() + with open(LAST_WRITTEN_FILE, "r") as fd: + last_written_value = int(fd.read()) except RetryError as e: - logger.exception(e) + logger.exception("Unable to read result", exc_info=e) return -1 + os.remove(LAST_WRITTEN_FILE) + os.remove(CONFIG_FILE) return last_written_value diff --git a/tests/integration/ha_tests/application-charm/src/continuous_writes.py b/tests/integration/ha_tests/application-charm/src/continuous_writes.py index 83ab585749..c392e9221f 100644 --- a/tests/integration/ha_tests/application-charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application-charm/src/continuous_writes.py @@ -2,12 +2,28 @@ # See LICENSE file for licensing details. """This file is meant to run in the background continuously writing entries to PostgreSQL.""" +import os +import signal import sys import psycopg2 as psycopg2 +run = True +connection_string = None -def continuous_writes(connection_string: str, starting_number: int): + +def sigterm_handler(_signo, _stack_frame): + global run + run = False + + +def read_config_file(): + with open("/tmp/continuous_writes_config") as fd: + global connection_string + connection_string = fd.read().strip() + + +def continuous_writes(starting_number: int): """Continuously writes data do PostgreSQL database. Args: @@ -17,19 +33,10 @@ def continuous_writes(connection_string: str, starting_number: int): """ write_value = starting_number - try: - # Create the table to write records on and also a unique index to prevent duplicate writes. - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - connection.autocommit = True - cursor.execute("CREATE TABLE IF NOT EXISTS continuous_writes(number INTEGER);") - cursor.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS number ON continuous_writes(number);" - ) - finally: - connection.close() + read_config_file() # Continuously write the record to the database (incrementing it at each iteration). - while True: + while run: try: with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: connection.autocommit = True @@ -53,12 +60,16 @@ def continuous_writes(connection_string: str, starting_number: int): write_value += 1 + with open("/tmp/last_written_value", "w") as fd: + fd.write(str(write_value - 1)) + os.fsync(fd) + def main(): - connection_string = sys.argv[1] - starting_number = int(sys.argv[2]) - continuous_writes(connection_string, starting_number) + starting_number = int(sys.argv[1]) + continuous_writes(starting_number) if __name__ == "__main__": + signal.signal(signal.SIGTERM, sigterm_handler) main() diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index d331e070fb..db7f902b0a 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -3,14 +3,12 @@ # See LICENSE file for licensing details. import pytest as pytest from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed from tests.integration.ha_tests.helpers import ( - app_name, - change_master_start_timeout, - get_master_start_timeout, - stop_continuous_writes, + change_primary_start_timeout, + get_primary_start_timeout, ) -from tests.integration.helpers import CHARM_SERIES APPLICATION_NAME = "application" @@ -18,48 +16,25 @@ @pytest.fixture() async def continuous_writes(ops_test: OpsTest) -> None: """Deploy the charm that makes continuous writes to PostgreSQL.""" - # Deploy the continuous writes application charm if it wasn't already deployed. - async with ops_test.fast_forward(): - if await app_name(ops_test, APPLICATION_NAME) is None: - charm = await ops_test.build_charm("tests/integration/ha_tests/application-charm") - await ops_test.model.deploy( - charm, application_name=APPLICATION_NAME, series=CHARM_SERIES - ) - await ops_test.model.wait_for_idle(status="active", timeout=1000) - - # Start the continuous writes process by relating the application to the database or - # by calling the action if the relation already exists. - database_app = await app_name(ops_test) - relations = [ - relation - for relation in ops_test.model.applications[database_app].relations - if not relation.is_peer - and f"{relation.requires.application_name}:{relation.requires.name}" - == "application:database" - ] - if not relations: - await ops_test.model.relate(database_app, APPLICATION_NAME) - await ops_test.model.wait_for_idle(status="active", timeout=1000) - else: - action = await ops_test.model.units.get(f"{APPLICATION_NAME}/0").run_action( - "start-continuous-writes" - ) - await action.wait() yield - # Stop the continuous writes process and clear the written data at the end. - await stop_continuous_writes(ops_test) - action = await ops_test.model.units.get(f"{APPLICATION_NAME}/0").run_action( - "clear-continuous-writes" - ) - await action.wait() + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" -@pytest.fixture() -async def master_start_timeout(ops_test: OpsTest) -> None: - """Temporary change the master start timeout configuration.""" +@pytest.fixture(scope="module") +async def primary_start_timeout(ops_test: OpsTest) -> None: + """Temporary change the primary start timeout configuration.""" # Change the parameter that makes the primary reelection faster. - initial_master_start_timeout = await get_master_start_timeout(ops_test) - await change_master_start_timeout(ops_test, 0) + initial_primary_start_timeout = await get_primary_start_timeout(ops_test) + await change_primary_start_timeout(ops_test, 0) yield # Rollback to the initial configuration. - await change_master_start_timeout(ops_test, initial_master_start_timeout) + await change_primary_start_timeout(ops_test, initial_primary_start_timeout) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 3c649ad0ea..3caa1c8448 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,7 +1,8 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import asyncio from pathlib import Path -from typing import Optional +from typing import Dict, Optional, Tuple import psycopg2 import requests @@ -37,12 +38,17 @@ class ProcessError(Exception): """Raised when a process fails.""" -async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: - """Change master start timeout configuration. +def get_patroni_cluster(unit_ip: str) -> Dict[str, str]: + resp = requests.get(f"http://{unit_ip}:8008/cluster") + return resp.json() + + +async def change_primary_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: + """Change primary start timeout configuration. Args: ops_test: ops_test instance. - seconds: number of seconds to set in master_start_timeout configuration. + seconds: number of seconds to set in primary_start_timeout configuration. """ for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): with attempt: @@ -51,35 +57,69 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) unit_ip = await get_unit_address(ops_test, primary_name) requests.patch( f"http://{unit_ip}:8008/config", - json={"master_start_timeout": seconds}, + json={"primary_start_timeout": seconds}, ) -async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: +async def check_writes(ops_test) -> int: + """Gets the total writes from the test charm and compares to the writes from db.""" + total_expected_writes = await stop_continuous_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test) + for member, count in actual_writes.items(): + assert ( + count == max_number_written[member] + ), f"{member}: writes to the db were missed: count of actual writes different from the max number written." + assert total_expected_writes == count, f"{member}: writes to the db were missed." + return total_expected_writes + + +async def check_writes_are_increasing(ops_test, down_unit: str) -> None: + """Verify new writes are continuing by counting the number of writes.""" + writes, _ = await count_writes(ops_test, down_unit=down_unit) + for member, count in writes.items(): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + more_writes, _ = await count_writes(ops_test, down_unit=down_unit) + assert more_writes[member] > count, f"{member}: writes not continuing to DB" + + +async def count_writes( + ops_test: OpsTest, down_unit: str = None +) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) status = await ops_test.model.get_status() for unit_name, unit in status["applications"][app]["units"].items(): if unit_name != down_unit: - host = unit["address"] + cluster = get_patroni_cluster(unit["address"]) break - connection_string = ( - f"dbname='application' user='operator'" - f" host='{host}' password='{password}' connect_timeout=10" - ) - try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - with psycopg2.connect( - connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number) FROM continuous_writes;") - count = cursor.fetchone()[0] - connection.close() - except RetryError: - return -1 - return count + count = {} + max = {} + for member in cluster["members"]: + if member["role"] != "replica" and member["host"].split(".")[0] != ( + down_unit or "" + ).replace("/", "-"): + host = member["host"] + + # Translate the service hostname to an IP address. + model = ops_test.model.info + client = Client(namespace=model.name) + service = client.get(Pod, name=host.split(".")[0]) + ip = service.status.podIP + + connection_string = ( + f"dbname='application' user='operator'" + f" host='{ip}' password='{password}' connect_timeout=10" + ) + + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member["name"]] = results[0] + max[member["name"]] = results[1] + connection.close() + return count, max async def fetch_cluster_members(ops_test: OpsTest): @@ -112,14 +152,14 @@ def get_host_ip(host: str) -> str: return member_ips -async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: - """Get the master start timeout configuration. +async def get_primary_start_timeout(ops_test: OpsTest) -> Optional[int]: + """Get the primary start timeout configuration. Args: ops_test: ops_test instance. Returns: - master start timeout in seconds or None if it's using the default value. + primary start timeout in seconds or None if it's using the default value. """ for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): with attempt: @@ -127,8 +167,8 @@ async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: primary_name = await get_primary(ops_test, app) unit_ip = await get_unit_address(ops_test, primary_name) configuration_info = requests.get(f"http://{unit_ip}:8008/config") - master_start_timeout = configuration_info.json().get("master_start_timeout") - return int(master_start_timeout) if master_start_timeout is not None else None + primary_start_timeout = configuration_info.json().get("primary_start_timeout") + return int(primary_start_timeout) if primary_start_timeout is not None else None async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: @@ -153,7 +193,7 @@ async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: # A member that restarted has the DB process stopped may # take some time to know that a new primary was elected. - if role == "replica": + if role != "leader": return True else: raise MemberNotUpdatedOnClusterError() @@ -207,7 +247,7 @@ async def secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_write async def send_signal_to_process( - ops_test: OpsTest, unit_name: str, process: str, signal: str + ops_test: OpsTest, unit_name: str, process: str, signal: str, use_ssh: bool = False ) -> None: """Send a signal to an OS process on a specific unit. @@ -227,12 +267,24 @@ async def send_signal_to_process( await ops_test.model.applications[app].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + pod_name = unit_name.replace("/", "-") + command = f"pkill --signal {signal} -f {process}" + + if use_ssh: + kill_cmd = f"ssh {unit_name} {command}" + return_code, _, _ = await asyncio.wait_for(ops_test.juju(*kill_cmd.split()), 10) + if return_code != 0: + raise ProcessError( + "Expected command %s to succeed instead it failed: %s", + command, + return_code, + ) + return + # Load Kubernetes configuration to connect to the cluster. config.load_kube_config() # Send the signal. - pod_name = unit_name.replace("/", "-") - command = f"pkill --signal {signal} -f {process}" response = stream( core_v1_api.CoreV1Api().connect_get_namespaced_pod_exec, pod_name, @@ -256,6 +308,38 @@ async def send_signal_to_process( ) +async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: + """Start continuous writes to PostgreSQL.""" + # Start the process by relating the application to the database or + # by calling the action if the relation already exists. + relations = [ + relation + for relation in ops_test.model.applications[app].relations + if not relation.is_peer + and f"{relation.requires.application_name}:{relation.requires.name}" + == "application:database" + ] + if not relations: + await ops_test.model.relate(app, "application") + await ops_test.model.wait_for_idle(status="active", timeout=1000) + else: + action = ( + await ops_test.model.applications["application"] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model.applications["application"] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to create continuous_writes table" + + async def stop_continuous_writes(ops_test: OpsTest) -> int: """Stops continuous writes to PostgreSQL and returns the last written value.""" action = await ops_test.model.units.get("application/0").run_action("stop-continuous-writes") diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index a4e3012277..8cbd8536f8 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -1,61 +1,100 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import logging +from time import sleep import pytest from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed +from tests.integration.ha_tests.conftest import APPLICATION_NAME from tests.integration.ha_tests.helpers import ( - count_writes, + METADATA, + check_writes, + check_writes_are_increasing, fetch_cluster_members, get_primary, is_replica, postgresql_ready, secondary_up_to_date, send_signal_to_process, - stop_continuous_writes, + start_continuous_writes, ) -from tests.integration.helpers import app_name, build_and_deploy, get_unit_address +from tests.integration.helpers import ( + CHARM_SERIES, + app_name, + build_and_deploy, + get_unit_address, +) + +logger = logging.getLogger(__name__) -PATRONI_PROCESS = "/usr/local/bin/patroni" -POSTGRESQL_PROCESS = "postgres" +APP_NAME = METADATA["name"] +PATRONI_PROCESS = "/usr/bin/patroni" +POSTGRESQL_PROCESS = "/usr/lib/postgresql/14/bin/postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] +MEDIAN_ELECTION_TIME = 10 @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy three unit of PostgreSQL.""" - await build_and_deploy(ops_test, 3) - - -@pytest.mark.unstable -@pytest.mark.parametrize("process", [POSTGRESQL_PROCESS]) + wait_for_apps = False + # It is possible for users to provide their own cluster for HA testing. Hence, check if there + # is a pre-existing cluster. + if not await app_name(ops_test): + wait_for_apps = True + await build_and_deploy(ops_test, 3, wait_for_idle=False) + # Deploy the continuous writes application charm if it wasn't already deployed. + if not await app_name(ops_test, APPLICATION_NAME): + wait_for_apps = True + async with ops_test.fast_forward(): + charm = await ops_test.build_charm("tests/integration/ha_tests/application-charm") + await ops_test.model.deploy( + charm, application_name=APPLICATION_NAME, series=CHARM_SERIES + ) + + if wait_for_apps: + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=1000) + + +@pytest.mark.parametrize("process", [PATRONI_PROCESS]) async def test_freeze_db_process( - ops_test: OpsTest, process: str, continuous_writes, master_start_timeout + ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout ) -> None: # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") - async with ops_test.fast_forward(): - # Verify new writes are continuing by counting the number of writes before and after a - # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): - with attempt: - more_writes = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" + # Wait some time to elect a new primary. + sleep(MEDIAN_ELECTION_TIME * 2) - # Verify that a new primary gets elected (ie old primary is secondary). - new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) - assert new_primary_name != primary_name - - # Un-freeze the old primary. - await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") + async with ops_test.fast_forward(): + try: + await check_writes_are_increasing(ops_test, primary_name) + + # Verify that a new primary gets elected (ie old primary is secondary). + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) + assert new_primary_name != primary_name + finally: + # Un-freeze the old primary. + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + use_ssh = (attempt.retry_state.attempt_number % 2) == 0 + logger.info(f"unfreezing {process}") + await send_signal_to_process( + ops_test, primary_name, process, "SIGCONT", use_ssh + ) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) @@ -74,11 +113,7 @@ async def test_freeze_db_process( assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." # Verify that no writes to the database were missed after stopping the writes. - total_expected_writes = await stop_continuous_writes(ops_test) - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - actual_writes = await count_writes(ops_test) - assert total_expected_writes == actual_writes, "writes to the db were missed." + total_expected_writes = await check_writes(ops_test) # Verify that old primary is up-to-date. assert await secondary_up_to_date( @@ -86,26 +121,31 @@ async def test_freeze_db_process( ), "secondary not up to date with the cluster after restarting." -@pytest.mark.unstable @pytest.mark.parametrize("process", DB_PROCESSES) async def test_restart_db_process( - ops_test: OpsTest, process: str, continuous_writes, master_start_timeout + ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout ) -> None: + # Set signal based on the process + if process == PATRONI_PROCESS: + signal = "SIGTERM" + else: + signal = "SIGINT" + # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + # Restart the database process. - await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") + await send_signal_to_process(ops_test, primary_name, process, signal) + + # Wait some time to elect a new primary. + sleep(MEDIAN_ELECTION_TIME * 2) async with ops_test.fast_forward(): - # Verify new writes are continuing by counting the number of writes before and after a - # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): - with attempt: - more_writes = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" + await check_writes_are_increasing(ops_test, primary_name) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) @@ -128,11 +168,7 @@ async def test_restart_db_process( assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." # Verify that no writes to the database were missed after stopping the writes. - total_expected_writes = await stop_continuous_writes(ops_test) - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - actual_writes = await count_writes(ops_test) - assert total_expected_writes == actual_writes, "writes to the db were missed." + total_expected_writes = await check_writes(ops_test) # Verify that old primary is up-to-date. assert await secondary_up_to_date( diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 004d917de8..804ce2a416 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -446,6 +446,7 @@ def get_expected_k8s_resources(namespace: str, application: str) -> set: resources.update( [ f"Endpoints/patroni-{application}-config", + f"Endpoints/patroni-{application}-sync", f"Endpoints/patroni-{application}", f"Endpoints/{application}", f"Endpoints/{application}-primary", @@ -587,7 +588,7 @@ async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: """ application = old_primary.split("/")[0] primary = await get_primary(ops_test, application, down_unit=old_primary) - return primary != old_primary + return primary != old_primary and primary != "None" async def restart_patroni(ops_test: OpsTest, unit_name: str) -> None: diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index 018a6fc5d4..2bb020f355 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -1,7 +1,10 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import logging + import pytest as pytest +import requests from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_exponential @@ -22,6 +25,8 @@ run_command_on_unit, ) +logger = logging.getLogger(__name__) + MATTERMOST_APP_NAME = "mattermost" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" APPLICATION_UNITS = 2 @@ -57,13 +62,15 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: # Test TLS being used by pg_rewind. To accomplish that, get the primary unit # and a replica that will be promoted to primary (this should trigger a rewind - # operation when the old primary is started again). + # operation when the old primary is started again). 'verify=False' is used here + # because the unit IP that is used in the test doesn't match the certificate + # hostname (that is a k8s hostname). primary = await get_primary(ops_test) - replica = [ - unit.name - for unit in ops_test.model.applications[DATABASE_APP_NAME].units - if unit.name != primary - ][0] + primary_address = await get_unit_address(ops_test, primary) + cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False) + for member in cluster_info.json()["members"]: + if member["role"] == "replica": + replica = "/".join(member["name"].rsplit("-", 1)) # Enable additional logs on the PostgreSQL instance to check TLS # being used in a later step. @@ -103,17 +110,28 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: connection.close() # Stop the initial primary. + logger.info(f"stopping database on {primary}") await run_command_on_unit(ops_test, primary, "/charm/bin/pebble stop postgresql") # Check that the primary changed. assert await primary_changed(ops_test, primary), "primary not changed" # Restart the initial primary and check the logs to ensure TLS is being used by pg_rewind. + logger.info(f"starting database on {primary}") await run_command_on_unit(ops_test, primary, "/charm/bin/pebble start postgresql") - logs = await run_command_on_unit(ops_test, replica, "/charm/bin/pebble logs") - assert ( - "connection authorized: user=rewind database=postgres SSL enabled" in logs - ), "TLS is not being used on pg_rewind connections" + for attempt in Retrying( + stop=stop_after_delay(60 * 3), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + logger.info(f"checking if pg_rewind used TLS on {replica}") + logs = await run_command_on_unit( + ops_test, + replica, + "grep rewind /var/log/postgresql/postgresql.log", + ) + assert ( + "connection authorized: user=rewind database=postgres SSL enabled" in logs + ), "TLS is not being used on pg_rewind connections" # Deploy and check Mattermost user and database existence. relation_id = await deploy_and_relate_application_with_postgresql( diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 81979aedf4..4c7a72073a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -56,20 +56,10 @@ def setUp(self): self.rel_id = self.harness.add_relation(self._peer_relation, self.charm.app.name) - @patch_network_get(private_address="1.1.1.1") - @patch("charm.Patroni.render_postgresql_conf_file") - def test_on_install( - self, - _render_postgresql_conf_file, - ): - self.charm.on.install.emit() - _render_postgresql_conf_file.assert_called_once() - @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._patch_pod_labels") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_on_leader_elected(self, _, __, _render_postgresql_conf_file, ___): + def test_on_leader_elected(self, _, __, ___): # Assert that there is no password in the peer relation. self.assertIsNone(self.charm._peers.data[self.charm.app].get("postgres-password", None)) self.assertIsNone(self.charm._peers.data[self.charm.app].get("replication-password", None)) @@ -83,7 +73,6 @@ def test_on_leader_elected(self, _, __, _render_postgresql_conf_file, ___): "replication-password", None ) self.assertIsNotNone(replication_password) - _render_postgresql_conf_file.assert_called_once() # Trigger a new leader election and check that the password is still the same. self.harness.set_leader(False) @@ -154,6 +143,7 @@ def test_on_postgresql_pebble_ready( expected = self.charm._postgresql_layer().to_dict() expected.pop("summary", "") expected.pop("description", "") + expected.pop("checks", "") # Check the plan is as expected. self.assertEqual(plan.to_dict(), expected) self.assertEqual(self.harness.model.unit.status, ActiveStatus()) @@ -416,10 +406,9 @@ def test_patch_pod_labels(self, _client): ) @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._patch_pod_labels") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_postgresql_layer(self, _, __, ___, ____): + def test_postgresql_layer(self, _, __, ___): # Test with the already generated password. self.harness.set_leader() plan = self.charm._postgresql_layer().to_dict() @@ -453,13 +442,21 @@ def test_postgresql_layer(self, _, __, ___, ____): "group": "postgres", }, }, + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": "http://postgresql-k8s-0.postgresql-k8s-endpoints:8008/health", + }, + } + }, } self.assertDictEqual(plan, expected) @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_get_secret(self, _, __, ___): + def test_get_secret(self, _, __): self.harness.set_leader() # Test application scope. @@ -477,9 +474,8 @@ def test_get_secret(self, _, __, ___): assert self.charm.get_secret("unit", "password") == "test-password" @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_set_secret(self, _, __, ___): + def test_set_secret(self, _, __): self.harness.set_leader() # Test application scope. diff --git a/tests/unit/test_patroni.py b/tests/unit/test_patroni.py index 3d57c58db1..52b80a9cca 100644 --- a/tests/unit/test_patroni.py +++ b/tests/unit/test_patroni.py @@ -105,6 +105,7 @@ def test_render_patroni_yml_file(self, _render_file, _rock_postgresql_version): replication_password=self.patroni._replication_password, rewind_user=REWIND_USER, rewind_password=self.patroni._rewind_password, + minority_count=self.patroni._members_count // 2, version="14", ) @@ -139,6 +140,7 @@ def test_render_patroni_yml_file(self, _render_file, _rock_postgresql_version): replication_password=self.patroni._replication_password, rewind_user=REWIND_USER, rewind_password=self.patroni._rewind_password, + minority_count=self.patroni._members_count // 2, version="14", ) self.assertNotEqual(expected_content_with_tls, expected_content) @@ -164,66 +166,6 @@ def test_render_patroni_yml_file(self, _render_file, _rock_postgresql_version): ) self.assertIn("ssl_key_file: /var/lib/postgresql/data/key.pem", expected_content_with_tls) - @patch("charm.Patroni._render_file") - def test_render_postgresql_conf_file(self, _render_file): - # Get the expected content from a file. - with open("templates/postgresql.conf.j2") as file: - template = Template(file.read()) - expected_content = template.render( - logging_collector="on", - synchronous_commit="off", - synchronous_standby_names="*", - ) - - # Setup a mock for the `open` method, set returned data to postgresql.conf template. - with open("templates/postgresql.conf.j2", "r") as f: - mock = mock_open(read_data=f.read()) - - # Patch the `open` method with our mock. - with patch("builtins.open", mock, create=True): - # Call the method - self.patroni.render_postgresql_conf_file() - - # Check the template is opened read-only in the call to open. - self.assertEqual(mock.call_args_list[0][0], ("templates/postgresql.conf.j2", "r")) - # Ensure the correct rendered template is sent to _render_file method. - _render_file.assert_called_once_with( - f"{STORAGE_PATH}/postgresql-k8s-operator.conf", - expected_content, - 0o644, - ) - - # Also test with multiple planned units (synchronous_commit is turned on). - self.patroni = Patroni( - self.charm, - "postgresql-k8s-0", - ["postgresql-k8s-0", "postgresql-k8s-1"], - "postgresql-k8s-primary.dev.svc.cluster.local", - "test-model", - STORAGE_PATH, - "superuser-password", - "replication-password", - "rewind-password", - False, - ) - expected_content = template.render( - logging_collector="on", - synchronous_commit="on", - synchronous_standby_names="*", - ) - - # Patch the `open` method with our mock. - with patch("builtins.open", mock, create=True): - # Call the method - self.patroni.render_postgresql_conf_file() - - # Ensure the correct rendered template is sent to _render_file method. - _render_file.assert_called_with( - f"{STORAGE_PATH}/postgresql-k8s-operator.conf", - expected_content, - 0o644, - ) - @patch("requests.get") def test_primary_endpoint_ready(self, _get): # Test with an issue when trying to connect to the Patroni API.