Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backup): support mutating backup config #8505

Merged
merged 3 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ impl ValidateOnSet for OverrideValidateOnSet {
fn checkpoint_frequency(v: &u64) -> Result<()> {
Self::expect_range(*v, 1..)
}

fn backup_storage_directory(_v: &String) -> Result<()> {
// TODO
Ok(())
}

fn backup_storage_url(_v: &String) -> Result<()> {
// TODO
Ok(())
}
}

for_all_undeprecated_params!(impl_default_from_other_params);
Expand Down
20 changes: 13 additions & 7 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ pub async fn compute_node_serve(
.await
.unwrap();

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params.clone()));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let Some(storage) = state_store.as_hummock_trait() {
extra_info_sources.push(storage.sstable_id_manager().clone());
Expand Down Expand Up @@ -202,6 +209,12 @@ pub async fn compute_node_serve(
memory_limiter,
));
monitor_cache(memory_collector, &registry).unwrap();
let backup_reader = storage.backup_reader();
tokio::spawn(async move {
backup_reader
.watch_config_change(system_params_manager.watch_params())
.await;
});
}

sub_tasks.push(MetaClient::start_heartbeat_loop(
Expand Down Expand Up @@ -249,13 +262,6 @@ pub async fn compute_node_serve(
// of lru manager.
stream_mgr.set_watermark_epoch(watermark_epoch).await;

// Initialize observer manager.
let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params));
let compute_observer_node = ComputeObserverNode::new(system_params_manager.clone());
let observer_manager =
ObserverManager::new_with_meta_client(meta_client.clone(), compute_observer_node).await;
observer_manager.start().await;

let grpc_await_tree_reg = await_tree_config
.map(|config| AwaitTreeRegistryRef::new(await_tree::Registry::new(config).into()));
let dml_mgr = Arc::new(DmlManager::default());
Expand Down
149 changes: 136 additions & 13 deletions src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,24 @@
use std::sync::Arc;
use std::time::Instant;

use arc_swap::ArcSwap;
use itertools::Itertools;
use prometheus::Registry;
use risingwave_backup::error::BackupError;
use risingwave_backup::storage::MetaSnapshotStorageRef;
use risingwave_backup::storage::{BoxedMetaSnapshotStorage, ObjectStoreMetaSnapshotStorage};
use risingwave_backup::{MetaBackupJobId, MetaSnapshotId, MetaSnapshotManifest};
use risingwave_common::bail;
use risingwave_hummock_sdk::HummockSstableId;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::backup_service::{BackupJobStatus, MetaBackupManifestId};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use tokio::task::JoinHandle;

use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::backup_restore::metrics::BackupManagerMetrics;
use crate::hummock::{HummockManagerRef, HummockVersionSafePoint};
use crate::manager::{IdCategory, MetaSrvEnv};
use crate::manager::{IdCategory, LocalNotification, MetaSrvEnv};
use crate::storage::MetaStore;
use crate::MetaResult;

Expand Down Expand Up @@ -57,40 +60,118 @@ impl BackupJobHandle {
}

pub type BackupManagerRef<S> = Arc<BackupManager<S>>;
/// (url, dir)
type StoreConfig = (String, String);

/// `BackupManager` manages lifecycle of all existent backups and the running backup job.
pub struct BackupManager<S: MetaStore> {
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
backup_store: MetaSnapshotStorageRef,
backup_store: ArcSwap<(BoxedMetaSnapshotStorage, StoreConfig)>,
/// Tracks the running backup job. Concurrent jobs is not supported.
running_backup_job: tokio::sync::Mutex<Option<BackupJobHandle>>,
metrics: BackupManagerMetrics,
}

impl<S: MetaStore> BackupManager<S> {
pub fn new(
pub async fn new(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
backup_store: MetaSnapshotStorageRef,
registry: Registry,
store_url: &str,
store_dir: &str,
) -> MetaResult<Arc<Self>> {
let store_config = (store_url.to_string(), store_dir.to_string());
let store = create_snapshot_store(&store_config).await?;
tracing::info!(
"backup manager initialized: url={}, dir={}",
store_config.0,
store_config.1
);
let instance = Arc::new(Self::with_store(
env.clone(),
hummock_manager,
registry,
(store, store_config),
));
let (local_notification_tx, mut local_notification_rx) =
tokio::sync::mpsc::unbounded_channel();
env.notification_manager()
.insert_local_sender(local_notification_tx)
.await;
let this = instance.clone();
tokio::spawn(async move {
loop {
match local_notification_rx.recv().await {
Some(notification) => {
if let LocalNotification::SystemParamsChange(p) = notification {
let new_config = (
p.backup_storage_url().to_string(),
p.backup_storage_directory().to_string(),
);
this.handle_new_config(new_config).await;
}
}
None => {
return;
}
}
}
});
Ok(instance)
}

async fn handle_new_config(&self, new_config: StoreConfig) {
if self.backup_store.load().1 == new_config {
return;
}
if let Err(e) = self.set_store(new_config.clone()).await {
// Retry is driven by periodic system params notification.
tracing::warn!(
"failed to apply new backup config: url={}, dir={}, {:#?}",
new_config.0,
new_config.1,
e
);
}
}

fn with_store(
env: MetaSrvEnv<S>,
hummock_manager: HummockManagerRef<S>,
registry: Registry,
backup_store: (BoxedMetaSnapshotStorage, StoreConfig),
) -> Self {
Self {
env,
hummock_manager,
backup_store,
backup_store: ArcSwap::from_pointee(backup_store),
running_backup_job: tokio::sync::Mutex::new(None),
metrics: BackupManagerMetrics::new(registry),
}
}

pub async fn set_store(&self, config: StoreConfig) -> MetaResult<()> {
let new_store = create_snapshot_store(&config).await?;
tracing::info!(
"new backup config is applied: url={}, dir={}",
config.0,
config.1
);
self.backup_store.store(Arc::new((new_store, config)));
Ok(())
}

#[cfg(test)]
pub fn for_test(env: MetaSrvEnv<S>, hummock_manager: HummockManagerRef<S>) -> Self {
Self::new(
Self::with_store(
env,
hummock_manager,
Arc::new(risingwave_backup::storage::DummyMetaSnapshotStorage::default()),
Registry::new(),
(
Box::<risingwave_backup::storage::DummyMetaSnapshotStorage>::default(),
StoreConfig::default(),
),
)
}

Expand All @@ -104,6 +185,26 @@ impl<S: MetaStore> BackupManager<S> {
job.job_id
));
}
// The reasons to limit number of meta snapshot are:
// 1. limit size of `MetaSnapshotManifest`, which is kept in memory by
// `ObjectStoreMetaSnapshotStorage`.
// 2. limit number of pinned SSTs returned by
// `list_pinned_ssts`, which subsequently is used by GC.
const MAX_META_SNAPSHOT_NUM: usize = 100;
let current_number = self
.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.len();
if current_number > MAX_META_SNAPSHOT_NUM {
bail!(format!(
"too many existent meta snapshots, expect at most {}",
MAX_META_SNAPSHOT_NUM
))
}

let job_id = self
.env
.id_gen_manager()
Expand Down Expand Up @@ -134,6 +235,8 @@ impl<S: MetaStore> BackupManager<S> {
}
if self
.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
Expand All @@ -160,7 +263,7 @@ impl<S: MetaStore> BackupManager<S> {
.notify_hummock_without_version(
Operation::Update,
Info::MetaBackupManifestId(MetaBackupManifestId {
id: self.backup_store.manifest().manifest_id,
id: self.backup_store.load().0.manifest().manifest_id,
}),
);
}
Expand Down Expand Up @@ -188,13 +291,13 @@ impl<S: MetaStore> BackupManager<S> {

/// Deletes existent backups from backup storage.
pub async fn delete_backups(&self, ids: &[MetaSnapshotId]) -> MetaResult<()> {
self.backup_store.delete(ids).await?;
self.backup_store.load().0.delete(ids).await?;
self.env
.notification_manager()
.notify_hummock_without_version(
Operation::Update,
Info::MetaBackupManifestId(MetaBackupManifestId {
id: self.backup_store.manifest().manifest_id,
id: self.backup_store.load().0.manifest().manifest_id,
}),
);
Ok(())
Expand All @@ -203,6 +306,8 @@ impl<S: MetaStore> BackupManager<S> {
/// List all `SSTables` required by backups.
pub fn list_pinned_ssts(&self) -> Vec<HummockSstableId> {
self.backup_store
.load()
.0
.manifest()
.snapshot_metadata
.iter()
Expand All @@ -212,7 +317,7 @@ impl<S: MetaStore> BackupManager<S> {
}

pub fn manifest(&self) -> Arc<MetaSnapshotManifest> {
self.backup_store.manifest()
self.backup_store.load().0.manifest()
}
}

Expand All @@ -234,7 +339,12 @@ impl<S: MetaStore> BackupWorker<S> {
// Reuse job id as snapshot id.
snapshot_builder.build(job_id).await?;
let snapshot = snapshot_builder.finish()?;
backup_manager_clone.backup_store.create(&snapshot).await?;
backup_manager_clone
.backup_store
.load()
.0
.create(&snapshot)
.await?;
Ok(BackupJobResult::Succeeded)
};
tokio::spawn(async move {
Expand All @@ -245,3 +355,16 @@ impl<S: MetaStore> BackupWorker<S> {
})
}
}

async fn create_snapshot_store(config: &StoreConfig) -> MetaResult<BoxedMetaSnapshotStorage> {
let object_store = Arc::new(
parse_remote_object_store(
&config.0,
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
)
.await,
);
let store = ObjectStoreMetaSnapshotStorage::new(&config.1, object_store).await?;
Ok(Box::new(store))
}
29 changes: 6 additions & 23 deletions src/meta/src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@ use std::time::Duration;

use either::Either;
use etcd_client::ConnectOptions;
use risingwave_backup::storage::ObjectStoreMetaSnapshotStorage;
use risingwave_common::monitor::process_linux::monitor_process;
use risingwave_common_service::metrics_manager::MetricsManager;
use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::parse_remote_object_store;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
Expand Down Expand Up @@ -426,31 +423,17 @@ pub async fn start_service_as_election_leader<S: MetaStore>(
.await
.expect("list_table_fragments"),
)
.await
.unwrap();
.await?;

// Initialize services.
let backup_object_store = Arc::new(
parse_remote_object_store(
system_params_reader.backup_storage_url(),
Arc::new(ObjectStoreMetrics::unused()),
"Meta Backup",
)
.await,
);
let backup_storage = Arc::new(
ObjectStoreMetaSnapshotStorage::new(
system_params_reader.backup_storage_directory(),
backup_object_store,
)
.await?,
);
let backup_manager = Arc::new(BackupManager::new(
let backup_manager = BackupManager::new(
env.clone(),
hummock_manager.clone(),
backup_storage,
meta_metrics.registry().clone(),
));
system_params_reader.backup_storage_url(),
system_params_reader.backup_storage_directory(),
)
.await?;
let vacuum_manager = Arc::new(hummock::VacuumManager::new(
env.clone(),
hummock_manager.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/storage/backup/integration_tests/run_all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ tests=( \
"test_basic.sh" \
"test_pin_sst.sh" \
"test_query_backup.sh" \
"test_set_config.sh" \
)
for t in "${tests[@]}"
do
Expand Down
Loading