Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 48 additions & 9 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]:

tls_enabled = "tls" in self.charm.unit_peer_data

# Check if this unit is the primary (if it was not possible to retrieve that information,
# then show that the unit cannot perform a backup, because possibly the database is offline).
try:
is_primary = self.charm.is_primary
except RetryError:
return False, "Unit cannot perform backups as the database seems to be offline"

# Only enable backups on primary if there are replicas but TLS is not enabled.
is_primary = self.charm.unit.name == self.charm._patroni.get_primary(
unit_name_pattern=True
)
if is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
return False, "Unit cannot perform backups as it is the cluster primary"

Expand Down Expand Up @@ -363,7 +367,7 @@ def _initialise_stanza(self) -> None:
located, how it will be backed up, archiving options, etc. (more info in
https://pgbackrest.org/user-guide.html#quickstart/configure-stanza).
"""
if not self.charm.unit.is_leader():
if not self.charm.is_primary:
return

# Enable stanza initialisation if the backup settings were fixed after being invalid
Expand Down Expand Up @@ -403,11 +407,18 @@ def _initialise_stanza(self) -> None:
self.start_stop_pgbackrest_service()

# Store the stanza name to be used in configurations updates.
self.charm.app_peer_data.update({"stanza": self.stanza_name, "init-pgbackrest": "True"})
if self.charm.unit.is_leader():
self.charm.app_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)
else:
self.charm.unit_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)

def check_stanza(self) -> None:
"""Runs the pgbackrest stanza validation."""
if not self.charm.unit.is_leader() or "init-pgbackrest" not in self.charm.app_peer_data:
if not self.charm.is_primary or "init-pgbackrest" not in self.charm.app_peer_data:
return

# Update the configuration to use pgBackRest as the archiving mechanism.
Expand Down Expand Up @@ -437,12 +448,37 @@ def check_stanza(self) -> None:
# and rollback the configuration.
self.charm.app_peer_data.update({"stanza": ""})
self.charm.app_peer_data.pop("init-pgbackrest", None)
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
self.charm.update_config()

logger.exception(e)
self.charm.unit.status = BlockedStatus(FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE)

self.charm.app_peer_data.pop("init-pgbackrest", None)
if self.charm.unit.is_leader():
self.charm.app_peer_data.pop("init-pgbackrest", None)
self.charm.unit_peer_data.pop("init-pgbackrest", None)

def coordinate_stanza_fields(self) -> None:
"""Coordinate the stanza name between the primary and the leader units."""
for unit, unit_data in self.charm._peers.data.items():
if "stanza" not in unit_data:
continue
# If the stanza name is not set in the application databag, then the primary is not
# the leader unit, and it's needed to set the stanza name in the application databag.
if "stanza" not in self.charm.app_peer_data and self.charm.unit.is_leader():
self.charm.app_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)
break
# If the stanza was already checked and its name is still in the unit databag, mark
# the stanza as already checked in the application databag and remove it from the
# unit databag.
if "init-pgbackrest" not in unit_data:
if self.charm.unit.is_leader():
self.charm.app_peer_data.pop("init-pgbackrest", None)
if "init-pgbackrest" not in self.charm.app_peer_data and unit == self.charm.unit:
self.charm.unit_peer_data.update({"stanza": ""})
break

@property
def _is_primary_pgbackrest_service_running(self) -> bool:
Expand All @@ -469,8 +505,8 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
logger.debug("Cannot set pgBackRest configurations, missing configurations.")
return

# Verify the s3 relation only on the leader
if not self.charm.unit.is_leader():
# Verify the s3 relation only on the primary.
if not self.charm.is_primary:
return

try:
Expand All @@ -487,6 +523,9 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
self._initialise_stanza()

def _on_s3_credential_gone(self, _) -> None:
if self.charm.unit.is_leader():
self.charm.app_peer_data.update({"stanza": "", "init-pgbackrest": ""})
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
self.charm.unit.status = ActiveStatus()

Expand Down
14 changes: 14 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,18 @@ def _on_peer_relation_changed(self, event: HookEvent):
event.defer()
return

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
self.unit.status = MaintenanceStatus("reinitialising replica")
event.defer()
return

# Start or stop the pgBackRest TLS server service when TLS certificate change.
if not self.backup.start_stop_pgbackrest_service():
logger.debug(
Expand All @@ -485,6 +497,8 @@ def _on_peer_relation_changed(self, event: HookEvent):
event.defer()
return

self.backup.coordinate_stanza_fields()

self.backup.check_stanza()

if "exporter-started" not in self.unit_peer_data:
Expand Down
65 changes: 62 additions & 3 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
get_password,
get_primary,
get_unit_address,
scale_application,
switchover,
wait_for_idle_on_blocked,
)
from .juju_ import juju_major_version
Expand Down Expand Up @@ -72,6 +74,7 @@ async def cloud_configs(ops_test: OpsTest, github_secrets) -> None:
}
yield configs, credentials
# Delete the previously created objects.
logger.info("deleting the previously created backups")
for cloud, config in configs.items():
session = boto3.session.Session(
aws_access_key_id=credentials[cloud]["access-key"],
Expand Down Expand Up @@ -128,9 +131,10 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict]) -> No
**cloud_configs[1][cloud],
)
await action.wait()
await ops_test.model.wait_for_idle(
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
)

primary = await get_primary(ops_test, f"{database_app_name}/0")
for unit in ops_test.model.applications[database_app_name].units:
Expand Down Expand Up @@ -225,6 +229,61 @@ async def test_backup(ops_test: OpsTest, cloud_configs: Tuple[Dict, Dict]) -> No
], "backup wasn't correctly restored: table 'backup_table_2' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
if cloud == list(cloud_configs[0].keys())[0]:
# Remove the relation to the TLS certificates operator.
await ops_test.model.applications[database_app_name].remove_relation(
f"{database_app_name}:certificates", f"{TLS_CERTIFICATES_APP_NAME}:certificates"
)
await ops_test.model.wait_for_idle(
apps=[database_app_name], status="active", timeout=1000
)

# Scale up to be able to test primary and leader being different.
async with ops_test.fast_forward():
await scale_application(ops_test, database_app_name, 2)

# Ensure replication is working correctly.
new_unit_name = f"{database_app_name}/2"
address = get_unit_address(ops_test, new_unit_name)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], f"replication isn't working correctly: table 'backup_table_1' doesn't exist in {new_unit_name}"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert not cursor.fetchone()[
0
], f"replication isn't working correctly: table 'backup_table_2' exists in {new_unit_name}"
connection.close()

switchover(ops_test, primary, new_unit_name)

# Get the new primary unit.
primary = await get_primary(ops_test, new_unit_name)
# Check that the primary changed.
for attempt in Retrying(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
assert primary == new_unit_name

# Ensure stanza is working correctly.
logger.info("listing the available backups")
action = await ops_test.model.units.get(new_unit_name).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Remove the database app.
await ops_test.model.remove_application(database_app_name, block_until_done=True)
# Remove the TLS operator.
Expand Down
Loading