Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,21 @@ def _can_unit_perform_backup(self) -> Tuple[bool, Optional[str]]:

tls_enabled = "tls" in self.charm.unit_peer_data

# Check if this unit is the primary (if it was not possible to retrieve that information,
# then show that the unit cannot perform a backup, because possibly the database is offline).
try:
is_primary = self.charm.is_primary
except RetryError:
return False, "Unit cannot perform backups as the database seems to be offline"

# Only enable backups on primary if there are replicas but TLS is not enabled.
if self.charm.is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
if is_primary and self.charm.app.planned_units() > 1 and tls_enabled:
return False, "Unit cannot perform backups as it is the cluster primary"

# Can create backups on replicas only if TLS is enabled (it's needed to enable
# pgBackRest to communicate with the primary to request that missing WAL files
# are pushed to the S3 repo before the backup action is triggered).
if not self.charm.is_primary and not tls_enabled:
if not is_primary and not tls_enabled:
return False, "Unit cannot perform backups as TLS is not enabled"

if not self.charm._patroni.member_started:
Expand Down Expand Up @@ -295,7 +302,7 @@ def _initialise_stanza(self) -> None:
located, how it will be backed up, archiving options, etc. (more info in
https://pgbackrest.org/user-guide.html#quickstart/configure-stanza).
"""
if not self.charm.unit.is_leader():
if not self.charm.is_primary:
return

# Enable stanza initialisation if the backup settings were fixed after being invalid
Expand All @@ -321,11 +328,18 @@ def _initialise_stanza(self) -> None:
self.start_stop_pgbackrest_service()

# Store the stanza name to be used in configurations updates.
self.charm.app_peer_data.update({"stanza": self.stanza_name, "init-pgbackrest": "True"})
if self.charm.unit.is_leader():
self.charm.app_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)
Comment on lines +331 to +334
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to check the unit databags anyway, would it make sense to not set the app data at all? that way we should be able to have a single flow using the unit data, instead of having to check two places.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me try to recall if there is any reason for this approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered the reason I have chosen the approach with both application and unit relation databags: using only the unit relation databag, if we scale down the application and the primary unit is removed, we need to handle the transfer of the stanza name to one of the remaining units in something like the relation departed event or keep the stanza name in all the units, which seems not so good as keeping it in the application relation databag.

else:
self.charm.unit_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)

def check_stanza(self) -> None:
"""Runs the pgbackrest stanza validation."""
if not self.charm.unit.is_leader() or "init-pgbackrest" not in self.charm.app_peer_data:
if not self.charm.is_primary or "init-pgbackrest" not in self.charm.app_peer_data:
return

# Update the configuration to use pgBackRest as the archiving mechanism.
Expand All @@ -346,13 +360,38 @@ def check_stanza(self) -> None:
# and rollback the configuration.
self.charm.app_peer_data.update({"stanza": ""})
self.charm.app_peer_data.pop("init-pgbackrest", None)
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
self.charm.update_config()

logger.exception(e)
self.charm.unit.status = BlockedStatus(FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE)
return

self.charm.app_peer_data.pop("init-pgbackrest", None)
if self.charm.unit.is_leader():
self.charm.app_peer_data.pop("init-pgbackrest", None)
self.charm.unit_peer_data.pop("init-pgbackrest", None)

def coordinate_stanza_fields(self) -> None:
"""Coordinate the stanza name between the primary and the leader units."""
for unit, unit_data in self.charm._peers.data.items():
if "stanza" not in unit_data:
continue
# If the stanza name is not set in the application databag, then the primary is not
# the leader unit, and it's needed to set the stanza name in the application databag.
if "stanza" not in self.charm.app_peer_data and self.charm.unit.is_leader():
self.charm.app_peer_data.update(
{"stanza": self.stanza_name, "init-pgbackrest": "True"}
)
break
# If the stanza was already checked and its name is still in the unit databag, mark
# the stanza as already checked in the application databag and remove it from the
# unit databag.
if "init-pgbackrest" not in unit_data:
if self.charm.unit.is_leader():
self.charm.app_peer_data.pop("init-pgbackrest", None)
if "init-pgbackrest" not in self.charm.app_peer_data and unit == self.charm.unit:
self.charm.unit_peer_data.update({"stanza": ""})
break

@property
def _is_primary_pgbackrest_service_running(self) -> bool:
Expand All @@ -363,6 +402,10 @@ def _is_primary_pgbackrest_service_running(self) -> bool:
logger.error(f"failed to get primary with error {str(e)}")
return False

if primary is None:
logger.debug("the primary was not elected yet")
return False

primary_endpoint = self.charm._get_hostname_from_unit(primary)

try:
Expand All @@ -388,8 +431,8 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):
logger.debug("Cannot set pgBackRest configurations, missing configurations.")
return

# Verify the s3 relation only on the leader
if not self.charm.unit.is_leader():
# Verify the s3 relation only on the primary.
if not self.charm.is_primary:
return

try:
Expand Down Expand Up @@ -526,6 +569,9 @@ def _on_create_backup_action(self, event) -> None:
self.charm.unit.status = ActiveStatus()

def _on_s3_credential_gone(self, _) -> None:
if self.charm.unit.is_leader():
self.charm.app_peer_data.update({"stanza": "", "init-pgbackrest": ""})
self.charm.unit_peer_data.update({"stanza": "", "init-pgbackrest": ""})
if self.charm.is_blocked and self.charm.unit.status.message in S3_BLOCK_MESSAGES:
self.charm.unit.status = ActiveStatus()

Expand Down
2 changes: 2 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,8 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None:

self.postgresql_client_relation.update_read_only_endpoint()

self.backup.coordinate_stanza_fields()

self.backup.check_stanza()

# Start or stop the pgBackRest TLS server service when TLS certificate change.
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,39 @@ async def set_password(
return result.results


async def switchover(ops_test: OpsTest, current_primary: str, candidate: str = None) -> None:
"""Trigger a switchover.

Args:
ops_test: The ops test framework instance.
current_primary: The current primary unit.
candidate: The unit that should be elected the new primary.
"""
primary_ip = await get_unit_address(ops_test, current_primary)
response = requests.post(
f"http://{primary_ip}:8008/switchover",
json={
"leader": current_primary.replace("/", "-"),
"candidate": candidate.replace("/", "-") if candidate else None,
},
)
assert response.status_code == 200
app_name = current_primary.split("/")[0]
minority_count = len(ops_test.model.applications[app_name].units) // 2
for attempt in Retrying(stop=stop_after_attempt(30), wait=wait_fixed(2), reraise=True):
with attempt:
response = requests.get(f"http://{primary_ip}:8008/cluster")
assert response.status_code == 200
standbys = len(
[
member
for member in response.json()["members"]
if member["role"] == "sync_standby"
]
)
assert standbys >= minority_count


async def wait_for_idle_on_blocked(
ops_test: OpsTest,
database_app_name: str,
Expand Down
71 changes: 67 additions & 4 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
get_primary,
get_unit_address,
scale_application,
switchover,
wait_for_idle_on_blocked,
)
from .juju_ import juju_major_version
Expand Down Expand Up @@ -73,6 +74,7 @@ async def cloud_configs(ops_test: OpsTest, github_secrets) -> None:
}
yield configs, credentials
# Delete the previously created objects.
logger.info("deleting the previously created backups")
for cloud, config in configs.items():
session = boto3.session.Session(
aws_access_key_id=credentials[cloud]["access-key"],
Expand Down Expand Up @@ -121,9 +123,10 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
**cloud_configs[1][cloud],
)
await action.wait()
await ops_test.model.wait_for_idle(
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[database_app_name, S3_INTEGRATOR_APP_NAME], status="active", timeout=1000
)

primary = await get_primary(ops_test, database_app_name)
for unit in ops_test.model.applications[database_app_name].units:
Expand Down Expand Up @@ -167,7 +170,8 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
connection.close()

# Scale down to be able to restore.
await scale_application(ops_test, database_app_name, 1)
async with ops_test.fast_forward(fast_interval="60s"):
await scale_application(ops_test, database_app_name, 1)

# Run the "restore backup" action.
for attempt in Retrying(
Expand Down Expand Up @@ -211,6 +215,65 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
], "backup wasn't correctly restored: table 'backup_table_2' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
if cloud == list(cloud_configs[0].keys())[0]:
async with ops_test.fast_forward():
logger.info("removing the TLS relation")
await ops_test.model.applications[database_app_name].remove_relation(
f"{database_app_name}:certificates",
f"{TLS_CERTIFICATES_APP_NAME}:certificates",
)
await ops_test.model.wait_for_idle(
apps=[database_app_name], status="active", timeout=1000
)

# Scale up to be able to test primary and leader being different.
async with ops_test.fast_forward():
await scale_application(ops_test, database_app_name, 2)

logger.info("ensuring that the replication is working correctly")
new_unit_name = f"{database_app_name}/1"
address = await get_unit_address(ops_test, new_unit_name)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], f"replication isn't working correctly: table 'backup_table_1' doesn't exist in {new_unit_name}"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert not cursor.fetchone()[
0
], f"replication isn't working correctly: table 'backup_table_2' exists in {new_unit_name}"
connection.close()

logger.info(f"performing a switchover from {primary} to {new_unit_name}")
await switchover(ops_test, primary, new_unit_name)

logger.info("checking that the primary unit has changed")
primary = await get_primary(ops_test, database_app_name)
for attempt in Retrying(
stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
assert primary == new_unit_name

# Ensure stanza is working correctly.
logger.info(
"listing the available backups to ensure that the stanza is working correctly"
)
action = await ops_test.model.units.get(new_unit_name).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Remove the database app.
await ops_test.model.remove_application(database_app_name, block_until_done=True)
# Remove the TLS operator.
Expand Down
Loading