Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

create-backup:
description: Creates a backup to s3 storage in AWS.
params:
type:
type: string
description: The backup type, the default value is 'full'.
Full backup is a full copy of all data.
Differential backup is a copy only of changed data since the last full backup.
Possible values - full, differential.
get-primary:
description: Get the unit with is the primary/leader in the replication.
get-password:
Expand Down
78 changes: 66 additions & 12 deletions src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,20 @@ def _generate_backup_list_output(self) -> str:
output, _ = self._execute_command(["pgbackrest", "info", "--output=json"])
backups = json.loads(output)[0]["backup"]
for backup in backups:
backup_id = datetime.strftime(
datetime.strptime(backup["label"][:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
)
backup_id, backup_type = self._parse_backup_id(backup["label"])
error = backup["error"]
backup_status = "finished"
if error:
backup_status = f"failed: {error}"
backup_list.append((backup_id, "physical", backup_status))
backup_list.append((backup_id, backup_type, backup_status))
return self._format_backup_list(backup_list)

def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
def _list_backups(self, show_failed: bool, parse=True) -> OrderedDict[str, str]:
"""Retrieve the list of backups.

Args:
show_failed: whether to also return the failed backups.
parse: whether to convert backup labels to their IDs or not.

Returns:
a dict of previously created backups (id + stanza name) or an empty list
Expand All @@ -286,15 +285,34 @@ def _list_backups(self, show_failed: bool) -> OrderedDict[str, str]:
stanza_name = repository_info["name"]
return OrderedDict[str, str](
(
datetime.strftime(
datetime.strptime(backup["label"][:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
),
self._parse_backup_id(backup["label"])[0] if parse else backup["label"],
stanza_name,
)
for backup in backups
if show_failed or not backup["error"]
)

def _parse_backup_id(self, label) -> Tuple[str, str]:
"""Parse backup ID as a timestamp."""
if label[-1] == "F":
timestamp = label
backup_type = "full"
elif label[-1] == "D":
timestamp = label.split("_")[1]
backup_type = "differential"
elif label[-1] == "I":
timestamp = label.split("_")[1]
backup_type = "incremental"
else:
raise ValueError("Unknown label format for backup ID: %s", label)

return (
datetime.strftime(
datetime.strptime(timestamp[:-1], "%Y%m%d-%H%M%S"), "%Y-%m-%dT%H:%M:%SZ"
),
backup_type,
)

def _initialise_stanza(self) -> None:
"""Initialize the stanza.

Expand Down Expand Up @@ -454,8 +472,18 @@ def _on_s3_credential_changed(self, event: CredentialsChangedEvent):

self._initialise_stanza()

def _on_create_backup_action(self, event) -> None:
def _on_create_backup_action(self, event) -> None: # noqa: C901
"""Request that pgBackRest creates a backup."""
backup_type = event.params.get("type", "full").lower()[:4]
if backup_type not in ["full", "diff"]:
error_message = (
f"Invalid backup type: {backup_type}. Possible values: full, differential."
)
logger.error(f"Backup failed: {error_message}")
event.fail(error_message)
return

logger.info(f"A {backup_type} backup has been requested on unit")
can_unit_perform_backup, validation_message = self._can_unit_perform_backup()
if not can_unit_perform_backup:
logger.error(f"Backup failed: {validation_message}")
Expand Down Expand Up @@ -502,7 +530,7 @@ def _on_create_backup_action(self, event) -> None:
"pgbackrest",
f"--stanza={self.stanza_name}",
"--log-level-console=debug",
"--type=full",
f"--type={backup_type}",
"backup",
]
if self.charm.is_primary:
Expand All @@ -523,7 +551,7 @@ def _on_create_backup_action(self, event) -> None:
else:
# Generate a backup id from the current date and time if the backup failed before
# generating the backup label (our backup id).
backup_id = datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
backup_id = self._generate_fake_backup_id(backup_type)

# Upload the logs to S3.
logs = f"""Stdout:
Expand Down Expand Up @@ -664,7 +692,7 @@ def _on_restore_action(self, event):
# Mark the cluster as in a restoring backup state and update the Patroni configuration.
logger.info("Configuring Patroni to restore the backup")
self.charm.app_peer_data.update({
"restoring-backup": f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}F',
"restoring-backup": self._fetch_backup_from_id(backup_id),
"restore-stanza": backups[backup_id],
})
self.charm.update_config()
Expand All @@ -675,6 +703,32 @@ def _on_restore_action(self, event):

event.set_results({"restore-status": "restore started"})

def _generate_fake_backup_id(self, backup_type: str) -> str:
"""Creates a backup id for failed backup operations (to store log file)."""
if backup_type == "F":
return datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SF")
if backup_type == "D":
backups = self._list_backups(show_failed=False, parse=False).keys()
last_full_backup = None
for label in backups[::-1]:
if label.endswith("F"):
last_full_backup = label
break

if last_full_backup is None:
raise TypeError("Differential backup requested but no previous full backup")
return f'{last_full_backup}_{datetime.strftime(datetime.now(), "%Y%m%d-%H%M%SD")}'

def _fetch_backup_from_id(self, backup_id: str) -> str:
"""Fetches backup's pgbackrest label from backup id."""
timestamp = f'{datetime.strftime(datetime.strptime(backup_id, "%Y-%m-%dT%H:%M:%SZ"), "%Y%m%d-%H%M%S")}'
backups = self._list_backups(show_failed=False, parse=False).keys()
for label in backups:
if timestamp in label:
return label

return None

def _pre_restore_checks(self, event: ActionEvent) -> bool:
"""Run some checks before starting the restore.

Expand Down
3 changes: 3 additions & 0 deletions templates/pgbackrest.conf.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[global]
backup-standby=y
repo1-retention-full=9999999
repo1-retention-history=365
repo1-type=s3
repo1-path={{ path }}
repo1-s3-region={{ region }}
Expand All @@ -9,6 +10,8 @@ repo1-s3-bucket={{ bucket }}
repo1-s3-uri-style={{ s3_uri_style }}
repo1-s3-key={{ access_key }}
repo1-s3-key-secret={{ secret_key }}
repo1-block=y
repo1-bundle=y
start-fast=y
{%- if enable_tls %}
tls-server-address=*
Expand Down
99 changes: 91 additions & 8 deletions tests/integration/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
# (to be able to create backups from replicas).
database_app_name = f"{DATABASE_APP_NAME}-{cloud.lower()}"
await build_and_deploy(
ops_test, 2, database_app_name=database_app_name, wait_for_idle=False
ops_test, 2, database_app_name=database_app_name, wait_for_idle=True
)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)
await ops_test.model.relate(database_app_name, TLS_CERTIFICATES_APP_NAME)
Expand Down Expand Up @@ -156,7 +156,8 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
assert backups, "backups not outputted"
# 2 lines for header output, 1 backup line ==> 3 total lines
assert len(backups.split("\n")) == 3, "full backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
Expand All @@ -166,18 +167,93 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
connection.cursor().execute("CREATE TABLE backup_table_2 (test_collumn INT );")
connection.close()

# Run the "create backup" action.
logger.info("creating a backup")
action = await ops_test.model.units.get(replica).run_action(
"create-backup", **{"type": "diff"}
)
await action.wait()
backup_status = action.results.get("backup-status")
assert backup_status, "backup hasn't succeeded"
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Run the "list backups" action.
logger.info("listing the available backups")
action = await ops_test.model.units.get(replica).run_action("list-backups")
await action.wait()
backups = action.results.get("backups")
# 2 lines for header output, 2 backup lines ==> 4 total lines
assert len(backups.split("\n")) == 4, "differential backup is not outputted"
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Write some data.
logger.info("creating a second table in the database")
with db_connect(host=address, password=password) as connection:
connection.autocommit = True
connection.cursor().execute("CREATE TABLE backup_table_3 (test_collumn INT );")
connection.close()
# Scale down to be able to restore.
async with ops_test.fast_forward(fast_interval="60s"):
await scale_application(ops_test, database_app_name, 1)

# Run the "restore backup" action.
# Run the "restore backup" action for differential backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
most_recent_backup = backups.split("\n")[-1]
backup_id = most_recent_backup.split()[0]
last_diff_backup = backups.split("\n")[-1]
backup_id = last_diff_backup.split()[0]
action = await ops_test.model.units.get(f"{database_app_name}/0").run_action(
"restore", **{"backup-id": backup_id}
)
await action.wait()
restore_status = action.results.get("restore-status")
assert restore_status, "restore hasn't succeeded"

# Wait for the restore to complete.
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(status="active", timeout=1000)

# Check that the backup was correctly restored by having only the first created table.
logger.info("checking that the backup was correctly restored")
primary = await get_primary(ops_test, database_app_name)
address = await get_unit_address(ops_test, primary)
with db_connect(
host=address, password=password
) as connection, connection.cursor() as cursor:
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_1');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_1' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_2');"
)
assert cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' doesn't exist"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the "restore backup" action for full backup.
for attempt in Retrying(
stop=stop_after_attempt(10), wait=wait_exponential(multiplier=1, min=2, max=30)
):
with attempt:
logger.info("restoring the backup")
last_full_backup = backups.split("\n")[-2]
backup_id = last_full_backup.split()[0]
action = await ops_test.model.units.get(f"{database_app_name}/0").run_action(
"restore", **{"backup-id": backup_id}
)
Expand Down Expand Up @@ -210,6 +286,13 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_2' exists"
cursor.execute(
"SELECT EXISTS (SELECT FROM information_schema.tables"
" WHERE table_schema = 'public' AND table_name = 'backup_table_3');"
)
assert not cursor.fetchone()[
0
], "backup wasn't correctly restored: table 'backup_table_3' exists"
connection.close()

# Run the following steps only in one cloud (it's enough for those checks).
Expand All @@ -225,7 +308,7 @@ async def test_backup_and_restore(ops_test: OpsTest, cloud_configs: Tuple[Dict,
)

# Scale up to be able to test primary and leader being different.
async with ops_test.fast_forward():
async with ops_test.fast_forward(fast_interval="60s"):
await scale_application(ops_test, database_app_name, 2)

logger.info("ensuring that the replication is working correctly")
Expand Down Expand Up @@ -283,9 +366,9 @@ async def test_restore_on_new_cluster(ops_test: OpsTest, github_secrets) -> None
previous_database_app_name = f"{DATABASE_APP_NAME}-gcp"
database_app_name = f"new-{DATABASE_APP_NAME}"
await build_and_deploy(
ops_test, 1, database_app_name=previous_database_app_name, wait_for_idle=False
ops_test, 1, database_app_name=previous_database_app_name, wait_for_idle=True
)
await build_and_deploy(ops_test, 1, database_app_name=database_app_name, wait_for_idle=False)
await build_and_deploy(ops_test, 1, database_app_name=database_app_name, wait_for_idle=True)
await ops_test.model.relate(previous_database_app_name, S3_INTEGRATOR_APP_NAME)
await ops_test.model.relate(database_app_name, S3_INTEGRATOR_APP_NAME)
async with ops_test.fast_forward():
Expand Down
Loading