Skip to content

Commit a497159

Browse files
[DPE-3473] Stabilise SST and network cut tests (canonical#385)
* Stabilise network cut test Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Fix forceful restart without data and transaction logs test Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Minor fixes + add unit tests Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> --------- Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>
1 parent fefad7c commit a497159

File tree

7 files changed

+339
-31
lines changed

7 files changed

+339
-31
lines changed

poetry.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/charm.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:
398398
):
399399
self._patroni.reinitialize_postgresql()
400400
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
401-
self.unit.status = WaitingStatus("reinitialising replica")
401+
self.unit.status = MaintenanceStatus("reinitialising replica")
402402
event.defer()
403403
return
404404

@@ -1079,6 +1079,22 @@ def _handle_processes_failures(self) -> bool:
10791079
return False
10801080
return True
10811081

1082+
if (
1083+
not self.is_primary
1084+
and self._patroni.member_started
1085+
and not self._patroni.member_streaming
1086+
):
1087+
try:
1088+
self._patroni.reinitialize_postgresql()
1089+
logger.info("restarted the replica because it was not streaming from primary")
1090+
self.unit.status = MaintenanceStatus("reinitialising replica")
1091+
except RetryError:
1092+
logger.error(
1093+
"failed to reinitialise replica after checking that it was not streaming from primary"
1094+
)
1095+
return False
1096+
return True
1097+
10821098
return False
10831099

10841100
def _set_primary_status_message(self) -> None:

src/patroni.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,23 @@ def member_started(self) -> bool:
268268

269269
return r.json()["state"] in RUNNING_STATES
270270

271+
@property
272+
def member_streaming(self) -> bool:
273+
"""Has the member started to stream data from primary.
274+
275+
Returns:
276+
True if it's streaming False otherwise. Retries over a period of 60 seconds times to
277+
allow server time to start up.
278+
"""
279+
try:
280+
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
281+
with attempt:
282+
r = requests.get(f"{self._patroni_url}/health", verify=self._verify)
283+
except RetryError:
284+
return False
285+
286+
return r.json().get("replication_state") == "streaming"
287+
271288
@property
272289
def is_database_running(self) -> bool:
273290
"""Returns whether the PostgreSQL database process is running (and isn't frozen)."""

tests/integration/ha_tests/helpers.py

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ async def is_cluster_updated(ops_test: OpsTest, primary_name: str) -> None:
155155
# Verify that old primary is up-to-date.
156156
assert await is_secondary_up_to_date(
157157
ops_test, primary_name, total_expected_writes
158-
), "secondary not up to date with the cluster after restarting."
158+
), f"secondary ({primary_name}) not up to date with the cluster after restarting."
159159

160160

161161
def get_member_lag(cluster: Dict, member_name: str) -> int:
@@ -191,7 +191,7 @@ async def check_writes(ops_test) -> int:
191191
for member, count in actual_writes.items():
192192
assert (
193193
count == max_number_written[member]
194-
), f"{member}: writes to the db were missed: count of actual writes different from the max number written."
194+
), f"{member}: writes to the db were missed: count of actual writes ({count}) on {member} different from the max number written ({max_number_written[member]})."
195195
assert total_expected_writes == count, f"{member}: writes to the db were missed."
196196
return total_expected_writes
197197

@@ -200,10 +200,12 @@ async def are_writes_increasing(ops_test, down_unit: str = None) -> None:
200200
"""Verify new writes are continuing by counting the number of writes."""
201201
writes, _ = await count_writes(ops_test, down_unit=down_unit)
202202
for member, count in writes.items():
203-
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
203+
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True):
204204
with attempt:
205205
more_writes, _ = await count_writes(ops_test, down_unit=down_unit)
206-
assert more_writes[member] > count, f"{member}: writes not continuing to DB"
206+
assert (
207+
more_writes[member] > count
208+
), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"
207209

208210

209211
def copy_file_into_pod(
@@ -422,14 +424,16 @@ async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
422424
password = await get_password(ops_test, database_app_name=app, down_unit=unit_name)
423425
address = await get_unit_address(ops_test, unit_name)
424426
try:
425-
with db_connect(
426-
host=address, password=password
427-
) as connection, connection.cursor() as cursor:
428-
cursor.execute("SELECT 1;")
429-
success = cursor.fetchone()[0] == 1
430-
connection.close()
431-
return success
432-
except psycopg2.Error:
427+
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
428+
with attempt:
429+
with db_connect(
430+
host=address, password=password
431+
) as connection, connection.cursor() as cursor:
432+
cursor.execute("SELECT 1;")
433+
success = cursor.fetchone()[0] == 1
434+
connection.close()
435+
return success
436+
except (psycopg2.Error, RetryError):
433437
# Error raised when the connection is not possible.
434438
return False
435439

@@ -614,14 +618,19 @@ async def is_secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_wr
614618
)
615619

616620
try:
617-
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
621+
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)):
618622
with attempt:
619623
with psycopg2.connect(
620624
connection_string
621625
) as connection, connection.cursor() as cursor:
622626
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
623627
results = cursor.fetchone()
624-
assert results[0] == expected_writes and results[1] == expected_writes
628+
if results[0] != expected_writes or results[1] != expected_writes:
629+
async with ops_test.fast_forward(fast_interval="30s"):
630+
await ops_test.model.wait_for_idle(
631+
apps=[unit_name.split("/")[0]], idle_period=15, timeout=1000
632+
)
633+
raise Exception
625634
except RetryError:
626635
return False
627636
finally:

tests/integration/ha_tests/test_self_healing.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ async def test_kill_db_process(
101101
await are_writes_increasing(ops_test, primary_name)
102102

103103
# Verify that the database service got restarted and is ready in the old primary.
104+
logger.info(f"waiting for the database service to restart on {primary_name}")
104105
assert await is_postgresql_ready(ops_test, primary_name)
105106

106107
# Verify that a new primary gets elected (ie old primary is secondary).
@@ -147,6 +148,7 @@ async def test_freeze_db_process(
147148
ops_test, primary_name, process, "SIGCONT", use_ssh
148149
)
149150
# Verify that the database service got restarted and is ready in the old primary.
151+
logger.info(f"waiting for the database service to restart on {primary_name}")
150152
assert await is_postgresql_ready(ops_test, primary_name)
151153

152154
await is_cluster_updated(ops_test, primary_name)
@@ -174,6 +176,7 @@ async def test_restart_db_process(
174176
await are_writes_increasing(ops_test, primary_name)
175177

176178
# Verify that the database service got restarted and is ready in the old primary.
179+
logger.info(f"waiting for the database service to restart on {primary_name}")
177180
assert await is_postgresql_ready(ops_test, primary_name)
178181

179182
# Verify that a new primary gets elected (ie old primary is secondary).
@@ -288,6 +291,7 @@ async def test_forceful_restart_without_data_and_transaction_logs(
288291
sleep(MEDIAN_ELECTION_TIME * 2)
289292

290293
async with ops_test.fast_forward():
294+
logger.info("checking whether writes are increasing")
291295
await are_writes_increasing(ops_test, primary_name)
292296

293297
# Verify that a new primary gets elected (ie old primary is secondary).
@@ -305,6 +309,7 @@ async def test_forceful_restart_without_data_and_transaction_logs(
305309
await change_wal_settings(ops_test, unit.name, 32, 32, 1)
306310

307311
# Rotate the WAL segments.
312+
logger.info(f"rotating WAL segments on {new_primary_name}")
308313
files = await list_wal_files(ops_test, app)
309314
host = await get_unit_address(ops_test, new_primary_name)
310315
password = await get_password(ops_test, down_unit=primary_name)
@@ -317,17 +322,20 @@ async def test_forceful_restart_without_data_and_transaction_logs(
317322
cursor.execute("SELECT pg_switch_wal();")
318323
connection.close()
319324
new_files = await list_wal_files(ops_test, app)
325+
320326
# Check that the WAL was correctly rotated.
327+
321328
for unit_name in files:
322329
assert not files[unit_name].intersection(
323330
new_files
324-
), "WAL segments weren't correctly rotated"
331+
), f"WAL segments weren't correctly rotated on {unit_name}"
325332

326333
# Start the systemd service in the old primary.
327334
logger.info(f"starting database on {primary_name}")
328335
await run_command_on_unit(ops_test, primary_name, "/charm/bin/pebble start postgresql")
329336

330337
# Verify that the database service got restarted and is ready in the old primary.
338+
logger.info(f"waiting for the database service to restart on {primary_name}")
331339
assert await is_postgresql_ready(ops_test, primary_name)
332340

333341
await is_cluster_updated(ops_test, primary_name)
@@ -385,14 +393,14 @@ async def test_network_cut(
385393
remove_instance_isolation(ops_test)
386394

387395
# Verify that the database service got restarted and is ready in the old primary.
388-
logger.info("waiting for the database service to restart")
396+
logger.info(f"waiting for the database service to restart on {primary_name}")
389397
assert await is_postgresql_ready(ops_test, primary_name)
390398

391399
# Verify that connection is possible.
392400
logger.info("checking whether the connectivity to the database is working")
393401
assert await is_connection_possible(
394402
ops_test, primary_name
395-
), "Connection is not possible after network restore"
403+
), f"Connection is not possible to {primary_name} after network restore"
396404

397405
await is_cluster_updated(ops_test, primary_name)
398406

0 commit comments

Comments
 (0)