Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): refuse to start cluster if data directory is used by another instance #9642

Merged
merged 11 commits into from
May 9, 2023
27 changes: 19 additions & 8 deletions src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,32 @@ macro_rules! def_default {

for_all_undeprecated_params!(def_default);

macro_rules! impl_check_missing_fields {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr },)*) => {
/// Check if any undeprecated fields are missing.
pub fn check_missing_params(params: &SystemParams) -> Result<()> {
$(
if params.$field.is_none() {
return Err(format!("missing system param {:?}", key_of!($field)));
}
)*
Ok(())
}
};
}

/// Derive serialization to kv pairs.
macro_rules! impl_system_params_to_kv {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr },)*) => {
/// The returned map only contains undeprecated fields.
/// Return error if there are missing fields.
#[allow(clippy::vec_init_then_push)]
pub fn system_params_to_kv(params: &SystemParams) -> Result<Vec<(String, String)>> {
let mut ret = Vec::with_capacity(9);
check_missing_params(params)?;
let mut ret = Vec::new();
$(ret.push((
key_of!($field).to_string(),
params
.$field.as_ref()
.ok_or_else(||format!(
"missing system param {:?}",
key_of!($field)
))?
.to_string(),
params.$field.as_ref().unwrap().to_string(),
));)*
Ok(ret)
}
Expand Down Expand Up @@ -292,6 +302,7 @@ macro_rules! impl_system_params_for_test {
for_all_params!(impl_system_params_from_kv);
for_all_params!(impl_is_mutable);
for_all_undeprecated_params!(impl_derive_missing_fields);
for_all_undeprecated_params!(impl_check_missing_fields);
for_all_undeprecated_params!(impl_system_params_to_kv);
for_all_undeprecated_params!(impl_set_system_param);
for_all_undeprecated_params!(impl_default_validation_on_set);
Expand Down
18 changes: 8 additions & 10 deletions src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ use risingwave_pb::meta::SystemParams;
use risingwave_pb::user::UserInfo;

use crate::manager::model::SystemParamsModel;
use crate::model::MetadataModel;
use crate::model::{ClusterId, MetadataModel};
use crate::storage::{MetaStore, Snapshot, DEFAULT_COLUMN_FAMILY};
use crate::telemetry::TrackingId;

const VERSION: u32 = 1;

Expand Down Expand Up @@ -117,9 +116,9 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
.ok_or_else(|| anyhow!("system params not found in meta store"))?;

// tracking_id is always created in meta store
let tracking_id = TrackingId::from_snapshot::<S>(&meta_store_snapshot)
.await
.map_err(|_| anyhow!("tracking id not found in meta store"))?
let cluster_id = ClusterId::from_snapshot::<S>(&meta_store_snapshot)
.await?
.ok_or_else(|| anyhow!("cluster id not found in meta store"))?
.into();

self.snapshot.metadata = ClusterMetadata {
Expand All @@ -139,7 +138,7 @@ impl<S: MetaStore> MetaSnapshotBuilder<S> {
function,
connection,
system_param,
tracking_id,
cluster_id,
};
Ok(())
}
Expand Down Expand Up @@ -175,9 +174,8 @@ mod tests {

use crate::backup_restore::meta_snapshot_builder::MetaSnapshotBuilder;
use crate::manager::model::SystemParamsModel;
use crate::model::MetadataModel;
use crate::model::{ClusterId, MetadataModel};
use crate::storage::{MemStore, MetaStore, DEFAULT_COLUMN_FAMILY};
use crate::telemetry::TrackingId;

#[tokio::test]
async fn test_snapshot_builder() {
Expand Down Expand Up @@ -228,9 +226,9 @@ mod tests {
.await
.unwrap_err();
let err = assert_matches!(err, BackupError::Other(e) => e);
assert_eq!("tracking id not found in meta store", err.to_error_str());
assert_eq!("cluster id not found in meta store", err.to_error_str());

TrackingId::new()
ClusterId::new()
.put_at_meta_store(&meta_store)
.await
.unwrap();
Expand Down
49 changes: 45 additions & 4 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant};

use arc_swap::ArcSwap;
use bytes::Bytes;
use fail::fail_point;
use function_name::named;
use itertools::Itertools;
Expand All @@ -46,6 +47,7 @@ use risingwave_pb::hummock::{
IntraLevelDelta, LevelType, TableOption,
};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_sqlparser::keywords::CLUSTER;
use tokio::sync::oneshot::Sender;
use tokio::sync::{Notify, RwLockWriteGuard};
use tokio::task::JoinHandle;
Expand All @@ -65,7 +67,8 @@ use crate::manager::{
CatalogManagerRef, ClusterManagerRef, IdCategory, LocalNotification, MetaSrvEnv, META_NODE_ID,
};
use crate::model::{
BTreeMapEntryTransaction, BTreeMapTransaction, MetadataModel, ValTransaction, VarTransaction,
BTreeMapEntryTransaction, BTreeMapTransaction, ClusterId, MetadataModel, ValTransaction,
VarTransaction,
};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::{MetaStore, Transaction};
Expand Down Expand Up @@ -166,7 +169,7 @@ use risingwave_hummock_sdk::compaction_group::{StateTableId, StaticCompactionGro
use risingwave_hummock_sdk::table_stats::{
add_prost_table_stats_map, purge_prost_table_stats, PbTableStatsMap,
};
use risingwave_object_store::object::{parse_remote_object_store, ObjectStoreRef};
use risingwave_object_store::object::{parse_remote_object_store, ObjectError, ObjectStoreRef};
use risingwave_pb::catalog::Table;
use risingwave_pb::hummock::level_handler::RunningCompactTask;
use risingwave_pb::hummock::version_update_payload::Payload;
Expand Down Expand Up @@ -284,9 +287,10 @@ where
compaction_group_manager: tokio::sync::RwLock<CompactionGroupManager>,
catalog_manager: CatalogManagerRef<S>,
) -> Result<HummockManagerRef<S>> {
let sys_params = env.system_params_manager().get_params().await;
let sys_params_manager = env.system_params_manager();
let sys_params = sys_params_manager.get_params().await;
let state_store_url = sys_params.state_store();
let state_store_dir = sys_params.data_directory();
let state_store_dir: &str = sys_params.data_directory();
let object_store = Arc::new(
parse_remote_object_store(
state_store_url.strip_prefix("hummock+").unwrap_or("memory"),
Expand All @@ -295,6 +299,16 @@ where
)
.await,
);

// Make sure data dir is not used by another cluster.
if env.cluster_first_launch() {
write_exclusive_cluster_id(
state_store_dir,
env.cluster_id().clone(),
object_store.clone(),
)
.await?;
}
let checkpoint_path = version_checkpoint_path(state_store_dir);
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let instance = HummockManager {
Expand Down Expand Up @@ -2265,3 +2279,30 @@ fn gen_version_delta<'a>(

version_delta
}

async fn write_exclusive_cluster_id(
state_store_dir: &str,
cluster_id: ClusterId,
object_store: ObjectStoreRef,
) -> Result<()> {
const CLUSTER_ID_DIR: &str = "cluster_id";
const CLUSTER_ID_NAME: &str = "0";

let cluster_id_dir = format!("{}/{}/", state_store_dir, CLUSTER_ID_DIR);
let cluster_id_full_path = format!("{}{}", cluster_id_dir, CLUSTER_ID_NAME);
let metadata = object_store.list(&cluster_id_dir).await?;

if metadata.is_empty() {
object_store
.upload(&cluster_id_full_path, Bytes::from(String::from(cluster_id)))
.await?;
Ok(())
} else {
let cluster_id = object_store.read(&cluster_id_full_path, None).await?;
Err(ObjectError::internal(format!(
"data directory is already used by another cluster with id {:?}",
String::from_utf8(cluster_id.to_vec()).unwrap()
))
.into())
}
}
29 changes: 28 additions & 1 deletion src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::manager::{
IdGeneratorManager, IdGeneratorManagerRef, IdleManager, IdleManagerRef, NotificationManager,
NotificationManagerRef,
};
use crate::model::ClusterId;
#[cfg(any(test, feature = "test"))]
use crate::storage::MemStore;
use crate::storage::MetaStore;
Expand Down Expand Up @@ -53,6 +54,12 @@ where
/// system param manager.
system_params_manager: SystemParamsManagerRef<S>,

/// Unique identifier of the cluster.
cluster_id: ClusterId,

/// Whether the cluster is launched for the first time.
cluster_first_launch: bool,

/// options read by all services
pub opts: Arc<MetaOpts>,
}
Expand Down Expand Up @@ -169,22 +176,30 @@ where
let stream_client_pool = Arc::new(StreamClientPool::default());
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms));
let (cluster_id, cluster_first_launch) =
if let Some(id) = ClusterId::from_meta_store(&meta_store).await? {
(id, false)
} else {
(ClusterId::new(), true)
};
let system_params_manager = Arc::new(
SystemParamsManager::new(
meta_store.clone(),
notification_manager.clone(),
init_system_params,
cluster_first_launch,
)
.await?,
);

Ok(Self {
id_gen_manager,
meta_store,
notification_manager,
stream_client_pool,
idle_manager,
system_params_manager,
cluster_id,
cluster_first_launch,
opts: opts.into(),
})
}
Expand Down Expand Up @@ -236,6 +251,14 @@ where
pub fn stream_client_pool(&self) -> &StreamClientPool {
self.stream_client_pool.deref()
}

pub fn cluster_id(&self) -> &ClusterId {
&self.cluster_id
}

pub fn cluster_first_launch(&self) -> bool {
self.cluster_first_launch
}
}

#[cfg(any(test, feature = "test"))]
Expand All @@ -252,11 +275,13 @@ impl MetaSrvEnv<MemStore> {
let notification_manager = Arc::new(NotificationManager::new(meta_store.clone()).await);
let stream_client_pool = Arc::new(StreamClientPool::default());
let idle_manager = Arc::new(IdleManager::disabled());
let (cluster_id, cluster_first_launch) = (ClusterId::new(), true);
let system_params_manager = Arc::new(
SystemParamsManager::new(
meta_store.clone(),
notification_manager.clone(),
risingwave_common::system_param::system_params_for_test(),
true,
)
.await
.unwrap(),
Expand All @@ -269,6 +294,8 @@ impl MetaSrvEnv<MemStore> {
stream_client_pool,
idle_manager,
system_params_manager,
cluster_id,
cluster_first_launch,
opts,
}
}
Expand Down
43 changes: 32 additions & 11 deletions src/meta/src/manager/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@

pub mod model;

use std::ops::DerefMut;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use risingwave_common::system_param::reader::SystemParamsReader;
use risingwave_common::system_param::set_system_param;
use risingwave_common::system_param::{check_missing_params, set_system_param};
use risingwave_common::{for_all_undeprecated_params, key_of};
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SystemParams;
use tokio::sync::oneshot::Sender;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::info;

use self::model::SystemParamsModel;
use super::NotificationManagerRef;
Expand All @@ -37,7 +39,9 @@ pub type SystemParamsManagerRef<S> = Arc<SystemParamsManager<S>>;

pub struct SystemParamsManager<S: MetaStore> {
meta_store: Arc<S>,
// Notify workers and local subscribers of parameter change.
notification_manager: NotificationManagerRef<S>,
// Cached parameters.
params: RwLock<SystemParams>,
}

Expand All @@ -47,16 +51,20 @@ impl<S: MetaStore> SystemParamsManager<S> {
meta_store: Arc<S>,
notification_manager: NotificationManagerRef<S>,
init_params: SystemParams,
cluster_first_launch: bool,
) -> MetaResult<Self> {
let persisted = SystemParams::get(meta_store.as_ref()).await?;

let params = if let Some(persisted) = persisted {
let params = if cluster_first_launch {
init_params
} else if let Some(persisted) = SystemParams::get(meta_store.as_ref()).await? {
merge_params(persisted, init_params)
} else {
init_params
return Err(MetaError::system_param(
"cluster is not newly created but no system parameters can be found",
));
};

SystemParams::insert(&params, meta_store.as_ref()).await?;
info!("system parameters: {:?}", params);
check_missing_params(&params).map_err(|e| anyhow!(e))?;

Ok(Self {
meta_store,
Expand Down Expand Up @@ -99,6 +107,14 @@ impl<S: MetaStore> SystemParamsManager<S> {
Ok(())
}

/// Flush the cached params to meta store.
pub async fn flush_params(&self) -> MetaResult<()> {
Ok(
SystemParams::insert(self.params.read().await.deref(), self.meta_store.as_ref())
.await?,
)
}

// Periodically sync params to worker nodes.
pub async fn start_params_notifier(
system_params_manager: Arc<Self>,
Expand Down Expand Up @@ -141,23 +157,28 @@ impl<S: MetaStore> SystemParamsManager<S> {
}

// For each field in `persisted` and `init`
// 1. Some, None: Params not from CLI need not be validated. Use persisted value.
// 1. Some, None: The persisted field is deprecated, so just ignore it.
// 2. Some, Some: Check equality and warn if they differ.
// 3. None, Some: A new version of RW cluster is launched for the first time and newly introduced
// params are not set. Use init value.
// 4. None, None: Impossible.
// 4. None, None: A new version of RW cluster is launched for the first time and newly introduced
// params are not set. The new field is not initialized either, just leave it as `None`.
macro_rules! impl_merge_params {
($({ $field:ident, $type:ty, $default:expr, $is_mutable:expr },)*) => {
fn merge_params(mut persisted: SystemParams, init: SystemParams) -> SystemParams {
$(
match (persisted.$field.as_ref(), init.$field) {
(Some(persisted), Some(init)) => {
if persisted != &init {
tracing::warn!("System parameters \"{:?}\" from CLI and config file ({}) differ from persisted ({})", key_of!($field), init, persisted);
tracing::warn!(
"The initializing value of \"{:?}\" ({}) differ from persisted ({}), using persisted value",
key_of!($field),
init,
persisted
);
}
},
(None, Some(init)) => persisted.$field = Some(init),
(None, None) => unreachable!(),
_ => {},
}
)*
Expand Down
Loading