Skip to content

Commit

Permalink
feat(backup): support mutating backup config (#8505)
Browse files Browse the repository at this point in the history
Co-authored-by: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com>
  • Loading branch information
zwang28 and Gun9niR authored Mar 13, 2023
1 parent 85e450d commit 428354d
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 80 deletions.
10 changes: 10 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ impl ValidateOnSet for OverrideValidateOnSet {
fn checkpoint_frequency(v: &u64) -> Result<()> {
Self::expect_range(*v, 1..)
}

fn backup_storage_directory(_v: &String) -> Result<()> {
// TODO
Ok(())
}

fn backup_storage_url(_v: &String) -> Result<()> {
// TODO
Ok(())
}
}

for_all_undeprecated_params!(impl_default_from_other_params);
Expand Down
20 changes: 13 additions & 7 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@ pub async fn compute_node_serve(
.await
.unwrap();

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let Some(storage) = state_store.as_hummock_trait() {
extra_info_sources.push(storage.sstable_id_manager().clone());
Expand Down Expand Up @@ -206,6 +213,12 @@ pub async fn compute_node_serve(
memory_limiter,
));
monitor_cache(memory_collector, &registry).unwrap();
let backup_reader = storage.backup_reader();
tokio::spawn(async move {
backup_reader
.watch_config_change(system_params_manager.watch_params())
.await;
});
}

sub_tasks.push(MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -253,13 +266,6 @@ pub async fn compute_node_serve(
// of lru manager.
stream_mgr.set_watermark_epoch(watermark_epoch).await;

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
let dml_mgr = Arc::new(DmlManager::default());
Expand Down
149 changes: 136 additions & 13 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@
use std::sync::Arc;
use std::time::Instant;

use arc_swap::ArcSwap;
use itertools::Itertools;
use prometheus::Registry;
use risingwave_backup::error::BackupError;
use risingwave_backup::storage::MetaSnapshotStorageRef;
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
use risingwave_common::bail;
use risingwave_hummock_sdk::HummockSstableId;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::task::JoinHandle;

use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::backup_restore::metrics::BackupManagerMetrics;
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
use crate::manager::{IdCategory, MetaSrvEnv};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
use crate::storage::MetaStore;
use crate::MetaResult;

Expand Down Expand Up @@ -57,40 +60,118 @@ impl BackupJobHandle {
}

pub type BackupManagerRef<S> = Arc<BackupManager<S>>;
/// (url, dir)
type StoreConfig = (String, String);

/// `BackupManager` manages lifecycle of all existent backups and the running backup job.
pub struct BackupManager<S: MetaStore> {
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
backup_store: MetaSnapshotStorageRef,
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
/// Tracks the running backup job. Concurrent jobs is not supported.
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
metrics: BackupManagerMetrics,
}

impl<S: MetaStore> BackupManager<S> {
pub fn new(
pub async fn new(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
backup_store: MetaSnapshotStorageRef,
registry: Registry,
store_url: &str,
store_dir: &str,
) -> MetaResult<Arc<Self>> {
let store_config = (store_url.to_string(), store_dir.to_string());
let store = create_snapshot_store(&store_config).await?;
tracing::info!(
"backup manager initialized: url={}, dir={}",
store_config.0,
store_config.1
);
let instance = Arc::new(Self::with_store(
env.clone(),
hummock_manager,
registry,
(store, store_config),
));
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
env.notification_manager()
.insert_local_sender(local_notification_tx)
.await;
let this = instance.clone();
tokio::spawn(async move {
loop {
match local_notification_rx.recv().await {
Some(notification) => {
if let LocalNotification::SystemParamsChange(p) = notification {
let new_config = (
p.backup_storage_url().to_string(),
p.backup_storage_directory().to_string(),
);
this.handle_new_config(new_config).await;
}
}
None => {
return;
}
}
}
});
Ok(instance)
}

async fn handle_new_config(&self, new_config: StoreConfig) {
if self.backup_store.load().1 == new_config {
return;
}
if let Err(e) = self.set_store(new_config.clone()).await {
// Retry is driven by periodic system params notification.
tracing::warn!(
"failed to apply new backup config: url={}, dir={}, {:#?}",
new_config.0,
new_config.1,
e
);
}
}

fn with_store(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
registry: Registry,
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
) -> Self {
Self {
env,
hummock_manager,
backup_store,
backup_store: ArcSwap::from_pointee(backup_store),
running_backup_job: tokio::sync::Mutex::new(None),
metrics: BackupManagerMetrics::new(registry),
}
}

pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
let new_store = create_snapshot_store(&config).await?;
tracing::info!(
"new backup config is applied: url={}, dir={}",
config.0,
config.1
);
self.backup_store.store(Arc::new((new_store, config)));
Ok(())
}

#[cfg(test)]
pub fn for_test(env: MetaSrvEnv<S>, hummock_manager: HummockManagerRef<S>) -> Self {
Self::new(
Self::with_store(
env,
hummock_manager,
Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()),
Registry::new(),
(
Box::<risingwave_backup::storage::DummyMetaSnapshotStorage>::default(),
StoreConfig::default(),
),
)
}

Expand All @@ -104,6 +185,26 @@ impl<S: MetaStore> BackupManager<S> {
job.job_id
));
}
// The reasons to limit number of meta snapshot are:
// 1. limit size of `MetaSnapshotManifest`, which is kept in memory by
// `ObjectStoreMetaSnapshotStorage`.
// 2. limit number of pinned SSTs returned by
// `list_pinned_ssts`, which subsequently is used by GC.
const MAX_META_SNAPSHOT_NUM: usize = 100;
let current_number = self
.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.len();
if current_number > MAX_META_SNAPSHOT_NUM {
bail!(format!(
"too many existent meta snapshots, expect at most {}",
MAX_META_SNAPSHOT_NUM
))
}

let job_id = self
.env
.id_gen_manager()
Expand Down Expand Up @@ -134,6 +235,8 @@ impl<S: MetaStore> BackupManager<S> {
}
if self
.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
Expand All @@ -160,7 +263,7 @@ impl<S: MetaStore> BackupManager<S> {
.notify_hummock_without_version(
Operation::Update,
Info::MetaBackupManifestId(MetaBackupManifestId {
id: self.backup_store.manifest().manifest_id,
id: self.backup_store.load().0.manifest().manifest_id,
}),
);
}
Expand Down Expand Up @@ -188,13 +291,13 @@ impl<S: MetaStore> BackupManager<S> {

/// Deletes existent backups from backup storage.
pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> {
self.backup_store.delete(ids).await?;
self.backup_store.load().0.delete(ids).await?;
self.env
.notification_manager()
.notify_hummock_without_version(
Operation::Update,
Info::MetaBackupManifestId(MetaBackupManifestId {
id: self.backup_store.manifest().manifest_id,
id: self.backup_store.load().0.manifest().manifest_id,
}),
);
Ok(())
Expand All @@ -203,6 +306,8 @@ impl<S: MetaStore> BackupManager<S> {
/// List all `SSTables` required by backups.
pub fn list_pinned_ssts(&self) -> Vec<HummockSstableId> {
self.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
Expand All @@ -212,7 +317,7 @@ impl<S: MetaStore> BackupManager<S> {
}

pub fn manifest(&self) -> Arc<MetaSnapshotManifest> {
self.backup_store.manifest()
self.backup_store.load().0.manifest()
}
}

Expand All @@ -234,7 +339,12 @@ impl<S: MetaStore> BackupWorker<S> {
// Reuse job id as snapshot id.
snapshot_builder.build(job_id).await?;
let snapshot = snapshot_builder.finish()?;
backup_manager_clone.backup_store.create(&snapshot).await?;
backup_manager_clone
.backup_store
.load()
.0
.create(&snapshot)
.await?;
Ok(BackupJobResult::Succeeded)
};
tokio::spawn(async move {
Expand All @@ -245,3 +355,16 @@ impl<S: MetaStore> BackupWorker<S> {
})
}
}

async fn create_snapshot_store(config: &StoreConfig) -> MetaResult<BoxedMetaSnapshotStorage> {
let object_store = Arc::new(
parse_remote_object_store(
&config.0,
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
)
.await,
);
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
Ok(Box::new(store))
}
29 changes: 6 additions & 23 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ use std::time::Duration;

use either::Either;
use etcd_client::ConnectOptions;
use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage;
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common_service::metrics_manager::MetricsManager;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
Expand Down Expand Up @@ -426,31 +423,17 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.await
.expect("list_table_fragments"),
)
.await
.unwrap();
.await?;

// Initialize services.
let backup_object_store = Arc::new(
parse_remote_object_store(
system_params_reader.backup_storage_url(),
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
)
.await,
);
let backup_storage = Arc::new(
ObjectStoreMetaSnapshotStorage::new(
system_params_reader.backup_storage_directory(),
backup_object_store,
)
.await?,
);
let backup_manager = Arc::new(BackupManager::new(
let backup_manager = BackupManager::new(
env.clone(),
hummock_manager.clone(),
backup_storage,
meta_metrics.registry().clone(),
));
system_params_reader.backup_storage_url(),
system_params_reader.backup_storage_directory(),
)
.await?;
let vacuum_manager = Arc::new(hummock::VacuumManager::new(
env.clone(),
hummock_manager.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/storage/backup/integration_tests/run_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ tests=( \
"test_basic.sh" \
"test_pin_sst.sh" \
"test_query_backup.sh" \
"test_set_config.sh" \
)
for t in "${tests[@]}"
do
Expand Down
Loading

0 comments on commit 428354d

Please sign in to comment.