From 79daddd59d89054c0c5e9f3920d4a80899359216 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 7 Oct 2022 14:49:46 -0300 Subject: [PATCH 01/12] Add alternative servers for primary and members retrieval --- src/cluster.py | 44 ++++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index e8f4891187..4b1e90845c 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -163,12 +163,15 @@ def get_member_ip(self, member_name: str) -> str: """ ip = None # Request info from cluster endpoint (which returns all members of the cluster). - cluster_status = requests.get(f"{self._patroni_url}/cluster", verify=self.verify) - for member in cluster_status.json()["members"]: - if member["name"] == member_name: - ip = member["host"] - break - return ip + for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_server_url(attempt) + cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10) + for member in cluster_status.json()["members"]: + if member["name"] == member_name: + ip = member["host"] + break + return ip def get_primary(self, unit_name_pattern=False) -> str: """Get primary instance. @@ -181,16 +184,29 @@ def get_primary(self, unit_name_pattern=False) -> str: """ primary = None # Request info from cluster endpoint (which returns all members of the cluster). - cluster_status = requests.get(f"{self._patroni_url}/cluster", verify=self.verify) - for member in cluster_status.json()["members"]: - if member["role"] == "leader": - primary = member["name"] - if unit_name_pattern: - # Change the last dash to / in order to match unit name pattern. - primary = "/".join(primary.rsplit("-", 1)) - break + for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): + with attempt: + url = self._get_alternative_server_url(attempt) + cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10) + for member in cluster_status.json()["members"]: + if member["role"] == "leader": + primary = member["name"] + if unit_name_pattern: + # Change the last dash to / in order to match unit name pattern. + primary = "/".join(primary.rsplit("-", 1)) + break return primary + def _get_alternative_server_url(self, attempt) -> str: + if attempt.retry_state.attempt_number > 1: + url = self._patroni_url.replace( + self.unit_ip, list(self.peers_ips)[attempt.retry_state.attempt_number - 2] + ) + else: + url = self._patroni_url + logger.warning(f"url for get primary: {url}") + return url + def are_all_members_ready(self) -> bool: """Check if all members are correctly running Patroni and PostgreSQL. From 7811a0ba7bbd766407f9ddb03445bccd751640b6 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 7 Oct 2022 18:17:20 -0300 Subject: [PATCH 02/12] Test working --- src/relations/postgresql_provider.py | 3 + .../ha_tests/application-charm/src/charm.py | 36 ++++++++ .../src/continuous_writes.py | 42 +++++++++- tests/integration/ha_tests/conftest.py | 12 +-- tests/integration/ha_tests/helpers.py | 55 ++++++++++--- .../integration/ha_tests/test_self_healing.py | 82 ++++++++++++++++++- 6 files changed, 208 insertions(+), 22 deletions(-) diff --git a/src/relations/postgresql_provider.py b/src/relations/postgresql_provider.py index 959cb7b794..1cde10c4bd 100644 --- a/src/relations/postgresql_provider.py +++ b/src/relations/postgresql_provider.py @@ -179,9 +179,11 @@ def oversee_users(self) -> None: def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None: """Set the read/write and read-only endpoints.""" + logger.error("update_endpoints before leader check") if not self.charm.unit.is_leader(): return + logger.error("update_endpoints after leader check") # Get the current relation or all the relations # if this is triggered by another type of event. relations = [event.relation] if event else self.model.relations[self.relation_name] @@ -194,6 +196,7 @@ def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None: else "" ) + logger.error(f"update_endpoints relations: {relations}") for relation in relations: # Set the read/write endpoint. self.database_provides.set_endpoints( diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index 6540091eb6..9a5a0c463d 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -9,6 +9,8 @@ """ import logging +import os +import signal import subprocess from typing import Optional @@ -74,15 +76,24 @@ def _on_start(self, _) -> None: def _on_database_created(self, _) -> None: """Event triggered when a database was created for this application.""" + logger.error( + "--------------------------------- _on_database_created ---------------------------------" + ) self._start_continuous_writes(1) def _on_endpoints_changed(self, _) -> None: """Event triggered when the read/write endpoints of the database change.""" + logger.error( + "--------------------------------- _on_endpoints_changed ---------------------------------" + ) count = self._count_writes() self._start_continuous_writes(count + 1) def _count_writes(self) -> int: """Count the number of records in the continuous_writes table.""" + logger.error( + "--------------------------------- _count_writes ---------------------------------" + ) with psycopg2.connect( self._connection_string ) as connection, connection.cursor() as cursor: @@ -93,6 +104,9 @@ def _count_writes(self) -> int: def _on_clear_continuous_writes_action(self, _) -> None: """Clears database writes.""" + logger.error( + "--------------------------------- _on_clear_continuous_writes_action ---------------------------------" + ) self._stop_continuous_writes() with psycopg2.connect( self._connection_string @@ -101,15 +115,24 @@ def _on_clear_continuous_writes_action(self, _) -> None: connection.close() def _on_start_continuous_writes_action(self, _) -> None: + logger.error( + "--------------------------------- _on_start_continuous_writes_action ---------------------------------" + ) """Start the continuous writes process.""" self._start_continuous_writes(1) def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None: """Stops the continuous writes process.""" + logger.error( + "--------------------------------- _on_stop_continuous_writes_action ---------------------------------" + ) writes = self._stop_continuous_writes() event.set_results({"writes": writes}) def _start_continuous_writes(self, starting_number: int) -> None: + logger.error( + "--------------------------------- _start_continuous_writes ---------------------------------" + ) """Starts continuous writes to PostgreSQL instance.""" if self._connection_string is None: return @@ -134,12 +157,22 @@ def _stop_continuous_writes(self) -> Optional[int]: """Stops continuous writes to PostgreSQL and returns the last written value.""" if self._stored.continuous_writes_pid is None: return None + logger.error( + "--------------------------------- _stop_continuous_writes ---------------------------------" + ) # Stop the process. proc = subprocess.Popen(["pkill", "--signal", "SIGKILL", "-f", "src/continuous_writes.py"]) + # os.kill(self._stored.continuous_writes_pid, signal.SIGINT) + logger.error( + "--------------------------------- after kill ---------------------------------" + ) # Wait for process to be killed. proc.communicate() + logger.error( + "--------------------------------- after communicate ---------------------------------" + ) self._stored.continuous_writes_pid = None @@ -156,6 +189,9 @@ def _stop_continuous_writes(self) -> Optional[int]: except RetryError as e: logger.exception(e) return -1 + logger.error( + "--------------------------------- after retry ---------------------------------" + ) return last_written_value diff --git a/tests/integration/ha_tests/application-charm/src/continuous_writes.py b/tests/integration/ha_tests/application-charm/src/continuous_writes.py index 83ab585749..e0c0377234 100644 --- a/tests/integration/ha_tests/application-charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application-charm/src/continuous_writes.py @@ -2,10 +2,13 @@ # See LICENSE file for licensing details. """This file is meant to run in the background continuously writing entries to PostgreSQL.""" +import logging import sys import psycopg2 as psycopg2 +logger = logging.getLogger(__name__) + def continuous_writes(connection_string: str, starting_number: int): """Continuously writes data do PostgreSQL database. @@ -30,6 +33,11 @@ def continuous_writes(connection_string: str, starting_number: int): # Continuously write the record to the database (incrementing it at each iteration). while True: + logger.error("starting loop again...") + f = open("/tmp/demofile0.txt", "a") + f.write(str(write_value)) + f.write("\n\n") + f.close() try: with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: connection.autocommit = True @@ -38,19 +46,49 @@ def continuous_writes(connection_string: str, starting_number: int): psycopg2.InterfaceError, psycopg2.OperationalError, psycopg2.errors.ReadOnlySqlTransaction, - ): + ) as e: # We should not raise any of those exceptions that can happen when a connection failure # happens, for example, when a primary is being reelected after a failure on the old # primary. + f = open("/tmp/demofile1.txt", "a") + f.write(str(write_value)) + f.write("\n") + f.write(str(e)) + f.write("\n") + f.write(str(type(e))) + f.write("\n\n") + f.close() continue - except psycopg2.Error: + except psycopg2.Error as e: # If another error happens, like writing a duplicate number when a connection failed # in a previous iteration (but the transaction was already committed), just increment # the number. + f = open("/tmp/demofile2.txt", "a") + f.write(str(write_value)) + f.write("\n") + f.write(str(e)) + f.write("\n") + f.write(str(type(e))) + f.write("\n\n") + f.close() + pass + except Exception as e: + f = open("/tmp/demofile3.txt", "a") + f.write(str(write_value)) + f.write("\n") + f.write(str(e)) + f.write("\n") + f.write(str(type(e))) + f.write("\n\n") + f.close() pass finally: connection.close() + f = open("/tmp/demofile4.txt", "a") + f.write(str(write_value)) + f.write("\n\n") + f.close() write_value += 1 diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 15d909c27d..87e5fa17be 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -24,12 +24,12 @@ async def continuous_writes(ops_test: OpsTest) -> None: await ops_test.model.wait_for_idle(status="active", timeout=1000) yield # Clear the written data at the end. - action = ( - await ops_test.model.applications[APPLICATION_NAME] - .units[0] - .run_action("clear-continuous-writes") - ) - await action.wait() + # action = ( + # await ops_test.model.applications[APPLICATION_NAME] + # .units[0] + # .run_action("clear-continuous-writes") + # ) + # await action.wait() @pytest.fixture() diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 239d1886e6..f1a63c88c1 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,5 +1,6 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import asyncio from pathlib import Path from typing import Optional @@ -7,7 +8,13 @@ import requests import yaml from pytest_operator.plugin import OpsTest -from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed +from tenacity import ( + RetryError, + Retrying, + stop_after_attempt, + stop_after_delay, + wait_fixed, +) from tests.integration.helpers import get_unit_address @@ -57,23 +64,40 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) async def count_writes(ops_test: OpsTest) -> int: """Count the number of writes in the database.""" app = await app_name(ops_test) + print(11) password = await get_password(ops_test, app) - host = ops_test.model.applications[app].units[0].public_address - connection_string = ( - f"dbname='application' user='operator'" - f" host='{host}' password='{password}' connect_timeout=10" - ) + print(12) try: - for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): + for attempt in Retrying( + stop=stop_after_attempt(len(ops_test.model.applications[app].units)) + ): with attempt: + host = ( + ops_test.model.applications[app] + .units[attempt.retry_state.attempt_number - 1] + .public_address + ) + print(13) + connection_string = ( + f"dbname='application' user='operator'" + f" host='{host}' password='{password}' connect_timeout=10" + ) + print(14) with psycopg2.connect( connection_string ) as connection, connection.cursor() as cursor: + print(15) cursor.execute("SELECT COUNT(number) FROM continuous_writes;") + print(16) count = cursor.fetchone()[0] + print(17) connection.close() + print(18) except RetryError: return -1 + except Exception as e: + print(str(e)) + return -1 return count @@ -103,10 +127,15 @@ async def get_password(ops_test: OpsTest, app) -> str: string with the password stored on the peer relation databag. """ # Can retrieve from any unit running unit, so we pick the first. - unit_name = ops_test.model.applications[app].units[0].name - action = await ops_test.model.units.get(unit_name).run_action("get-password") - action = await action.wait() - return action.results["operator-password"] + for attempt in Retrying(stop=stop_after_attempt(len(ops_test.model.applications[app].units))): + with attempt: + unit_name = ( + ops_test.model.applications[app].units[attempt.retry_state.attempt_number - 1].name + ) + print(f"unit_name: {unit_name}") + action = await ops_test.model.units.get(unit_name).run_action("get-password") + action = await asyncio.wait_for(action.wait(), 10) + return action.results["operator-password"] async def get_primary(ops_test: OpsTest, app) -> str: @@ -122,7 +151,9 @@ async def get_primary(ops_test: OpsTest, app) -> str: return action.results["primary"] -async def kill_process(ops_test: OpsTest, unit_name: str, process: str, kill_code: str) -> None: +async def send_signal_to_process( + ops_test: OpsTest, unit_name: str, process: str, kill_code: str +) -> None: """Kills process on the unit according to the provided kill code.""" # Killing the only instance can be disastrous. app = await app_name(ops_test) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 90dbbbd9f8..e1ba377f4b 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +from time import sleep import pytest from pytest_operator.plugin import OpsTest @@ -11,9 +12,9 @@ app_name, count_writes, get_primary, - kill_process, postgresql_ready, secondary_up_to_date, + send_signal_to_process, start_continuous_writes, stop_continuous_writes, ) @@ -51,35 +52,112 @@ async def test_kill_db_process( # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) + print(1) # Kill the database process. - await kill_process(ops_test, primary_name, process, kill_code="SIGKILL") + await send_signal_to_process(ops_test, primary_name, process, kill_code="SIGKILL") + print(2) async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a # 60 seconds wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) + print(3) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) + print(4) assert more_writes > writes, "writes not continuing to DB" + print(4.1) + print(4.2) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) + print(5) # Verify that a new primary gets elected (ie old primary is secondary). new_primary_name = await get_primary(ops_test, app) + print(6) assert new_primary_name != primary_name + print(7) # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) + print(8) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: actual_writes = await count_writes(ops_test) + print(9) assert total_expected_writes == actual_writes, "writes to the db were missed." + print(9.1) # Verify that old primary is up-to-date. + print(9.2) assert await secondary_up_to_date( ops_test, primary_name, total_expected_writes ), "secondary not up to date with the cluster after restarting." + print(10) + + +@pytest.mark.ha_self_healing_tests +@pytest.mark.parametrize("process", DB_PROCESSES) +async def test_freeze_db_process( + ops_test: OpsTest, process: str, continuous_writes, master_start_timeout +) -> None: + # Locate primary unit. + app = await app_name(ops_test) + primary_name = await get_primary(ops_test, app) + print(1) + + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + print(2) + + # Freeze the database process. + await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") + print(3) + sleep(30) + + async with ops_test.fast_forward(): + # Verify new writes are continuing by counting the number of writes before and after a + # 60 seconds wait (this is a little more than the loop wait configuration, that is + # considered to trigger a fail-over after master_start_timeout is changed). + writes = await count_writes(ops_test) + print(4) + for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): + with attempt: + more_writes = await count_writes(ops_test) + print(4.1) + assert more_writes > writes, "writes not continuing to DB" + + # Un-freeze the old primary. + print(4.2) + await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") + print(5) + + # Verify that the database service got restarted and is ready in the old primary. + assert await postgresql_ready(ops_test, primary_name) + print(6) + + # Verify that a new primary gets elected (ie old primary is secondary). + new_primary_name = await get_primary(ops_test, app) + print(7) + assert new_primary_name != primary_name + print(8) + + # Verify that no writes to the database were missed after stopping the writes. + total_expected_writes = await stop_continuous_writes(ops_test) + print(9) + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + actual_writes = await count_writes(ops_test) + print(9.1) + assert total_expected_writes == actual_writes, "writes to the db were missed." + + print(9.2) + # Verify that old primary is up-to-date. + assert await secondary_up_to_date( + ops_test, primary_name, total_expected_writes + ), "secondary not up to date with the cluster after restarting." + print(10) From a4b76c86209c08bde1a73e647c280ced8410d9b9 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Sun, 9 Oct 2022 11:26:28 -0300 Subject: [PATCH 03/12] Test working --- tests/integration/ha_tests/application-charm/src/charm.py | 2 -- tests/integration/ha_tests/helpers.py | 2 +- tests/integration/ha_tests/test_self_healing.py | 6 +++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index 9a5a0c463d..214433fd06 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -9,8 +9,6 @@ """ import logging -import os -import signal import subprocess from typing import Optional diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index f1a63c88c1..7cc6971eb6 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -134,7 +134,7 @@ async def get_password(ops_test: OpsTest, app) -> str: ) print(f"unit_name: {unit_name}") action = await ops_test.model.units.get(unit_name).run_action("get-password") - action = await asyncio.wait_for(action.wait(), 10) + action = await asyncio.wait_for(action.wait(), 30) return action.results["operator-password"] diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index e1ba377f4b..cebcbf7090 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -64,7 +64,7 @@ async def test_kill_db_process( # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) print(3) - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) print(4) @@ -121,11 +121,11 @@ async def test_freeze_db_process( async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a - # 60 seconds wait (this is a little more than the loop wait configuration, that is + # 3 minutes wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) print(4) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) print(4.1) From ba63682273a51717c1b07a4c3f23683daf8a0c55 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Oct 2022 11:11:25 -0300 Subject: [PATCH 04/12] Cleanup the code --- tests/integration/ha_tests/helpers.py | 12 -------- .../integration/ha_tests/test_self_healing.py | 28 ------------------- 2 files changed, 40 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 7cc6971eb6..94fdde90cf 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -64,9 +64,7 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) async def count_writes(ops_test: OpsTest) -> int: """Count the number of writes in the database.""" app = await app_name(ops_test) - print(11) password = await get_password(ops_test, app) - print(12) try: for attempt in Retrying( stop=stop_after_attempt(len(ops_test.model.applications[app].units)) @@ -77,27 +75,18 @@ async def count_writes(ops_test: OpsTest) -> int: .units[attempt.retry_state.attempt_number - 1] .public_address ) - print(13) connection_string = ( f"dbname='application' user='operator'" f" host='{host}' password='{password}' connect_timeout=10" ) - print(14) with psycopg2.connect( connection_string ) as connection, connection.cursor() as cursor: - print(15) cursor.execute("SELECT COUNT(number) FROM continuous_writes;") - print(16) count = cursor.fetchone()[0] - print(17) connection.close() - print(18) except RetryError: return -1 - except Exception as e: - print(str(e)) - return -1 return count @@ -132,7 +121,6 @@ async def get_password(ops_test: OpsTest, app) -> str: unit_name = ( ops_test.model.applications[app].units[attempt.retry_state.attempt_number - 1].name ) - print(f"unit_name: {unit_name}") action = await ops_test.model.units.get(unit_name).run_action("get-password") action = await asyncio.wait_for(action.wait(), 30) return action.results["operator-password"] diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index cebcbf7090..a2209beb9f 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -52,52 +52,38 @@ async def test_kill_db_process( # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) - print(1) # Kill the database process. await send_signal_to_process(ops_test, primary_name, process, kill_code="SIGKILL") - print(2) async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a # 60 seconds wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) - print(3) for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) - print(4) assert more_writes > writes, "writes not continuing to DB" - print(4.1) - print(4.2) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) - print(5) # Verify that a new primary gets elected (ie old primary is secondary). new_primary_name = await get_primary(ops_test, app) - print(6) assert new_primary_name != primary_name - print(7) # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) - print(8) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: actual_writes = await count_writes(ops_test) - print(9) assert total_expected_writes == actual_writes, "writes to the db were missed." - print(9.1) # Verify that old primary is up-to-date. - print(9.2) assert await secondary_up_to_date( ops_test, primary_name, total_expected_writes ), "secondary not up to date with the cluster after restarting." - print(10) @pytest.mark.ha_self_healing_tests @@ -108,15 +94,12 @@ async def test_freeze_db_process( # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) - print(1) # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) - print(2) # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") - print(3) sleep(30) async with ops_test.fast_forward(): @@ -124,40 +107,29 @@ async def test_freeze_db_process( # 3 minutes wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) - print(4) for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) - print(4.1) assert more_writes > writes, "writes not continuing to DB" # Un-freeze the old primary. - print(4.2) await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") - print(5) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) - print(6) # Verify that a new primary gets elected (ie old primary is secondary). new_primary_name = await get_primary(ops_test, app) - print(7) assert new_primary_name != primary_name - print(8) # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) - print(9) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: actual_writes = await count_writes(ops_test) - print(9.1) assert total_expected_writes == actual_writes, "writes to the db were missed." - print(9.2) # Verify that old primary is up-to-date. assert await secondary_up_to_date( ops_test, primary_name, total_expected_writes ), "secondary not up to date with the cluster after restarting." - print(10) From b48a3bdef2a4a24b96b804d9c0863d194dd3d690 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Oct 2022 12:52:58 -0300 Subject: [PATCH 05/12] More cleanup --- src/relations/postgresql_provider.py | 3 -- .../ha_tests/application-charm/src/charm.py | 33 --------------- .../src/continuous_writes.py | 42 +------------------ tests/integration/ha_tests/conftest.py | 12 +++--- 4 files changed, 8 insertions(+), 82 deletions(-) diff --git a/src/relations/postgresql_provider.py b/src/relations/postgresql_provider.py index 1cde10c4bd..959cb7b794 100644 --- a/src/relations/postgresql_provider.py +++ b/src/relations/postgresql_provider.py @@ -179,11 +179,9 @@ def oversee_users(self) -> None: def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None: """Set the read/write and read-only endpoints.""" - logger.error("update_endpoints before leader check") if not self.charm.unit.is_leader(): return - logger.error("update_endpoints after leader check") # Get the current relation or all the relations # if this is triggered by another type of event. relations = [event.relation] if event else self.model.relations[self.relation_name] @@ -196,7 +194,6 @@ def update_endpoints(self, event: DatabaseRequestedEvent = None) -> None: else "" ) - logger.error(f"update_endpoints relations: {relations}") for relation in relations: # Set the read/write endpoint. self.database_provides.set_endpoints( diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index 214433fd06..2db74fbd0f 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -74,24 +74,15 @@ def _on_start(self, _) -> None: def _on_database_created(self, _) -> None: """Event triggered when a database was created for this application.""" - logger.error( - "--------------------------------- _on_database_created ---------------------------------" - ) self._start_continuous_writes(1) def _on_endpoints_changed(self, _) -> None: """Event triggered when the read/write endpoints of the database change.""" - logger.error( - "--------------------------------- _on_endpoints_changed ---------------------------------" - ) count = self._count_writes() self._start_continuous_writes(count + 1) def _count_writes(self) -> int: """Count the number of records in the continuous_writes table.""" - logger.error( - "--------------------------------- _count_writes ---------------------------------" - ) with psycopg2.connect( self._connection_string ) as connection, connection.cursor() as cursor: @@ -102,9 +93,6 @@ def _count_writes(self) -> int: def _on_clear_continuous_writes_action(self, _) -> None: """Clears database writes.""" - logger.error( - "--------------------------------- _on_clear_continuous_writes_action ---------------------------------" - ) self._stop_continuous_writes() with psycopg2.connect( self._connection_string @@ -113,24 +101,15 @@ def _on_clear_continuous_writes_action(self, _) -> None: connection.close() def _on_start_continuous_writes_action(self, _) -> None: - logger.error( - "--------------------------------- _on_start_continuous_writes_action ---------------------------------" - ) """Start the continuous writes process.""" self._start_continuous_writes(1) def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None: """Stops the continuous writes process.""" - logger.error( - "--------------------------------- _on_stop_continuous_writes_action ---------------------------------" - ) writes = self._stop_continuous_writes() event.set_results({"writes": writes}) def _start_continuous_writes(self, starting_number: int) -> None: - logger.error( - "--------------------------------- _start_continuous_writes ---------------------------------" - ) """Starts continuous writes to PostgreSQL instance.""" if self._connection_string is None: return @@ -155,22 +134,13 @@ def _stop_continuous_writes(self) -> Optional[int]: """Stops continuous writes to PostgreSQL and returns the last written value.""" if self._stored.continuous_writes_pid is None: return None - logger.error( - "--------------------------------- _stop_continuous_writes ---------------------------------" - ) # Stop the process. proc = subprocess.Popen(["pkill", "--signal", "SIGKILL", "-f", "src/continuous_writes.py"]) # os.kill(self._stored.continuous_writes_pid, signal.SIGINT) - logger.error( - "--------------------------------- after kill ---------------------------------" - ) # Wait for process to be killed. proc.communicate() - logger.error( - "--------------------------------- after communicate ---------------------------------" - ) self._stored.continuous_writes_pid = None @@ -187,9 +157,6 @@ def _stop_continuous_writes(self) -> Optional[int]: except RetryError as e: logger.exception(e) return -1 - logger.error( - "--------------------------------- after retry ---------------------------------" - ) return last_written_value diff --git a/tests/integration/ha_tests/application-charm/src/continuous_writes.py b/tests/integration/ha_tests/application-charm/src/continuous_writes.py index e0c0377234..83ab585749 100644 --- a/tests/integration/ha_tests/application-charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application-charm/src/continuous_writes.py @@ -2,13 +2,10 @@ # See LICENSE file for licensing details. """This file is meant to run in the background continuously writing entries to PostgreSQL.""" -import logging import sys import psycopg2 as psycopg2 -logger = logging.getLogger(__name__) - def continuous_writes(connection_string: str, starting_number: int): """Continuously writes data do PostgreSQL database. @@ -33,11 +30,6 @@ def continuous_writes(connection_string: str, starting_number: int): # Continuously write the record to the database (incrementing it at each iteration). while True: - logger.error("starting loop again...") - f = open("/tmp/demofile0.txt", "a") - f.write(str(write_value)) - f.write("\n\n") - f.close() try: with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: connection.autocommit = True @@ -46,49 +38,19 @@ def continuous_writes(connection_string: str, starting_number: int): psycopg2.InterfaceError, psycopg2.OperationalError, psycopg2.errors.ReadOnlySqlTransaction, - ) as e: + ): # We should not raise any of those exceptions that can happen when a connection failure # happens, for example, when a primary is being reelected after a failure on the old # primary. - f = open("/tmp/demofile1.txt", "a") - f.write(str(write_value)) - f.write("\n") - f.write(str(e)) - f.write("\n") - f.write(str(type(e))) - f.write("\n\n") - f.close() continue - except psycopg2.Error as e: + except psycopg2.Error: # If another error happens, like writing a duplicate number when a connection failed # in a previous iteration (but the transaction was already committed), just increment # the number. - f = open("/tmp/demofile2.txt", "a") - f.write(str(write_value)) - f.write("\n") - f.write(str(e)) - f.write("\n") - f.write(str(type(e))) - f.write("\n\n") - f.close() - pass - except Exception as e: - f = open("/tmp/demofile3.txt", "a") - f.write(str(write_value)) - f.write("\n") - f.write(str(e)) - f.write("\n") - f.write(str(type(e))) - f.write("\n\n") - f.close() pass finally: connection.close() - f = open("/tmp/demofile4.txt", "a") - f.write(str(write_value)) - f.write("\n\n") - f.close() write_value += 1 diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 87e5fa17be..15d909c27d 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -24,12 +24,12 @@ async def continuous_writes(ops_test: OpsTest) -> None: await ops_test.model.wait_for_idle(status="active", timeout=1000) yield # Clear the written data at the end. - # action = ( - # await ops_test.model.applications[APPLICATION_NAME] - # .units[0] - # .run_action("clear-continuous-writes") - # ) - # await action.wait() + action = ( + await ops_test.model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() @pytest.fixture() From 800ad8fb7c1c0489d4fb1e386b709b1bd0f9bf7f Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Oct 2022 16:55:29 -0300 Subject: [PATCH 06/12] Small adjustments --- tests/integration/ha_tests/test_self_healing.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index a2209beb9f..6fc6125a20 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -from time import sleep import pytest from pytest_operator.plugin import OpsTest @@ -61,7 +60,7 @@ async def test_kill_db_process( # 60 seconds wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed). writes = await count_writes(ops_test) - for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): with attempt: more_writes = await count_writes(ops_test) assert more_writes > writes, "writes not continuing to DB" @@ -100,7 +99,6 @@ async def test_freeze_db_process( # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") - sleep(30) async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a @@ -112,16 +110,18 @@ async def test_freeze_db_process( more_writes = await count_writes(ops_test) assert more_writes > writes, "writes not continuing to DB" + # Verify that a new primary gets elected (ie old primary is secondary). + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + new_primary_name = await get_primary(ops_test, app) + assert new_primary_name != primary_name + # Un-freeze the old primary. await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) - # Verify that a new primary gets elected (ie old primary is secondary). - new_primary_name = await get_primary(ops_test, app) - assert new_primary_name != primary_name - # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): From 9ab9bf3d56c71e64d677d809290bb1bc45f5080f Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 10:05:21 -0300 Subject: [PATCH 07/12] Add unit tests --- src/cluster.py | 5 +- tests/unit/test_cluster.py | 99 +++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 4 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index 4b1e90845c..ecd2c9a795 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -19,6 +19,7 @@ ) from jinja2 import Template from tenacity import ( + AttemptManager, RetryError, Retrying, retry, @@ -197,14 +198,14 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary - def _get_alternative_server_url(self, attempt) -> str: + def _get_alternative_server_url(self, attempt: AttemptManager) -> str: + """Get an alternative URL from another member each time.""" if attempt.retry_state.attempt_number > 1: url = self._patroni_url.replace( self.unit_ip, list(self.peers_ips)[attempt.retry_state.attempt_number - 2] ) else: url = self._patroni_url - logger.warning(f"url for get primary: {url}") return url def are_all_members_ready(self) -> bool: diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index 73a0446b38..ca96b82ae1 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -2,8 +2,11 @@ # See LICENSE file for licensing details. import unittest +from unittest import mock from unittest.mock import mock_open, patch +import requests as requests +import tenacity as tenacity from jinja2 import Template from cluster import Patroni @@ -13,10 +16,30 @@ PATRONI_SERVICE = "patroni" +# This method will be used by the mock to replace requests.get +def mocked_requests_get(*args, **kwargs): + class MockResponse: + def __init__(self, json_data): + self.json_data = json_data + + def json(self): + return self.json_data + + data = { + "http://server1/cluster": { + "members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader"}] + }, + } + if args[0] in data: + return MockResponse(data[args[0]]) + + raise requests.exceptions.Timeout() + + class TestCharm(unittest.TestCase): def setUp(self): # Setup a cluster. - self.peers_ips = peers_ips = ["2.2.2.2", "3.3.3.3"] + self.peers_ips = {"2.2.2.2", "3.3.3.3"} self.patroni = Patroni( "1.1.1.1", @@ -24,12 +47,57 @@ def setUp(self): "postgresql", "postgresql-0", 1, - peers_ips, + self.peers_ips, "fake-superuser-password", "fake-replication-password", False, ) + def test_get_alternative_server_url(self): + # Mock tenacity attempt. + retry = tenacity.Retrying() + retry_state = tenacity.RetryCallState(retry, None, None, None) + attempt = tenacity.AttemptManager(retry_state) + + # Test the first URL that is returned (it should have the current unit IP). + url = self.patroni._get_alternative_server_url(attempt) + self.assertEqual(url, f"http://{self.patroni.unit_ip}:8008") + + # Test returning the other servers URLs. + for attempt_number in range( + attempt.retry_state.attempt_number + 1, len(self.peers_ips) + 2 + ): + attempt.retry_state.attempt_number = attempt_number + url = self.patroni._get_alternative_server_url(attempt) + self.assertIn(url.split("http://")[1].split(":8008")[0], self.peers_ips) + + @mock.patch("requests.get", side_effect=mocked_requests_get) + @patch("charm.Patroni._get_alternative_server_url") + def test_get_member_ip(self, _get_alternative_server_url, _get): + # Test error on trying to get the member IP. + _get_alternative_server_url.side_effect = "http://server2" + with self.assertRaises(tenacity.RetryError): + self.patroni.get_member_ip(self.patroni.member_name) + + # Test using an alternative server URL. + _get_alternative_server_url.side_effect = [ + "http://server3", + "http://server2", + "http://server1", + ] + ip = self.patroni.get_member_ip(self.patroni.member_name) + self.assertEqual(ip, "1.1.1.1") + + # Test using the current server URL. + _get_alternative_server_url.side_effect = ["http://server1"] + ip = self.patroni.get_member_ip(self.patroni.member_name) + self.assertEqual(ip, "1.1.1.1") + + # Test when not having that specific member in the cluster. + _get_alternative_server_url.side_effect = ["http://server1"] + ip = self.patroni.get_member_ip("other-member-name") + self.assertIsNone(ip) + @patch("charms.operator_libs_linux.v0.apt.DebianPackage.from_system") def test_get_postgresql_version(self, _from_system): # Mock the package returned by from_system call. @@ -40,6 +108,33 @@ def test_get_postgresql_version(self, _from_system): _from_system.assert_called_once_with("postgresql") self.assertEqual(version, "12") + @mock.patch("requests.get", side_effect=mocked_requests_get) + @patch("charm.Patroni._get_alternative_server_url") + def test_get_primary(self, _get_alternative_server_url, _get): + # Test error on trying to get the member IP. + _get_alternative_server_url.side_effect = "http://server2" + with self.assertRaises(tenacity.RetryError): + self.patroni.get_primary(self.patroni.member_name) + + # Test using an alternative server URL. + _get_alternative_server_url.side_effect = [ + "http://server3", + "http://server2", + "http://server1", + ] + primary = self.patroni.get_primary() + self.assertEqual(primary, "postgresql-0") + + # Test using the current server URL. + _get_alternative_server_url.side_effect = ["http://server1"] + primary = self.patroni.get_primary() + self.assertEqual(primary, "postgresql-0") + + # Test requesting the primary in the unit name pattern. + _get_alternative_server_url.side_effect = ["http://server1"] + primary = self.patroni.get_primary(unit_name_pattern=True) + self.assertEqual(primary, "postgresql/0") + @patch("os.chmod") @patch("os.chown") @patch("pwd.getpwnam") From 1c0d12d3f99fc38c73dffb7ed67559e64d5d2f1b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 10:14:19 -0300 Subject: [PATCH 08/12] Improve comments --- tests/integration/ha_tests/application-charm/src/charm.py | 1 - tests/integration/ha_tests/test_self_healing.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index 2db74fbd0f..6540091eb6 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -137,7 +137,6 @@ def _stop_continuous_writes(self) -> Optional[int]: # Stop the process. proc = subprocess.Popen(["pkill", "--signal", "SIGKILL", "-f", "src/continuous_writes.py"]) - # os.kill(self._stored.continuous_writes_pid, signal.SIGINT) # Wait for process to be killed. proc.communicate() diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 6fc6125a20..974a1a1b48 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -103,7 +103,8 @@ async def test_freeze_db_process( async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a # 3 minutes wait (this is a little more than the loop wait configuration, that is - # considered to trigger a fail-over after master_start_timeout is changed). + # considered to trigger a fail-over after master_start_timeout is changed, and also + # when freezing the DB process it take some more time to trigger the fail-over). writes = await count_writes(ops_test) for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: From ec6705d7949f53737a5c66519211492899d6c594 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 10:52:54 -0300 Subject: [PATCH 09/12] Use down unit --- tests/integration/ha_tests/helpers.py | 51 ++++++++----------- .../integration/ha_tests/test_self_healing.py | 4 +- 2 files changed, 22 insertions(+), 33 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 94fdde90cf..558b17d417 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,6 +1,5 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. -import asyncio from pathlib import Path from typing import Optional @@ -8,13 +7,7 @@ import requests import yaml from pytest_operator.plugin import OpsTest -from tenacity import ( - RetryError, - Retrying, - stop_after_attempt, - stop_after_delay, - wait_fixed, -) +from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed from tests.integration.helpers import get_unit_address @@ -61,24 +54,21 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) ) -async def count_writes(ops_test: OpsTest) -> int: +async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: """Count the number of writes in the database.""" app = await app_name(ops_test) - password = await get_password(ops_test, app) + password = await get_password(ops_test, app, down_unit) + for unit in ops_test.model.applications[app].units: + if unit.name != down_unit: + host = unit.public_address + break + connection_string = ( + f"dbname='application' user='operator'" + f" host='{host}' password='{password}' connect_timeout=10" + ) try: - for attempt in Retrying( - stop=stop_after_attempt(len(ops_test.model.applications[app].units)) - ): + for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): with attempt: - host = ( - ops_test.model.applications[app] - .units[attempt.retry_state.attempt_number - 1] - .public_address - ) - connection_string = ( - f"dbname='application' user='operator'" - f" host='{host}' password='{password}' connect_timeout=10" - ) with psycopg2.connect( connection_string ) as connection, connection.cursor() as cursor: @@ -109,21 +99,20 @@ async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: return int(master_start_timeout) if master_start_timeout is not None else None -async def get_password(ops_test: OpsTest, app) -> str: +async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str: """Use the charm action to retrieve the password from provided application. Returns: string with the password stored on the peer relation databag. """ # Can retrieve from any unit running unit, so we pick the first. - for attempt in Retrying(stop=stop_after_attempt(len(ops_test.model.applications[app].units))): - with attempt: - unit_name = ( - ops_test.model.applications[app].units[attempt.retry_state.attempt_number - 1].name - ) - action = await ops_test.model.units.get(unit_name).run_action("get-password") - action = await asyncio.wait_for(action.wait(), 30) - return action.results["operator-password"] + for unit in ops_test.model.applications[app].units: + if unit.name != down_unit: + unit_name = unit.name + break + action = await ops_test.model.units.get(unit_name).run_action("get-password") + action = await action.wait() + return action.results["operator-password"] async def get_primary(ops_test: OpsTest, app) -> str: diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 974a1a1b48..ad8a109148 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -105,10 +105,10 @@ async def test_freeze_db_process( # 3 minutes wait (this is a little more than the loop wait configuration, that is # considered to trigger a fail-over after master_start_timeout is changed, and also # when freezing the DB process it take some more time to trigger the fail-over). - writes = await count_writes(ops_test) + writes = await count_writes(ops_test, primary_name) for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: - more_writes = await count_writes(ops_test) + more_writes = await count_writes(ops_test, primary_name) assert more_writes > writes, "writes not continuing to DB" # Verify that a new primary gets elected (ie old primary is secondary). From 2108c4aff35ceb5707f9f71c3666d45bd1f0e79c Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 14:32:49 -0300 Subject: [PATCH 10/12] Improve alternative URL description --- src/cluster.py | 12 ++++++++---- tests/unit/test_cluster.py | 38 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index ecd2c9a795..00698a3c0c 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -166,7 +166,7 @@ def get_member_ip(self, member_name: str) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_server_url(attempt) + url = self._get_alternative_patroni_url(attempt) cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10) for member in cluster_status.json()["members"]: if member["name"] == member_name: @@ -187,7 +187,7 @@ def get_primary(self, unit_name_pattern=False) -> str: # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): with attempt: - url = self._get_alternative_server_url(attempt) + url = self._get_alternative_patroni_url(attempt) cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10) for member in cluster_status.json()["members"]: if member["role"] == "leader": @@ -198,8 +198,12 @@ def get_primary(self, unit_name_pattern=False) -> str: break return primary - def _get_alternative_server_url(self, attempt: AttemptManager) -> str: - """Get an alternative URL from another member each time.""" + def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: + """Get an alternative REST API URL from another member each time. + + When the Patroni process is not running in the current unit it's needed + to use a URL from another cluster member REST API to do some operations. + """ if attempt.retry_state.attempt_number > 1: url = self._patroni_url.replace( self.unit_ip, list(self.peers_ips)[attempt.retry_state.attempt_number - 2] diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index ca96b82ae1..19310ae3a4 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -53,14 +53,14 @@ def setUp(self): False, ) - def test_get_alternative_server_url(self): + def test_get_alternative_patroni_url(self): # Mock tenacity attempt. retry = tenacity.Retrying() retry_state = tenacity.RetryCallState(retry, None, None, None) attempt = tenacity.AttemptManager(retry_state) # Test the first URL that is returned (it should have the current unit IP). - url = self.patroni._get_alternative_server_url(attempt) + url = self.patroni._get_alternative_patroni_url(attempt) self.assertEqual(url, f"http://{self.patroni.unit_ip}:8008") # Test returning the other servers URLs. @@ -68,19 +68,19 @@ def test_get_alternative_server_url(self): attempt.retry_state.attempt_number + 1, len(self.peers_ips) + 2 ): attempt.retry_state.attempt_number = attempt_number - url = self.patroni._get_alternative_server_url(attempt) + url = self.patroni._get_alternative_patroni_url(attempt) self.assertIn(url.split("http://")[1].split(":8008")[0], self.peers_ips) @mock.patch("requests.get", side_effect=mocked_requests_get) - @patch("charm.Patroni._get_alternative_server_url") - def test_get_member_ip(self, _get_alternative_server_url, _get): + @patch("charm.Patroni._get_alternative_patroni_url") + def test_get_member_ip(self, _get_alternative_patroni_url, _get): # Test error on trying to get the member IP. - _get_alternative_server_url.side_effect = "http://server2" + _get_alternative_patroni_url.side_effect = "http://server2" with self.assertRaises(tenacity.RetryError): self.patroni.get_member_ip(self.patroni.member_name) - # Test using an alternative server URL. - _get_alternative_server_url.side_effect = [ + # Test using an alternative Patroni URL. + _get_alternative_patroni_url.side_effect = [ "http://server3", "http://server2", "http://server1", @@ -88,13 +88,13 @@ def test_get_member_ip(self, _get_alternative_server_url, _get): ip = self.patroni.get_member_ip(self.patroni.member_name) self.assertEqual(ip, "1.1.1.1") - # Test using the current server URL. - _get_alternative_server_url.side_effect = ["http://server1"] + # Test using the current Patroni URL. + _get_alternative_patroni_url.side_effect = ["http://server1"] ip = self.patroni.get_member_ip(self.patroni.member_name) self.assertEqual(ip, "1.1.1.1") # Test when not having that specific member in the cluster. - _get_alternative_server_url.side_effect = ["http://server1"] + _get_alternative_patroni_url.side_effect = ["http://server1"] ip = self.patroni.get_member_ip("other-member-name") self.assertIsNone(ip) @@ -109,15 +109,15 @@ def test_get_postgresql_version(self, _from_system): self.assertEqual(version, "12") @mock.patch("requests.get", side_effect=mocked_requests_get) - @patch("charm.Patroni._get_alternative_server_url") - def test_get_primary(self, _get_alternative_server_url, _get): + @patch("charm.Patroni._get_alternative_patroni_url") + def test_get_primary(self, _get_alternative_patroni_url, _get): # Test error on trying to get the member IP. - _get_alternative_server_url.side_effect = "http://server2" + _get_alternative_patroni_url.side_effect = "http://server2" with self.assertRaises(tenacity.RetryError): self.patroni.get_primary(self.patroni.member_name) - # Test using an alternative server URL. - _get_alternative_server_url.side_effect = [ + # Test using an alternative Patroni URL. + _get_alternative_patroni_url.side_effect = [ "http://server3", "http://server2", "http://server1", @@ -125,13 +125,13 @@ def test_get_primary(self, _get_alternative_server_url, _get): primary = self.patroni.get_primary() self.assertEqual(primary, "postgresql-0") - # Test using the current server URL. - _get_alternative_server_url.side_effect = ["http://server1"] + # Test using the current Patroni URL. + _get_alternative_patroni_url.side_effect = ["http://server1"] primary = self.patroni.get_primary() self.assertEqual(primary, "postgresql-0") # Test requesting the primary in the unit name pattern. - _get_alternative_server_url.side_effect = ["http://server1"] + _get_alternative_patroni_url.side_effect = ["http://server1"] primary = self.patroni.get_primary(unit_name_pattern=True) self.assertEqual(primary, "postgresql/0") From 6a1e39bf4d695470e165b6417d9e39ddc4e5f318 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 17:29:08 -0300 Subject: [PATCH 11/12] Add additional checks --- tests/integration/ha_tests/conftest.py | 1 - tests/integration/ha_tests/helpers.py | 59 +++++++++++++++++++ .../integration/ha_tests/test_self_healing.py | 34 +++++++++++ 3 files changed, 93 insertions(+), 1 deletion(-) diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 15d909c27d..5bc999784b 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -37,7 +37,6 @@ async def master_start_timeout(ops_test: OpsTest) -> None: """Temporary change the master start timeout configuration.""" # Change the parameter that makes the primary reelection faster. initial_master_start_timeout = await get_master_start_timeout(ops_test) - await change_master_start_timeout(ops_test, 0) yield # Rollback to the initial configuration. await change_master_start_timeout(ops_test, initial_master_start_timeout) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 558b17d417..c9d7ad31a4 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -16,6 +16,14 @@ APP_NAME = METADATA["name"] +class MemberNotListedOnClusterError(Exception): + """Raised when a member is not listed in the cluster.""" + + +class MemberNotUpdatedOnClusterError(Exception): + """Raised when a member is not yet updated in the cluster.""" + + class ProcessError(Exception): pass @@ -80,6 +88,27 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: return count +async def fetch_cluster_members(ops_test: OpsTest): + """Fetches the IPs listed by Patroni as cluster members. + + Args: + ops_test: OpsTest instance. + """ + app = await app_name(ops_test) + member_ips = {} + for unit in ops_test.model.applications[app].units: + cluster_info = requests.get(f"http://{unit.public_address}:8008/cluster") + if len(member_ips) > 0: + # If the list of members IPs was already fetched, also compare the + # list provided by other members. + assert member_ips == { + member["host"] for member in cluster_info.json()["members"] + }, "members report different lists of cluster members." + else: + member_ips = {member["host"] for member in cluster_info.json()["members"]} + return member_ips + + async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: """Get the master start timeout configuration. @@ -115,6 +144,36 @@ async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> st return action.results["operator-password"] +def is_replica(ops_test: OpsTest, unit_name: str) -> bool: + """Returns whether the unit a replica in the cluster.""" + unit_ip = get_unit_address(ops_test, unit_name) + member_name = unit_name.replace("/", "-") + + try: + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + cluster_info = requests.get(f"http://{unit_ip}:8008/cluster") + + # The unit may take some time to be listed on Patroni REST API cluster endpoint. + if member_name not in { + member["name"] for member in cluster_info.json()["members"] + }: + raise MemberNotListedOnClusterError() + + for member in cluster_info.json()["members"]: + if member["name"] == member_name: + role = member["role"] + + # A member that restarted has the DB process stopped may + # take some time to know that a new primary was elected. + if role == "replica": + return True + else: + raise MemberNotUpdatedOnClusterError() + except RetryError: + return False + + async def get_primary(ops_test: OpsTest, app) -> str: """Use the charm action to retrieve the primary from provided application. diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index ad8a109148..c817deb113 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -9,8 +9,12 @@ from tests.integration.ha_tests.helpers import ( METADATA, app_name, + change_master_start_timeout, count_writes, + fetch_cluster_members, + get_master_start_timeout, get_primary, + is_replica, postgresql_ready, secondary_up_to_date, send_signal_to_process, @@ -52,6 +56,10 @@ async def test_kill_db_process( # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) + # Change the "master_start_timeout" parameter to speed up the fail-over. + original_master_start_timeout = await get_master_start_timeout(ops_test) + await change_master_start_timeout(ops_test, 0) + # Kill the database process. await send_signal_to_process(ops_test, primary_name, process, kill_code="SIGKILL") @@ -72,6 +80,17 @@ async def test_kill_db_process( new_primary_name = await get_primary(ops_test, app) assert new_primary_name != primary_name + # Revert the "master_start_timeout" parameter to avoid fail-over again. + await change_master_start_timeout(ops_test, original_master_start_timeout) + + # Verify that the old primary is now a replica. + assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster." + + # Verify that all units are part of the same cluster. + member_ips = await fetch_cluster_members(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." + # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): @@ -97,6 +116,10 @@ async def test_freeze_db_process( # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) + # Change the "master_start_timeout" parameter to speed up the fail-over. + original_master_start_timeout = await get_master_start_timeout(ops_test) + await change_master_start_timeout(ops_test, 0) + # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") @@ -117,12 +140,23 @@ async def test_freeze_db_process( new_primary_name = await get_primary(ops_test, app) assert new_primary_name != primary_name + # Revert the "master_start_timeout" parameter to avoid fail-over again. + await change_master_start_timeout(ops_test, original_master_start_timeout) + # Un-freeze the old primary. await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) + # Verify that the old primary is now a replica. + assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster." + + # Verify that all units are part of the same cluster. + member_ips = await fetch_cluster_members(ops_test) + ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units] + assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." + # Verify that no writes to the database were missed after stopping the writes. total_expected_writes = await stop_continuous_writes(ops_test) for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): From 29eae5b6bfaa92227a43f1e77569d9ade31559f5 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Oct 2022 19:03:04 -0300 Subject: [PATCH 12/12] Improve returns --- src/cluster.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/cluster.py b/src/cluster.py index 00698a3c0c..5c2a82543a 100644 --- a/src/cluster.py +++ b/src/cluster.py @@ -162,7 +162,6 @@ def get_member_ip(self, member_name: str) -> str: Returns: IP address of the cluster member. """ - ip = None # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): with attempt: @@ -170,9 +169,7 @@ def get_member_ip(self, member_name: str) -> str: cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10) for member in cluster_status.json()["members"]: if member["name"] == member_name: - ip = member["host"] - break - return ip + return member["host"] def get_primary(self, unit_name_pattern=False) -> str: """Get primary instance. @@ -183,7 +180,6 @@ def get_primary(self, unit_name_pattern=False) -> str: Returns: primary pod or unit name. """ - primary = None # Request info from cluster endpoint (which returns all members of the cluster). for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)): with attempt: @@ -195,8 +191,7 @@ def get_primary(self, unit_name_pattern=False) -> str: if unit_name_pattern: # Change the last dash to / in order to match unit name pattern. primary = "/".join(primary.rsplit("-", 1)) - break - return primary + return primary def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str: """Get an alternative REST API URL from another member each time.