Skip to content

Commit

Permalink
feat: use SHOW PARAMETERS to list all system parameters (#7882)
Browse files Browse the repository at this point in the history
As title. Also refactored the code with macro to reduce the occurrence of hard coded keys such as `CHECKPOINT_FREQUENCY_KEY`.

```
dev=> show parameters;
Name           |    Value
--------------------------+-------------
barrier_interval_ms      | 1000
checkpoint_interval      | 10
sstable_size_mb          | 256
block_size_kb            | 64
bloom_false_positive     | 0.001
state_store              |
data_directory           | hummock_001
backup_storage_url       | memory
backup_storage_directory | backup
```

Approved-By: fuyufjh
Approved-By: BugenZhao

Co-Authored-By: Gun9niR <gun9nir.guo@gmail.com>
Co-Authored-By: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com>
  • Loading branch information
Gun9niR and Gun9niR authored Feb 14, 2023
1 parent 680bc23 commit 7724305
Show file tree
Hide file tree
Showing 16 changed files with 421 additions and 181 deletions.
60 changes: 52 additions & 8 deletions dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,10 @@ message AddWorkerNodeRequest {
}

message AddWorkerNodeResponse {
reserved 3;
reserved "system_params";
common.Status status = 1;
common.WorkerNode node = 2;
SystemParams system_params = 3;
}

message ActivateWorkerNodeRequest {
Expand Down Expand Up @@ -324,3 +325,13 @@ message SystemParams {
optional string backup_storage_url = 8;
optional string backup_storage_directory = 9;
}

message GetSystemParamsRequest {}

message GetSystemParamsResponse {
SystemParams params = 1;
}

service SystemParamsService {
rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse);
}
8 changes: 2 additions & 6 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,12 @@ pub enum ErrorCode {
InvalidParameterValue(String),
#[error("Sink error: {0}")]
SinkError(BoxedError),

#[error("Permission denied: {0}")]
PermissionDenied(String),

#[error("unrecognized configuration parameter \"{0}\"")]
UnrecognizedConfigurationParameter(String),
}

pub fn internal_err(msg: impl Into<anyhow::Error>) -> RwError {
ErrorCode::InternalError(msg.into().to_string()).into()
#[error("SystemParams error: {0}")]
SystemParamsError(String),
}

pub fn internal_error(msg: impl Into<String>) -> RwError {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub mod hash;
pub mod monitor;
pub mod row;
pub mod session_config;
pub mod system_param;
#[cfg(test)]
pub mod test_utils;
pub mod types;
Expand Down
182 changes: 182 additions & 0 deletions src/common/src/system_param.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use paste::paste;
use risingwave_pb::meta::SystemParams;

use crate::error::{ErrorCode, RwError};

type Result<T> = core::result::Result<T, RwError>;

#[allow(dead_code)]
const MUTABLE: bool = true;
#[allow(dead_code)]
const IMMUTABLE: bool = false;

// Only includes undeprecated params.
// Macro input is { field identifier, mutability }
macro_rules! for_all_undeprecated_params {
($macro:ident) => {
$macro! {
{ barrier_interval_ms, MUTABLE },
{ checkpoint_frequency, MUTABLE },
{ sstable_size_mb, IMMUTABLE },
{ block_size_kb, IMMUTABLE },
{ bloom_false_positive, IMMUTABLE },
{ state_store, IMMUTABLE },
{ data_directory, IMMUTABLE },
{ backup_storage_url, IMMUTABLE },
{ backup_storage_directory, IMMUTABLE },
}
};
}

// Only includes deprecated params. Used to define key constants.
// Macro input is { field identifier, mutability }
macro_rules! for_all_deprecated_params {
($macro:ident) => {
$macro! {}
};
}

/// Convert field name to string.
macro_rules! key_of {
($field:ident) => {
stringify!($field)
};
}

/// Define key constants for fields in `SystemParams` for use of other modules.
macro_rules! def_key {
($({ $field:ident, $_:expr },)*) => {
paste! {
$(
pub const [<$field:upper _KEY>]: &str = key_of!($field);
)*
}

};
}

for_all_undeprecated_params!(def_key);
for_all_deprecated_params!(def_key);

macro_rules! impl_system_params_to_kv {
($({ $field:ident, $_:expr },)*) => {
/// All undeprecated fields are guaranteed to be contained in the returned map.
/// Return error if there are missing fields.
pub fn system_params_to_kv(params: &SystemParams) -> Result<Vec<(String, String)>> {
let mut ret = Vec::with_capacity(9);
$(ret.push((
key_of!($field).to_string(),
params
.$field.as_ref()
.ok_or::<RwError>(ErrorCode::SystemParamsError(format!(
"missing system param {:?}",
key_of!($field)
)).into())?
.to_string(),
));)*
Ok(ret)
}
};
}

macro_rules! impl_system_params_from_kv {
($({ $field:ident, $_:expr },)*) => {
/// For each field in `SystemParams`, one of these rules apply:
/// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous
/// versions of this field.
/// - Deprecated: Guaranteed to be `None`.
/// - Unrecognized: Not allowed.
pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result<SystemParams> {
let mut ret = SystemParams::default();
let mut expected_keys: HashSet<_> = [
$(key_of!($field),)*
]
.iter()
.cloned()
.collect();
for (k, v) in kvs {
let k = std::str::from_utf8(k.as_ref()).unwrap();
let v = std::str::from_utf8(v.as_ref()).unwrap();
match k {
$(
key_of!($field) => ret.$field = Some(v.parse().unwrap()),
)*
_ => {
return Err(ErrorCode::SystemParamsError(format!(
"unrecognized system param {:?}",
k
))
.into());
}
}
expected_keys.remove(k);
}
if !expected_keys.is_empty() {
return Err(ErrorCode::SystemParamsError(format!(
"missing system param {:?}",
expected_keys
))
.into());
}
Ok(ret)
}
};
}

for_all_undeprecated_params!(impl_system_params_from_kv);

for_all_undeprecated_params!(impl_system_params_to_kv);

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_to_from_kv() {
// Include all fields (deprecated also).
let kvs = vec![
(BARRIER_INTERVAL_MS_KEY, "1"),
(CHECKPOINT_FREQUENCY_KEY, "1"),
(SSTABLE_SIZE_MB_KEY, "1"),
(BLOCK_SIZE_KB_KEY, "1"),
(BLOOM_FALSE_POSITIVE_KEY, "1"),
(STATE_STORE_KEY, "a"),
(DATA_DIRECTORY_KEY, "a"),
(BACKUP_STORAGE_URL_KEY, "a"),
(BACKUP_STORAGE_DIRECTORY_KEY, "a"),
];

// To kv - missing field.
let p = SystemParams::default();
assert!(system_params_to_kv(&p).is_err());

// From kv - missing field.
assert!(system_params_from_kv(vec![(BARRIER_INTERVAL_MS_KEY, "1")]).is_err());

// From kv - unrecognized field.
assert!(system_params_from_kv(vec![("?", "?")]).is_err());

// Deser & ser.
let p = system_params_from_kv(kvs).unwrap();
assert_eq!(
p,
system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
);
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub async fn handle(
variable,
value,
} => variable::handle_set(handler_args, variable, value),
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
Statement::CreateIndex {
name,
table_name,
Expand Down
Loading

0 comments on commit 7724305

Please sign in to comment.