diff --git a/Cargo.lock b/Cargo.lock index 3caf15ecb0..513e3fd9cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1286,6 +1286,7 @@ dependencies = [ "serde", "server", "signal-hook", + "size_ext", "table_engine", "toml 0.7.3", "toml_ext", diff --git a/components/runtime/src/lib.rs b/components/runtime/src/lib.rs index 42461e161e..755f4d3e69 100644 --- a/components/runtime/src/lib.rs +++ b/components/runtime/src/lib.rs @@ -186,6 +186,15 @@ impl Builder { self } + /// Sets the size of the stack allocated to the worker threads the Runtime + /// will use. + /// + /// This can be any number above 0. + pub fn stack_size(&mut self, val: usize) -> &mut Self { + self.builder.thread_stack_size(val); + self + } + /// Sets name of threads spawned by the Runtime thread pool pub fn thread_name(&mut self, val: impl Into) -> &mut Self { self.thread_name = val.into(); diff --git a/src/ceresdb/Cargo.toml b/src/ceresdb/Cargo.toml index c16f7ea437..2c554cf67f 100644 --- a/src/ceresdb/Cargo.toml +++ b/src/ceresdb/Cargo.toml @@ -27,38 +27,48 @@ workspace = true [features] default = ["wal-rocksdb", "wal-table-kv", "wal-message-queue"] wal-table-kv = ["wal/wal-table-kv", "analytic_engine/wal-table-kv"] -wal-message-queue = ["wal/wal-message-queue", "analytic_engine/wal-message-queue"] +wal-message-queue = [ + "wal/wal-message-queue", + "analytic_engine/wal-message-queue", +] wal-rocksdb = ["wal/wal-rocksdb", "analytic_engine/wal-rocksdb"] [dependencies] analytic_engine = { workspace = true } -catalog = { workspace = true } -catalog_impls = { workspace = true } -clap = { workspace = true } -cluster = { workspace = true } -datafusion = { workspace = true } -df_operator = { workspace = true } -etcd-client = { workspace = true } -interpreters = { workspace = true } -logger = { workspace = true } -meta_client = { workspace = true } -moka = { version = "0.10", features = ["future"] } -panic_ext = { workspace = true } -proxy = { workspace = true } -query_engine = { workspace = true } -router = { workspace = true } -runtime = { workspace = true } -serde = { workspace = true } -server = { workspace = true } -signal-hook = "0.3" -table_engine = { workspace = true } -toml = { workspace = true } -toml_ext = { workspace = true } -tracing_util = { workspace = true } -wal = { workspace = true } +catalog = { workspace = true } +catalog_impls = { workspace = true } +clap = { workspace = true } +cluster = { workspace = true } +datafusion = { workspace = true } +df_operator = { workspace = true } +etcd-client = { workspace = true } +interpreters = { workspace = true } +logger = { workspace = true } +meta_client = { workspace = true } +moka = { version = "0.10", features = ["future"] } +panic_ext = { workspace = true } +proxy = { workspace = true } +query_engine = { workspace = true } +router = { workspace = true } +runtime = { workspace = true } +serde = { workspace = true } +server = { workspace = true } +signal-hook = "0.3" +size_ext = { workspace = true } +table_engine = { workspace = true } +toml = { workspace = true } +toml_ext = { workspace = true } +tracing_util = { workspace = true } +wal = { workspace = true } [build-dependencies] -vergen = { version = "8", default-features = false, features = ["build", "cargo", "git", "gitcl", "rustc"] } +vergen = { version = "8", default-features = false, features = [ + "build", + "cargo", + "git", + "gitcl", + "rustc", +] } [[bin]] name = "ceresdb-server" diff --git a/src/ceresdb/src/config.rs b/src/ceresdb/src/config.rs index 9d855cdca3..7463f798f6 100644 --- a/src/ceresdb/src/config.rs +++ b/src/ceresdb/src/config.rs @@ -18,6 +18,7 @@ use cluster::config::ClusterConfig; use proxy::limiter::LimiterConfig; use serde::{Deserialize, Serialize}; use server::config::{ServerConfig, StaticRouteConfig}; +use size_ext::ReadableSize; #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(default)] @@ -92,6 +93,12 @@ pub enum ClusterDeployment { pub struct RuntimeConfig { /// Runtime for reading data pub read_thread_num: usize, + /// The size of the stack used by the read thread + /// + /// The size should be a set as a large number if the complex query exists. + /// TODO: this config may be removed in the future when the complex query + /// won't overflow the stack. + pub read_thread_stack_size: ReadableSize, /// Runtime for writing data pub write_thread_num: usize, /// Runtime for communicating with meta cluster @@ -108,6 +115,7 @@ impl Default for RuntimeConfig { fn default() -> Self { Self { read_thread_num: 8, + read_thread_stack_size: ReadableSize::mb(16), write_thread_num: 8, meta_thread_num: 2, compact_thread_num: 4, diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs index ad9dab5b40..58729bf6ad 100644 --- a/src/ceresdb/src/setup.rs +++ b/src/ceresdb/src/setup.rs @@ -62,8 +62,18 @@ pub fn setup_tracing(config: &Config) -> WorkerGuard { tracing_util::init_tracing_with_file(&config.tracing, &config.node.addr, Rotation::NEVER) } -fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { - runtime::Builder::default() +fn build_runtime_with_stack_size( + name: &str, + threads_num: usize, + stack_size: Option, +) -> runtime::Runtime { + let mut builder = runtime::Builder::default(); + + if let Some(stack_size) = stack_size { + builder.stack_size(stack_size); + } + + builder .worker_threads(threads_num) .thread_name(name) .enable_all() @@ -71,9 +81,17 @@ fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { .expect("Failed to create runtime") } +fn build_runtime(name: &str, threads_num: usize) -> runtime::Runtime { + build_runtime_with_stack_size(name, threads_num, None) +} + fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes { EngineRuntimes { - read_runtime: Arc::new(build_runtime("ceres-read", config.read_thread_num)), + read_runtime: Arc::new(build_runtime_with_stack_size( + "ceres-read", + config.read_thread_num, + Some(config.read_thread_stack_size.as_byte() as usize), + )), write_runtime: Arc::new(build_runtime("ceres-write", config.write_thread_num)), compact_runtime: Arc::new(build_runtime("ceres-compact", config.compact_thread_num)), meta_runtime: Arc::new(build_runtime("ceres-meta", config.meta_thread_num)),