From 74f689560d1260c11515ba578aeb8a1245f83992 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 10:47:04 +0000 Subject: [PATCH 1/9] support reading params --- dashboard/proto/gen/meta.ts | 60 ++++++- proto/meta.proto | 11 +- src/common/src/error.rs | 8 +- src/common/src/lib.rs | 1 + src/common/src/system_param.rs | 124 +++++++++++++++ src/frontend/src/handler/mod.rs | 2 +- src/frontend/src/handler/variable.rs | 45 +++++- src/frontend/src/meta_client.rs | 8 +- src/frontend/src/test_utils.rs | 6 + src/meta/src/manager/system_param/mod.rs | 4 +- src/meta/src/manager/system_param/model.rs | 149 ++---------------- src/meta/src/rpc/server.rs | 12 +- src/meta/src/rpc/service/cluster_service.rs | 15 +- src/meta/src/rpc/service/mod.rs | 1 + .../src/rpc/service/system_params_service.rs | 50 ++++++ src/rpc_client/src/meta_client.rs | 49 ++++-- 16 files changed, 362 insertions(+), 183 deletions(-) create mode 100644 src/common/src/system_param.rs create mode 100644 src/meta/src/rpc/service/system_params_service.rs diff --git a/dashboard/proto/gen/meta.ts b/dashboard/proto/gen/meta.ts index ed505ac413e65..058d3e7f9477a 100644 --- a/dashboard/proto/gen/meta.ts +++ b/dashboard/proto/gen/meta.ts @@ -338,7 +338,6 @@ export interface AddWorkerNodeRequest { export interface AddWorkerNodeResponse { status: Status | undefined; node: WorkerNode | undefined; - systemParams: SystemParams | undefined; } export interface ActivateWorkerNodeRequest { @@ -555,6 +554,13 @@ export interface SystemParams { backupStorageDirectory?: string | undefined; } +export interface GetSystemParamsRequest { +} + +export interface GetSystemParamsResponse { + params: SystemParams | undefined; +} + function createBaseHeartbeatRequest(): HeartbeatRequest { return { nodeId: 0, info: [] }; } @@ -1307,7 +1313,7 @@ export const AddWorkerNodeRequest = { }; function createBaseAddWorkerNodeResponse(): AddWorkerNodeResponse { - return { status: undefined, node: undefined, systemParams: undefined }; + return { status: undefined, node: undefined }; } export const AddWorkerNodeResponse = { @@ -1315,7 +1321,6 @@ export const AddWorkerNodeResponse = { return { status: isSet(object.status) ? Status.fromJSON(object.status) : undefined, node: isSet(object.node) ? WorkerNode.fromJSON(object.node) : undefined, - systemParams: isSet(object.systemParams) ? SystemParams.fromJSON(object.systemParams) : undefined, }; }, @@ -1323,8 +1328,6 @@ export const AddWorkerNodeResponse = { const obj: any = {}; message.status !== undefined && (obj.status = message.status ? Status.toJSON(message.status) : undefined); message.node !== undefined && (obj.node = message.node ? WorkerNode.toJSON(message.node) : undefined); - message.systemParams !== undefined && - (obj.systemParams = message.systemParams ? SystemParams.toJSON(message.systemParams) : undefined); return obj; }, @@ -1336,9 +1339,6 @@ export const AddWorkerNodeResponse = { message.node = (object.node !== undefined && object.node !== null) ? WorkerNode.fromPartial(object.node) : undefined; - message.systemParams = (object.systemParams !== undefined && object.systemParams !== null) - ? SystemParams.fromPartial(object.systemParams) - : undefined; return message; }, }; @@ -2403,6 +2403,50 @@ export const SystemParams = { }, }; +function createBaseGetSystemParamsRequest(): GetSystemParamsRequest { + return {}; +} + +export const GetSystemParamsRequest = { + fromJSON(_: any): GetSystemParamsRequest { + return {}; + }, + + toJSON(_: GetSystemParamsRequest): unknown { + const obj: any = {}; + return obj; + }, + + fromPartial, I>>(_: I): GetSystemParamsRequest { + const message = createBaseGetSystemParamsRequest(); + return message; + }, +}; + +function createBaseGetSystemParamsResponse(): GetSystemParamsResponse { + return { params: undefined }; +} + +export const GetSystemParamsResponse = { + fromJSON(object: any): GetSystemParamsResponse { + return { params: isSet(object.params) ? SystemParams.fromJSON(object.params) : undefined }; + }, + + toJSON(message: GetSystemParamsResponse): unknown { + const obj: any = {}; + message.params !== undefined && (obj.params = message.params ? SystemParams.toJSON(message.params) : undefined); + return obj; + }, + + fromPartial, I>>(object: I): GetSystemParamsResponse { + const message = createBaseGetSystemParamsResponse(); + message.params = (object.params !== undefined && object.params !== null) + ? SystemParams.fromPartial(object.params) + : undefined; + return message; + }, +}; + type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; export type DeepPartial = T extends Builtin ? T diff --git a/proto/meta.proto b/proto/meta.proto index ea0c8ff319a2f..8ec4eaa84880f 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -147,7 +147,6 @@ message AddWorkerNodeRequest { message AddWorkerNodeResponse { common.Status status = 1; common.WorkerNode node = 2; - SystemParams system_params = 3; } message ActivateWorkerNodeRequest { @@ -324,3 +323,13 @@ message SystemParams { optional string backup_storage_url = 8; optional string backup_storage_directory = 9; } + +message GetSystemParamsRequest {} + +message GetSystemParamsResponse { + SystemParams params = 1; +} + +service SystemParamsService { + rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse); +} diff --git a/src/common/src/error.rs b/src/common/src/error.rs index 4a79d74211f3d..2a7ee56765790 100644 --- a/src/common/src/error.rs +++ b/src/common/src/error.rs @@ -128,16 +128,12 @@ pub enum ErrorCode { InvalidParameterValue(String), #[error("Sink error: {0}")] SinkError(BoxedError), - #[error("Permission denied: {0}")] PermissionDenied(String), - #[error("unrecognized configuration parameter \"{0}\"")] UnrecognizedConfigurationParameter(String), -} - -pub fn internal_err(msg: impl Into) -> RwError { - ErrorCode::InternalError(msg.into().to_string()).into() + #[error("SystemParams error: {0}")] + SystemParamsError(String), } pub fn internal_error(msg: impl Into) -> RwError { diff --git a/src/common/src/lib.rs b/src/common/src/lib.rs index 1de7ac46a90b0..601780436130f 100644 --- a/src/common/src/lib.rs +++ b/src/common/src/lib.rs @@ -52,6 +52,7 @@ pub mod hash; pub mod monitor; pub mod row; pub mod session_config; +pub mod system_param; #[cfg(test)] pub mod test_utils; pub mod types; diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs new file mode 100644 index 0000000000000..f4ea0467c118d --- /dev/null +++ b/src/common/src/system_param.rs @@ -0,0 +1,124 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use risingwave_pb::meta::SystemParams; + +use crate::error::{ErrorCode, RwError}; + +pub const BARRIER_INTERVAL_MS_KEY: &str = "barrier_interval_ms"; +pub const CHECKPOINT_FREQUENCY_KEY: &str = "checkpoint_interval"; +pub const SSTABLE_SIZE_MB_KEY: &str = "sstable_size_mb"; +pub const BLOCK_SIZE_KB_KEY: &str = "block_size_kb"; +pub const BLOOM_FALSE_POSITIVE_KEY: &str = "bloom_false_positive"; +pub const STATE_STORE_KEY: &str = "state_store"; +pub const DATA_DIRECTORY_KEY: &str = "data_directory"; +pub const BACKUP_STORAGE_URL_KEY: &str = "backup_storage_url"; +pub const BACKUP_STORAGE_DIRECTORY_KEY: &str = "backup_storage_directory"; + +type Result = core::result::Result; + +macro_rules! for_all_undeprecated_params { + ($macro:ident) => { + $macro! { + { barrier_interval_ms, BARRIER_INTERVAL_MS_KEY }, + { checkpoint_frequency, CHECKPOINT_FREQUENCY_KEY }, + { sstable_size_mb, SSTABLE_SIZE_MB_KEY }, + { block_size_kb, BLOCK_SIZE_KB_KEY }, + { bloom_false_positive, BLOOM_FALSE_POSITIVE_KEY }, + { state_store, STATE_STORE_KEY }, + { data_directory, DATA_DIRECTORY_KEY }, + { backup_storage_url, BACKUP_STORAGE_URL_KEY }, + { backup_storage_directory, BACKUP_STORAGE_DIRECTORY_KEY }, + } + }; +} + +macro_rules! impl_system_params_to_kv { + ($({ $field:ident, $key:expr },)*) => { + /// All undeprecated fields are guaranteed to be contained in the returned map. + /// Return error if there are missing fields. + pub fn system_params_to_kv(params: &SystemParams) -> Result> { + let mut ret = Vec::with_capacity(9); + $(ret.push(( + $key.to_string(), + params + .$field.as_ref() + .ok_or::(ErrorCode::SystemParamsError(format!( + "missing system param {:?}", + $key + )).into())? + .to_string(), + ));)* + Ok(ret) + } + }; +} + +/// For each field in `SystemParams`, one of these rules apply: +/// - Up-to-date: Required. If it is not present, may try to derive it from previous versions of +/// this field. +/// - Deprecated: Optional. +/// - Unrecognized: Not allowed. +pub fn system_params_from_kv(kvs: Vec<(Vec, Vec)>) -> Result { + let mut ret = SystemParams::default(); + let mut expected_keys: HashSet<_> = [ + BARRIER_INTERVAL_MS_KEY, + CHECKPOINT_FREQUENCY_KEY, + SSTABLE_SIZE_MB_KEY, + BLOCK_SIZE_KB_KEY, + BLOOM_FALSE_POSITIVE_KEY, + STATE_STORE_KEY, + DATA_DIRECTORY_KEY, + BACKUP_STORAGE_URL_KEY, + BACKUP_STORAGE_DIRECTORY_KEY, + ] + .iter() + .cloned() + .collect(); + for (k, v) in kvs { + let k = String::from_utf8(k).unwrap(); + let v = String::from_utf8(v).unwrap(); + match k.as_str() { + BARRIER_INTERVAL_MS_KEY => ret.barrier_interval_ms = Some(v.parse().unwrap()), + CHECKPOINT_FREQUENCY_KEY => ret.checkpoint_frequency = Some(v.parse().unwrap()), + SSTABLE_SIZE_MB_KEY => ret.sstable_size_mb = Some(v.parse().unwrap()), + BLOCK_SIZE_KB_KEY => ret.block_size_kb = Some(v.parse().unwrap()), + BLOOM_FALSE_POSITIVE_KEY => ret.bloom_false_positive = Some(v.parse().unwrap()), + STATE_STORE_KEY => ret.state_store = Some(v), + DATA_DIRECTORY_KEY => ret.data_directory = Some(v), + BACKUP_STORAGE_URL_KEY => ret.backup_storage_url = Some(v), + BACKUP_STORAGE_DIRECTORY_KEY => ret.backup_storage_directory = Some(v), + _ => { + return Err(ErrorCode::SystemParamsError(format!( + "unrecognized system param {:?}", + k + )) + .into()); + } + } + expected_keys.remove(k.as_str()); + } + if !expected_keys.is_empty() { + return Err(ErrorCode::SystemParamsError(format!( + "missing system param {:?}", + expected_keys + )) + .into()); + } + Ok(ret) +} + +for_all_undeprecated_params!(impl_system_params_to_kv); diff --git a/src/frontend/src/handler/mod.rs b/src/frontend/src/handler/mod.rs index 05fed15a0f4ac..f9bafa77dc420 100644 --- a/src/frontend/src/handler/mod.rs +++ b/src/frontend/src/handler/mod.rs @@ -344,7 +344,7 @@ pub async fn handle( variable, value, } => variable::handle_set(handler_args, variable, value), - Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable), + Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await, Statement::CreateIndex { name, table_name, diff --git a/src/frontend/src/handler/variable.rs b/src/frontend/src/handler/variable.rs index 376ae0a27e2ea..ae5d8e232f7da 100644 --- a/src/frontend/src/handler/variable.rs +++ b/src/frontend/src/handler/variable.rs @@ -48,10 +48,17 @@ pub fn handle_set( Ok(PgResponse::empty_result(StatementType::SET_OPTION)) } -pub(super) fn handle_show(handler_args: HandlerArgs, variable: Vec) -> Result { - let config_reader = handler_args.session.config(); +pub(super) async fn handle_show( + handler_args: HandlerArgs, + variable: Vec, +) -> Result { // TODO: Verify that the name used in `show` command is indeed always case-insensitive. let name = variable.iter().map(|e| e.real_value()).join(" "); + if name.eq_ignore_ascii_case("PARAMETERS") { + return handle_show_system_params(handler_args).await; + } + // Show session config. + let config_reader = handler_args.session.config(); if name.eq_ignore_ascii_case("ALL") { return handle_show_all(handler_args.clone()); } @@ -69,7 +76,7 @@ pub(super) fn handle_show(handler_args: HandlerArgs, variable: Vec) -> Re )) } -pub(super) fn handle_show_all(handler_args: HandlerArgs) -> Result { +fn handle_show_all(handler_args: HandlerArgs) -> Result { let config_reader = handler_args.session.config(); let all_variables = config_reader.get_all(); @@ -108,3 +115,35 @@ pub(super) fn handle_show_all(handler_args: HandlerArgs) -> Result ], )) } + +async fn handle_show_system_params(handler_args: HandlerArgs) -> Result { + let params = handler_args + .session + .env() + .meta_client() + .get_system_params() + .await?; + let rows = params + .to_kv() + .into_iter() + .map(|(k, v)| Row::new(vec![Some(k.into()), Some(v.into())])) + .collect_vec(); + + Ok(RwPgResponse::new_for_stream( + StatementType::SHOW_COMMAND, + None, + rows.into(), + vec![ + PgFieldDescriptor::new( + "Name".to_string(), + DataType::VARCHAR.to_oid(), + DataType::VARCHAR.type_len(), + ), + PgFieldDescriptor::new( + "Value".to_string(), + DataType::VARCHAR.to_oid(), + DataType::VARCHAR.type_len(), + ), + ], + )) +} diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index 908da1ff076dc..f255b3a22b34a 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -18,7 +18,7 @@ use risingwave_pb::backup_service::MetaSnapshotMetadata; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; use risingwave_rpc_client::error::Result; -use risingwave_rpc_client::{HummockMetaClient, MetaClient}; +use risingwave_rpc_client::{HummockMetaClient, MetaClient, SystemParamsReader}; /// A wrapper around the `MetaClient` that only provides a minor set of meta rpc. /// Most of the rpc to meta are delegated by other separate structs like `CatalogWriter`, @@ -43,6 +43,8 @@ pub trait FrontendMetaClient: Send + Sync { async fn unpin_snapshot_before(&self, epoch: u64) -> Result<()>; async fn list_meta_snapshots(&self) -> Result>; + + async fn get_system_params(&self) -> Result; } pub struct FrontendMetaClientImpl(pub MetaClient); @@ -80,4 +82,8 @@ impl FrontendMetaClient for FrontendMetaClientImpl { let manifest = self.0.get_meta_snapshot_manifest().await?; Ok(manifest.snapshot_metadata) } + + async fn get_system_params(&self) -> Result { + self.0.get_system_params().await + } } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 131a7b04a9aed..91b0e937c927c 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -36,10 +36,12 @@ use risingwave_pb::catalog::{ }; use risingwave_pb::hummock::HummockSnapshot; use risingwave_pb::meta::list_table_fragments_response::TableFragmentInfo; +use risingwave_pb::meta::SystemParams; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; use risingwave_pb::user::{GrantPrivilege, UserInfo}; use risingwave_rpc_client::error::Result as RpcResult; +use risingwave_rpc_client::SystemParamsReader; use tempfile::{Builder, NamedTempFile}; use crate::catalog::catalog_service::CatalogWriter; @@ -675,6 +677,10 @@ impl FrontendMetaClient for MockFrontendMetaClient { async fn list_meta_snapshots(&self) -> RpcResult> { Ok(vec![]) } + + async fn get_system_params(&self) -> RpcResult { + Ok(SystemParams::default().into()) + } } #[cfg(test)] diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index d0d8ab143108b..b0cabf8650019 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod model; +pub mod model; use std::sync::Arc; use risingwave_pb::meta::SystemParams; -use self::model::KvSingletonModel; +use self::model::SystemParamsModel; use super::MetaSrvEnv; use crate::storage::MetaStore; use crate::MetaResult; diff --git a/src/meta/src/manager/system_param/model.rs b/src/meta/src/manager/system_param/model.rs index 2f3dddc6d18a8..f03c5f8e541a0 100644 --- a/src/meta/src/manager/system_param/model.rs +++ b/src/meta/src/manager/system_param/model.rs @@ -12,41 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; - use async_trait::async_trait; +use risingwave_common::system_param::{system_params_from_kv, system_params_to_kv}; use risingwave_pb::meta::SystemParams; use crate::model::{MetadataModelError, MetadataModelResult}; use crate::storage::{MetaStore, Transaction}; - -const SYSTEM_PARAM_CF_NAME: &str = "cf/system_params"; - -const BARRIER_INTERVAL_MS_KEY: &str = "barrier_interval_ms"; -const CHECKPOINT_FREQUENCY_KEY: &str = "checkpoint_interval"; -const SSTABLE_SIZE_MB_KEY: &str = "sstable_size_mb"; -const BLOCK_SIZE_KB_KEY: &str = "block_size_kb"; -const BLOOM_FALSE_POSITIVE_KEY: &str = "bloom_false_positive"; -const STATE_STORE_KEY: &str = "state_store"; -const DATA_DIRECTORY_KEY: &str = "data_directory"; -const BACKUP_STORAGE_URL_KEY: &str = "backup_storage_url"; -const BACKUP_STORAGE_DIRECTORY_KEY: &str = "backup_storage_directory"; +const SYSTEM_PARAMS_CF_NAME: &str = "cf/system_params"; // A dummy trait to implement custom methods on `SystemParams`. #[async_trait] -pub trait KvSingletonModel: Sized { +pub trait SystemParamsModel: Sized { fn cf_name() -> String; async fn get(store: &S) -> MetadataModelResult>; async fn insert(&self, store: &S) -> MetadataModelResult<()>; } #[async_trait] -impl KvSingletonModel for SystemParams { +impl SystemParamsModel for SystemParams { fn cf_name() -> String { - SYSTEM_PARAM_CF_NAME.to_string() + SYSTEM_PARAMS_CF_NAME.to_string() } - /// The undeprecated fields are guaranteed to be `Some`. + /// All undeprecated fields are guaranteed to be `Some`. + /// Return error if there are missing or unrecognized fields. async fn get(store: &S) -> MetadataModelResult> where S: MetaStore, @@ -55,130 +44,22 @@ impl KvSingletonModel for SystemParams { if kvs.is_empty() { Ok(None) } else { - Ok(Some(system_param_from_kv(kvs)?)) + Ok(Some( + system_params_from_kv(kvs).map_err(MetadataModelError::internal)?, + )) } } + /// All undeprecated fields must be `Some`. + /// Return error if there are missing fields. async fn insert(&self, store: &S) -> MetadataModelResult<()> where S: MetaStore, { let mut txn = Transaction::default(); - self.barrier_interval_ms.inspect(|v| { - txn.put( - Self::cf_name(), - BARRIER_INTERVAL_MS_KEY.as_bytes().to_vec(), - v.to_string().into_bytes(), - ); - }); - self.checkpoint_frequency.inspect(|v| { - txn.put( - Self::cf_name(), - CHECKPOINT_FREQUENCY_KEY.as_bytes().to_vec(), - v.to_string().into_bytes(), - ); - }); - self.sstable_size_mb.inspect(|v| { - txn.put( - Self::cf_name(), - SSTABLE_SIZE_MB_KEY.as_bytes().to_vec(), - v.to_string().into_bytes(), - ); - }); - self.block_size_kb.inspect(|v| { - txn.put( - Self::cf_name(), - BLOCK_SIZE_KB_KEY.as_bytes().to_vec(), - v.to_string().into_bytes(), - ); - }); - self.bloom_false_positive.inspect(|v| { - txn.put( - Self::cf_name(), - BLOOM_FALSE_POSITIVE_KEY.as_bytes().to_vec(), - v.to_string().into_bytes(), - ); - }); - self.state_store.as_ref().inspect(|v| { - txn.put( - Self::cf_name(), - STATE_STORE_KEY.as_bytes().to_vec(), - v.as_bytes().to_vec(), - ); - }); - self.data_directory.as_ref().inspect(|v| { - txn.put( - Self::cf_name(), - DATA_DIRECTORY_KEY.as_bytes().to_vec(), - v.as_bytes().to_vec(), - ); - }); - self.backup_storage_url.as_ref().inspect(|v| { - txn.put( - Self::cf_name(), - BACKUP_STORAGE_URL_KEY.as_bytes().to_vec(), - v.as_bytes().to_vec(), - ); - }); - self.backup_storage_directory.as_ref().inspect(|v| { - txn.put( - Self::cf_name(), - BACKUP_STORAGE_DIRECTORY_KEY.as_bytes().to_vec(), - v.as_bytes().to_vec(), - ); - }); - Ok(store.txn(txn).await?) - } -} - -/// For each field in `SystemParams`, one of these rules apply: -/// - Up-to-date: Required. If it is not present, may try to derive it from previous versions of -/// this field. -/// - Deprecated: Optional. -/// - Unrecognized: Not allowed. -fn system_param_from_kv(kvs: Vec<(Vec, Vec)>) -> MetadataModelResult { - let mut ret = SystemParams::default(); - let mut expected_keys: HashSet<_> = [ - BARRIER_INTERVAL_MS_KEY, - CHECKPOINT_FREQUENCY_KEY, - SSTABLE_SIZE_MB_KEY, - BLOCK_SIZE_KB_KEY, - BLOOM_FALSE_POSITIVE_KEY, - STATE_STORE_KEY, - DATA_DIRECTORY_KEY, - BACKUP_STORAGE_URL_KEY, - BACKUP_STORAGE_DIRECTORY_KEY, - ] - .iter() - .cloned() - .collect(); - for (k, v) in kvs { - let k = String::from_utf8(k).unwrap(); - let v = String::from_utf8(v).unwrap(); - match k.as_str() { - BARRIER_INTERVAL_MS_KEY => ret.barrier_interval_ms = Some(v.parse().unwrap()), - CHECKPOINT_FREQUENCY_KEY => ret.checkpoint_frequency = Some(v.parse().unwrap()), - SSTABLE_SIZE_MB_KEY => ret.sstable_size_mb = Some(v.parse().unwrap()), - BLOCK_SIZE_KB_KEY => ret.block_size_kb = Some(v.parse().unwrap()), - BLOOM_FALSE_POSITIVE_KEY => ret.bloom_false_positive = Some(v.parse().unwrap()), - STATE_STORE_KEY => ret.state_store = Some(v), - DATA_DIRECTORY_KEY => ret.data_directory = Some(v), - BACKUP_STORAGE_URL_KEY => ret.backup_storage_url = Some(v), - BACKUP_STORAGE_DIRECTORY_KEY => ret.backup_storage_directory = Some(v), - _ => { - return Err(MetadataModelError::internal(format!( - "unrecognized system param {:?}", - k - ))); - } + for (k, v) in system_params_to_kv(self).map_err(MetadataModelError::internal)? { + txn.put(Self::cf_name(), k.into_bytes(), v.into_bytes()); } - expected_keys.remove(k.as_str()); - } - if !expected_keys.is_empty() { - return Err(MetadataModelError::internal(format!( - "missing system param {:?}", - expected_keys - ))); + Ok(store.txn(txn).await?) } - Ok(ret) } diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index 232fbfe30d9ce..bc3544d0e5a29 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -33,6 +33,7 @@ use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer; use risingwave_pb::meta::notification_service_server::NotificationServiceServer; use risingwave_pb::meta::scale_service_server::ScaleServiceServer; use risingwave_pb::meta::stream_manager_service_server::StreamManagerServiceServer; +use risingwave_pb::meta::system_params_service_server::SystemParamsServiceServer; use risingwave_pb::user::user_service_server::UserServiceServer; use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver}; use tokio::sync::watch; @@ -59,6 +60,7 @@ use crate::rpc::service::heartbeat_service::HeartbeatServiceImpl; use crate::rpc::service::hummock_service::HummockServiceImpl; use crate::rpc::service::meta_member_service::MetaMemberServiceImpl; use crate::rpc::service::stream_service::StreamServiceImpl; +use crate::rpc::service::system_params_service::SystemParamsServiceImpl; use crate::rpc::service::user_service::UserServiceImpl; use crate::storage::{EtcdMetaStore, MemStore, MetaStore, WrappedEtcdClient as EtcdClient}; use crate::stream::{GlobalStreamManager, SourceManager}; @@ -305,7 +307,7 @@ pub async fn start_service_as_election_leader( ) -> MetaResult<()> { tracing::info!("Defining leader services"); let prometheus_endpoint = opts.prometheus_endpoint.clone(); - let init_system_param = opts.init_system_params(); + let init_system_params = opts.init_system_params(); let env = MetaSrvEnv::::new(opts, meta_store.clone()).await; let fragment_manager = Arc::new(FragmentManager::new(env.clone()).await.unwrap()); let meta_metrics = Arc::new(MetaMetrics::new()); @@ -440,8 +442,8 @@ pub async fn start_service_as_election_leader( backup_manager.clone(), compactor_manager.clone(), )); - let system_param_manager = - Arc::new(SystemParamManager::new(env.clone(), init_system_param).await?); + let system_params_manager = + Arc::new(SystemParamManager::new(env.clone(), init_system_params).await?); let ddl_srv = DdlServiceImpl::::new( env.clone(), @@ -464,7 +466,7 @@ pub async fn start_service_as_election_leader( stream_manager.clone(), ); - let cluster_srv = ClusterServiceImpl::::new(cluster_manager.clone(), system_param_manager); + let cluster_srv = ClusterServiceImpl::::new(cluster_manager.clone()); let stream_srv = StreamServiceImpl::::new( env.clone(), barrier_scheduler.clone(), @@ -486,6 +488,7 @@ pub async fn start_service_as_election_leader( ); let health_srv = HealthServiceImpl::new(); let backup_srv = BackupServiceImpl::new(backup_manager); + let system_params_srv = SystemParamsServiceImpl::new(system_params_manager); if let Some(prometheus_addr) = address_info.prometheus_addr { MetricsManager::boot_metrics_service( @@ -565,6 +568,7 @@ pub async fn start_service_as_election_leader( .add_service(ScaleServiceServer::new(scale_srv)) .add_service(HealthServer::new(health_srv)) .add_service(BackupServiceServer::new(backup_srv)) + .add_service(SystemParamsServiceServer::new(system_params_srv)) .serve_with_shutdown(address_info.listen_addr, async move { tokio::select! { res = svc_shutdown_rx.changed() => { diff --git a/src/meta/src/rpc/service/cluster_service.rs b/src/meta/src/rpc/service/cluster_service.rs index a81529ae438c9..d725c428a1b5d 100644 --- a/src/meta/src/rpc/service/cluster_service.rs +++ b/src/meta/src/rpc/service/cluster_service.rs @@ -20,27 +20,20 @@ use risingwave_pb::meta::{ }; use tonic::{Request, Response, Status}; -use crate::manager::{ClusterManagerRef, SystemParamManagerRef}; +use crate::manager::ClusterManagerRef; use crate::storage::MetaStore; #[derive(Clone)] pub struct ClusterServiceImpl { cluster_manager: ClusterManagerRef, - system_param_manager: SystemParamManagerRef, } impl ClusterServiceImpl where S: MetaStore, { - pub fn new( - cluster_manager: ClusterManagerRef, - system_param_manager: SystemParamManagerRef, - ) -> Self { - ClusterServiceImpl { - cluster_manager, - system_param_manager, - } + pub fn new(cluster_manager: ClusterManagerRef) -> Self { + ClusterServiceImpl { cluster_manager } } } @@ -61,11 +54,9 @@ where .cluster_manager .add_worker_node(worker_type, host, worker_node_parallelism) .await?; - let system_params = self.system_param_manager.get_params().clone(); Ok(Response::new(AddWorkerNodeResponse { status: None, node: Some(worker_node), - system_params: Some(system_params), })) } diff --git a/src/meta/src/rpc/service/mod.rs b/src/meta/src/rpc/service/mod.rs index f985862923f21..dd775588405d3 100644 --- a/src/meta/src/rpc/service/mod.rs +++ b/src/meta/src/rpc/service/mod.rs @@ -22,6 +22,7 @@ pub mod meta_member_service; pub mod notification_service; pub mod scale_service; pub mod stream_service; +pub mod system_params_service; pub mod user_service; use std::pin::Pin; diff --git a/src/meta/src/rpc/service/system_params_service.rs b/src/meta/src/rpc/service/system_params_service.rs new file mode 100644 index 0000000000000..b19bf0d213a6d --- /dev/null +++ b/src/meta/src/rpc/service/system_params_service.rs @@ -0,0 +1,50 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use risingwave_pb::meta::system_params_service_server::SystemParamsService; +use risingwave_pb::meta::{GetSystemParamsRequest, GetSystemParamsResponse}; +use tonic::{Request, Response, Status}; + +use crate::manager::SystemParamManagerRef; +use crate::storage::MetaStore; + +pub struct SystemParamsServiceImpl +where + S: MetaStore, +{ + system_params_manager: SystemParamManagerRef, +} + +impl SystemParamsServiceImpl { + pub fn new(system_params_manager: SystemParamManagerRef) -> Self { + Self { + system_params_manager, + } + } +} + +#[async_trait] +impl SystemParamsService for SystemParamsServiceImpl +where + S: MetaStore, +{ + async fn get_system_params( + &self, + _request: Request, + ) -> Result, Status> { + let params = Some(self.system_params_manager.get_params().clone()); + Ok(Response::new(GetSystemParamsResponse { params })) + } +} diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 9db5c41fd759b..413dd921aaebe 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,6 +25,7 @@ use futures::stream::BoxStream; use itertools::Itertools; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::MAX_CONNECTION_WINDOW_SIZE; +use risingwave_common::system_param::system_params_to_kv; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; use risingwave_hummock_sdk::table_stats::to_prost_table_stats_map; @@ -55,6 +56,7 @@ use risingwave_pb::meta::notification_service_client::NotificationServiceClient; use risingwave_pb::meta::reschedule_request::Reschedule as ProstReschedule; use risingwave_pb::meta::scale_service_client::ScaleServiceClient; use risingwave_pb::meta::stream_manager_service_client::StreamManagerServiceClient; +use risingwave_pb::meta::system_params_service_client::SystemParamsServiceClient; use risingwave_pb::meta::{SystemParams as ProstSystemParams, *}; use risingwave_pb::stream_plan::StreamFragmentGraph; use risingwave_pb::user::update_user_request::UpdateField; @@ -166,19 +168,30 @@ impl MetaClient { let addr_strategy = Self::parse_meta_addr(meta_addr)?; let grpc_meta_client = GrpcMetaClient::new(addr_strategy).await?; - let request = AddWorkerNodeRequest { + + let add_worker_request = AddWorkerNodeRequest { worker_type: worker_type as i32, host: Some(addr.to_protobuf()), worker_node_parallelism: worker_node_parallelism as u64, }; - let retry_strategy = GrpcMetaClient::retry_strategy_for_request(); - let resp = tokio_retry::Retry::spawn(retry_strategy, || async { - let request = request.clone(); - grpc_meta_client.add_worker_node(request).await - }) - .await?; - let worker_node = resp.node.expect("AddWorkerNodeResponse::node is empty"); - let system_params = resp.system_params.unwrap(); + let add_worker_resp = + tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { + let request = add_worker_request.clone(); + grpc_meta_client.add_worker_node(request).await + }) + .await?; + let worker_node = add_worker_resp + .node + .expect("AddWorkerNodeResponse::node is empty"); + + let system_params_request = GetSystemParamsRequest {}; + let system_params_resp = + tokio_retry::Retry::spawn(GrpcMetaClient::retry_strategy_for_request(), || async { + let request = system_params_request.clone(); + grpc_meta_client.get_system_params(request).await + }) + .await?; + Ok(( Self { worker_id: worker_node.id, @@ -186,7 +199,7 @@ impl MetaClient { host_addr: addr.clone(), inner: grpc_meta_client, }, - system_params.into(), + system_params_resp.params.unwrap().into(), )) } @@ -741,6 +754,12 @@ impl MetaClient { let resp = self.inner.get_meta_snapshot_manifest(req).await?; Ok(resp.manifest.expect("should exist")) } + + pub async fn get_system_params(&self) -> Result { + let req = GetSystemParamsRequest {}; + let resp = self.inner.get_system_params(req).await?; + Ok(resp.params.unwrap().into()) + } } #[async_trait] @@ -967,6 +986,10 @@ impl SystemParamsReader { pub fn backup_storage_directory(&self) -> &str { self.prost.backup_storage_directory.as_ref().unwrap() } + + pub fn to_kv(&self) -> Vec<(String, String)> { + system_params_to_kv(&self.prost).unwrap() + } } #[derive(Debug, Clone)] @@ -981,6 +1004,7 @@ struct GrpcMetaClientCore { user_client: UserServiceClient, scale_client: ScaleServiceClient, backup_client: BackupServiceClient, + system_params_client: SystemParamsServiceClient, } impl GrpcMetaClientCore { @@ -994,7 +1018,8 @@ impl GrpcMetaClientCore { let stream_client = StreamManagerServiceClient::new(channel.clone()); let user_client = UserServiceClient::new(channel.clone()); let scale_client = ScaleServiceClient::new(channel.clone()); - let backup_client = BackupServiceClient::new(channel); + let backup_client = BackupServiceClient::new(channel.clone()); + let system_params_client = SystemParamsServiceClient::new(channel); GrpcMetaClientCore { cluster_client, meta_member_client, @@ -1006,6 +1031,7 @@ impl GrpcMetaClientCore { user_client, scale_client, backup_client, + system_params_client, } } } @@ -1414,6 +1440,7 @@ macro_rules! for_all_meta_rpc { ,{ backup_client, get_backup_job_status, GetBackupJobStatusRequest, GetBackupJobStatusResponse } ,{ backup_client, delete_meta_snapshot, DeleteMetaSnapshotRequest, DeleteMetaSnapshotResponse} ,{ backup_client, get_meta_snapshot_manifest, GetMetaSnapshotManifestRequest, GetMetaSnapshotManifestResponse} + ,{ system_params_client, get_system_params, GetSystemParamsRequest, GetSystemParamsResponse } } }; } From ecc8c25f7fdfbcf1ddb843f7842f46ad0e68075a Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 11:24:53 +0000 Subject: [PATCH 2/9] use macro to refactor from kv --- src/common/src/system_param.rs | 88 ++++++++++++++++------------------ 1 file changed, 40 insertions(+), 48 deletions(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index f4ea0467c118d..58f1b7d3fe537 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -47,7 +47,7 @@ macro_rules! for_all_undeprecated_params { } macro_rules! impl_system_params_to_kv { - ($({ $field:ident, $key:expr },)*) => { + ($({ $field:ident, $key:path },)*) => { /// All undeprecated fields are guaranteed to be contained in the returned map. /// Return error if there are missing fields. pub fn system_params_to_kv(params: &SystemParams) -> Result> { @@ -67,58 +67,50 @@ macro_rules! impl_system_params_to_kv { }; } -/// For each field in `SystemParams`, one of these rules apply: -/// - Up-to-date: Required. If it is not present, may try to derive it from previous versions of -/// this field. -/// - Deprecated: Optional. -/// - Unrecognized: Not allowed. -pub fn system_params_from_kv(kvs: Vec<(Vec, Vec)>) -> Result { - let mut ret = SystemParams::default(); - let mut expected_keys: HashSet<_> = [ - BARRIER_INTERVAL_MS_KEY, - CHECKPOINT_FREQUENCY_KEY, - SSTABLE_SIZE_MB_KEY, - BLOCK_SIZE_KB_KEY, - BLOOM_FALSE_POSITIVE_KEY, - STATE_STORE_KEY, - DATA_DIRECTORY_KEY, - BACKUP_STORAGE_URL_KEY, - BACKUP_STORAGE_DIRECTORY_KEY, - ] - .iter() - .cloned() - .collect(); - for (k, v) in kvs { - let k = String::from_utf8(k).unwrap(); - let v = String::from_utf8(v).unwrap(); - match k.as_str() { - BARRIER_INTERVAL_MS_KEY => ret.barrier_interval_ms = Some(v.parse().unwrap()), - CHECKPOINT_FREQUENCY_KEY => ret.checkpoint_frequency = Some(v.parse().unwrap()), - SSTABLE_SIZE_MB_KEY => ret.sstable_size_mb = Some(v.parse().unwrap()), - BLOCK_SIZE_KB_KEY => ret.block_size_kb = Some(v.parse().unwrap()), - BLOOM_FALSE_POSITIVE_KEY => ret.bloom_false_positive = Some(v.parse().unwrap()), - STATE_STORE_KEY => ret.state_store = Some(v), - DATA_DIRECTORY_KEY => ret.data_directory = Some(v), - BACKUP_STORAGE_URL_KEY => ret.backup_storage_url = Some(v), - BACKUP_STORAGE_DIRECTORY_KEY => ret.backup_storage_directory = Some(v), - _ => { +macro_rules! impl_system_params_from_kv { + ($({ $field:ident, $key:path },)*) => { + /// For each field in `SystemParams`, one of these rules apply: + /// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous + /// versions of this field. + /// - Deprecated: Guaranteed to be `None`. + /// - Unrecognized: Not allowed. + pub fn system_params_from_kv(kvs: Vec<(Vec, Vec)>) -> Result { + let mut ret = SystemParams::default(); + let mut expected_keys: HashSet<_> = [ + $($key,)* + ] + .iter() + .cloned() + .collect(); + for (k, v) in kvs { + let k = String::from_utf8(k).unwrap(); + let v = String::from_utf8(v).unwrap(); + match k.as_str() { + $( + $key => ret.$field = Some(v.parse().unwrap()), + )* + _ => { + return Err(ErrorCode::SystemParamsError(format!( + "unrecognized system param {:?}", + k + )) + .into()); + } + } + expected_keys.remove(k.as_str()); + } + if !expected_keys.is_empty() { return Err(ErrorCode::SystemParamsError(format!( - "unrecognized system param {:?}", - k + "missing system param {:?}", + expected_keys )) .into()); } + Ok(ret) } - expected_keys.remove(k.as_str()); - } - if !expected_keys.is_empty() { - return Err(ErrorCode::SystemParamsError(format!( - "missing system param {:?}", - expected_keys - )) - .into()); - } - Ok(ret) + }; } +for_all_undeprecated_params!(impl_system_params_from_kv); + for_all_undeprecated_params!(impl_system_params_to_kv); From 45ccd4979057e7c050efdccf1dfd9c35ad1722f1 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 12:26:27 +0000 Subject: [PATCH 3/9] add ut --- src/common/src/system_param.rs | 48 ++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index 58f1b7d3fe537..481c394e9de5d 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -74,7 +74,7 @@ macro_rules! impl_system_params_from_kv { /// versions of this field. /// - Deprecated: Guaranteed to be `None`. /// - Unrecognized: Not allowed. - pub fn system_params_from_kv(kvs: Vec<(Vec, Vec)>) -> Result { + pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result { let mut ret = SystemParams::default(); let mut expected_keys: HashSet<_> = [ $($key,)* @@ -83,9 +83,9 @@ macro_rules! impl_system_params_from_kv { .cloned() .collect(); for (k, v) in kvs { - let k = String::from_utf8(k).unwrap(); - let v = String::from_utf8(v).unwrap(); - match k.as_str() { + let k = std::str::from_utf8(k.as_ref()).unwrap(); + let v = std::str::from_utf8(v.as_ref()).unwrap(); + match k { $( $key => ret.$field = Some(v.parse().unwrap()), )* @@ -97,7 +97,7 @@ macro_rules! impl_system_params_from_kv { .into()); } } - expected_keys.remove(k.as_str()); + expected_keys.remove(k); } if !expected_keys.is_empty() { return Err(ErrorCode::SystemParamsError(format!( @@ -114,3 +114,41 @@ macro_rules! impl_system_params_from_kv { for_all_undeprecated_params!(impl_system_params_from_kv); for_all_undeprecated_params!(impl_system_params_to_kv); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_to_from_kv() { + // Include all fields (deprecated also). + let kvs = vec![ + (BARRIER_INTERVAL_MS_KEY, "1"), + (CHECKPOINT_FREQUENCY_KEY, "1"), + (SSTABLE_SIZE_MB_KEY, "1"), + (BLOCK_SIZE_KB_KEY, "1"), + (BLOOM_FALSE_POSITIVE_KEY, "1"), + (STATE_STORE_KEY, "a"), + (DATA_DIRECTORY_KEY, "a"), + (BACKUP_STORAGE_URL_KEY, "a"), + (BACKUP_STORAGE_DIRECTORY_KEY, "a"), + ]; + + // To kv - missing field. + let p = SystemParams::default(); + assert!(system_params_to_kv(&p).is_err()); + + // From kv - missing field. + assert!(system_params_from_kv(vec![(BARRIER_INTERVAL_MS_KEY, "1")]).is_err()); + + // From kv - unrecognized field. + assert!(system_params_from_kv(vec![("?", "?")]).is_err()); + + // Deser & ser. + let p = system_params_from_kv(kvs).unwrap(); + assert_eq!( + p, + system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap() + ); + } +} From eb4d3d5f4159d29396a01fcfea17dc2f59581f27 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 12:28:57 +0000 Subject: [PATCH 4/9] misc --- src/meta/src/manager/system_param/mod.rs | 2 +- src/meta/src/manager/system_param/model.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/meta/src/manager/system_param/mod.rs b/src/meta/src/manager/system_param/mod.rs index b0cabf8650019..88ebc09b963e0 100644 --- a/src/meta/src/manager/system_param/mod.rs +++ b/src/meta/src/manager/system_param/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod model; +mod model; use std::sync::Arc; diff --git a/src/meta/src/manager/system_param/model.rs b/src/meta/src/manager/system_param/model.rs index f03c5f8e541a0..e2f8ada7c52a6 100644 --- a/src/meta/src/manager/system_param/model.rs +++ b/src/meta/src/manager/system_param/model.rs @@ -18,6 +18,7 @@ use risingwave_pb::meta::SystemParams; use crate::model::{MetadataModelError, MetadataModelResult}; use crate::storage::{MetaStore, Transaction}; + const SYSTEM_PARAMS_CF_NAME: &str = "cf/system_params"; // A dummy trait to implement custom methods on `SystemParams`. From 07578baf1f30df5ed3dd8f9700f829023f69ad00 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 16:29:40 +0000 Subject: [PATCH 5/9] further simplify key declaration with macro --- src/common/src/system_param.rs | 81 +++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 25 deletions(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index 481c394e9de5d..f032f0095676f 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -14,51 +14,82 @@ use std::collections::HashSet; +use paste::paste; use risingwave_pb::meta::SystemParams; use crate::error::{ErrorCode, RwError}; -pub const BARRIER_INTERVAL_MS_KEY: &str = "barrier_interval_ms"; -pub const CHECKPOINT_FREQUENCY_KEY: &str = "checkpoint_interval"; -pub const SSTABLE_SIZE_MB_KEY: &str = "sstable_size_mb"; -pub const BLOCK_SIZE_KB_KEY: &str = "block_size_kb"; -pub const BLOOM_FALSE_POSITIVE_KEY: &str = "bloom_false_positive"; -pub const STATE_STORE_KEY: &str = "state_store"; -pub const DATA_DIRECTORY_KEY: &str = "data_directory"; -pub const BACKUP_STORAGE_URL_KEY: &str = "backup_storage_url"; -pub const BACKUP_STORAGE_DIRECTORY_KEY: &str = "backup_storage_directory"; - type Result = core::result::Result; +// Includes deprecated params. Used to define key constans. +macro_rules! for_all_params { + ($macro:ident) => { + $macro! { + { barrier_interval_ms }, + { checkpoint_frequency }, + { sstable_size_mb }, + { block_size_kb }, + { bloom_false_positive }, + { state_store }, + { data_directory }, + { backup_storage_url }, + { backup_storage_directory }, + } + }; +} + +// Only includes undeprecated params. +// Macro input is { field identifier, mutability } macro_rules! for_all_undeprecated_params { ($macro:ident) => { $macro! { - { barrier_interval_ms, BARRIER_INTERVAL_MS_KEY }, - { checkpoint_frequency, CHECKPOINT_FREQUENCY_KEY }, - { sstable_size_mb, SSTABLE_SIZE_MB_KEY }, - { block_size_kb, BLOCK_SIZE_KB_KEY }, - { bloom_false_positive, BLOOM_FALSE_POSITIVE_KEY }, - { state_store, STATE_STORE_KEY }, - { data_directory, DATA_DIRECTORY_KEY }, - { backup_storage_url, BACKUP_STORAGE_URL_KEY }, - { backup_storage_directory, BACKUP_STORAGE_DIRECTORY_KEY }, + { barrier_interval_ms, true }, + { checkpoint_frequency, true }, + { sstable_size_mb, false }, + { block_size_kb, false }, + { bloom_false_positive, false }, + { state_store, false }, + { data_directory, false }, + { backup_storage_url, false }, + { backup_storage_directory, false }, + } + }; +} + +/// Convert field name to string. +macro_rules! key_of { + ($field:ident) => { + stringify!($field) + }; +} + +/// Define key constants for fields in `SystemParams` for use of other modules. +macro_rules! def_key { + ($({ $field:ident },)*) => { + paste! { + $( + pub const [<$field:upper _KEY>]: &str = key_of!($field); + )* } + }; } +for_all_params!(def_key); + macro_rules! impl_system_params_to_kv { - ($({ $field:ident, $key:path },)*) => { + ($({ $field:ident, $_:expr },)*) => { /// All undeprecated fields are guaranteed to be contained in the returned map. /// Return error if there are missing fields. pub fn system_params_to_kv(params: &SystemParams) -> Result> { let mut ret = Vec::with_capacity(9); $(ret.push(( - $key.to_string(), + key_of!($field).to_string(), params .$field.as_ref() .ok_or::(ErrorCode::SystemParamsError(format!( "missing system param {:?}", - $key + key_of!($field) )).into())? .to_string(), ));)* @@ -68,7 +99,7 @@ macro_rules! impl_system_params_to_kv { } macro_rules! impl_system_params_from_kv { - ($({ $field:ident, $key:path },)*) => { + ($({ $field:ident, $_:expr },)*) => { /// For each field in `SystemParams`, one of these rules apply: /// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous /// versions of this field. @@ -77,7 +108,7 @@ macro_rules! impl_system_params_from_kv { pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result { let mut ret = SystemParams::default(); let mut expected_keys: HashSet<_> = [ - $($key,)* + $(key_of!($field),)* ] .iter() .cloned() @@ -87,7 +118,7 @@ macro_rules! impl_system_params_from_kv { let v = std::str::from_utf8(v.as_ref()).unwrap(); match k { $( - $key => ret.$field = Some(v.parse().unwrap()), + key_of!($field) => ret.$field = Some(v.parse().unwrap()), )* _ => { return Err(ErrorCode::SystemParamsError(format!( From b24629a1095e112c7037c7de2146d0aa112ba160 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Mon, 13 Feb 2023 17:42:01 +0000 Subject: [PATCH 6/9] fix typo --- src/common/src/system_param.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index f032f0095676f..e6a0b62cd608c 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -21,7 +21,7 @@ use crate::error::{ErrorCode, RwError}; type Result = core::result::Result; -// Includes deprecated params. Used to define key constans. +// Includes deprecated params. Used to define key constants. macro_rules! for_all_params { ($macro:ident) => { $macro! { From 71b2f8820d5584dc0292245859425edcfcf7d476 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Tue, 14 Feb 2023 05:09:29 +0000 Subject: [PATCH 7/9] replace for_all_params with for_all_deprecated_params --- src/common/src/system_param.rs | 30 +++++++++++------------------- 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index e6a0b62cd608c..5d23b044d4ebd 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -21,23 +21,6 @@ use crate::error::{ErrorCode, RwError}; type Result = core::result::Result; -// Includes deprecated params. Used to define key constants. -macro_rules! for_all_params { - ($macro:ident) => { - $macro! { - { barrier_interval_ms }, - { checkpoint_frequency }, - { sstable_size_mb }, - { block_size_kb }, - { bloom_false_positive }, - { state_store }, - { data_directory }, - { backup_storage_url }, - { backup_storage_directory }, - } - }; -} - // Only includes undeprecated params. // Macro input is { field identifier, mutability } macro_rules! for_all_undeprecated_params { @@ -56,6 +39,14 @@ macro_rules! for_all_undeprecated_params { }; } +// Only includes deprecated params. Used to define key constants. +// Macro input is { field identifier, mutability } +macro_rules! for_all_deprecated_params { + ($macro:ident) => { + $macro! {} + }; +} + /// Convert field name to string. macro_rules! key_of { ($field:ident) => { @@ -65,7 +56,7 @@ macro_rules! key_of { /// Define key constants for fields in `SystemParams` for use of other modules. macro_rules! def_key { - ($({ $field:ident },)*) => { + ($({ $field:ident, $_:expr },)*) => { paste! { $( pub const [<$field:upper _KEY>]: &str = key_of!($field); @@ -75,7 +66,8 @@ macro_rules! def_key { }; } -for_all_params!(def_key); +for_all_undeprecated_params!(def_key); +for_all_deprecated_params!(def_key); macro_rules! impl_system_params_to_kv { ($({ $field:ident, $_:expr },)*) => { From d11dff8d3e9658e69969d3149bb997464b54e483 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Tue, 14 Feb 2023 06:11:00 +0000 Subject: [PATCH 8/9] use constant to denote mutability --- src/common/src/system_param.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/common/src/system_param.rs b/src/common/src/system_param.rs index 5d23b044d4ebd..4d8596ac87880 100644 --- a/src/common/src/system_param.rs +++ b/src/common/src/system_param.rs @@ -21,20 +21,25 @@ use crate::error::{ErrorCode, RwError}; type Result = core::result::Result; +#[allow(dead_code)] +const MUTABLE: bool = true; +#[allow(dead_code)] +const IMMUTABLE: bool = false; + // Only includes undeprecated params. // Macro input is { field identifier, mutability } macro_rules! for_all_undeprecated_params { ($macro:ident) => { $macro! { - { barrier_interval_ms, true }, - { checkpoint_frequency, true }, - { sstable_size_mb, false }, - { block_size_kb, false }, - { bloom_false_positive, false }, - { state_store, false }, - { data_directory, false }, - { backup_storage_url, false }, - { backup_storage_directory, false }, + { barrier_interval_ms, MUTABLE }, + { checkpoint_frequency, MUTABLE }, + { sstable_size_mb, IMMUTABLE }, + { block_size_kb, IMMUTABLE }, + { bloom_false_positive, IMMUTABLE }, + { state_store, IMMUTABLE }, + { data_directory, IMMUTABLE }, + { backup_storage_url, IMMUTABLE }, + { backup_storage_directory, IMMUTABLE }, } }; } From 2ac54df29ba6a38558490c8588d8fdf9c59d00b5 Mon Sep 17 00:00:00 2001 From: Gun9niR Date: Tue, 14 Feb 2023 07:28:55 +0000 Subject: [PATCH 9/9] fix proto compatibility --- proto/meta.proto | 2 ++ 1 file changed, 2 insertions(+) diff --git a/proto/meta.proto b/proto/meta.proto index 8ec4eaa84880f..88cd2ad764f66 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -145,6 +145,8 @@ message AddWorkerNodeRequest { } message AddWorkerNodeResponse { + reserved 3; + reserved "system_params"; common.Status status = 1; common.WorkerNode node = 2; }