Skip to content

Commit 7630fa1

Browse files
Freeze DB process test (canonical#39)
* Add alternative servers for primary and members retrieval * Test working * Test working * Cleanup the code * More cleanup * Small adjustments * Add unit tests * Improve comments * Use down unit * Improve alternative URL description * Add additional checks * Improve returns
1 parent 3bd77bf commit 7630fa1

File tree

5 files changed

+290
-28
lines changed

5 files changed

+290
-28
lines changed

src/cluster.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
)
2020
from jinja2 import Template
2121
from tenacity import (
22+
AttemptManager,
2223
RetryError,
2324
Retrying,
2425
retry,
@@ -161,14 +162,14 @@ def get_member_ip(self, member_name: str) -> str:
161162
Returns:
162163
IP address of the cluster member.
163164
"""
164-
ip = None
165165
# Request info from cluster endpoint (which returns all members of the cluster).
166-
cluster_status = requests.get(f"{self._patroni_url}/cluster", verify=self.verify)
167-
for member in cluster_status.json()["members"]:
168-
if member["name"] == member_name:
169-
ip = member["host"]
170-
break
171-
return ip
166+
for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)):
167+
with attempt:
168+
url = self._get_alternative_patroni_url(attempt)
169+
cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10)
170+
for member in cluster_status.json()["members"]:
171+
if member["name"] == member_name:
172+
return member["host"]
172173

173174
def get_primary(self, unit_name_pattern=False) -> str:
174175
"""Get primary instance.
@@ -179,17 +180,32 @@ def get_primary(self, unit_name_pattern=False) -> str:
179180
Returns:
180181
primary pod or unit name.
181182
"""
182-
primary = None
183183
# Request info from cluster endpoint (which returns all members of the cluster).
184-
cluster_status = requests.get(f"{self._patroni_url}/cluster", verify=self.verify)
185-
for member in cluster_status.json()["members"]:
186-
if member["role"] == "leader":
187-
primary = member["name"]
188-
if unit_name_pattern:
189-
# Change the last dash to / in order to match unit name pattern.
190-
primary = "/".join(primary.rsplit("-", 1))
191-
break
192-
return primary
184+
for attempt in Retrying(stop=stop_after_attempt(len(self.peers_ips) + 1)):
185+
with attempt:
186+
url = self._get_alternative_patroni_url(attempt)
187+
cluster_status = requests.get(f"{url}/cluster", verify=self.verify, timeout=10)
188+
for member in cluster_status.json()["members"]:
189+
if member["role"] == "leader":
190+
primary = member["name"]
191+
if unit_name_pattern:
192+
# Change the last dash to / in order to match unit name pattern.
193+
primary = "/".join(primary.rsplit("-", 1))
194+
return primary
195+
196+
def _get_alternative_patroni_url(self, attempt: AttemptManager) -> str:
197+
"""Get an alternative REST API URL from another member each time.
198+
199+
When the Patroni process is not running in the current unit it's needed
200+
to use a URL from another cluster member REST API to do some operations.
201+
"""
202+
if attempt.retry_state.attempt_number > 1:
203+
url = self._patroni_url.replace(
204+
self.unit_ip, list(self.peers_ips)[attempt.retry_state.attempt_number - 2]
205+
)
206+
else:
207+
url = self._patroni_url
208+
return url
193209

194210
def are_all_members_ready(self) -> bool:
195211
"""Check if all members are correctly running Patroni and PostgreSQL.

tests/integration/ha_tests/conftest.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ async def master_start_timeout(ops_test: OpsTest) -> None:
3737
"""Temporary change the master start timeout configuration."""
3838
# Change the parameter that makes the primary reelection faster.
3939
initial_master_start_timeout = await get_master_start_timeout(ops_test)
40-
await change_master_start_timeout(ops_test, 0)
4140
yield
4241
# Rollback to the initial configuration.
4342
await change_master_start_timeout(ops_test, initial_master_start_timeout)

tests/integration/ha_tests/helpers.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@
1616
APP_NAME = METADATA["name"]
1717

1818

19+
class MemberNotListedOnClusterError(Exception):
20+
"""Raised when a member is not listed in the cluster."""
21+
22+
23+
class MemberNotUpdatedOnClusterError(Exception):
24+
"""Raised when a member is not yet updated in the cluster."""
25+
26+
1927
class ProcessError(Exception):
2028
pass
2129

@@ -54,11 +62,14 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int])
5462
)
5563

5664

57-
async def count_writes(ops_test: OpsTest) -> int:
65+
async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int:
5866
"""Count the number of writes in the database."""
5967
app = await app_name(ops_test)
60-
password = await get_password(ops_test, app)
61-
host = ops_test.model.applications[app].units[0].public_address
68+
password = await get_password(ops_test, app, down_unit)
69+
for unit in ops_test.model.applications[app].units:
70+
if unit.name != down_unit:
71+
host = unit.public_address
72+
break
6273
connection_string = (
6374
f"dbname='application' user='operator'"
6475
f" host='{host}' password='{password}' connect_timeout=10"
@@ -77,6 +88,27 @@ async def count_writes(ops_test: OpsTest) -> int:
7788
return count
7889

7990

91+
async def fetch_cluster_members(ops_test: OpsTest):
92+
"""Fetches the IPs listed by Patroni as cluster members.
93+
94+
Args:
95+
ops_test: OpsTest instance.
96+
"""
97+
app = await app_name(ops_test)
98+
member_ips = {}
99+
for unit in ops_test.model.applications[app].units:
100+
cluster_info = requests.get(f"http://{unit.public_address}:8008/cluster")
101+
if len(member_ips) > 0:
102+
# If the list of members IPs was already fetched, also compare the
103+
# list provided by other members.
104+
assert member_ips == {
105+
member["host"] for member in cluster_info.json()["members"]
106+
}, "members report different lists of cluster members."
107+
else:
108+
member_ips = {member["host"] for member in cluster_info.json()["members"]}
109+
return member_ips
110+
111+
80112
async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]:
81113
"""Get the master start timeout configuration.
82114
@@ -96,19 +128,52 @@ async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]:
96128
return int(master_start_timeout) if master_start_timeout is not None else None
97129

98130

99-
async def get_password(ops_test: OpsTest, app) -> str:
131+
async def get_password(ops_test: OpsTest, app: str, down_unit: str = None) -> str:
100132
"""Use the charm action to retrieve the password from provided application.
101133
102134
Returns:
103135
string with the password stored on the peer relation databag.
104136
"""
105137
# Can retrieve from any unit running unit, so we pick the first.
106-
unit_name = ops_test.model.applications[app].units[0].name
138+
for unit in ops_test.model.applications[app].units:
139+
if unit.name != down_unit:
140+
unit_name = unit.name
141+
break
107142
action = await ops_test.model.units.get(unit_name).run_action("get-password")
108143
action = await action.wait()
109144
return action.results["operator-password"]
110145

111146

147+
def is_replica(ops_test: OpsTest, unit_name: str) -> bool:
148+
"""Returns whether the unit a replica in the cluster."""
149+
unit_ip = get_unit_address(ops_test, unit_name)
150+
member_name = unit_name.replace("/", "-")
151+
152+
try:
153+
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
154+
with attempt:
155+
cluster_info = requests.get(f"http://{unit_ip}:8008/cluster")
156+
157+
# The unit may take some time to be listed on Patroni REST API cluster endpoint.
158+
if member_name not in {
159+
member["name"] for member in cluster_info.json()["members"]
160+
}:
161+
raise MemberNotListedOnClusterError()
162+
163+
for member in cluster_info.json()["members"]:
164+
if member["name"] == member_name:
165+
role = member["role"]
166+
167+
# A member that restarted has the DB process stopped may
168+
# take some time to know that a new primary was elected.
169+
if role == "replica":
170+
return True
171+
else:
172+
raise MemberNotUpdatedOnClusterError()
173+
except RetryError:
174+
return False
175+
176+
112177
async def get_primary(ops_test: OpsTest, app) -> str:
113178
"""Use the charm action to retrieve the primary from provided application.
114179
@@ -122,7 +187,9 @@ async def get_primary(ops_test: OpsTest, app) -> str:
122187
return action.results["primary"]
123188

124189

125-
async def kill_process(ops_test: OpsTest, unit_name: str, process: str, kill_code: str) -> None:
190+
async def send_signal_to_process(
191+
ops_test: OpsTest, unit_name: str, process: str, kill_code: str
192+
) -> None:
126193
"""Kills process on the unit according to the provided kill code."""
127194
# Killing the only instance can be disastrous.
128195
app = await app_name(ops_test)

tests/integration/ha_tests/test_self_healing.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99
from tests.integration.ha_tests.helpers import (
1010
METADATA,
1111
app_name,
12+
change_master_start_timeout,
1213
count_writes,
14+
fetch_cluster_members,
15+
get_master_start_timeout,
1316
get_primary,
14-
kill_process,
17+
is_replica,
1518
postgresql_ready,
1619
secondary_up_to_date,
20+
send_signal_to_process,
1721
start_continuous_writes,
1822
stop_continuous_writes,
1923
)
@@ -52,8 +56,12 @@ async def test_kill_db_process(
5256
# Start an application that continuously writes data to the database.
5357
await start_continuous_writes(ops_test, app)
5458

59+
# Change the "master_start_timeout" parameter to speed up the fail-over.
60+
original_master_start_timeout = await get_master_start_timeout(ops_test)
61+
await change_master_start_timeout(ops_test, 0)
62+
5563
# Kill the database process.
56-
await kill_process(ops_test, primary_name, process, kill_code="SIGKILL")
64+
await send_signal_to_process(ops_test, primary_name, process, kill_code="SIGKILL")
5765

5866
async with ops_test.fast_forward():
5967
# Verify new writes are continuing by counting the number of writes before and after a
@@ -72,6 +80,83 @@ async def test_kill_db_process(
7280
new_primary_name = await get_primary(ops_test, app)
7381
assert new_primary_name != primary_name
7482

83+
# Revert the "master_start_timeout" parameter to avoid fail-over again.
84+
await change_master_start_timeout(ops_test, original_master_start_timeout)
85+
86+
# Verify that the old primary is now a replica.
87+
assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster."
88+
89+
# Verify that all units are part of the same cluster.
90+
member_ips = await fetch_cluster_members(ops_test)
91+
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units]
92+
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."
93+
94+
# Verify that no writes to the database were missed after stopping the writes.
95+
total_expected_writes = await stop_continuous_writes(ops_test)
96+
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
97+
with attempt:
98+
actual_writes = await count_writes(ops_test)
99+
assert total_expected_writes == actual_writes, "writes to the db were missed."
100+
101+
# Verify that old primary is up-to-date.
102+
assert await secondary_up_to_date(
103+
ops_test, primary_name, total_expected_writes
104+
), "secondary not up to date with the cluster after restarting."
105+
106+
107+
@pytest.mark.ha_self_healing_tests
108+
@pytest.mark.parametrize("process", DB_PROCESSES)
109+
async def test_freeze_db_process(
110+
ops_test: OpsTest, process: str, continuous_writes, master_start_timeout
111+
) -> None:
112+
# Locate primary unit.
113+
app = await app_name(ops_test)
114+
primary_name = await get_primary(ops_test, app)
115+
116+
# Start an application that continuously writes data to the database.
117+
await start_continuous_writes(ops_test, app)
118+
119+
# Change the "master_start_timeout" parameter to speed up the fail-over.
120+
original_master_start_timeout = await get_master_start_timeout(ops_test)
121+
await change_master_start_timeout(ops_test, 0)
122+
123+
# Freeze the database process.
124+
await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP")
125+
126+
async with ops_test.fast_forward():
127+
# Verify new writes are continuing by counting the number of writes before and after a
128+
# 3 minutes wait (this is a little more than the loop wait configuration, that is
129+
# considered to trigger a fail-over after master_start_timeout is changed, and also
130+
# when freezing the DB process it take some more time to trigger the fail-over).
131+
writes = await count_writes(ops_test, primary_name)
132+
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
133+
with attempt:
134+
more_writes = await count_writes(ops_test, primary_name)
135+
assert more_writes > writes, "writes not continuing to DB"
136+
137+
# Verify that a new primary gets elected (ie old primary is secondary).
138+
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
139+
with attempt:
140+
new_primary_name = await get_primary(ops_test, app)
141+
assert new_primary_name != primary_name
142+
143+
# Revert the "master_start_timeout" parameter to avoid fail-over again.
144+
await change_master_start_timeout(ops_test, original_master_start_timeout)
145+
146+
# Un-freeze the old primary.
147+
await send_signal_to_process(ops_test, primary_name, process, "SIGCONT")
148+
149+
# Verify that the database service got restarted and is ready in the old primary.
150+
assert await postgresql_ready(ops_test, primary_name)
151+
152+
# Verify that the old primary is now a replica.
153+
assert is_replica(ops_test, primary_name), "there are more than one primary in the cluster."
154+
155+
# Verify that all units are part of the same cluster.
156+
member_ips = await fetch_cluster_members(ops_test)
157+
ip_addresses = [unit.public_address for unit in ops_test.model.applications[app].units]
158+
assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster."
159+
75160
# Verify that no writes to the database were missed after stopping the writes.
76161
total_expected_writes = await stop_continuous_writes(ops_test)
77162
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):

0 commit comments

Comments
 (0)