From 3d9077b07a8318cc0d64b25b904f0e66e1bf5c86 Mon Sep 17 00:00:00 2001 From: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com> Date: Wed, 1 Feb 2023 22:10:12 +0800 Subject: [PATCH] feat: override config with cli opts (#7613) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Define a proc macro `OverrideConfig` which will override fields in `RwConfig`. It supports overriding any field in the config file. Its usage is as bellow: ```rust pub struct ComputeNodeOpts { // Other items ... #[clap(flatten)] override_config: OverrideConfigOpts, } /// Command-line arguments for compute-node that overrides the config file. #[derive(Parser, Clone, Debug, OverrideConfig)] struct OverrideConfigOpts { #[clap(long)] #[override(path = storage.state_store)] pub state_store: Option, /// Enable reporting tracing information to jaeger. #[clap(parse(from_flag), long)] #[override(path = streaming.enable_jaeger_tracing)] pub enable_jaeger_tracing: Flag, } fn compute_node_serve(opts: ComputeNodeOpts) { // `override_config` should not be usable afterwards let config = load_config(&opts.config_path, Some(opts.override_config)); } ``` I plan to keep only the addresses, hardware-related items and credential-related items in CLI. Thus more options are available in the config file. Please check the release notes. This PR keeps backward compatibility, as the newly added fields in the config file can still be overridden from CLI. I did not use serfig because it [does not support overriding with default values](https://github.com/Xuanwo/serfig/issues/23#issuecomment-1409809442). The user might not do that, but as risedev users, we might accidentally have modified risingwave.toml and forgot to change it back, which conflicts with the values generated by risedev and causing confusing behaviours 🥵 Approved-By: fuyufjh Approved-By: xxchan Co-Authored-By: Gun9niR Co-Authored-By: Zhidong Guo <52783948+Gun9niR@users.noreply.github.com> --- Cargo.lock | 57 +++++++- src/cmd_all/src/playground.rs | 4 - src/common/Cargo.toml | 1 + src/common/proc_macro/Cargo.toml | 21 +++ src/common/proc_macro/src/config.rs | 64 +++++++++ src/common/proc_macro/src/lib.rs | 57 ++++++++ src/common/src/config.rs | 123 +++++++++++++++++- src/compute/Cargo.toml | 1 + src/compute/src/lib.rs | 81 ++++++------ src/compute/src/server.rs | 22 ++-- src/frontend/Cargo.toml | 1 + src/frontend/src/lib.rs | 27 ++-- src/frontend/src/session.rs | 13 +- src/meta/Cargo.toml | 1 + src/meta/src/backup_restore/restore.rs | 7 +- src/meta/src/backup_restore/utils.rs | 7 +- src/meta/src/hummock/manager/mod.rs | 2 +- src/meta/src/hummock/vacuum.rs | 4 +- src/meta/src/lib.rs | 38 +++--- src/risedevtool/src/config/use_expander.rs | 2 +- src/storage/compactor/Cargo.toml | 1 + src/storage/compactor/src/lib.rs | 44 ++++--- src/storage/compactor/src/server.rs | 14 +- .../src/compaction_test_runner.rs | 9 +- .../src/delete_range_runner.rs | 4 +- 25 files changed, 470 insertions(+), 135 deletions(-) create mode 100644 src/common/proc_macro/Cargo.toml create mode 100644 src/common/proc_macro/src/config.rs create mode 100644 src/common/proc_macro/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 0162b196ed425..ce916c7cbd90d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -936,6 +936,19 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "bae" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33b8de67cc41132507eeece2584804efcb15f85ba516e34c944b7667f480397a" +dependencies = [ + "heck 0.3.3", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.13.1" @@ -1402,7 +1415,7 @@ version = "3.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro-error", "proc-macro2", "quote", @@ -1415,7 +1428,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "684a277d672e91966334af371f1a7b5833f9aa00b07c84e92fbce95e00208ce8" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro-error", "proc-macro2", "quote", @@ -2121,7 +2134,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9720bba047d567ffc8a3cba48bf19126600e249ab7f128e9233e6376976a116" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "syn", @@ -2838,6 +2851,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "heck" version = "0.4.0" @@ -4565,7 +4587,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bdbb7b706f2afc610f3853550cdbbf6372fd324824a087806bd4480ea4996e24" dependencies = [ - "heck", + "heck 0.4.0", "itertools", "prost 0.11.6", "prost-types", @@ -5039,7 +5061,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" dependencies = [ "bytes", - "heck", + "heck 0.4.0", "itertools", "lazy_static", "log", @@ -5723,6 +5745,7 @@ dependencies = [ "bytes", "chrono", "chrono-tz", + "clap 3.2.23", "comfy-table", "crc32fast", "criterion", @@ -5778,6 +5801,18 @@ dependencies = [ "workspace-hack", ] +[[package]] +name = "risingwave_common_proc_macro" +version = "0.2.0-alpha" +dependencies = [ + "bae", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", + "workspace-hack", +] + [[package]] name = "risingwave_common_service" version = "0.2.0-alpha" @@ -5839,6 +5874,7 @@ dependencies = [ "parking_lot 0.12.1", "prometheus", "risingwave_common", + "risingwave_common_proc_macro", "risingwave_common_service", "risingwave_hummock_sdk", "risingwave_object_store", @@ -5887,6 +5923,7 @@ dependencies = [ "rand 0.8.5", "risingwave_batch", "risingwave_common", + "risingwave_common_proc_macro", "risingwave_common_service", "risingwave_connector", "risingwave_hummock_sdk", @@ -6105,6 +6142,7 @@ dependencies = [ "rand 0.8.5", "risingwave_batch", "risingwave_common", + "risingwave_common_proc_macro", "risingwave_common_service", "risingwave_connector", "risingwave_expr", @@ -6235,6 +6273,7 @@ dependencies = [ "reqwest", "risingwave_backup", "risingwave_common", + "risingwave_common_proc_macro", "risingwave_common_service", "risingwave_connector", "risingwave_hummock_sdk", @@ -7427,7 +7466,7 @@ version = "0.24.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59" dependencies = [ - "heck", + "heck 0.4.0", "proc-macro2", "quote", "rustversion", @@ -8242,6 +8281,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fdbf052a0783de01e944a6ce7a8cb939e295b1e7be835a1112c3b9a7f047a5a" + [[package]] name = "unicode-width" version = "0.1.10" diff --git a/src/cmd_all/src/playground.rs b/src/cmd_all/src/playground.rs index d05446246b5be..5dbb65c3a87dc 100644 --- a/src/cmd_all/src/playground.rs +++ b/src/cmd_all/src/playground.rs @@ -18,7 +18,6 @@ use std::sync::LazyLock; use anyhow::Result; use clap::StructOpt; -use risingwave_common::config::load_config; use tempfile::TempPath; use tokio::signal; @@ -154,9 +153,6 @@ pub async fn playground() -> Result<()> { opts.insert(0, "meta-node".into()); tracing::info!("starting meta-node thread with cli args: {:?}", opts); let opts = risingwave_meta::MetaNodeOpts::parse_from(opts); - - let _config = load_config(&opts.config_path); - tracing::info!("opts: {:#?}", opts); let _meta_handle = tokio::spawn(async move { risingwave_meta::start(opts).await; diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index e07d0e5ab5145..a76c0941c4e80 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -19,6 +19,7 @@ byteorder = "1" bytes = "1" chrono = { version = "0.4", default-features = false, features = ["clock", "std"] } chrono-tz = { version = "0.7", features = ["case-insensitive"] } +clap = { version = "3", features = ["derive"] } comfy-table = "6" crc32fast = "1" derivative = "2" diff --git a/src/common/proc_macro/Cargo.toml b/src/common/proc_macro/Cargo.toml new file mode 100644 index 0000000000000..a8017141f2fff --- /dev/null +++ b/src/common/proc_macro/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "risingwave_common_proc_macro" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +[lib] +proc-macro = true + +[dependencies] +proc-macro-error = "1.0" +quote = "1" +proc-macro2 = { version = "1", default-features = false } +syn = "1" +bae = "0.1.7" + +[target.'cfg(not(madsim))'.dependencies] +workspace-hack = { path = "../../workspace-hack" } \ No newline at end of file diff --git a/src/common/proc_macro/src/config.rs b/src/common/proc_macro/src/config.rs new file mode 100644 index 0000000000000..55f5c98559be5 --- /dev/null +++ b/src/common/proc_macro/src/config.rs @@ -0,0 +1,64 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use bae::FromAttributes; +use proc_macro2::TokenStream; +use proc_macro_error::ResultExt; +use quote::quote; +use syn::DeriveInput; + +#[derive(FromAttributes)] +pub struct OverrideOpts { + pub path: Option, + pub optional_in_config: Option<()>, +} + +#[cfg_attr(coverage, no_coverage)] +pub fn produce_override_config(input: DeriveInput) -> TokenStream { + let struct_ident = input.ident; + let mut override_stmts = Vec::new(); + + if let syn::Data::Struct(syn::DataStruct { fields, .. }) = input.data { + for field in fields { + let override_opts = OverrideOpts::from_attributes(&field.attrs) + .expect_or_abort("Failed to parse `override_opts` attribute"); + let path = override_opts.path.expect("`path` must exist"); + let field_ident = field.ident; + + let override_stmt = if override_opts.optional_in_config.is_some() { + quote! { + if self.#field_ident.is_some() { + config.#path = self.#field_ident; + } + } + } else { + quote! { + if let Some(v) = self.#field_ident { + config.#path = v; + } + } + }; + + override_stmts.push(override_stmt); + } + } + + quote! { + impl risingwave_common::config::OverrideConfig for #struct_ident { + fn r#override(self, config: &mut risingwave_common::config::RwConfig) { + #(#override_stmts)* + } + } + } +} diff --git a/src/common/proc_macro/src/lib.rs b/src/common/proc_macro/src/lib.rs new file mode 100644 index 0000000000000..33808a2c496a4 --- /dev/null +++ b/src/common/proc_macro/src/lib.rs @@ -0,0 +1,57 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#![cfg_attr(coverage, feature(no_coverage))] + +use proc_macro_error::proc_macro_error; +use syn::parse_macro_input; + +mod config; + +/// Sections in the configuration file can use `#[derive(OverrideConfig)]` to generate the +/// implementation of overwriting configs from the file. +/// +/// In the struct definition, use #[override_opts(path = ...)] on a field to indicate the field in +/// `RwConfig` to override. +/// +/// An example: +/// +/// ```ignore +/// #[derive(OverrideConfig)] +/// struct Opts { +/// #[override_opts(path = meta.listen_addr)] +/// listen_addr: Option, +/// } +/// ``` +/// +/// will generate +/// +/// impl OverrideConfig for Opts { +/// fn r#override(self, config: &mut RwConfig) { +/// if let Some(v) = self.required_str { +/// config.meta.listen_addr = v; +/// } +/// } +/// } +/// ``` +#[cfg_attr(coverage, no_coverage)] +#[proc_macro_derive(OverrideConfig, attributes(override_opts))] +#[proc_macro_error] +pub fn override_config(input: proc_macro::TokenStream) -> proc_macro::TokenStream { + let input = parse_macro_input!(input); + + let gen = config::produce_override_config(input); + + gen.into() +} diff --git a/src/common/src/config.rs b/src/common/src/config.rs index c0423472a3dff..ba9ccf70da8cb 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -19,6 +19,7 @@ use std::fs; +use clap::ArgEnum; use serde::{Deserialize, Serialize}; /// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed @@ -27,17 +28,49 @@ pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1; /// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange, /// as we don't rely on this for back-pressure. pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB +/// For non-user-facing components where the CLI arguments do not override the config file. +pub const NO_OVERRIDE: Option = None; -pub fn load_config(path: &str) -> RwConfig +/// A workaround for a bug in clap where the attribute `from_flag` on `Option` results in +/// compilation error. +pub type Flag = Option; + +pub fn load_config(path: &str, cli_override: Option) -> RwConfig where { - if path.is_empty() { + let mut config = if path.is_empty() { tracing::warn!("risingwave.toml not found, using default config."); - return RwConfig::default(); + RwConfig::default() + } else { + let config_str = fs::read_to_string(path) + .unwrap_or_else(|e| panic!("failed to open config file '{}': {}", path, e)); + toml::from_str(config_str.as_str()).unwrap_or_else(|e| panic!("parse error {}", e)) + }; + if let Some(cli_override) = cli_override { + cli_override.r#override(&mut config); } - let config_str = fs::read_to_string(path) - .unwrap_or_else(|e| panic!("failed to open config file '{}': {}", path, e)); - toml::from_str(config_str.as_str()).unwrap_or_else(|e| panic!("parse error {}", e)) + config +} + +/// Map command line flag to `Flag`. Should only be used in `#[derive(OverrideConfig)]`. +pub fn true_if_present(b: bool) -> Flag { + if b { + Some(true) + } else { + None + } +} + +pub trait OverrideConfig { + fn r#override(self, config: &mut RwConfig); +} + +/// A dummy struct for `NO_OVERRIDE`. Do NOT use it directly. +#[derive(Clone, Copy)] +pub struct NoOverride {} + +impl OverrideConfig for NoOverride { + fn r#override(self, _config: &mut RwConfig) {} } /// [`RwConfig`] corresponds to the whole config file `risingwave.toml`. Each field corresponds to a @@ -63,6 +96,13 @@ pub struct RwConfig { pub backup: BackupConfig, } +#[derive(Copy, Clone, Debug, Default, ArgEnum, Serialize, Deserialize)] +pub enum MetaBackend { + #[default] + Mem, + Etcd, +} + /// The section `[meta]` in `risingwave.toml`. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -111,6 +151,9 @@ pub struct MetaConfig { #[serde(default = "default::meta::node_num_monitor_interval_sec")] pub node_num_monitor_interval_sec: u64, + + #[serde(default = "default::meta::backend")] + pub backend: MetaBackend, } impl Default for MetaConfig { @@ -133,6 +176,12 @@ pub struct ServerConfig { #[serde(default = "default::server::connection_pool_size")] pub connection_pool_size: u16, + + #[serde(default = "default::server::metrics_level")] + /// Used for control the metrics level, similar to log level. + /// 0 = close metrics + /// >0 = open metrics + pub metrics_level: u32, } impl Default for ServerConfig { @@ -181,6 +230,14 @@ pub struct StreamingConfig { #[serde(default)] pub actor_runtime_worker_threads_num: Option, + /// Enable reporting tracing information to jaeger. + #[serde(default = "default::streaming::enable_jaegar_tracing")] + pub enable_jaeger_tracing: bool, + + /// Enable async stack tracing for risectl. + #[serde(default = "default::streaming::async_stack_trace")] + pub async_stack_trace: AsyncStackTraceOption, + #[serde(default)] pub developer: DeveloperConfig, } @@ -221,6 +278,10 @@ pub struct StorageConfig { #[serde(default = "default::storage::shared_buffer_capacity_mb")] pub shared_buffer_capacity_mb: u32, + /// State store url. + #[serde(default = "default::storage::state_store")] + pub state_store: String, + /// Remote directory for storing data and metadata objects. #[serde(default = "default::storage::data_directory")] pub data_directory: String, @@ -273,6 +334,9 @@ pub struct StorageConfig { #[serde(default = "default::storage::object_store_use_batch_delete")] pub object_store_use_batch_delete: bool, + #[serde(default = "default::storage::max_concurrent_compaction_task_number")] + pub max_concurrent_compaction_task_number: u64, + /// Whether to enable state_store_v1 for hummock #[serde(default = "default::storage::enable_state_store_v1")] pub enable_state_store_v1: bool, @@ -290,6 +354,9 @@ impl Default for StorageConfig { #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct FileCacheConfig { + #[serde(default = "default::file_cache::dir")] + pub dir: String, + #[serde(default = "default::file_cache::capacity_mb")] pub capacity_mb: usize, @@ -312,6 +379,14 @@ impl Default for FileCacheConfig { } } +#[derive(Debug, Default, Clone, ArgEnum, Serialize, Deserialize)] +pub enum AsyncStackTraceOption { + Off, + #[default] + On, + Verbose, +} + /// The subsections `[batch.developer]` and `[streaming.developer]`. /// /// It is put at [`BatchConfig::developer`] and [`StreamingConfig::developer`]. @@ -382,6 +457,8 @@ impl Default for BackupConfig { mod default { pub mod meta { + use crate::config::MetaBackend; + pub fn min_sst_retention_time_sec() -> u64 { 604800 } @@ -409,6 +486,10 @@ mod default { pub fn node_num_monitor_interval_sec() -> u64 { 10 } + + pub fn backend() -> MetaBackend { + MetaBackend::Mem + } } pub mod server { @@ -424,6 +505,10 @@ mod default { pub fn connection_pool_size() -> u16 { 16 } + + pub fn metrics_level() -> u32 { + 0 + } } pub mod storage { @@ -452,6 +537,12 @@ mod default { 1024 } + pub fn state_store() -> String { + // May be problematic for multi-node deployment, but since we override it with CLI and + // it will be removed soon, it won't be a problem. + "hummock+memory".to_string() + } + pub fn data_directory() -> String { "hummock_001".to_string() } @@ -504,12 +595,19 @@ mod default { pub fn object_store_use_batch_delete() -> bool { true } + + pub fn max_concurrent_compaction_task_number() -> u64 { + 16 + } + pub fn enable_state_store_v1() -> bool { false } } pub mod streaming { + use crate::config::AsyncStackTraceOption; + pub fn barrier_interval_ms() -> u32 { 1000 } @@ -523,10 +621,22 @@ mod default { pub fn checkpoint_frequency() -> usize { 10 } + + pub fn enable_jaegar_tracing() -> bool { + false + } + + pub fn async_stack_trace() -> AsyncStackTraceOption { + AsyncStackTraceOption::On + } } pub mod file_cache { + pub fn dir() -> String { + "".to_string() + } + pub fn capacity_mb() -> usize { 1024 } @@ -549,6 +659,7 @@ mod default { } pub mod developer { + pub fn batch_output_channel_size() -> usize { 64 } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 9731fb96ab248..f305de0b3ec78 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -37,6 +37,7 @@ prometheus = { version = "0.13" } prost = "0.11" risingwave_batch = { path = "../batch" } risingwave_common = { path = "../common" } +risingwave_common_proc_macro = { path = "../common/proc_macro" } risingwave_common_service = { path = "../common/common_service" } risingwave_connector = { path = "../connector" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index 8d9c20dbe608b..74b9048f5f4bf 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -28,17 +28,11 @@ pub mod memory_management; pub mod rpc; pub mod server; -use clap::clap_derive::ArgEnum; use clap::Parser; +use risingwave_common::config::{true_if_present, AsyncStackTraceOption, Flag}; use risingwave_common::util::resource_util::cpu::total_cpu_available; use risingwave_common::util::resource_util::memory::total_memory_available_bytes; - -#[derive(Debug, Clone, ArgEnum)] -pub enum AsyncStackTraceOption { - Off, - On, // default - Verbose, -} +use risingwave_common_proc_macro::OverrideConfig; /// Command-line arguments for compute-node. #[derive(Parser, Clone, Debug)] @@ -53,40 +47,12 @@ pub struct ComputeNodeOpts { #[clap(long)] pub client_address: Option, - /// One of: - /// 1. `hummock+{object_store}` where `object_store` - /// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`, - /// `memory` or `memory-shared`. - /// 2. `in-memory` - /// 3. `sled://{path}` - #[clap(long, default_value = "hummock+memory")] - pub state_store: String, - #[clap(long, default_value = "127.0.0.1:1222")] pub prometheus_listener_addr: String, - /// Used for control the metrics level, similar to log level. - /// 0 = close metrics - /// >0 = open metrics - #[clap(long, default_value = "0")] - pub metrics_level: u32, - #[clap(long, default_value = "http://127.0.0.1:5690")] pub meta_address: String, - /// Enable reporting tracing information to jaeger. - #[clap(long)] - pub enable_jaeger_tracing: bool, - - /// Enable async stack tracing for risectl. - #[clap(long, arg_enum, default_value_t = AsyncStackTraceOption::On)] - pub async_stack_trace: AsyncStackTraceOption, - - /// Path to file cache data directory. - /// Left empty to disable file cache. - #[clap(long, default_value = "")] - pub file_cache_dir: String, - /// Endpoint of the connector node #[clap(long, env = "CONNECTOR_RPC_ENDPOINT")] pub connector_rpc_endpoint: Option, @@ -94,9 +60,6 @@ pub struct ComputeNodeOpts { /// The path of `risingwave.toml` configuration file. /// /// If empty, default configuration values will be used. - /// - /// Note that internal system parameters should be defined in the configuration file at - /// [`risingwave_common::config`] instead of command line arguments. #[clap(long, default_value = "")] pub config_path: String, @@ -107,6 +70,46 @@ pub struct ComputeNodeOpts { /// The parallelism that the compute node will register to the scheduler of the meta service. #[clap(long, default_value_t = default_parallelism())] pub parallelism: usize, + + #[clap(flatten)] + override_config: OverrideConfigOpts, +} + +/// Command-line arguments for compute-node that overrides the config file. +#[derive(Parser, Clone, Debug, OverrideConfig)] +struct OverrideConfigOpts { + /// One of: + /// 1. `hummock+{object_store}` where `object_store` + /// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`, + /// `memory` or `memory-shared`. + /// 2. `in-memory` + /// 3. `sled://{path}` + #[clap(long)] + #[override_opts(path = storage.state_store)] + pub state_store: Option, + + /// Used for control the metrics level, similar to log level. + /// 0 = close metrics + /// >0 = open metrics + #[clap(long)] + #[override_opts(path = server.metrics_level)] + pub metrics_level: Option, + + /// Path to file cache data directory. + /// Left empty to disable file cache. + #[clap(long)] + #[override_opts(path = storage.file_cache.dir)] + pub file_cache_dir: Option, + + /// Enable reporting tracing information to jaeger. + #[clap(parse(from_flag = true_if_present), long)] + #[override_opts(path = streaming.enable_jaeger_tracing)] + pub enable_jaeger_tracing: Flag, + + /// Enable async stack tracing for risectl. + #[clap(long, arg_enum)] + #[override_opts(path = streaming.async_stack_trace)] + pub async_stack_trace: Option, } fn validate_opts(opts: &ComputeNodeOpts) { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index a8614758b3759..938f7ca07c2bb 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -20,7 +20,9 @@ use async_stack_trace::StackTraceManager; use risingwave_batch::executor::BatchTaskMetrics; use risingwave_batch::rpc::service::task_service::BatchServiceImpl; use risingwave_batch::task::{BatchEnvironment, BatchManager}; -use risingwave_common::config::{load_config, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE}; +use risingwave_common::config::{ + load_config, AsyncStackTraceOption, MAX_CONNECTION_WINDOW_SIZE, STREAM_WINDOW_SIZE, +}; use risingwave_common::monitor::process_linux::monitor_process; use risingwave_common::util::addr::HostAddr; use risingwave_common_service::metrics_manager::MetricsManager; @@ -59,7 +61,7 @@ use crate::rpc::service::monitor_service::{ GrpcStackTraceManagerRef, MonitorServiceImpl, StackTraceMiddlewareLayer, }; use crate::rpc::service::stream_service::StreamServiceImpl; -use crate::{AsyncStackTraceOption, ComputeNodeOpts}; +use crate::ComputeNodeOpts; /// Bootstraps the compute-node. pub async fn compute_node_serve( @@ -68,7 +70,7 @@ pub async fn compute_node_serve( opts: ComputeNodeOpts, ) -> (Vec>, Sender<()>) { // Load the configuration. - let config = load_config(&opts.config_path); + let config = load_config(&opts.config_path, Some(opts.override_config)); info!( "Starting compute node with config {:?} with debug assertions {}", config, @@ -116,14 +118,14 @@ pub async fn compute_node_serve( let mut join_handle_vec = vec![]; let state_store = StateStoreImpl::new( - &opts.state_store, - &opts.file_cache_dir, + &config.storage.state_store, + &config.storage.file_cache.dir, &config, hummock_meta_client.clone(), state_store_metrics.clone(), object_store_metrics, TieredCacheMetricsBuilder::new(registry.clone()), - if opts.enable_jaeger_tracing { + if config.streaming.enable_jaeger_tracing { Arc::new( risingwave_tracing::RwTracingService::new(risingwave_tracing::TracingConfig::new( "127.0.0.1:6831".to_string(), @@ -144,8 +146,8 @@ pub async fn compute_node_serve( extra_info_sources.push(storage.sstable_id_manager().clone()); // Note: we treat `hummock+memory-shared` as a shared storage, so we won't start the // compactor along with compute node. - if opts.state_store == "hummock+memory" - || opts.state_store.starts_with("hummock+disk") + if config.storage.state_store == "hummock+memory" + || config.storage.state_store.starts_with("hummock+disk") || storage_config.disable_remote_compactor { tracing::info!("start embedded compactor"); @@ -189,7 +191,7 @@ pub async fn compute_node_serve( extra_info_sources, )); - let async_stack_trace_config = match opts.async_stack_trace { + let async_stack_trace_config = match &config.streaming.async_stack_trace { AsyncStackTraceOption::Off => None, c => Some(async_stack_trace::TraceConfig { report_detached: true, @@ -311,7 +313,7 @@ pub async fn compute_node_serve( join_handle_vec.push(join_handle); // Boot metrics service. - if opts.metrics_level > 0 { + if config.server.metrics_level > 0 { MetricsManager::boot_metrics_service( opts.prometheus_listener_addr.clone(), registry.clone(), diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a991a210059c3..c2b4246867baa 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -40,6 +40,7 @@ prost = "0.11" rand = "0.8" risingwave_batch = { path = "../batch" } risingwave_common = { path = "../common" } +risingwave_common_proc_macro = { path = "../common/proc_macro" } risingwave_common_service = { path = "../common/common_service" } risingwave_connector = { path = "../connector" } risingwave_expr = { path = "../expr" } diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index 147c960369264..6b5d18153078e 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -47,6 +47,7 @@ pub use planner::Planner; mod scheduler; pub mod session; mod stream_fragmenter; +use risingwave_common_proc_macro::OverrideConfig; pub use stream_fragmenter::build_graph; mod utils; pub use utils::{explain_stream_graph, WithOptions}; @@ -65,6 +66,7 @@ use clap::Parser; use pgwire::pg_server::pg_serve; use session::SessionManagerImpl; +/// Command-line arguments for frontend-node. #[derive(Parser, Clone, Debug)] pub struct FrontendOpts { // TODO: rename to listen_address and separate out the port. @@ -88,12 +90,6 @@ pub struct FrontendOpts { #[clap(long, default_value = "127.0.0.1:6786")] pub health_check_listener_addr: String, - /// Used for control the metrics level, similar to log level. - /// 0 = close metrics - /// >0 = open metrics - #[clap(long, default_value = "0")] - pub metrics_level: u32, - /// The path of `risingwave.toml` configuration file. /// /// If empty, default configuration values will be used. @@ -102,6 +98,20 @@ pub struct FrontendOpts { /// [`risingwave_common::config`] instead of command line arguments. #[clap(long, default_value = "")] pub config_path: String, + + #[clap(flatten)] + override_opts: OverrideConfigOpts, +} + +/// Command-line arguments for frontend-node that overrides the config file. +#[derive(Parser, Clone, Debug, OverrideConfig)] +struct OverrideConfigOpts { + /// Used for control the metrics level, similar to log level. + /// 0 = close metrics + /// >0 = open metrics + #[clap(long)] + #[override_opts(path = server.metrics_level)] + pub metrics_level: Option, } impl Default for FrontendOpts { @@ -120,8 +130,9 @@ pub fn start(opts: FrontendOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { - let session_mgr = Arc::new(SessionManagerImpl::new(&opts).await.unwrap()); - pg_serve(&opts.host, session_mgr, Some(TlsConfig::new_default())) + let addr = opts.host.clone(); + let session_mgr = Arc::new(SessionManagerImpl::new(opts).await.unwrap()); + pg_serve(&addr, session_mgr, Some(TlsConfig::new_default())) .await .unwrap(); }) diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 748848542db18..f74ff09d4ddf8 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -144,12 +144,13 @@ impl FrontendEnv { } pub async fn init( - opts: &FrontendOpts, + opts: FrontendOpts, ) -> Result<(Self, JoinHandle<()>, JoinHandle<()>, Sender<()>)> { - let config = load_config(&opts.config_path); + let config = load_config(&opts.config_path, Some(opts.override_opts)); tracing::info!( - "Starting frontend node with\nfrontend config {:?}", - config.server + "Starting frontend node with config {:?} with debug assertions {}", + config, + if cfg!(debug_assertions) { "on" } else { "off" } ); let batch_config = config.batch; @@ -232,7 +233,7 @@ impl FrontendEnv { let frontend_metrics = Arc::new(FrontendMetrics::new(registry.clone())); let source_metrics = Arc::new(SourceMetrics::new(registry.clone())); - if opts.metrics_level > 0 { + if config.server.metrics_level > 0 { MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone(), registry); } @@ -624,7 +625,7 @@ impl SessionManager for SessionManagerImpl { } impl SessionManagerImpl { - pub async fn new(opts: &FrontendOpts) -> Result { + pub async fn new(opts: FrontendOpts) -> Result { let (env, join_handle, heartbeat_join_handle, heartbeat_shutdown_sender) = FrontendEnv::init(opts).await?; Ok(Self { diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index b28d42bee1220..e9fab3004788d 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -42,6 +42,7 @@ rand = "0.8" reqwest = "0.11" risingwave_backup = { path = "../storage/backup" } risingwave_common = { path = "../common" } +risingwave_common_proc_macro = { path = "../common/proc_macro" } risingwave_common_service = { path = "../common/common_service" } risingwave_connector = { path = "../connector" } risingwave_hummock_sdk = { path = "../storage/hummock_sdk" } diff --git a/src/meta/src/backup_restore/restore.rs b/src/meta/src/backup_restore/restore.rs index 4565b5ef8021d..f3476019e7c4b 100644 --- a/src/meta/src/backup_restore/restore.rs +++ b/src/meta/src/backup_restore/restore.rs @@ -17,12 +17,13 @@ use itertools::Itertools; use risingwave_backup::error::{BackupError, BackupResult}; use risingwave_backup::meta_snapshot::MetaSnapshot; use risingwave_backup::storage::MetaSnapshotStorageRef; +use risingwave_common::config::MetaBackend; use crate::backup_restore::utils::{get_backup_store, get_meta_store, MetaStoreBackendImpl}; +use crate::dispatch_meta_store; use crate::hummock::compaction_group::CompactionGroup; use crate::model::{MetadataModel, TableFragments}; use crate::storage::{MetaStore, DEFAULT_COLUMN_FAMILY}; -use crate::{dispatch_meta_store, Backend}; /// Command-line arguments for restore. #[derive(Parser, Debug, Clone)] @@ -32,8 +33,8 @@ pub struct RestoreOpts { #[clap(long)] pub meta_snapshot_id: u64, /// Type of meta store to restore. - #[clap(long, arg_enum, default_value_t = Backend::Etcd)] - pub meta_store_type: Backend, + #[clap(long, arg_enum, default_value_t = MetaBackend::Etcd)] + pub meta_store_type: MetaBackend, /// Etcd endpoints. #[clap(long, default_value_t = String::from(""))] pub etcd_endpoints: String, diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 4c7a2ba47a97f..a040214f72ad9 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -18,12 +18,13 @@ use std::time::Duration; use etcd_client::ConnectOptions; use risingwave_backup::error::BackupResult; use risingwave_backup::storage::{MetaSnapshotStorageRef, ObjectStoreMetaSnapshotStorage}; +use risingwave_common::config::MetaBackend; use risingwave_object_store::object::object_metrics::ObjectStoreMetrics; use risingwave_object_store::object::parse_remote_object_store; use crate::backup_restore::RestoreOpts; use crate::storage::{EtcdMetaStore, MemStore, WrappedEtcdClient as EtcdClient}; -use crate::{Backend, MetaStoreBackend}; +use crate::MetaStoreBackend; #[derive(Clone)] pub enum MetaStoreBackendImpl { @@ -44,7 +45,7 @@ macro_rules! dispatch_meta_store { // Code is copied from src/meta/src/rpc/server.rs. TODO #6482: extract method. pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult { let meta_store_backend = match opts.meta_store_type { - Backend::Etcd => MetaStoreBackend::Etcd { + MetaBackend::Etcd => MetaStoreBackend::Etcd { endpoints: opts .etcd_endpoints .split(',') @@ -55,7 +56,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult None, }, }, - Backend::Mem => MetaStoreBackend::Mem, + MetaBackend::Mem => MetaStoreBackend::Mem, }; match meta_store_backend { MetaStoreBackend::Etcd { diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index f59bcc597fcac..126c0c2efea96 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -2198,7 +2198,7 @@ fn gen_version_delta<'a>( version_delta.safe_epoch = std::cmp::max(old_version.safe_epoch, compact_task.watermark); version_delta.id = old_version.id + 1; // Don't persist version delta generated by compaction to meta store in deterministic mode. - // Because it will overwrite existing version delta that has same ID generated in the data + // Because it will override existing version delta that has same ID generated in the data // ingestion phase. if !deterministic_mode { txn.insert(version_delta.id, version_delta.clone()); diff --git a/src/meta/src/hummock/vacuum.rs b/src/meta/src/hummock/vacuum.rs index 3ed62edfc88a9..86912823a3187 100644 --- a/src/meta/src/hummock/vacuum.rs +++ b/src/meta/src/hummock/vacuum.rs @@ -461,7 +461,7 @@ mod tests { panic!() } }; - // min_sst_retention_time_sec overwrite user provided value. + // min_sst_retention_time_sec override user provided value. assert_eq!( vacuum.env.opts.min_sst_retention_time_sec, full_scan_task.sst_retention_time_sec @@ -479,7 +479,7 @@ mod tests { panic!() } }; - // min_sst_retention_time_sec doesn't overwrite user provided value. + // min_sst_retention_time_sec doesn't override user provided value. assert_eq!( vacuum.env.opts.min_sst_retention_time_sec + 1, full_scan_task.sst_retention_time_sec diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 429c52e95e159..07fdfe5a2d8f5 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -48,18 +48,13 @@ mod stream; use std::time::Duration; -use clap::{ArgEnum, Parser}; +use clap::Parser; pub use error::{MetaError, MetaResult}; +use risingwave_common_proc_macro::OverrideConfig; use crate::manager::MetaOpts; use crate::rpc::server::{rpc_serve, AddressInfo, MetaStoreBackend}; -#[derive(Copy, Clone, Debug, ArgEnum)] -pub enum Backend { - Mem, - Etcd, -} - #[derive(Debug, Clone, Parser)] pub struct MetaNodeOpts { // TODO: rename to listen_address and separate out the port. @@ -81,9 +76,6 @@ pub struct MetaNodeOpts { #[clap(long)] prometheus_host: Option, - #[clap(long, arg_enum, default_value_t = Backend::Mem)] - backend: Backend, - #[clap(long, default_value_t = String::from(""))] etcd_endpoints: String, @@ -116,27 +108,35 @@ pub struct MetaNodeOpts { /// The path of `risingwave.toml` configuration file. /// /// If empty, default configuration values will be used. - /// - /// Note that internal system parameters should be defined in the configuration file at - /// [`risingwave_common::config`] instead of command line arguments. #[clap(long, default_value = "")] pub config_path: String, + + #[clap(flatten)] + pub override_opts: OverrideConfigOpts, +} + +/// Command-line arguments for compute-node that overrides the config file. +#[derive(Parser, Clone, Debug, OverrideConfig)] +pub struct OverrideConfigOpts { + #[clap(long, arg_enum)] + #[override_opts(path = meta.backend)] + backend: Option, } use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use risingwave_common::config::load_config; +use risingwave_common::config::{load_config, MetaBackend}; /// Start meta node pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { - let config = load_config(&opts.config_path); - tracing::info!("Starting meta node with config {:?}", config); tracing::info!("Starting meta node with options {:?}", opts); + let config = load_config(&opts.config_path, Some(opts.override_opts)); + tracing::info!("Starting meta node with config {:?}", config); let listen_addr: SocketAddr = opts.listen_addr.parse().unwrap(); let meta_addr = opts.host.unwrap_or_else(|| opts.listen_addr.clone()); let dashboard_addr = opts.dashboard_host.map(|x| x.parse().unwrap()); @@ -144,8 +144,8 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let meta_endpoint = opts .meta_endpoint .unwrap_or_else(|| format!("{}:{}", meta_addr, listen_addr.port())); - let backend = match opts.backend { - Backend::Etcd => MetaStoreBackend::Etcd { + let backend = match config.meta.backend { + MetaBackend::Etcd => MetaStoreBackend::Etcd { endpoints: opts .etcd_endpoints .split(',') @@ -156,7 +156,7 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { false => None, }, }, - Backend::Mem => MetaStoreBackend::Mem, + MetaBackend::Mem => MetaStoreBackend::Mem, }; let max_heartbeat_interval = diff --git a/src/risedevtool/src/config/use_expander.rs b/src/risedevtool/src/config/use_expander.rs index 96b6306d2a464..bdf201eabaea1 100644 --- a/src/risedevtool/src/config/use_expander.rs +++ b/src/risedevtool/src/config/use_expander.rs @@ -41,7 +41,7 @@ impl UseExpander { Ok(Self { template }) } - /// Overwrites values in `default` with values from `provided`. + /// Overrides values in `default` with values from `provided`. fn merge(use_id: &str, default: &yaml::Hash, provided: &yaml::Hash) -> yaml::Hash { let mut result = yaml::Hash::new(); // put `use` as the first element to make the generated yaml more readable. diff --git a/src/storage/compactor/Cargo.toml b/src/storage/compactor/Cargo.toml index 379280854ca09..8d0b77fdf5c62 100644 --- a/src/storage/compactor/Cargo.toml +++ b/src/storage/compactor/Cargo.toml @@ -14,6 +14,7 @@ clap = { version = "3", features = ["derive"] } parking_lot = "0.12" prometheus = { version = "0.13" } risingwave_common = { path = "../../common" } +risingwave_common_proc_macro = { path = "../../common/proc_macro" } risingwave_common_service = { path = "../../common/common_service" } risingwave_hummock_sdk = { path = "../hummock_sdk" } risingwave_object_store = { path = "../../object_store" } diff --git a/src/storage/compactor/src/lib.rs b/src/storage/compactor/src/lib.rs index 5819106da3afd..119c73fb39c17 100644 --- a/src/storage/compactor/src/lib.rs +++ b/src/storage/compactor/src/lib.rs @@ -17,6 +17,7 @@ mod rpc; mod server; use clap::Parser; +use risingwave_common_proc_macro::OverrideConfig; use crate::server::compactor_serve; @@ -35,36 +36,46 @@ pub struct CompactorOpts { #[clap(long)] pub port: Option, - /// Of the form `hummock+{object_store}` where `object_store` - /// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`, - /// `memory` or `memory-shared`. - #[clap(long, default_value = "")] - pub state_store: String, - #[clap(long, default_value = "127.0.0.1:1260")] pub prometheus_listener_addr: String, - #[clap(long, default_value = "0")] - pub metrics_level: u32, - #[clap(long, default_value = "http://127.0.0.1:5690")] pub meta_address: String, - /// It's a hint used by meta node. - #[clap(long, default_value = "16")] - pub max_concurrent_task_number: u64, - #[clap(long)] pub compaction_worker_threads_number: Option, /// The path of `risingwave.toml` configuration file. /// /// If empty, default configuration values will be used. - /// - /// Note that internal system parameters should be defined in the configuration file at - /// [`risingwave_common::config`] instead of command line arguments. #[clap(long, default_value = "")] pub config_path: String, + + #[clap(flatten)] + override_config: OverrideConfigOpts, +} + +/// Command-line arguments for compactor-node that overrides the config file. +#[derive(Parser, Clone, Debug, OverrideConfig)] +struct OverrideConfigOpts { + /// Of the form `hummock+{object_store}` where `object_store` + /// is one of `s3://{path}`, `s3-compatible://{path}`, `minio://{path}`, `disk://{path}`, + /// `memory` or `memory-shared`. + #[clap(long)] + #[override_opts(path = storage.state_store)] + pub state_store: Option, + + /// Used for control the metrics level, similar to log level. + /// 0 = close metrics + /// >0 = open metrics + #[clap(long)] + #[override_opts(path = server.metrics_level)] + pub metrics_level: Option, + + /// It's a hint used by meta node. + #[clap(long)] + #[override_opts(path = storage.max_concurrent_compaction_task_number)] + pub max_concurrent_task_number: Option, } use std::future::Future; @@ -74,6 +85,7 @@ pub fn start(opts: CompactorOpts) -> Pin + Send>> { // WARNING: don't change the function signature. Making it `async fn` will cause // slow compile in release mode. Box::pin(async move { + tracing::info!("Compactor node options: {:?}", opts); tracing::info!("meta address: {}", opts.meta_address.clone()); let listen_address = opts.host.parse().unwrap(); diff --git a/src/storage/compactor/src/server.rs b/src/storage/compactor/src/server.rs index 2c9ff0675c44c..85d08c7220c16 100644 --- a/src/storage/compactor/src/server.rs +++ b/src/storage/compactor/src/server.rs @@ -48,11 +48,11 @@ pub async fn compactor_serve( client_addr: HostAddr, opts: CompactorOpts, ) -> (JoinHandle<()>, JoinHandle<()>, Sender<()>) { - let config = load_config(&opts.config_path); + let config = load_config(&opts.config_path, Some(opts.override_config)); tracing::info!( - "Starting compactor with config {:?} and opts {:?}", + "Starting compactor node with config {:?} with debug assertions {}", config, - opts + if cfg!(debug_assertions) { "on" } else { "off" } ); // Register to the cluster. @@ -80,7 +80,8 @@ pub async fn compactor_serve( let storage_config = Arc::new(config.storage); let object_store = Arc::new( parse_remote_object_store( - opts.state_store + storage_config + .state_store .strip_prefix("hummock+") .expect("object store must be hummock for compactor server"), object_metrics, @@ -105,6 +106,7 @@ pub async fn compactor_serve( let output_limit_mb = storage_config.compactor_memory_limit_mb as u64 / 2; let memory_limiter = Arc::new(MemoryLimiter::new(output_limit_mb << 20)); let input_limit_mb = storage_config.compactor_memory_limit_mb as u64 / 2; + let max_concurrent_task_number = storage_config.max_concurrent_compaction_task_number; let memory_collector = Arc::new(CompactorMemoryCollector::new( memory_limiter.clone(), sstable_store.clone(), @@ -129,7 +131,7 @@ pub async fn compactor_serve( sstable_id_manager: sstable_id_manager.clone(), task_progress_manager: Default::default(), compactor_runtime_config: Arc::new(tokio::sync::Mutex::new(CompactorRuntimeConfig { - max_concurrent_task_number: opts.max_concurrent_task_number, + max_concurrent_task_number, })), }); let sub_tasks = vec![ @@ -173,7 +175,7 @@ pub async fn compactor_serve( }); // Boot metrics service. - if opts.metrics_level > 0 { + if config.server.metrics_level > 0 { MetricsManager::boot_metrics_service( opts.prometheus_listener_addr.clone(), registry.clone(), diff --git a/src/tests/compaction_test/src/compaction_test_runner.rs b/src/tests/compaction_test/src/compaction_test_runner.rs index 00a7532bd50e1..04bf3b2552583 100644 --- a/src/tests/compaction_test/src/compaction_test_runner.rs +++ b/src/tests/compaction_test/src/compaction_test_runner.rs @@ -26,7 +26,7 @@ use clap::Parser; use futures::TryStreamExt; use itertools::Itertools; use risingwave_common::catalog::TableId; -use risingwave_common::config::{load_config, RwConfig, StorageConfig}; +use risingwave_common::config::{load_config, RwConfig, StorageConfig, NO_OVERRIDE}; use risingwave_common::util::addr::HostAddr; use risingwave_hummock_sdk::{CompactionGroupId, HummockEpoch, FIRST_VERSION_ID}; use risingwave_pb::common::WorkerType; @@ -133,7 +133,10 @@ pub async fn start_meta_node(listen_addr: String, config_path: String) { "--config-path", &config_path, ]); - let config = load_config(&meta_opts.config_path); + let config = load_config( + &meta_opts.config_path, + Some(meta_opts.override_opts.clone()), + ); assert!( config.meta.enable_compaction_deterministic, "enable_compaction_deterministic should be set" @@ -307,7 +310,7 @@ async fn start_replay( ); let mut metric = CompactionTestMetrics::new(); - let config = load_config(&opts.config_path_for_meta); + let config = load_config(&opts.config_path_for_meta, NO_OVERRIDE); tracing::info!( "Starting replay with config {:?} and opts {:?}", config, diff --git a/src/tests/compaction_test/src/delete_range_runner.rs b/src/tests/compaction_test/src/delete_range_runner.rs index a22cf8acb70e1..e9f53469c1b8c 100644 --- a/src/tests/compaction_test/src/delete_range_runner.rs +++ b/src/tests/compaction_test/src/delete_range_runner.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use risingwave_common::catalog::TableId; -use risingwave_common::config::{load_config, StorageConfig}; +use risingwave_common::config::{load_config, StorageConfig, NO_OVERRIDE}; use risingwave_hummock_sdk::compact::CompactorRuntimeConfig; use risingwave_hummock_sdk::filter_key_extractor::{ FilterKeyExtractorImpl, FilterKeyExtractorManager, FullKeyFilterKeyExtractor, @@ -83,7 +83,7 @@ pub fn start_delete_range(opts: CompactionTestOpts) -> Pin anyhow::Result<()> { - let config = load_config(&opts.config_path); + let config = load_config(&opts.config_path, NO_OVERRIDE); let mut storage_config = config.storage; storage_config.enable_state_store_v1 = false; let compaction_config = CompactionConfigBuilder::new().build();