Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use SHOW PARAMETERS to list all system parameters #7882

Merged
merged 13 commits into from
Feb 14, 2023
60 changes: 52 additions & 8 deletions dashboard/proto/gen/meta.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ message AddWorkerNodeRequest {
message AddWorkerNodeResponse {
common.Status status = 1;
common.WorkerNode node = 2;
SystemParams system_params = 3;
}

message ActivateWorkerNodeRequest {
Expand Down Expand Up @@ -324,3 +323,13 @@ message SystemParams {
optional string backup_storage_url = 8;
optional string backup_storage_directory = 9;
}

message GetSystemParamsRequest {}

message GetSystemParamsResponse {
SystemParams params = 1;
}

service SystemParamsService {
rpc GetSystemParams(GetSystemParamsRequest) returns (GetSystemParamsResponse);
}
8 changes: 2 additions & 6 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,12 @@ pub enum ErrorCode {
InvalidParameterValue(String),
#[error("Sink error: {0}")]
SinkError(BoxedError),

#[error("Permission denied: {0}")]
PermissionDenied(String),

#[error("unrecognized configuration parameter \"{0}\"")]
UnrecognizedConfigurationParameter(String),
}

pub fn internal_err(msg: impl Into<anyhow::Error>) -> RwError {
ErrorCode::InternalError(msg.into().to_string()).into()
#[error("SystemParams error: {0}")]
SystemParamsError(String),
}

pub fn internal_error(msg: impl Into<String>) -> RwError {
Expand Down
1 change: 1 addition & 0 deletions src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub mod hash;
pub mod monitor;
pub mod row;
pub mod session_config;
pub mod system_param;
#[cfg(test)]
pub mod test_utils;
pub mod types;
Expand Down
185 changes: 185 additions & 0 deletions src/common/src/system_param.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use paste::paste;
use risingwave_pb::meta::SystemParams;

use crate::error::{ErrorCode, RwError};

type Result<T> = core::result::Result<T, RwError>;

// Includes deprecated params. Used to define key constants.
macro_rules! for_all_params {
Gun9niR marked this conversation as resolved.
Show resolved Hide resolved
($macro:ident) => {
$macro! {
{ barrier_interval_ms },
{ checkpoint_frequency },
{ sstable_size_mb },
{ block_size_kb },
{ bloom_false_positive },
{ state_store },
{ data_directory },
{ backup_storage_url },
{ backup_storage_directory },
}
};
}

// Only includes undeprecated params.
// Macro input is { field identifier, mutability }
macro_rules! for_all_undeprecated_params {
($macro:ident) => {
$macro! {
{ barrier_interval_ms, true },
Gun9niR marked this conversation as resolved.
Show resolved Hide resolved
{ checkpoint_frequency, true },
{ sstable_size_mb, false },
{ block_size_kb, false },
{ bloom_false_positive, false },
{ state_store, false },
{ data_directory, false },
{ backup_storage_url, false },
{ backup_storage_directory, false },
}
};
}

/// Convert field name to string.
macro_rules! key_of {
($field:ident) => {
stringify!($field)
};
}

/// Define key constants for fields in `SystemParams` for use of other modules.
macro_rules! def_key {
($({ $field:ident },)*) => {
paste! {
$(
pub const [<$field:upper _KEY>]: &str = key_of!($field);
)*
}

};
}

for_all_params!(def_key);

macro_rules! impl_system_params_to_kv {
($({ $field:ident, $_:expr },)*) => {
/// All undeprecated fields are guaranteed to be contained in the returned map.
/// Return error if there are missing fields.
pub fn system_params_to_kv(params: &SystemParams) -> Result<Vec<(String, String)>> {
let mut ret = Vec::with_capacity(9);
$(ret.push((
key_of!($field).to_string(),
params
.$field.as_ref()
.ok_or::<RwError>(ErrorCode::SystemParamsError(format!(
"missing system param {:?}",
key_of!($field)
)).into())?
.to_string(),
));)*
Ok(ret)
}
};
}

macro_rules! impl_system_params_from_kv {
($({ $field:ident, $_:expr },)*) => {
/// For each field in `SystemParams`, one of these rules apply:
/// - Up-to-date: Guaranteed to be `Some`. If it is not present, may try to derive it from previous
/// versions of this field.
/// - Deprecated: Guaranteed to be `None`.
/// - Unrecognized: Not allowed.
pub fn system_params_from_kv(kvs: Vec<(impl AsRef<[u8]>, impl AsRef<[u8]>)>) -> Result<SystemParams> {
let mut ret = SystemParams::default();
let mut expected_keys: HashSet<_> = [
$(key_of!($field),)*
]
.iter()
.cloned()
.collect();
for (k, v) in kvs {
let k = std::str::from_utf8(k.as_ref()).unwrap();
let v = std::str::from_utf8(v.as_ref()).unwrap();
match k {
$(
key_of!($field) => ret.$field = Some(v.parse().unwrap()),
)*
_ => {
return Err(ErrorCode::SystemParamsError(format!(
"unrecognized system param {:?}",
k
))
.into());
}
}
expected_keys.remove(k);
}
if !expected_keys.is_empty() {
return Err(ErrorCode::SystemParamsError(format!(
"missing system param {:?}",
expected_keys
))
.into());
}
Ok(ret)
}
};
}

for_all_undeprecated_params!(impl_system_params_from_kv);

for_all_undeprecated_params!(impl_system_params_to_kv);

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_to_from_kv() {
// Include all fields (deprecated also).
let kvs = vec![
(BARRIER_INTERVAL_MS_KEY, "1"),
(CHECKPOINT_FREQUENCY_KEY, "1"),
(SSTABLE_SIZE_MB_KEY, "1"),
(BLOCK_SIZE_KB_KEY, "1"),
(BLOOM_FALSE_POSITIVE_KEY, "1"),
(STATE_STORE_KEY, "a"),
(DATA_DIRECTORY_KEY, "a"),
(BACKUP_STORAGE_URL_KEY, "a"),
(BACKUP_STORAGE_DIRECTORY_KEY, "a"),
];

// To kv - missing field.
let p = SystemParams::default();
assert!(system_params_to_kv(&p).is_err());

// From kv - missing field.
assert!(system_params_from_kv(vec![(BARRIER_INTERVAL_MS_KEY, "1")]).is_err());

// From kv - unrecognized field.
assert!(system_params_from_kv(vec![("?", "?")]).is_err());

// Deser & ser.
let p = system_params_from_kv(kvs).unwrap();
assert_eq!(
p,
system_params_from_kv(system_params_to_kv(&p).unwrap()).unwrap()
);
}
}
2 changes: 1 addition & 1 deletion src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ pub async fn handle(
variable,
value,
} => variable::handle_set(handler_args, variable, value),
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable),
Statement::ShowVariable { variable } => variable::handle_show(handler_args, variable).await,
Statement::CreateIndex {
name,
table_name,
Expand Down
Loading