Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enhance backup logic #105

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ file is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and
this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Introduced a new module `postgres`. This module was added to allow the
existing backup `create`/`list`/`restore`/`recover` function to also support
postgresql DBs.
- Added `backup::purge_old_backups` for apply immediately after
`num_backups_to_keep` is changed.

### Changed

- Enhanced to also support postgresql for backup `create`/`list`/`restore`
/`recover`.

## [0.15.2] - 2023-07-06

### Added
Expand Down Expand Up @@ -422,6 +437,7 @@ leading to a more streamlined system.

- An initial version.

[Unreleased]: https://github.com/petabi/review-database/compare/0.15.2...main
[0.15.2]: https://github.com/petabi/review-database/compare/0.15.1...0.15.2
[0.15.1]: https://github.com/petabi/review-database/compare/0.15.0...0.15.1
[0.15.0]: https://github.com/petabi/review-database/compare/0.14.1...0.15.0
Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ ipnet = { version = "2", features = ["serde"] }
num-derive = "0.3"
num-traits = "0.2"
postgres-protocol = "0.6"
regex = "1"
rand = "0.8"
ring = { version = "0.16", features = ["std"] }
rocksdb = "0.21"
Expand All @@ -39,6 +40,8 @@ serde_json = "1"
structured = "0.13"
strum = "0.24"
strum_macros = "0.24"
tar = "0.4"
tempfile = "3"
thiserror = "1"
tokio = { version = "1", features = ["macros"] }
tokio-postgres-rustls = "0.10"
Expand Down
222 changes: 162 additions & 60 deletions src/backup.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
//! Database backup utilities.
mod postgresql;

use crate::Store;
use anyhow::Result;
#[allow(clippy::module_name_repetitions)]
pub use self::postgresql::BackupConfig;
use self::postgresql::{create_backup_path, purge_old_postgres_backups};
use crate::{
backup::postgresql::{postgres_backup, postgres_backup_list, postgres_restore},
Store,
};
use anyhow::{anyhow, Result};
use chrono::{DateTime, TimeZone, Utc};
use rocksdb::backup::BackupEngineInfo;
use std::{sync::Arc, time::Duration};
use tokio::sync::{Notify, RwLock};
use tracing::{info, warn};
use tracing::info;

#[derive(Debug, Clone)]
#[allow(clippy::module_name_repetitions)]
pub struct BackupInfo {
pub id: u32,
Expand All @@ -19,7 +27,10 @@ impl From<BackupEngineInfo> for BackupInfo {
fn from(backup: BackupEngineInfo) -> Self {
Self {
id: backup.backup_id,
timestamp: Utc.timestamp_nanos(backup.timestamp),
timestamp: Utc
.timestamp_opt(backup.timestamp, 0)
.single()
.expect("Invalid timestamp value"),
size: backup.size,
}
}
Expand All @@ -29,8 +40,8 @@ impl From<BackupEngineInfo> for BackupInfo {
#[allow(clippy::module_name_repetitions)]
pub async fn schedule_periodic(
store: Arc<RwLock<Store>>,
backup_cfg: Arc<RwLock<BackupConfig>>,
schedule: (Duration, Duration),
backups_to_keep: u32,
stop: Arc<Notify>,
) {
use tokio::time::{sleep, Instant};
Expand All @@ -43,11 +54,10 @@ pub async fn schedule_periodic(
tokio::select! {
() = &mut sleep => {
sleep.as_mut().reset(Instant::now() + duration);
let _res = create(&store, false, backups_to_keep);
let _res = create(&store, false, &backup_cfg);
kimhanbeom marked this conversation as resolved.
Show resolved Hide resolved
}
_ = stop.notified() => {
info!("creating a database backup before shutdown");
let _res = create(&store, false, backups_to_keep);
let _res = create(&store, false, &backup_cfg);
stop.notify_one();
return;
}
Expand All @@ -61,110 +71,198 @@ pub async fn schedule_periodic(
/// # Errors
///
/// Returns an error if backup fails.
pub async fn create(store: &Arc<RwLock<Store>>, flush: bool, backups_to_keep: u32) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn create(
store: &Arc<RwLock<Store>>,
flush: bool,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<()> {
info!("backing up database...");
let res = {
let mut store = store.write().await;
store.backup(flush, backups_to_keep)
};
match res {
Ok(_) => {
info!("backing up database completed");
Ok(())
}
Err(e) => {
warn!("database backup failed: {:?}", e);
Err(e)
{
let num_of_backups = { backup_cfg.read().await.num_of_backups };
let mut backup_store = store.write().await;
if let Err(e) = backup_store.backup(flush, num_of_backups) {
return Err(anyhow!("failed to create key-value database backup: {e:?}"));
}
}

let backup_id_list = backup_id_list(store).await?;
let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_backup(&backup_cfg, backup_id_list) {
return Err(anyhow!(
"failed to create relational database backup: {e:?}"
));
}
Ok(())
}

/// Lists the backup information of the database.
///
/// # Errors
///
/// Returns an error if backup list fails to create
pub async fn list(store: &Arc<RwLock<Store>>) -> Result<Vec<BackupInfo>> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn list(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<Vec<BackupInfo>> {
let res = {
let store = store.read().await;
store.get_backup_info()
};
match res {
let mut backup_list: Vec<BackupInfo> = match res {
Ok(backup_list) => {
info!("generate database backup list");
Ok(backup_list
backup_list
.into_iter()
.map(std::convert::Into::into)
.collect())
.collect()
}
Err(e) => {
warn!("failed to generate backup list: {:?}", e);
Err(e)
return Err(anyhow!("failed to generate key-value backup list: {e:?}"));
}
};

let backup_cfg = &backup_cfg.read().await;
if let Err(e) = postgres_backup_list(&backup_cfg.backup_path, &mut backup_list) {
return Err(anyhow!(
"failed to add list information from a relational database: {e:?}"
));
}
Ok(backup_list)
}

/// Restores the database from a backup with the specified ID.
/// Restores the database from a backup. If a backup file ID is not provided,
/// restore based on the latest backup.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn restore(store: &Arc<RwLock<Store>>, backup_id: Option<u32>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
info!("restoring database from {:?}", backup_id);
pub async fn restore(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
backup_id: Option<u32>,
) -> Result<u32> {
let backup_id_list = backup_id_list(store).await?;
let backup_id = if let Some(id) = backup_id {
if !backup_id_list.contains(&id) {
return Err(anyhow!("backup {id} is not exist"));
}
info!("start database restore {}", id);
id
} else {
let Some(id) = backup_id_list.last() else {
return Err(anyhow!("backup is not exist"));
};
info!("start database restore from latest backup");
*id
};

let res = {
let mut store = store.write().await;
match &backup_id {
Some(id) => store.restore_from_backup(*id),
None => store.restore_from_latest_backup(),
}
store.restore_from_backup(backup_id)
};
if let Err(e) = res {
return Err(anyhow!(
"failed to restore key-value database from {backup_id}: {e:?}"
));
}

match res {
Ok(_) => {
info!("database restored from backup {:?}", backup_id);
Ok(())
}
Err(e) => {
warn!(
"failed to restore database from backup {:?}: {:?}",
backup_id, e
);
Err(e)
let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_restore(&backup_cfg, backup_id) {
return Err(anyhow!(
"failed to restore relational database from {backup_id}: {e:?}"
));
}
Ok(backup_id)
}

/// Returns the number of backups in the backup list.
///
/// # Errors
///
/// Returns an error if getting the number of backup lists fails.
pub async fn count(store: &Arc<RwLock<Store>>) -> Result<usize> {
let store = store.write().await;
Ok(store.get_backup_info()?.len())
}

/// Remove older backups based on the number of backups retained.
///
/// # Errors
///
/// Returns an error if removing old backup fails.
pub async fn purge_old_backups(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<()> {
{
let num_of_backups = { backup_cfg.read().await.num_of_backups };
let mut backup_store = store.write().await;
if let Err(e) = backup_store.purge_old_backups(num_of_backups) {
return Err(anyhow!("failed to purge key-value database: {e:?}"));
}
}

let backup_id_list = backup_id_list(store).await?;
let data_backup_path = {
let cfg = backup_cfg.read().await;
create_backup_path(&cfg)?
};
if let Err(e) = purge_old_postgres_backups(&data_backup_path, backup_id_list) {
return Err(anyhow!("failed to purge relational database: {e:?}"));
}
Ok(())
}

/// Restores the database from a backup with the specified ID.
///
/// # Errors
///
/// Returns an error if the restore operation fails.
pub async fn recover(store: &Arc<RwLock<Store>>) -> Result<()> {
// TODO: This function should be expanded to support PostgreSQL backups as well.
pub async fn recover(
store: &Arc<RwLock<Store>>,
backup_cfg: &Arc<RwLock<BackupConfig>>,
) -> Result<u32> {
info!("recovering database from latest valid backup");

let res = {
let mut store = store.write().await;
store.recover()
};

match res {
Ok(_) => {
info!("database recovered from backup");
Ok(())
}
let recovery_id = match res {
Ok(id) => id,
Err(e) => {
warn!("failed to recover database from backup: {e:?}");
Err(e)
return Err(anyhow!(
"failed to recover key-value database from backup: {e:?}"
));
}
};

let backup_cfg = backup_cfg.read().await;
if let Err(e) = postgres_restore(&backup_cfg, recovery_id) {
return Err(anyhow!(
"failed to recover relational database from backup {e:?}"
));
}
Ok(recovery_id)
}

/// Lists the backup id.
///
/// # Errors
///
/// Returns an error if backup id list fails to create
#[allow(clippy::module_name_repetitions)]
pub async fn backup_id_list(store: &Arc<RwLock<Store>>) -> Result<Vec<u32>> {
let store = store.read().await;
match store.get_backup_info() {
Ok(backup) => Ok(backup.into_iter().map(|b| b.backup_id).collect()),
Err(e) => Err(anyhow!("failed to generate backup id list: {e:?}")),
}
}

#[cfg(test)]
mod tests {
use crate::{event::DnsEventFields, EventKind, EventMessage, Store};
use crate::{backup::BackupConfig, event::DnsEventFields, EventKind, EventMessage, Store};
use bincode::Options;
use chrono::Utc;
use std::{
Expand Down Expand Up @@ -211,6 +309,10 @@ mod tests {
let db_dir = tempfile::tempdir().unwrap();
let backup_dir = tempfile::tempdir().unwrap();

let backup_cfg = BackupConfig::builder()
.backup_path(backup_dir.path())
.build();
let backup_cfg = Arc::new(RwLock::new(backup_cfg));
let store = Arc::new(RwLock::new(
Store::new(db_dir.path(), backup_dir.path()).unwrap(),
));
Expand Down Expand Up @@ -250,7 +352,7 @@ mod tests {
}

// get backup list
let backup_list = list(&store).await.unwrap();
let backup_list = list(&store, &backup_cfg).await.unwrap();
assert_eq!(backup_list.len(), 3);
assert_eq!(backup_list.get(0).unwrap().id, 1);
assert_eq!(backup_list.get(1).unwrap().id, 2);
Expand Down
Loading