Skip to content

Commit 615751a

Browse files
[DPE-3349] Handle S3 relation in primary non-leader unit (canonical#375)
* Handle S3 relation in primary non-leader unit Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Fix relation data removal Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Update integration test Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Fix existing unit tests Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Add extra unit tests Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Add unit test for peer relation changed event handler Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Handle pgBackRest service check when the primary was not elected yet Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Speedup the new check on test Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Add log call Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> * Speed up events Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com> --------- Signed-off-by: Marcelo Henrique Neppel <marcelo.neppel@canonical.com>
1 parent a497159 commit 615751a

File tree

6 files changed

+410
-32
lines changed

6 files changed

+410
-32
lines changed

src/backups.py

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,21 @@ def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]:
8787

8888
tls_enabled = "tls" in self.charm.unit_peer_data
8989

90+
# Check if this unit is the primary (if it was not possible to retrieve that information,
91+
# then show that the unit cannot perform a backup, because possibly the database is offline).
92+
try:
93+
is_primary = self.charm.is_primary
94+
except RetryError:
95+
return False, "Unit cannot perform backups as the database seems to be offline"
96+
9097
# Only enable backups on primary if there are replicas but TLS is not enabled.
91-
if self.charm.is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
98+
if is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
9299
return False, "Unit cannot perform backups as it is the cluster primary"
93100

94101
# Can create backups on replicas only if TLS is enabled (it's needed to enable
95102
# pgBackRest to communicate with the primary to request that missing WAL files
96103
# are pushed to the S3 repo before the backup action is triggered).
97-
if not self.charm.is_primary and not tls_enabled:
104+
if not is_primary and not tls_enabled:
98105
return False, "Unit cannot perform backups as TLS is not enabled"
99106

100107
if not self.charm._patroni.member_started:
@@ -295,7 +302,7 @@ def _initialise_stanza(self) -> None:
295302
located, how it will be backed up, archiving options, etc. (more info in
296303
https://pgbackrest.org/user-guide.html#quickstart/configure-stanza).
297304
"""
298-
if not self.charm.unit.is_leader():
305+
if not self.charm.is_primary:
299306
return
300307

301308
# Enable stanza initialisation if the backup settings were fixed after being invalid
@@ -321,11 +328,18 @@ def _initialise_stanza(self) -> None:
321328
self.start_stop_pgbackrest_service()
322329

323330
# Store the stanza name to be used in configurations updates.
324-
self.charm.app_peer_data.update({"stanza": self.stanza_name, "init-pgbackrest": "True"})
331+
if self.charm.unit.is_leader():
332+
self.charm.app_peer_data.update(
333+
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
334+
)
335+
else:
336+
self.charm.unit_peer_data.update(
337+
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
338+
)
325339

326340
def check_stanza(self) -> None:
327341
"""Runs the pgbackrest stanza validation."""
328-
if not self.charm.unit.is_leader() or "init-pgbackrest" not in self.charm.app_peer_data:
342+
if not self.charm.is_primary or "init-pgbackrest" not in self.charm.app_peer_data:
329343
return
330344

331345
# Update the configuration to use pgBackRest as the archiving mechanism.
@@ -346,13 +360,38 @@ def check_stanza(self) -> None:
346360
# and rollback the configuration.
347361
self.charm.app_peer_data.update({"stanza": ""})
348362
self.charm.app_peer_data.pop("init-pgbackrest", None)
363+
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
349364
self.charm.update_config()
350365

351366
logger.exception(e)
352367
self.charm.unit.status = BlockedStatus(FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE)
353368
return
354369

355-
self.charm.app_peer_data.pop("init-pgbackrest", None)
370+
if self.charm.unit.is_leader():
371+
self.charm.app_peer_data.pop("init-pgbackrest", None)
372+
self.charm.unit_peer_data.pop("init-pgbackrest", None)
373+
374+
def coordinate_stanza_fields(self) -> None:
375+
"""Coordinate the stanza name between the primary and the leader units."""
376+
for unit, unit_data in self.charm._peers.data.items():
377+
if "stanza" not in unit_data:
378+
continue
379+
# If the stanza name is not set in the application databag, then the primary is not
380+
# the leader unit, and it's needed to set the stanza name in the application databag.
381+
if "stanza" not in self.charm.app_peer_data and self.charm.unit.is_leader():
382+
self.charm.app_peer_data.update(
383+
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
384+
)
385+
break
386+
# If the stanza was already checked and its name is still in the unit databag, mark
387+
# the stanza as already checked in the application databag and remove it from the
388+
# unit databag.
389+
if "init-pgbackrest" not in unit_data:
390+
if self.charm.unit.is_leader():
391+
self.charm.app_peer_data.pop("init-pgbackrest", None)
392+
if "init-pgbackrest" not in self.charm.app_peer_data and unit == self.charm.unit:
393+
self.charm.unit_peer_data.update({"stanza": ""})
394+
break
356395

357396
@property
358397
def _is_primary_pgbackrest_service_running(self) -> bool:
@@ -363,6 +402,10 @@ def _is_primary_pgbackrest_service_running(self) -> bool:
363402
logger.error(f"failed to get primary with error {str(e)}")
364403
return False
365404

405+
if primary is None:
406+
logger.debug("the primary was not elected yet")
407+
return False
408+
366409
primary_endpoint = self.charm._get_hostname_from_unit(primary)
367410

368411
try:
@@ -388,8 +431,8 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
388431
logger.debug("Cannot set pgBackRest configurations, missing configurations.")
389432
return
390433

391-
# Verify the s3 relation only on the leader
392-
if not self.charm.unit.is_leader():
434+
# Verify the s3 relation only on the primary.
435+
if not self.charm.is_primary:
393436
return
394437

395438
try:
@@ -526,6 +569,9 @@ def _on_create_backup_action(self, event) -> None:
526569
self.charm.unit.status = ActiveStatus()
527570

528571
def _on_s3_credential_gone(self, _) -> None:
572+
if self.charm.unit.is_leader():
573+
self.charm.app_peer_data.update({"stanza": "", "init-pgbackrest": ""})
574+
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
529575
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
530576
self.charm.unit.status = ActiveStatus()
531577

src/charm.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,8 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:
404404

405405
self.postgresql_client_relation.update_read_only_endpoint()
406406

407+
self.backup.coordinate_stanza_fields()
408+
407409
self.backup.check_stanza()
408410

409411
# Start or stop the pgBackRest TLS server service when TLS certificate change.

tests/integration/helpers.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,39 @@ async def set_password(
667667
return result.results
668668

669669

670+
async def switchover(ops_test: OpsTest, current_primary: str, candidate: str = None) -> None:
671+
"""Trigger a switchover.
672+
673+
Args:
674+
ops_test: The ops test framework instance.
675+
current_primary: The current primary unit.
676+
candidate: The unit that should be elected the new primary.
677+
"""
678+
primary_ip = await get_unit_address(ops_test, current_primary)
679+
response = requests.post(
680+
f"http://{primary_ip}:8008/switchover",
681+
json={
682+
"leader": current_primary.replace("/", "-"),
683+
"candidate": candidate.replace("/", "-") if candidate else None,
684+
},
685+
)
686+
assert response.status_code == 200
687+
app_name = current_primary.split("/")[0]
688+
minority_count = len(ops_test.model.applications[app_name].units) // 2
689+
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
690+
with attempt:
691+
response = requests.get(f"http://{primary_ip}:8008/cluster")
692+
assert response.status_code == 200
693+
standbys = len(
694+
[
695+
member
696+
for member in response.json()["members"]
697+
if member["role"] == "sync_standby"
698+
]
699+
)
700+
assert standbys >= minority_count
701+
702+
670703
async def wait_for_idle_on_blocked(
671704
ops_test: OpsTest,
672705
database_app_name: str,

tests/integration/test_backups.py

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
get_primary,
2020
get_unit_address,
2121
scale_application,
22+
switchover,
2223
wait_for_idle_on_blocked,
2324
)
2425
from .juju_ import juju_major_version
@@ -73,6 +74,7 @@ async def cloud_configs(ops_test: OpsTest, github_secrets) -> None:
7374
}
7475
yield configs, credentials
7576
# Delete the previously created objects.
77+
logger.info("deleting the previously created backups")
7678
for cloud, config in configs.items():
7779
session = boto3.session.Session(
7880
aws_access_key_id=credentials[cloud]["access-key"],
@@ -121,9 +123,10 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
121123
**cloud_configs[1][cloud],
122124
)
123125
await action.wait()
124-
await ops_test.model.wait_for_idle(
125-
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
126-
)
126+
async with ops_test.fast_forward(fast_interval="60s"):
127+
await ops_test.model.wait_for_idle(
128+
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
129+
)
127130

128131
primary = await get_primary(ops_test, database_app_name)
129132
for unit in ops_test.model.applications[database_app_name].units:
@@ -167,7 +170,8 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
167170
connection.close()
168171

169172
# Scale down to be able to restore.
170-
await scale_application(ops_test, database_app_name, 1)
173+
async with ops_test.fast_forward(fast_interval="60s"):
174+
await scale_application(ops_test, database_app_name, 1)
171175

172176
# Run the "restore backup" action.
173177
for attempt in Retrying(
@@ -211,6 +215,65 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
211215
], "backup wasn't correctly restored: table 'backup_table_2' exists"
212216
connection.close()
213217

218+
# Run the following steps only in one cloud (it's enough for those checks).
219+
if cloud == list(cloud_configs[0].keys())[0]:
220+
async with ops_test.fast_forward():
221+
logger.info("removing the TLS relation")
222+
await ops_test.model.applications[database_app_name].remove_relation(
223+
f"{database_app_name}:certificates",
224+
f"{TLS_CERTIFICATES_APP_NAME}:certificates",
225+
)
226+
await ops_test.model.wait_for_idle(
227+
apps=[database_app_name], status="active", timeout=1000
228+
)
229+
230+
# Scale up to be able to test primary and leader being different.
231+
async with ops_test.fast_forward():
232+
await scale_application(ops_test, database_app_name, 2)
233+
234+
logger.info("ensuring that the replication is working correctly")
235+
new_unit_name = f"{database_app_name}/1"
236+
address = await get_unit_address(ops_test, new_unit_name)
237+
with db_connect(
238+
host=address, password=password
239+
) as connection, connection.cursor() as cursor:
240+
cursor.execute(
241+
"SELECT EXISTS (SELECT FROM information_schema.tables"
242+
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
243+
)
244+
assert cursor.fetchone()[
245+
0
246+
], f"replication isn't working correctly: table 'backup_table_1' doesn't exist in {new_unit_name}"
247+
cursor.execute(
248+
"SELECT EXISTS (SELECT FROM information_schema.tables"
249+
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
250+
)
251+
assert not cursor.fetchone()[
252+
0
253+
], f"replication isn't working correctly: table 'backup_table_2' exists in {new_unit_name}"
254+
connection.close()
255+
256+
logger.info(f"performing a switchover from {primary} to {new_unit_name}")
257+
await switchover(ops_test, primary, new_unit_name)
258+
259+
logger.info("checking that the primary unit has changed")
260+
primary = await get_primary(ops_test, database_app_name)
261+
for attempt in Retrying(
262+
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30)
263+
):
264+
with attempt:
265+
assert primary == new_unit_name
266+
267+
# Ensure stanza is working correctly.
268+
logger.info(
269+
"listing the available backups to ensure that the stanza is working correctly"
270+
)
271+
action = await ops_test.model.units.get(new_unit_name).run_action("list-backups")
272+
await action.wait()
273+
backups = action.results.get("backups")
274+
assert backups, "backups not outputted"
275+
await ops_test.model.wait_for_idle(status="active", timeout=1000)
276+
214277
# Remove the database app.
215278
await ops_test.model.remove_application(database_app_name, block_until_done=True)
216279
# Remove the TLS operator.

0 commit comments

Comments
 (0)