diff --git a/Cargo.lock b/Cargo.lock index ea4bb4000753..d1462aa18cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1808,6 +1808,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "clocksource" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "129026dd5a8a9592d96916258f3a5379589e513ea5e86aeb0bd2530286e44e9e" +dependencies = [ + "libc", + "time", + "winapi", +] + [[package]] name = "cmake" version = "0.1.51" @@ -2332,16 +2343,24 @@ name = "common-runtime" version = "0.9.5" dependencies = [ "async-trait", + "clap 4.5.19", "common-error", "common-macro", "common-telemetry", + "futures", "lazy_static", "num_cpus", "once_cell", + "parking_lot 0.12.3", "paste", + "pin-project", "prometheus", + "rand", + "ratelimit", "serde", + "serde_json", "snafu 0.8.5", + "tempfile", "tokio", "tokio-metrics", "tokio-metrics-collector", @@ -9195,6 +9214,17 @@ dependencies = [ "rand", ] +[[package]] +name = "ratelimit" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c1bb13e2dcfa2232ac6887157aad8d9b3fe4ca57f7c8d4938ff5ea9be742300" +dependencies = [ + "clocksource", + "parking_lot 0.12.3", + "thiserror", +] + [[package]] name = "raw-cpuid" version = "11.2.0" diff --git a/Cargo.toml b/Cargo.toml index 846ab2d9383f..58dd837284e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [ "with-serde", "logs", ] } +parking_lot = "0.12" parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" pin-project = "1.0" @@ -148,6 +149,7 @@ promql-parser = { version = "0.4.1" } prost = "0.12" raft-engine = { version = "0.4.1", default-features = false } rand = "0.8" +ratelimit = "0.9" regex = "1.8" regex-automata = { version = "0.4" } reqwest = { version = "0.12", default-features = false, features = [ diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 469d7d1a7e2b..501c2f82d80a 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -28,7 +28,7 @@ enum_dispatch = "0.3" futures-util.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["future"] } -parking_lot = "0.12" +parking_lot.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index e5fa276c4bf1..c249ba221ecd 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -4,21 +4,36 @@ version.workspace = true edition.workspace = true license.workspace = true +[lib] +path = "src/lib.rs" + +[[bin]] +name = "common-runtime-bin" +path = "src/bin.rs" + [lints] workspace = true [dependencies] async-trait.workspace = true +clap.workspace = true common-error.workspace = true common-macro.workspace = true common-telemetry.workspace = true +futures.workspace = true lazy_static.workspace = true num_cpus.workspace = true once_cell.workspace = true +parking_lot.workspace = true paste.workspace = true +pin-project.workspace = true prometheus.workspace = true +rand.workspace = true +ratelimit.workspace = true serde.workspace = true +serde_json.workspace = true snafu.workspace = true +tempfile.workspace = true tokio.workspace = true tokio-metrics = "0.3" tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" } diff --git a/src/common/runtime/README.md b/src/common/runtime/README.md new file mode 100644 index 000000000000..a4214350e42e --- /dev/null +++ b/src/common/runtime/README.md @@ -0,0 +1,60 @@ +# Greptime Runtime + +## Run performance test for different priority & workload type + +``` +# workspace is at this subcrate +cargo run --release -- --loop-cnt 500 +``` + +## Related PRs & issues + +- Preliminary support cpu limitation + + ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685 + + PR: https://github.com/GreptimeTeam/greptimedb/pull/4782 + +## CPU resource constraints (ThrottleableRuntime) + + +To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms). + +The default used runtime can be switched by +``` rust +pub type Runtime = DefaultRuntime; +``` +in `runtime.rs`. + +We tested four type of workload with 5 priorities, whose setup are as follows: + +``` rust +impl Priority { + fn ratelimiter_count(&self) -> Result> { + let max = 8000; + let gen_per_10ms = match self { + Priority::VeryLow => Some(2000), + Priority::Low => Some(4000), + Priority::Middle => Some(6000), + Priority::High => Some(8000), + Priority::VeryHigh => None, + }; + if let Some(gen_per_10ms) = gen_per_10ms { + Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms + .max_tokens(max) // reserved token for batch request + .build() + .context(BuildRuntimeRateLimiterSnafu) + .map(Some) + } else { + Ok(None) + } + } +} +``` + +This is the preliminary experimental effect so far: + +![](resources/rdme-exp.png) + +## TODO +- Introduce PID to achieve more accurate limitation. diff --git a/src/common/runtime/resources/rdme-exp.png b/src/common/runtime/resources/rdme-exp.png new file mode 100644 index 000000000000..3bf0aa2dc0ea Binary files /dev/null and b/src/common/runtime/resources/rdme-exp.png differ diff --git a/src/common/runtime/src/bin.rs b/src/common/runtime/src/bin.rs new file mode 100644 index 000000000000..913a875c3722 --- /dev/null +++ b/src/common/runtime/src/bin.rs @@ -0,0 +1,205 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use clap::Parser; + +#[derive(Debug, Default, Parser)] +pub struct Command { + #[clap(long)] + loop_cnt: usize, +} + +fn main() { + common_telemetry::init_default_ut_logging(); + let cmd = Command::parse(); + + test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt); +} + +mod test_diff_priority_cpu { + use std::path::PathBuf; + + use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait}; + use common_runtime::{Builder, Runtime}; + use common_telemetry::debug; + use tempfile::TempDir; + + fn compute_pi_str(precision: usize) -> String { + let mut pi = 0.0; + let mut sign = 1.0; + + for i in 0..precision { + pi += sign / (2 * i + 1) as f64; + sign *= -1.0; + } + + pi *= 4.0; + format!("{:.prec$}", pi, prec = precision) + } + + macro_rules! def_workload_enum { + ($($variant:ident),+) => { + #[derive(Debug)] + enum WorkloadType { + $($variant),+ + } + + /// array of workloads for iteration + const WORKLOADS: &'static [WorkloadType] = &[ + $( WorkloadType::$variant ),+ + ]; + }; + } + + def_workload_enum!( + ComputeHeavily, + ComputeHeavily2, + WriteFile, + SpawnBlockingWriteFile + ); + + async fn workload_compute_heavily() { + let prefix = 10; + + for _ in 0..3000 { + let _ = compute_pi_str(prefix); + tokio::task::yield_now().await; + } + } + async fn workload_compute_heavily2() { + let prefix = 30; + for _ in 0..2000 { + let _ = compute_pi_str(prefix); + tokio::task::yield_now().await; + } + } + async fn workload_write_file(_idx: u64, tempdir: PathBuf) { + use tokio::io::AsyncWriteExt; + let prefix = 50; + + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(tempdir.join(format!("pi_{}", prefix))) + .await + .unwrap(); + for i in 0..200 { + let pi = compute_pi_str(prefix); + + if i % 2 == 0 { + file.write_all(pi.as_bytes()).await.unwrap(); + } + } + } + async fn workload_spawn_blocking_write_file(tempdir: PathBuf) { + use std::io::Write; + let prefix = 100; + let mut file = Some( + std::fs::OpenOptions::new() + .append(true) + .create(true) + .open(tempdir.join(format!("pi_{}", prefix))) + .unwrap(), + ); + for i in 0..100 { + let pi = compute_pi_str(prefix); + if i % 2 == 0 { + let mut file1 = file.take().unwrap(); + file = Some( + tokio::task::spawn_blocking(move || { + file1.write_all(pi.as_bytes()).unwrap(); + file1 + }) + .await + .unwrap(), + ); + } + } + } + + pub fn test_diff_workload_priority(loop_cnt: usize) { + let tempdir = tempfile::tempdir().unwrap(); + let priorities = [ + Priority::VeryLow, + Priority::Low, + Priority::Middle, + Priority::High, + Priority::VeryHigh, + ]; + for wl in WORKLOADS { + for p in priorities.iter() { + let runtime: Runtime = Builder::default() + .runtime_name("test") + .thread_name("test") + .worker_threads(8) + .priority(*p) + .build() + .expect("Fail to create runtime"); + let runtime2 = runtime.clone(); + runtime.block_on(test_spec_priority_and_workload( + *p, runtime2, wl, &tempdir, loop_cnt, + )); + } + } + } + + async fn test_spec_priority_and_workload( + priority: Priority, + runtime: Runtime, + workload_id: &WorkloadType, + tempdir: &TempDir, + loop_cnt: usize, + ) { + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + debug!( + "testing cpu usage for priority {:?} workload_id {:?}", + priority, workload_id, + ); + // start monitor thread + let mut tasks = vec![]; + let start = std::time::Instant::now(); + for i in 0..loop_cnt { + // persist cpu usage in json: {priority}.{workload_id} + match *workload_id { + WorkloadType::ComputeHeavily => { + tasks.push(runtime.spawn(workload_compute_heavily())); + } + WorkloadType::ComputeHeavily2 => { + tasks.push(runtime.spawn(workload_compute_heavily2())); + } + WorkloadType::SpawnBlockingWriteFile => { + tasks.push(runtime.spawn(workload_spawn_blocking_write_file( + tempdir.path().to_path_buf(), + ))); + } + WorkloadType::WriteFile => { + tasks.push( + runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())), + ); + } + } + } + for task in tasks { + task.await.unwrap(); + } + let elapsed = start.elapsed(); + debug!( + "test cpu usage for priority {:?} workload_id {:?} elapsed {}ms", + priority, + workload_id, + elapsed.as_millis() + ); + } +} diff --git a/src/common/runtime/src/error.rs b/src/common/runtime/src/error.rs index 4c553bbcd5e2..9d687edff389 100644 --- a/src/common/runtime/src/error.rs +++ b/src/common/runtime/src/error.rs @@ -33,6 +33,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build runtime rate limiter"))] + BuildRuntimeRateLimiter { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: ratelimit::Error, + }, + #[snafu(display("Repeated task {} is already started", name))] IllegalState { name: String, diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index b7d78badeb41..5cd008fa8cec 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -21,6 +21,7 @@ use once_cell::sync::Lazy; use paste::paste; use serde::{Deserialize, Serialize}; +use crate::runtime::{BuilderBuild, RuntimeTrait}; use crate::{Builder, JoinHandle, Runtime}; const GLOBAL_WORKERS: usize = 8; diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 4429f6fa71ab..d1effcfa4e41 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -17,6 +17,8 @@ pub mod global; mod metrics; mod repeated_task; pub mod runtime; +pub mod runtime_default; +pub mod runtime_throttleable; pub use global::{ block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime, diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index 2431a2ee17fb..6cc26e0545e6 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -23,6 +23,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu}; +use crate::runtime::RuntimeTrait; use crate::Runtime; /// Task to execute repeatedly. diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs index 0ea041578e10..aeba46f24fe1 100644 --- a/src/common/runtime/src/runtime.rs +++ b/src/common/runtime/src/runtime.rs @@ -19,23 +19,19 @@ use std::thread; use std::time::Duration; use snafu::ResultExt; -use tokio::runtime::{Builder as RuntimeBuilder, Handle}; +use tokio::runtime::Builder as RuntimeBuilder; use tokio::sync::oneshot; pub use tokio::task::{JoinError, JoinHandle}; use crate::error::*; use crate::metrics::*; +use crate::runtime_default::DefaultRuntime; +use crate::runtime_throttleable::ThrottleableRuntime; -static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); +// configurations +pub type Runtime = DefaultRuntime; -/// A runtime to run future tasks -#[derive(Clone, Debug)] -pub struct Runtime { - name: String, - handle: Handle, - // Used to receive a drop signal when dropper is dropped, inspired by databend - _dropper: Arc, -} +static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); /// Dropping the dropper will cause runtime to shutdown. #[derive(Debug)] @@ -50,45 +46,42 @@ impl Drop for Dropper { } } -impl Runtime { - pub fn builder() -> Builder { +pub trait RuntimeTrait { + /// Get a runtime builder + fn builder() -> Builder { Builder::default() } /// Spawn a future and execute it in this thread pool /// /// Similar to tokio::runtime::Runtime::spawn() - pub fn spawn(&self, future: F) -> JoinHandle + fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.handle.spawn(future) - } + F::Output: Send + 'static; /// Run the provided function on an executor dedicated to blocking /// operations. - pub fn spawn_blocking(&self, func: F) -> JoinHandle + fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.handle.spawn_blocking(func) - } + R: Send + 'static; /// Run a future to complete, this is the runtime's entry point - pub fn block_on(&self, future: F) -> F::Output { - self.handle.block_on(future) - } + fn block_on(&self, future: F) -> F::Output; - pub fn name(&self) -> &str { - &self.name - } + /// Get the name of the runtime + fn name(&self) -> &str; +} + +pub trait BuilderBuild { + fn build(&mut self) -> Result; } pub struct Builder { runtime_name: String, thread_name: String, + priority: Priority, builder: RuntimeBuilder, } @@ -98,11 +91,17 @@ impl Default for Builder { runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)), thread_name: "default-worker".to_string(), builder: RuntimeBuilder::new_multi_thread(), + priority: Priority::VeryHigh, } } } impl Builder { + pub fn priority(&mut self, priority: Priority) -> &mut Self { + self.priority = priority; + self + } + /// Sets the number of worker threads the Runtime will use. /// /// This can be any number above 0. The default value is the number of cores available to the system. @@ -139,8 +138,10 @@ impl Builder { self.thread_name = val.into(); self } +} - pub fn build(&mut self) -> Result { +impl BuilderBuild for Builder { + fn build(&mut self) -> Result { let runtime = self .builder .enable_all() @@ -163,13 +164,48 @@ impl Builder { #[cfg(tokio_unstable)] register_collector(name.clone(), &handle); - Ok(Runtime { - name, + Ok(DefaultRuntime::new( + &name, handle, - _dropper: Arc::new(Dropper { + Arc::new(Dropper { close: Some(send_stop), }), - }) + )) + } +} + +impl BuilderBuild for Builder { + fn build(&mut self) -> Result { + let runtime = self + .builder + .enable_all() + .thread_name(self.thread_name.clone()) + .on_thread_start(on_thread_start(self.thread_name.clone())) + .on_thread_stop(on_thread_stop(self.thread_name.clone())) + .on_thread_park(on_thread_park(self.thread_name.clone())) + .on_thread_unpark(on_thread_unpark(self.thread_name.clone())) + .build() + .context(BuildRuntimeSnafu)?; + + let name = self.runtime_name.clone(); + let handle = runtime.handle().clone(); + let (send_stop, recv_stop) = oneshot::channel(); + // Block the runtime to shutdown. + let _ = thread::Builder::new() + .name(format!("{}-blocker", self.thread_name)) + .spawn(move || runtime.block_on(recv_stop)); + + #[cfg(tokio_unstable)] + register_collector(name.clone(), &handle); + + ThrottleableRuntime::new( + &name, + self.priority, + handle, + Arc::new(Dropper { + close: Some(send_stop), + }), + ) } } @@ -213,8 +249,18 @@ fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static { } } +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub enum Priority { + VeryLow = 0, + Low = 1, + Middle = 2, + High = 3, + VeryHigh = 4, +} + #[cfg(test)] mod tests { + use std::sync::Arc; use std::thread; use std::time::Duration; @@ -235,12 +281,12 @@ mod tests { #[test] fn test_metric() { - let runtime = Builder::default() + let runtime: Runtime = Builder::default() .worker_threads(5) .thread_name("test_runtime_metric") .build() .unwrap(); - // wait threads created + // wait threads create thread::sleep(Duration::from_millis(50)); let _handle = runtime.spawn(async { diff --git a/src/common/runtime/src/runtime_default.rs b/src/common/runtime/src/runtime_default.rs new file mode 100644 index 000000000000..ea0b5c27e3b7 --- /dev/null +++ b/src/common/runtime/src/runtime_default.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; + +use tokio::runtime::Handle; +pub use tokio::task::JoinHandle; + +use crate::runtime::{Dropper, RuntimeTrait}; +use crate::Builder; + +/// A runtime to run future tasks +#[derive(Clone, Debug)] +pub struct DefaultRuntime { + name: String, + handle: Handle, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +impl DefaultRuntime { + pub(crate) fn new(name: &str, handle: Handle, dropper: Arc) -> Self { + Self { + name: name.to_string(), + handle, + _dropper: dropper, + } + } +} + +impl RuntimeTrait for DefaultRuntime { + fn builder() -> Builder { + Builder::default() + } + + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn(future) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } + + fn name(&self) -> &str { + &self.name + } +} diff --git a/src/common/runtime/src/runtime_throttleable.rs b/src/common/runtime/src/runtime_throttleable.rs new file mode 100644 index 000000000000..ea51270987a8 --- /dev/null +++ b/src/common/runtime/src/runtime_throttleable.rs @@ -0,0 +1,285 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use futures::FutureExt; +use ratelimit::Ratelimiter; +use snafu::ResultExt; +use tokio::runtime::Handle; +pub use tokio::task::JoinHandle; +use tokio::time::Sleep; + +use crate::error::{BuildRuntimeRateLimiterSnafu, Result}; +use crate::runtime::{Dropper, Priority, RuntimeTrait}; +use crate::Builder; + +struct RuntimeRateLimiter { + pub ratelimiter: Option, +} + +impl Debug for RuntimeRateLimiter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RuntimeThrottleShareWithFuture") + .field( + "ratelimiter_max_tokens", + &self.ratelimiter.as_ref().map(|v| v.max_tokens()), + ) + .field( + "ratelimiter_refill_amount", + &self.ratelimiter.as_ref().map(|v| v.refill_amount()), + ) + .finish() + } +} + +/// A runtime to run future tasks +#[derive(Clone, Debug)] +pub struct ThrottleableRuntime { + name: String, + handle: Handle, + shared_with_future: Arc, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +impl ThrottleableRuntime { + pub(crate) fn new( + name: &str, + priority: Priority, + handle: Handle, + dropper: Arc, + ) -> Result { + Ok(Self { + name: name.to_string(), + handle, + shared_with_future: Arc::new(RuntimeRateLimiter { + ratelimiter: priority.ratelimiter_count()?, + }), + _dropper: dropper, + }) + } +} + +impl RuntimeTrait for ThrottleableRuntime { + fn builder() -> Builder { + Builder::default() + } + + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle + .spawn(ThrottleFuture::new(self.shared_with_future.clone(), future)) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } + + fn name(&self) -> &str { + &self.name + } +} + +enum State { + Pollable, + Throttled(Pin>), +} + +impl State { + fn unwrap_backoff(&mut self) -> &mut Pin> { + match self { + State::Throttled(sleep) => sleep, + _ => panic!("unwrap_backoff failed"), + } + } +} + +#[pin_project::pin_project] +pub struct ThrottleFuture { + #[pin] + future: F, + + /// RateLimiter of this future + handle: Arc, + + state: State, +} + +impl ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn new(handle: Arc, future: F) -> Self { + Self { + future, + handle, + state: State::Pollable, + } + } +} + +impl Future for ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + type Output = F::Output; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match this.state { + State::Pollable => {} + State::Throttled(ref mut sleep) => match sleep.poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Pollable; + } + Poll::Pending => return Poll::Pending, + }, + }; + + if let Some(ratelimiter) = &this.handle.ratelimiter { + if let Err(wait) = ratelimiter.try_wait() { + *this.state = State::Throttled(Box::pin(tokio::time::sleep(wait))); + match this.state.unwrap_backoff().poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Pollable; + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } + + let poll_res = this.future.poll(cx); + + match poll_res { + Poll::Ready(r) => Poll::Ready(r), + Poll::Pending => Poll::Pending, + } + } +} + +impl Priority { + fn ratelimiter_count(&self) -> Result> { + let max = 8000; + let gen_per_10ms = match self { + Priority::VeryLow => Some(2000), + Priority::Low => Some(4000), + Priority::Middle => Some(6000), + Priority::High => Some(8000), + Priority::VeryHigh => None, + }; + if let Some(gen_per_10ms) = gen_per_10ms { + Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms + .max_tokens(max) // reserved token for batch request + .build() + .context(BuildRuntimeRateLimiterSnafu) + .map(Some) + } else { + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + + use tokio::fs::File; + use tokio::io::AsyncWriteExt; + use tokio::time::Duration; + + use super::*; + use crate::runtime::BuilderBuild; + + #[tokio::test] + async fn test_throttleable_runtime_spawn_simple() { + for p in [ + Priority::VeryLow, + Priority::Low, + Priority::Middle, + Priority::High, + Priority::VeryHigh, + ] { + let runtime: ThrottleableRuntime = Builder::default() + .runtime_name("test") + .thread_name("test") + .worker_threads(8) + .priority(p) + .build() + .expect("Fail to create runtime"); + + // Spawn a simple future that returns 42 + let handle = runtime.spawn(async { + tokio::time::sleep(Duration::from_millis(10)).await; + 42 + }); + let result = handle.await.expect("Task panicked"); + assert_eq!(result, 42); + } + } + + #[tokio::test] + async fn test_throttleable_runtime_spawn_complex() { + let tempdir = tempfile::tempdir().unwrap(); + for p in [ + Priority::VeryLow, + Priority::Low, + Priority::Middle, + Priority::High, + Priority::VeryHigh, + ] { + let runtime: ThrottleableRuntime = Builder::default() + .runtime_name("test") + .thread_name("test") + .worker_threads(8) + .priority(p) + .build() + .expect("Fail to create runtime"); + let tempdirpath = tempdir.path().to_path_buf(); + let handle = runtime.spawn(async move { + let mut file = File::create(tempdirpath.join("test.txt")).await.unwrap(); + file.write_all(b"Hello, world!").await.unwrap(); + 42 + }); + let result = handle.await.expect("Task panicked"); + assert_eq!(result, 42); + } + } +} diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 2b4023cf7cdd..da044436c84f 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -26,7 +26,7 @@ opentelemetry = { version = "0.21.0", default-features = false, features = [ opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] } opentelemetry-semantic-conventions = "0.13.0" opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] } -parking_lot = { version = "0.12" } +parking_lot.workspace = true prometheus.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 2acc66a5927d..8b48dd4258cc 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -23,6 +23,7 @@ use common_function::function::FunctionRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; +use common_runtime::runtime::{BuilderBuild, RuntimeTrait}; use common_runtime::Runtime; use datafusion_expr::LogicalPlan; use query::dataframe::DataFrame; diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 2e3216d075d0..6d051c2eeeeb 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -42,7 +42,7 @@ humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true once_cell.workspace = true -parking_lot = "0.12" +parking_lot.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index c4e1688de07c..51db562a3235 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -24,6 +24,7 @@ pub mod mock { use client::Client; use common_grpc::channel_manager::ChannelManager; use common_meta::peer::Peer; + use common_runtime::runtime::BuilderBuild; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::grpc::region_server::{RegionServerHandler, RegionServerRequestHandler}; use tokio::sync::mpsc; diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index c41df95c2521..9ab2be00ac27 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use async_walkdir::{Filtering, WalkDir}; use base64::prelude::BASE64_URL_SAFE; use base64::Engine; +use common_runtime::runtime::RuntimeTrait; use common_telemetry::{info, warn}; use futures::{FutureExt, StreamExt}; use moka::future::Cache; diff --git a/src/script/src/python/utils.rs b/src/script/src/python/utils.rs index 4662922f14dc..8838fe8a61ba 100644 --- a/src/script/src/python/utils.rs +++ b/src/script/src/python/utils.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_runtime::runtime::RuntimeTrait; use common_runtime::JoinHandle; use futures::Future; use rustpython_vm::builtins::PyBaseExceptionRef; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 354bdf642f5c..a2803ae03572 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -72,7 +72,7 @@ openmetrics-parser = "0.4" # opensrv-mysql = "0.7.0" opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" } opentelemetry-proto.workspace = true -parking_lot = "0.12" +parking_lot.workspace = true pgwire = { version = "0.25.0", default-features = false, features = ["server-api-ring"] } pin-project = "1.0" pipeline.workspace = true diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index f6bafde16d8d..b032ffc84722 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -25,6 +25,7 @@ use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; +use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, error, tracing}; diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index 366d90151f77..e3eb87467a6e 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -18,6 +18,7 @@ use api::v1::region::region_server::Region as RegionServer; use api::v1::region::{region_request, RegionRequest, RegionResponse}; use async_trait::async_trait; use common_error::ext::ErrorExt; +use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; use common_telemetry::tracing::info_span; use common_telemetry::tracing_context::{FutureExt, TracingContext}; diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index 146295bbaf34..dae01b3f1a41 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use async_trait::async_trait; use auth::UserProviderRef; +use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; use common_telemetry::{debug, warn}; use futures::StreamExt; diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index e904845547d6..70f74a32ece3 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use ::auth::UserProviderRef; use async_trait::async_trait; +use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; use common_telemetry::{debug, warn}; use futures::StreamExt; diff --git a/src/servers/tests/grpc/mod.rs b/src/servers/tests/grpc/mod.rs index 021144745fad..30bd168bc18a 100644 --- a/src/servers/tests/grpc/mod.rs +++ b/src/servers/tests/grpc/mod.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use auth::tests::MockUserProvider; use auth::UserProviderRef; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_runtime::runtime::BuilderBuild; use common_runtime::{Builder as RuntimeBuilder, Runtime}; use servers::error::{Result, StartGrpcSnafu, TcpBindSnafu}; use servers::grpc::flight::FlightCraftWrapper; diff --git a/src/servers/tests/mysql/mysql_server_test.rs b/src/servers/tests/mysql/mysql_server_test.rs index ba2cdbdab27d..a9f7f8309aa8 100644 --- a/src/servers/tests/mysql/mysql_server_test.rs +++ b/src/servers/tests/mysql/mysql_server_test.rs @@ -19,6 +19,7 @@ use std::time::Duration; use auth::tests::{DatabaseAuthInfo, MockUserProvider}; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_recordbatch::RecordBatch; +use common_runtime::runtime::BuilderBuild; use common_runtime::Builder as RuntimeBuilder; use datatypes::prelude::VectorRef; use datatypes::schema::{ColumnSchema, Schema}; diff --git a/src/servers/tests/postgres/mod.rs b/src/servers/tests/postgres/mod.rs index f3ff827db435..6ff659fec9c6 100644 --- a/src/servers/tests/postgres/mod.rs +++ b/src/servers/tests/postgres/mod.rs @@ -19,6 +19,7 @@ use std::time::Duration; use auth::tests::{DatabaseAuthInfo, MockUserProvider}; use auth::UserProviderRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_runtime::runtime::BuilderBuild; use common_runtime::Builder as RuntimeBuilder; use pgwire::api::Type; use rand::rngs::StdRng; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 769dad4c1e6b..55daa9681cc4 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -35,6 +35,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_meta::DatanodeId; +use common_runtime::runtime::BuilderBuild; use common_runtime::Builder as RuntimeBuilder; use common_test_util::temp_dir::create_temp_dir; use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig}; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index a055527e2b65..b3a7269ae003 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -25,7 +25,8 @@ use common_base::secrets::ExposeSecret; use common_config::Configurable; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; -use common_runtime::Builder as RuntimeBuilder; +use common_runtime::runtime::BuilderBuild; +use common_runtime::{Builder as RuntimeBuilder, Runtime}; use common_telemetry::warn; use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; @@ -494,7 +495,7 @@ pub async fn setup_grpc_server_with( ) -> (String, TestGuard, Arc) { let instance = setup_standalone_instance(name, store_type).await; - let runtime = RuntimeBuilder::default() + let runtime: Runtime = RuntimeBuilder::default() .worker_threads(2) .thread_name("grpc-handlers") .build() diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 55ccdb258d96..26c0385c330a 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -25,6 +25,7 @@ use common_catalog::consts::MITO_ENGINE; use common_grpc::channel_manager::ClientTlsOption; use common_query::Output; use common_recordbatch::RecordBatches; +use common_runtime::runtime::{BuilderBuild, RuntimeTrait}; use common_runtime::Runtime; use common_test_util::find_workspace_path; use servers::grpc::builder::GrpcServerBuilder;