Skip to content

Commit

Permalink
Enhance backup logic
Browse files Browse the repository at this point in the history
- Added new module 'postgres' to support postgresql.
- Enhance `backup::list`/`backup::create`/`backup::restore`/`backup::recovery`
  function to support postgresql.
- Added `backup::purge_old_backups` for apply immediately after
  `num_backups_to_keep` is changed.

Closes: petabi#71, petabi#76, petabi#80
  • Loading branch information
Hanbeom kim committed Jul 6, 2023
1 parent a675389 commit 6aa6b1a
Show file tree
Hide file tree
Showing 6 changed files with 665 additions and 67 deletions.
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Introduced a new module `postgres`. This module was added to allow the
existing backup `create`/`list`/`restore`/`recover` function to also support
postgresql DBs.
- Added `backup::purge_old_backups` for apply immediately after
`num_backups_to_keep` is changed.

### Changed

- Enhanced to also support postgresql for backup `create`/`list`/`restore`
/`recover`.

## [0.15.2] - 2023-07-06

### Added
Expand Down Expand Up @@ -422,6 +437,7 @@ leading to a more streamlined system.

- An initial version.

[Unreleased]: https://github.com/petabi/review-database/compare/0.15.2...main
[0.15.2]: https://github.com/petabi/review-database/compare/0.15.1...0.15.2
[0.15.1]: https://github.com/petabi/review-database/compare/0.15.0...0.15.1
[0.15.0]: https://github.com/petabi/review-database/compare/0.14.1...0.15.0
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ipnet = { version = "2", features = ["serde"] }
num-derive = "0.3"
num-traits = "0.2"
postgres-protocol = "0.6"
regex = "1"
rand = "0.8"
ring = { version = "0.16", features = ["std"] }
rocksdb = "0.21"
Expand All @@ -39,6 +40,8 @@ serde_json = "1"
structured = "0.13"
strum = "0.24"
strum_macros = "0.24"
tar = "0.4"
tempfile = "3"
thiserror = "1"
tokio = { version = "1", features = ["macros"] }
tokio-postgres-rustls = "0.10"
Expand Down
222 changes: 162 additions & 60 deletions src/backup.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
//! Database backup utilities.
mod postgresql;

use crate::Store;
use anyhow::Result;
#[allow(clippy::module_name_repetitions)]
pub use self::postgresql::BackupConfig;
use self::postgresql::{create_backup_path, purge_old_postgres_backups};
use crate::{
backup::postgresql::{postgres_backup, postgres_backup_list, postgres_restore},
Store,
};
use anyhow::{anyhow, Result};
use chrono::{DateTime, TimeZone, Utc};
use rocksdb::backup::BackupEngineInfo;
use std::{sync::Arc, time::Duration};
use tokio::sync::{Notify, RwLock};
use tracing::{info, warn};
use tracing::info;

#[derive(Debug, Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct BackupInfo {
pub id: u32,
Expand All @@ -19,7 +27,10 @@ impl From<BackupEngineInfo> for BackupInfo {
fn from(backup: BackupEngineInfo) -> Self {
Self {
id: backup.backup_id,
timestamp: Utc.timestamp_nanos(backup.timestamp),
timestamp: Utc
.timestamp_opt(backup.timestamp, 0)
.single()
.expect("Invalid timestamp value"),
size: backup.size,
}
}
Expand All @@ -29,8 +40,8 @@ impl From<BackupEngineInfo> for BackupInfo {
#[allow(clippy::module_name_repetitions)]
pub async fn schedule_periodic(
store: Arc<RwLock<Store>>,
backup_cfg: Arc<RwLock<BackupConfig>>,
schedule: (Duration, Duration),
backups_to_keep: u32,
stop: Arc<Notify>,
) {
use tokio::time::{sleep, Instant};
Expand All @@ -43,11 +54,10 @@ pub async fn schedule_periodic(
tokio::select! {
() = &mut sleep => {
sleep.as_mut().reset(Instant::now() + duration);
let _res = create(&store, false, backups_to_keep);
let _res = create(&store, false, &backup_cfg);
}
_ = stop.notified() => {
info!("creating a database backup before shutdown");
let _res = create(&store, false, backups_to_keep);
let _res = create(&store, false, &backup_cfg);
stop.notify_one();
return;
}
Expand All @@ -61,110 +71,198 @@ pub async fn schedule_periodic(
/// # Errors
///
/// Returns an error if backup fails.
pub async fn create(store: &Arc<RwLock<Store>>, flush: bool, backups_to_keep: u32) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn create(
store: &Arc<RwLock<Store>>,
flush: bool,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<()> {
info!("backing up database...");
let res = {
let mut store = store.write().await;
store.backup(flush, backups_to_keep)
};
match res {
Ok(_) => {
info!("backing up database completed");
Ok(())
}
Err(e) => {
warn!("database backup failed: {:?}", e);
Err(e)
{
let num_of_backups = { backup_cfg.read().await.num_of_backups };
let mut backup_store = store.write().await;
if let Err(e) = backup_store.backup(flush, num_of_backups) {
return Err(anyhow!("failed to create key-value database backup: {e:?}"));
}
}

let backup_id_list = backup_id_list(store).await?;
let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_backup(&backup_cfg, backup_id_list) {
return Err(anyhow!(
"failed to create relational database backup: {e:?}"
));
}
Ok(())
}

/// Lists the backup information of the database.
///
/// # Errors
///
/// Returns an error if backup list fails to create
pub async fn list(store: &Arc<RwLock<Store>>) -> Result<Vec<BackupInfo>> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn list(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<Vec<BackupInfo>> {
let res = {
let store = store.read().await;
store.get_backup_info()
};
match res {
let mut backup_list: Vec<BackupInfo> = match res {
Ok(backup_list) => {
info!("generate database backup list");
Ok(backup_list
backup_list
.into_iter()
.map(std::convert::Into::into)
.collect())
.collect()
}
Err(e) => {
warn!("failed to generate backup list: {:?}", e);
Err(e)
return Err(anyhow!("failed to generate key-value backup list: {e:?}"));
}
};

let backup_cfg = &backup_cfg.read().await;
if let Err(e) = postgres_backup_list(&backup_cfg.backup_path, &mut backup_list) {
return Err(anyhow!(
"failed to add list information from a relational database: {e:?}"
));
}
Ok(backup_list)
}

/// Restores the database from a backup with the specified ID.
/// Restores the database from a backup. If a backup file ID is not provided,
/// restore based on the latest backup.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn restore(store: &Arc<RwLock<Store>>, backup_id: Option<u32>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
info!("restoring database from {:?}", backup_id);
pub async fn restore(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
backup_id: Option<u32>,
) -> Result<u32> {
let backup_id_list = backup_id_list(store).await?;
let backup_id = if let Some(id) = backup_id {
if !backup_id_list.contains(&id) {
return Err(anyhow!("backup {id} is not exist"));
}
info!("start database restore {}", id);
id
} else {
let Some(id) = backup_id_list.last() else {
return Err(anyhow!("backup is not exist"));
};
info!("start database restore from latest backup");
*id
};

let res = {
let mut store = store.write().await;
match &backup_id {
Some(id) => store.restore_from_backup(*id),
None => store.restore_from_latest_backup(),
}
store.restore_from_backup(backup_id)
};
if let Err(e) = res {
return Err(anyhow!(
"failed to restore key-value database from {backup_id}: {e:?}"
));
}

match res {
Ok(_) => {
info!("database restored from backup {:?}", backup_id);
Ok(())
}
Err(e) => {
warn!(
"failed to restore database from backup {:?}: {:?}",
backup_id, e
);
Err(e)
let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_restore(&backup_cfg, backup_id) {
return Err(anyhow!(
"failed to restore relational database from {backup_id}: {e:?}"
));
}
Ok(backup_id)
}

/// Returns the number of backups in the backup list.
///
/// # Errors
///
/// Returns an error if getting the number of backup lists fails.
pub async fn count(store: &Arc<RwLock<Store>>) -> Result<usize> {
let store = store.write().await;
Ok(store.get_backup_info()?.len())
}

/// Remove older backups based on the number of backups retained.
///
/// # Errors
///
/// Returns an error if removing old backup fails.
pub async fn purge_old_backups(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<()> {
{
let num_of_backups = { backup_cfg.read().await.num_of_backups };
let mut backup_store = store.write().await;
if let Err(e) = backup_store.purge_old_backups(num_of_backups) {
return Err(anyhow!("failed to purge key-value database: {e:?}"));
}
}

let backup_id_list = backup_id_list(store).await?;
let data_backup_path = {
let cfg = backup_cfg.read().await;
create_backup_path(&cfg)?
};
if let Err(e) = purge_old_postgres_backups(&data_backup_path, backup_id_list) {
return Err(anyhow!("failed to purge relational database: {e:?}"));
}
Ok(())
}

/// Restores the database from a backup with the specified ID.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn recover(store: &Arc<RwLock<Store>>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn recover(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<u32> {
info!("recovering database from latest valid backup");

let res = {
let mut store = store.write().await;
store.recover()
};

match res {
Ok(_) => {
info!("database recovered from backup");
Ok(())
}
let recovery_id = match res {
Ok(id) => id,
Err(e) => {
warn!("failed to recover database from backup: {e:?}");
Err(e)
return Err(anyhow!(
"failed to recover key-value database from backup: {e:?}"
));
}
};

let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_restore(&backup_cfg, recovery_id) {
return Err(anyhow!(
"failed to recover relational database from backup {e:?}"
));
}
Ok(recovery_id)
}

/// Lists the backup id.
///
/// # Errors
///
/// Returns an error if backup id list fails to create
#[allow(clippy::module_name_repetitions)]
pub async fn backup_id_list(store: &Arc<RwLock<Store>>) -> Result<Vec<u32>> {
let store = store.read().await;
match store.get_backup_info() {
Ok(backup) => Ok(backup.into_iter().map(|b| b.backup_id).collect()),
Err(e) => Err(anyhow!("failed to generate backup id list: {e:?}")),
}
}

#[cfg(test)]
mod tests {
use crate::{event::DnsEventFields, EventKind, EventMessage, Store};
use crate::{backup::BackupConfig, event::DnsEventFields, EventKind, EventMessage, Store};
use bincode::Options;
use chrono::Utc;
use std::{
Expand Down Expand Up @@ -211,6 +309,10 @@ mod tests {
let db_dir = tempfile::tempdir().unwrap();
let backup_dir = tempfile::tempdir().unwrap();

let backup_cfg = BackupConfig::builder()
.backup_path(backup_dir.path())
.build();
let backup_cfg = Arc::new(RwLock::new(backup_cfg));
let store = Arc::new(RwLock::new(
Store::new(db_dir.path(), backup_dir.path()).unwrap(),
));
Expand Down Expand Up @@ -250,7 +352,7 @@ mod tests {
}

// get backup list
let backup_list = list(&store).await.unwrap();
let backup_list = list(&store, &backup_cfg).await.unwrap();
assert_eq!(backup_list.len(), 3);
assert_eq!(backup_list.get(0).unwrap().id, 1);
assert_eq!(backup_list.get(1).unwrap().id, 2);
Expand Down
Loading

0 comments on commit 6aa6b1a

Please sign in to comment.