diff --git a/Cargo.lock b/Cargo.lock index 991829a75813..5d31a7649e05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1752,6 +1752,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "clocksource" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "129026dd5a8a9592d96916258f3a5379589e513ea5e86aeb0bd2530286e44e9e" +dependencies = [ + "libc", + "time", + "winapi", +] + [[package]] name = "cmake" version = "0.1.51" @@ -2263,16 +2274,23 @@ name = "common-runtime" version = "0.9.3" dependencies = [ "async-trait", + "clocksource", "common-error", "common-macro", "common-telemetry", + "futures", "lazy_static", "num_cpus", "once_cell", + "parking_lot 0.12.3", "paste", + "pin-project", "prometheus", + "rand", "serde", + "serde_json", "snafu 0.8.4", + "sysinfo", "tokio", "tokio-metrics", "tokio-metrics-collector", diff --git a/Cargo.toml b/Cargo.toml index 44b2cda1227f..df914e82ce90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,6 +104,7 @@ chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["derive"] } config = "0.13.0" crossbeam-utils = "0.8" +clocksource = "0.8.1" dashmap = "5.4" datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "7823ef2f63663907edab46af0d51359900f608d6" } @@ -154,6 +155,7 @@ reqwest = { version = "0.12", default-features = false, features = [ "stream", "multipart", ] } +parking_lot = "0.12" rskafka = { git = "https://github.com/influxdata/rskafka.git", rev = "75535b5ad9bae4a5dbb582c82e44dfd81ec10105", features = [ "transport-tls", ] } @@ -245,6 +247,7 @@ store-api = { path = "src/store-api" } substrait = { path = "src/common/substrait" } table = { path = "src/table" } + [patch.crates-io] # change all rustls dependencies to use our fork to default to `ring` to make it "just work" hyper-rustls = { git = "https://github.com/GreptimeTeam/hyper-rustls" } diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 469d7d1a7e2b..501c2f82d80a 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -28,7 +28,7 @@ enum_dispatch = "0.3" futures-util.workspace = true lazy_static.workspace = true moka = { workspace = true, features = ["future"] } -parking_lot = "0.12" +parking_lot.workspace = true prometheus.workspace = true prost.workspace = true query.workspace = true diff --git a/src/common/runtime/.gitignore b/src/common/runtime/.gitignore new file mode 100644 index 000000000000..dca1cb747431 --- /dev/null +++ b/src/common/runtime/.gitignore @@ -0,0 +1,2 @@ +test_generate_dir +priority_workload_cpu_usage.json \ No newline at end of file diff --git a/src/common/runtime/Cargo.toml b/src/common/runtime/Cargo.toml index e5fa276c4bf1..cb2633d86290 100644 --- a/src/common/runtime/Cargo.toml +++ b/src/common/runtime/Cargo.toml @@ -23,6 +23,13 @@ tokio.workspace = true tokio-metrics = "0.3" tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" } tokio-util.workspace = true +serde_json.workspace = true +sysinfo.workspace = true +parking_lot.workspace = true +clocksource.workspace = true +rand.workspace = true +pin-project.workspace = true +futures.workspace = true [dev-dependencies] tokio-test = "0.4" diff --git a/src/common/runtime/README b/src/common/runtime/README new file mode 100644 index 000000000000..61bc0058a7e0 --- /dev/null +++ b/src/common/runtime/README @@ -0,0 +1,5 @@ +# Run performance test for different priority & workload type + +``` +cargo test --package common-runtime --lib -- test_metrics::test_all_cpu_usage --nocapture +``` \ No newline at end of file diff --git a/src/common/runtime/scripts/draw.py b/src/common/runtime/scripts/draw.py new file mode 100644 index 000000000000..c6e4d56b66ee --- /dev/null +++ b/src/common/runtime/scripts/draw.py @@ -0,0 +1,57 @@ +import numpy as np +import matplotlib.pyplot as plt +import json +import os +os.chdir(os.path.dirname(__file__)) + +# read json from ../priority_workload_cpu_usage.json +with open('../priority_workload_cpu_usage.json', 'r') as f: + data = json.load(f) + + +# 解构数据 +priorities = [0, 1, 2, 3, 4] +load_types = [0, 1, 2, 3, 4] +values = np.zeros((len(priorities), len(load_types))) + +for key, value in data.items(): + p, l = map(int, key.split(',')) + values[p, l] = value + +# 绘图配置 +bar_width = 0.15 +index = np.arange(len(load_types)) + +# 创建柱状图 +fig, ax = plt.subplots() +bars = [] +colors = ['b', 'g', 'r', 'c', 'm'] + +for i in range(len(priorities)): + bar = ax.bar(index + i * bar_width, values[i], bar_width, label=f'Priority {i}', color=colors[i]) + bars.append(bar) + +# 添加值标签 +def add_labels(rects): + for rect in rects: + height = rect.get_height() + ax.annotate('{}'.format(height), + xy=(rect.get_x() + rect.get_width() / 2, height), + xytext=(0, 3), # 3 points vertical offset + textcoords="offset points", + rotation=90, + ha='center', va='bottom') + +for bar_group in bars: + add_labels(bar_group) + +# 设置图表标题和坐标轴标签 +ax.set_title('CPU Usage by Priority and Load Type') +ax.set_xlabel('Workload Type') +ax.set_ylabel('CPU Usage') +ax.set_xticks(index + bar_width * 2) +ax.set_xticklabels(load_types) +ax.legend() + +# 显示图表 +plt.show() \ No newline at end of file diff --git a/src/common/runtime/scripts/priority_workload_cpu_usage_count.json b/src/common/runtime/scripts/priority_workload_cpu_usage_count.json new file mode 100644 index 000000000000..1c040667fb5d --- /dev/null +++ b/src/common/runtime/scripts/priority_workload_cpu_usage_count.json @@ -0,0 +1 @@ +{"0,0":262.3619,"0,1":256.2622,"0,2":516.4156,"0,3":522.557,"1,0":342.16656,"1,1":389.12814,"1,2":523.02625,"1,3":551.30365,"2,0":437.40067,"2,1":449.49277,"2,2":523.3377,"2,3":560.76587,"3,0":543.88837,"3,1":534.6068,"3,2":522.6544,"3,3":571.3978,"4,0":592.19385,"4,1":595.3774,"4,2":529.2562,"4,3":551.40424} \ No newline at end of file diff --git a/src/common/runtime/scripts/priority_workload_cpu_usage_time.json b/src/common/runtime/scripts/priority_workload_cpu_usage_time.json new file mode 100644 index 000000000000..7bacb4b16168 --- /dev/null +++ b/src/common/runtime/scripts/priority_workload_cpu_usage_time.json @@ -0,0 +1,22 @@ +{ + "0,0": 262.3619, + "0,1": 256.2622, + "0,2": 516.4156, + "0,3": 522.557, + "1,0": 342.16656, + "1,1": 389.12814, + "1,2": 523.02625, + "1,3": 551.30365, + "2,0": 437.40067, + "2,1": 449.49277, + "2,2": 523.3377, + "2,3": 560.76587, + "3,0": 543.88837, + "3,1": 534.6068, + "3,2": 522.6544, + "3,3": 571.3978, + "4,0": 592.19385, + "4,1": 595.3774, + "4,2": 529.2562, + "4,3": 551.40424 +} \ No newline at end of file diff --git a/src/common/runtime/src/future_throttle_count_mode.rs b/src/common/runtime/src/future_throttle_count_mode.rs new file mode 100644 index 000000000000..504001eb8062 --- /dev/null +++ b/src/common/runtime/src/future_throttle_count_mode.rs @@ -0,0 +1,112 @@ +// use core::panicking::panic; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures::FutureExt; +use tokio::time::Sleep; + +use crate::runtime_throttle_count_mode::RuntimeThrottleShareWithFuture; + +// static ref EACH_FUTURE_RECORD: parking_lot::RwLock>> = RwLock::new(None); + +enum State { + Common, + Backoff(Pin>), +} +impl State { + fn unwrap_backoff(&mut self) -> &mut Pin> { + match self { + State::Backoff(sleep) => sleep, + _ => panic!("unwrap_backoff failed"), + } + } +} + +#[pin_project::pin_project] +pub struct ThrottleFuture { + #[pin] + future: F, + /// priority of this future + handle: Arc, + /// count of pendings + pub pend_cnt: u32, // track the pending count for test + /// count of inserted pendings + // pub inserted_pend_cnt: u32, + // sche_time: u32, + // poll_time: u32, + state: State, +} + +impl ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + pub fn new(handle: Arc, future: F) -> Self { + Self { + future, + handle, + pend_cnt: 0, + // inserted_pend_cnt: 0, + state: State::Common, + // poll_time: 0, + // sche_time: 0, + } + } +} + +impl Future for ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + type Output = F::Output; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + // let sche_begin = Instant::now(); + match this.state { + State::Common => {} + State::Backoff(ref mut sleep) => match sleep.poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Common; + } + Poll::Pending => return Poll::Pending, + }, + }; + + // let inter = 5; + + // if (*this.pend_cnt + 1) % inter == 0 { + if let Some(ratelimiter) = &this.handle.ratelimiter { + *this.pend_cnt += 1; + // println!("try wait for {}", avg_poll_time); + if let Err(wait) = ratelimiter.try_wait() { + *this.state = State::Backoff(Box::pin(tokio::time::sleep(wait))); + match this.state.unwrap_backoff().poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Common; + cx.waker().clone().wake(); + return Poll::Pending; + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } + + // } + let poll_res = this.future.poll(cx); + + match poll_res { + Poll::Ready(r) => Poll::Ready(r), + Poll::Pending => { + *this.pend_cnt += 1; + Poll::Pending + } + } + } +} diff --git a/src/common/runtime/src/future_throttle_time_mode.rs b/src/common/runtime/src/future_throttle_time_mode.rs new file mode 100644 index 000000000000..2b8780748de3 --- /dev/null +++ b/src/common/runtime/src/future_throttle_time_mode.rs @@ -0,0 +1,147 @@ +// use core::panicking::panic; +use std::any::TypeId; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::FutureExt; +use tokio::time::Sleep; + +use crate::runtime_throttle_time_mode::RuntimeThrottleShareWithFuture; + +// static ref EACH_FUTURE_RECORD: parking_lot::RwLock>> = RwLock::new(None); + +enum State { + Common, + Backoff(Pin>), +} +impl State { + fn unwrap_backoff(&mut self) -> &mut Pin> { + match self { + State::Backoff(sleep) => sleep, + _ => panic!("unwrap_backoff failed"), + } + } +} + +#[pin_project::pin_project] +pub struct ThrottleFuture { + #[pin] + future: F, + /// priority of this future + handle: Arc, + /// count of pendings + pub pend_cnt: u32, // track the pending count for test + /// count of inserted pendings + // pub inserted_pend_cnt: u32, + // sche_time: u32, + // poll_time: u32, + state: State, +} + +impl ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + pub fn new(handle: Arc, future: F) -> Self { + Self { + future, + handle, + pend_cnt: 0, + // inserted_pend_cnt: 0, + state: State::Common, + // poll_time: 0, + // sche_time: 0, + } + } +} + +impl Future for ThrottleFuture +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + type Output = F::Output; + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + // let sche_begin = Instant::now(); + match this.state { + State::Common => {} + State::Backoff(ref mut sleep) => match sleep.poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Common; + } + Poll::Pending => return Poll::Pending, + }, + }; + + // let inter = 5; + + // if (*this.pend_cnt + 1) % inter == 0 { + if let Some(ratelimiter) = &this.handle.ratelimiter { + *this.pend_cnt += 1; + let future_id = TypeId::of::(); + let avg_poll_time = + if let Some(v) = this.handle.each_future_avg_poll_time.read().get(&future_id) { + v.load(Ordering::Relaxed) + } else { + 1 + }; + // println!("try wait for {}", avg_poll_time); + if let Err(wait) = ratelimiter.try_wait_for(avg_poll_time) { + *this.state = State::Backoff(Box::pin(tokio::time::sleep(wait))); + match this.state.unwrap_backoff().poll_unpin(cx) { + Poll::Ready(_) => { + *this.state = State::Common; + cx.waker().clone().wake(); + return Poll::Pending; + } + Poll::Pending => { + return Poll::Pending; + } + } + } + } + + // } + let poll_begin = Instant::now(); + let poll_res = this.future.poll(cx); + let poll_time = poll_begin.elapsed().as_micros() as u64; + if this.handle.ratelimiter.is_some() { + let future_id = TypeId::of::(); + let read_avg_time = this.handle.each_future_avg_poll_time.read(); + if let Some(v) = read_avg_time.get(&future_id) { + v.store( + (v.load(Ordering::Relaxed) + poll_time) / 2, + Ordering::Relaxed, + ) + } else { + drop(read_avg_time); + this.handle + .each_future_avg_poll_time + .write() + .entry(future_id) + .and_modify(|v| { + v.store( + (v.load(Ordering::Relaxed) + poll_time) / 2, + Ordering::Relaxed, + ) + }) + .or_insert(AtomicU64::new(poll_time)); + } + } + + match poll_res { + Poll::Ready(r) => Poll::Ready(r), + Poll::Pending => { + *this.pend_cnt += 1; + Poll::Pending + } + } + } +} diff --git a/src/common/runtime/src/global.rs b/src/common/runtime/src/global.rs index b7d78badeb41..5cd008fa8cec 100644 --- a/src/common/runtime/src/global.rs +++ b/src/common/runtime/src/global.rs @@ -21,6 +21,7 @@ use once_cell::sync::Lazy; use paste::paste; use serde::{Deserialize, Serialize}; +use crate::runtime::{BuilderBuild, RuntimeTrait}; use crate::{Builder, JoinHandle, Runtime}; const GLOBAL_WORKERS: usize = 8; diff --git a/src/common/runtime/src/lib.rs b/src/common/runtime/src/lib.rs index 4429f6fa71ab..a1900ffacdd1 100644 --- a/src/common/runtime/src/lib.rs +++ b/src/common/runtime/src/lib.rs @@ -13,10 +13,19 @@ // limitations under the License. pub mod error; +mod future_throttle_count_mode; +mod future_throttle_time_mode; pub mod global; mod metrics; +mod ratelimit; mod repeated_task; pub mod runtime; +mod runtime_default; +mod runtime_throttle_count_mode; +mod runtime_throttle_time_mode; +#[cfg(test)] +mod test_metrics; +mod throttle_priority; pub use global::{ block_on_compact, block_on_global, compact_runtime, create_runtime, global_runtime, diff --git a/src/common/runtime/src/ratelimit.rs b/src/common/runtime/src/ratelimit.rs new file mode 100644 index 000000000000..5eb25c93c4f7 --- /dev/null +++ b/src/common/runtime/src/ratelimit.rs @@ -0,0 +1,608 @@ +//! This crate provides a simple implementation of a ratelimiter that can be +//! shared between threads. +//! +//! ``` +//! use ratelimit::Ratelimiter; +//! use std::time::Duration; +//! +//! // Constructs a ratelimiter that generates 1 tokens/s with no burst. This +//! // can be used to produce a steady rate of requests. The ratelimiter starts +//! // with no tokens available, which means across application restarts, we +//! // cannot exceed the configured ratelimit. +//! let ratelimiter = Ratelimiter::builder(1, Duration::from_secs(1)) +//! .build() +//! .unwrap(); +//! +//! // Another use case might be admission control, where we start with some +//! // initial budget and replenish it periodically. In this example, our +//! // ratelimiter allows 1000 tokens/hour. For every hour long sliding window, +//! // no more than 1000 tokens can be acquired. But all tokens can be used in +//! // a single burst. Additional calls to `try_wait()` will return an error +//! // until the next token addition. +//! // +//! // This is popular approach with public API ratelimits. +//! let ratelimiter = Ratelimiter::builder(1000, Duration::from_secs(3600)) +//! .max_tokens(1000) +//! .initial_available(1000) +//! .build() +//! .unwrap(); +//! +//! // For very high rates, we should avoid using too short of an interval due +//! // to limits of system clock resolution. Instead, it's better to allow some +//! // burst and add multiple tokens per interval. The resulting ratelimiter +//! // here generates 50 million tokens/s and allows no more than 50 tokens to +//! // be acquired in any 1 microsecond long window. +//! let ratelimiter = Ratelimiter::builder(50, Duration::from_micros(1)) +//! .max_tokens(50) +//! .build() +//! .unwrap(); +//! +//! // constructs a ratelimiter that generates 100 tokens/s with no burst +//! let ratelimiter = Ratelimiter::builder(1, Duration::from_millis(10)) +//! .build() +//! .unwrap(); +//! +//! for _ in 0..10 { +//! // a simple sleep-wait +//! if let Err(sleep) = ratelimiter.try_wait() { +//! std::thread::sleep(sleep); +//! continue; +//! } +//! +//! // do some ratelimited action here +//! } +//! ``` + +use core::sync::atomic::{AtomicU64, Ordering}; +use std::fmt::{Debug, Formatter}; + +use clocksource::precise::{AtomicInstant, Duration, Instant}; +use parking_lot::RwLock; +use snafu::Snafu; + +// #[derive(Error, Debug, PartialEq, Eq)] +// pub enum Error { +// #[error("available tokens cannot be set higher than max tokens")] +// AvailableTokensTooHigh, +// #[error("max tokens cannot be less than the refill amount")] +// MaxTokensTooLow, +// #[error("refill amount cannot exceed the max tokens")] +// RefillAmountTooHigh, +// #[error("refill interval in nanoseconds exceeds maximum u64")] +// RefillIntervalTooLong, +// } +#[derive(Debug, Snafu, PartialEq, Eq)] +pub enum Error { + #[snafu(display("available tokens cannot be set higher than max tokens"))] + AvailableTokensTooHigh, + + #[snafu(display("max tokens cannot be less than the refill amount"))] + MaxTokensTooLow, + + #[snafu(display("refill amount cannot exceed the max tokens"))] + RefillAmountTooHigh, + + #[snafu(display("refill interval in nanoseconds exceeds maximum u64"))] + RefillIntervalTooLong, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct Parameters { + capacity: u64, + refill_amount: u64, + refill_interval: Duration, +} + +pub struct Ratelimiter { + available: AtomicU64, + dropped: AtomicU64, + parameters: RwLock, + refill_at: AtomicInstant, +} + +impl Ratelimiter { + /// Initialize a builder that will construct a `Ratelimiter` that adds the + /// specified `amount` of tokens to the token bucket after each `interval` + /// has elapsed. + /// + /// Note: In practice, the system clock resolution imposes a lower bound on + /// the `interval`. To be safe, it is recommended to set the interval to be + /// no less than 1 microsecond. This also means that the number of tokens + /// per interval should be > 1 to achieve rates beyond 1 million tokens/s. + pub fn builder(amount: u64, interval: core::time::Duration) -> Builder { + Builder::new(amount, interval) + } + + /// Return the current effective rate of the Ratelimiter in tokens/second + pub fn rate(&self) -> f64 { + let parameters = self.parameters.read(); + + parameters.refill_amount as f64 * 1_000_000_000.0 + / parameters.refill_interval.as_nanos() as f64 + } + + /// Return the current interval between refills. + pub fn refill_interval(&self) -> core::time::Duration { + let parameters = self.parameters.read(); + + core::time::Duration::from_nanos(parameters.refill_interval.as_nanos()) + } + + /// Allows for changing the interval between refills at runtime. + pub fn set_refill_interval(&self, duration: core::time::Duration) -> Result<(), Error> { + if duration.as_nanos() > u64::MAX as u128 { + return Err(Error::RefillIntervalTooLong); + } + + let mut parameters = self.parameters.write(); + + parameters.refill_interval = Duration::from_nanos(duration.as_nanos() as u64); + Ok(()) + } + + /// Return the current number of tokens to be added on each refill. + pub fn refill_amount(&self) -> u64 { + let parameters = self.parameters.read(); + + parameters.refill_amount + } + + /// Allows for changing the number of tokens to be added on each refill. + pub fn set_refill_amount(&self, amount: u64) -> Result<(), Error> { + let mut parameters = self.parameters.write(); + + if amount > parameters.capacity { + Err(Error::RefillAmountTooHigh) + } else { + parameters.refill_amount = amount; + Ok(()) + } + } + + /// Returns the maximum number of tokens that can + pub fn max_tokens(&self) -> u64 { + let parameters = self.parameters.read(); + + parameters.capacity + } + + /// Allows for changing the maximum number of tokens that can be held by the + /// ratelimiter for immediate use. This effectively sets the burst size. The + /// configured value must be greater than or equal to the refill amount. + pub fn set_max_tokens(&self, amount: u64) -> Result<(), Error> { + let mut parameters = self.parameters.write(); + + if amount < parameters.refill_amount { + Err(Error::MaxTokensTooLow) + } else { + parameters.capacity = amount; + loop { + let available = self.available(); + if amount > available { + if self + .available + .compare_exchange(available, amount, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + break; + } + } else { + break; + } + } + Ok(()) + } + } + + /// Returns the number of tokens currently available. + pub fn available(&self) -> u64 { + self.available.load(Ordering::Relaxed) + } + + /// Sets the number of tokens available to some amount. Returns an error if + /// the amount exceeds the bucket capacity. + pub fn set_available(&self, amount: u64) -> Result<(), Error> { + let parameters = self.parameters.read(); + if amount > parameters.capacity { + Err(Error::AvailableTokensTooHigh) + } else { + self.available.store(amount, Ordering::Release); + Ok(()) + } + } + + /// Returns the number of tokens that have been dropped due to bucket + /// overflowing. + pub fn dropped(&self) -> u64 { + self.dropped.load(Ordering::Relaxed) + } + + /// Internal function to refill the token bucket. Called as part of + /// `try_wait()` + fn refill(&self, time: Instant) -> Result<(), core::time::Duration> { + // will hold the number of elapsed refill intervals + let mut intervals; + // will hold a read lock for the refill parameters + let mut parameters; + + loop { + // determine when next refill should occur + let refill_at = self.refill_at.load(Ordering::Relaxed); + + // if this time is before the next refill is due, return + if time < refill_at { + return Err(core::time::Duration::from_nanos( + (refill_at - time).as_nanos(), + )); + } + + // acquire read lock for refill parameters + parameters = self.parameters.read(); + + intervals = (time - refill_at).as_nanos() / parameters.refill_interval.as_nanos() + 1; + + // calculate when the following refill would be + let next_refill = + refill_at + Duration::from_nanos(intervals * parameters.refill_interval.as_nanos()); + + // compare/exchange, if race, loop and check if we still need to + // refill before trying again + if self + .refill_at + .compare_exchange(refill_at, next_refill, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + break; + } + } + + // figure out how many tokens we might add + let amount = intervals * parameters.refill_amount; + + let available = self.available.load(Ordering::Acquire); + + if available + amount >= parameters.capacity { + // we will fill the bucket up to the capacity + let to_add = parameters.capacity - available; + self.available.fetch_add(to_add, Ordering::Release); + + // and increment the number of tokens dropped + self.dropped.fetch_add(amount - to_add, Ordering::Relaxed); + } else { + self.available.fetch_add(amount, Ordering::Release); + } + + Ok(()) + } + + /// Non-blocking function to "wait" for a single token. On success, a single + /// token has been acquired. On failure, a `Duration` hinting at when the + /// next refill would occur is returned. + pub fn try_wait(&self) -> Result<(), core::time::Duration> { + // We have an outer loop that drives the refilling of the token bucket. + // This will only be repeated if we refill successfully, but somebody + // else takes the newly available token(s) before we can attempt to + // acquire one. + loop { + // Attempt to refill the bucket. This makes sure we are moving the + // time forward, issuing new tokens, hitting our max capacity, etc. + let refill_result = self.refill(Instant::now()); + + // Note: right now it doesn't matter if refill succeeded or failed. + // We might already have tokens available. Even if refill failed we + // check if there are tokens and attempt to acquire one. + + // Our inner loop deals with acquiring a token. It will only repeat + // if there is a race on the available tokens. This can occur + // between: + // - the refill in the outer loop and the load in the inner loop + // - the load and the compare exchange, both in the inner loop + // + // Both these cases mean that somebody has taken a token we had + // hoped to acquire. However, the handling of these cases differs. + loop { + // load the count of available tokens + let available = self.available.load(Ordering::Acquire); + + // Two cases if there are no available tokens, we have: + // - Failed to refill and the bucket was empty. This means we + // should early return with an error that provides the caller + // with the duration until next refill. + // - Succeeded to refill but there are now no tokens. This is + // only hit if somebody else took the token between refill and + // load. In this case, we break the inner loop and repeat from + // the top of the outer loop. + // + // Note: this is when it matters if the refill was successful. + // We use the success or failure to determine if there was a + // race. + if available == 0 { + match refill_result { + Ok(_) => { + // This means we raced. Refill succeeded but another + // caller has taken the token. We break the inner + // loop and try to refill again. + break; + } + Err(e) => { + // Refill failed and there were no tokens already + // available. We return the error which contains a + // duration until the next refill. + return Err(e); + } + } + } + + // If we made it here, available is > 0 and so we can attempt to + // acquire a token by doing a simple compare exchange on + // available with the new value. + let new = available - 1; + + if self + .available + .compare_exchange(available, new, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // We have acquired a token and can return successfully + return Ok(()); + } + + // If we raced on the compare exchange, we need to repeat the + // token acquisition. Either there will be another token we can + // try to acquire, or we will break and attempt a refill again. + } + } + } + + pub fn try_wait_for(&self, count: u64) -> Result<(), core::time::Duration> { + // We have an outer loop that drives the refilling of the token bucket. + // This will only be repeated if we refill successfully, but somebody + // else takes the newly available token(s) before we can attempt to + // acquire one. + loop { + // Attempt to refill the bucket. This makes sure we are moving the + // time forward, issuing new tokens, hitting our max capacity, etc. + let refill_result = self.refill(Instant::now()); + + // Note: right now it doesn't matter if refill succeeded or failed. + // We might already have tokens available. Even if refill failed we + // check if there are tokens and attempt to acquire one. + + // Our inner loop deals with acquiring a token. It will only repeat + // if there is a race on the available tokens. This can occur + // between: + // - the refill in the outer loop and the load in the inner loop + // - the load and the compare exchange, both in the inner loop + // + // Both these cases mean that somebody has taken a token we had + // hoped to acquire. However, the handling of these cases differs. + loop { + // load the count of available tokens + let available = self.available.load(Ordering::Acquire); + + // Two cases if there are no available tokens, we have: + // - Failed to refill and the bucket was empty. This means we + // should early return with an error that provides the caller + // with the duration until next refill. + // - Succeeded to refill but there are now no tokens. This is + // only hit if somebody else took the token between refill and + // load. In this case, we break the inner loop and repeat from + // the top of the outer loop. + // + // Note: this is when it matters if the refill was successful. + // We use the success or failure to determine if there was a + // race. + if available < count { + match refill_result { + Ok(_) => { + // This means we raced. Refill succeeded but another + // caller has taken the token. We break the inner + // loop and try to refill again. + break; + } + Err(e) => { + // Refill failed and there were no tokens already + // available. We return the error which contains a + // duration until the next refill. + return Err(e); + } + } + } + + // If we made it here, available is > 0 and so we can attempt to + // acquire a token by doing a simple compare exchange on + // available with the new value. + let new = available - count; + + if self + .available + .compare_exchange(available, new, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // We have acquired a token and can return successfully + return Ok(()); + } + + // If we raced on the compare exchange, we need to repeat the + // token acquisition. Either there will be another token we can + // try to acquire, or we will break and attempt a refill again. + } + } + } +} + +impl Debug for Ratelimiter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Ratelimiter") + .field("available", &self.available.load(Ordering::Acquire)) + .field("max_tokens", &self.max_tokens()) + .field("refill_amount", &self.refill_amount()) + .field("refill_interval", &self.refill_interval()) + .finish() + } +} + +pub struct Builder { + initial_available: u64, + max_tokens: u64, + refill_amount: u64, + refill_interval: core::time::Duration, +} + +impl Builder { + /// Initialize a new builder that will add `amount` tokens after each + /// `interval` has elapsed. + fn new(amount: u64, interval: core::time::Duration) -> Self { + Self { + // default of zero tokens initially + initial_available: 0, + // default of one to prohibit bursts + max_tokens: 1, + refill_amount: amount, + refill_interval: interval, + } + } + + /// Set the max tokens that can be held in the the `Ratelimiter` at any + /// time. This limits the size of any bursts by placing an upper bound on + /// the number of tokens available for immediate use. + /// + /// By default, the max_tokens will be set to one unless the refill amount + /// requires a higher value. + /// + /// The selected value cannot be lower than the refill amount. + pub fn max_tokens(mut self, tokens: u64) -> Self { + self.max_tokens = tokens; + self + } + + /// Set the number of tokens that are initially available. For admission + /// control scenarios, you may wish for there to be some tokens initially + /// available to avoid delays or discards until the ratelimit is hit. When + /// using it to enforce a ratelimit on your own process, for example when + /// generating outbound requests, you may want there to be zero tokens + /// availble initially to make your application more well-behaved in event + /// of process restarts. + /// + /// The default is that no tokens are initially available. + pub fn initial_available(mut self, tokens: u64) -> Self { + self.initial_available = tokens; + self + } + + /// Consumes this `Builder` and attempts to construct a `Ratelimiter`. + pub fn build(self) -> Result { + if self.max_tokens < self.refill_amount { + return Err(Error::MaxTokensTooLow); + } + + if self.refill_interval.as_nanos() > u64::MAX as u128 { + return Err(Error::RefillIntervalTooLong); + } + + let available = AtomicU64::new(self.initial_available); + + let parameters = Parameters { + capacity: self.max_tokens, + refill_amount: self.refill_amount, + refill_interval: Duration::from_nanos(self.refill_interval.as_nanos() as u64), + }; + + let refill_at = AtomicInstant::new(Instant::now() + self.refill_interval); + + Ok(Ratelimiter { + available, + dropped: AtomicU64::new(0), + parameters: parameters.into(), + refill_at, + }) + } +} + +#[cfg(test)] +mod tests { + use std::time::{Duration, Instant}; + + use ratelimit::Ratelimiter; + + use crate::*; + + macro_rules! approx_eq { + ($value:expr, $target:expr) => { + let value: f64 = $value; + let target: f64 = $target; + assert!(value >= target * 0.999, "{value} >= {}", target * 0.999); + assert!(value <= target * 1.001, "{value} <= {}", target * 1.001); + }; + } + + // test that the configured rate and calculated effective rate are close + #[test] + pub fn rate() { + // amount + interval + let rl = Ratelimiter::builder(4, Duration::from_nanos(333)) + .max_tokens(4) + .build() + .unwrap(); + + approx_eq!(rl.rate(), 12012012.0); + } + + // quick test that a ratelimiter yields tokens at the desired rate + #[test] + pub fn wait() { + let rl = Ratelimiter::builder(1, Duration::from_micros(10)) + .build() + .unwrap(); + + let mut count = 0; + + let now = Instant::now(); + let end = now + Duration::from_millis(10); + while Instant::now() < end { + if rl.try_wait().is_ok() { + count += 1; + } + } + + assert!(count >= 600); + assert!(count <= 1400); + } + + // quick test that an idle ratelimiter doesn't build up excess capacity + #[test] + pub fn idle() { + let rl = Ratelimiter::builder(1, Duration::from_millis(1)) + .initial_available(1) + .build() + .unwrap(); + + std::thread::sleep(Duration::from_millis(10)); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_err()); + assert!(rl.dropped() >= 8); + } + + // quick test that capacity acts as expected + #[test] + pub fn capacity() { + let rl = Ratelimiter::builder(1, Duration::from_millis(10)) + .max_tokens(10) + .initial_available(0) + .build() + .unwrap(); + + std::thread::sleep(Duration::from_millis(100)); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_ok()); + assert!(rl.try_wait().is_err()); + } +} diff --git a/src/common/runtime/src/repeated_task.rs b/src/common/runtime/src/repeated_task.rs index 2431a2ee17fb..6cc26e0545e6 100644 --- a/src/common/runtime/src/repeated_task.rs +++ b/src/common/runtime/src/repeated_task.rs @@ -23,6 +23,7 @@ use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::error::{IllegalStateSnafu, Result, WaitGcTaskStopSnafu}; +use crate::runtime::RuntimeTrait; use crate::Runtime; /// Task to execute repeatedly. diff --git a/src/common/runtime/src/runtime.rs b/src/common/runtime/src/runtime.rs index 0ea041578e10..cee660ddebae 100644 --- a/src/common/runtime/src/runtime.rs +++ b/src/common/runtime/src/runtime.rs @@ -19,23 +19,23 @@ use std::thread; use std::time::Duration; use snafu::ResultExt; -use tokio::runtime::{Builder as RuntimeBuilder, Handle}; +use tokio::runtime::Builder as RuntimeBuilder; use tokio::sync::oneshot; pub use tokio::task::{JoinError, JoinHandle}; use crate::error::*; use crate::metrics::*; +use crate::runtime_default::DefaultRuntime; +use crate::runtime_throttle_count_mode::RuntimeThrottleCount; +use crate::runtime_throttle_time_mode::RuntimeThrottleTime; +use crate::throttle_priority::Priority; -static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); +// configurations +// pub type Runtime =crate::runtime_default::DefaultRuntime; +// pub type Runtime = crate::runtime_throttle_time_mode::RuntimeThrottleTime; +pub type Runtime = RuntimeThrottleCount; -/// A runtime to run future tasks -#[derive(Clone, Debug)] -pub struct Runtime { - name: String, - handle: Handle, - // Used to receive a drop signal when dropper is dropped, inspired by databend - _dropper: Arc, -} +static RUNTIME_ID: AtomicUsize = AtomicUsize::new(0); /// Dropping the dropper will cause runtime to shutdown. #[derive(Debug)] @@ -50,45 +50,42 @@ impl Drop for Dropper { } } -impl Runtime { - pub fn builder() -> Builder { +pub trait RuntimeTrait { + // fn new(name: &str, handle: Handle, dropper: Arc) -> Self; + + fn builder() -> Builder { Builder::default() } /// Spawn a future and execute it in this thread pool /// /// Similar to tokio::runtime::Runtime::spawn() - pub fn spawn(&self, future: F) -> JoinHandle + fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.handle.spawn(future) - } + F::Output: Send + 'static; /// Run the provided function on an executor dedicated to blocking /// operations. - pub fn spawn_blocking(&self, func: F) -> JoinHandle + fn spawn_blocking(&self, func: F) -> JoinHandle where F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.handle.spawn_blocking(func) - } + R: Send + 'static; /// Run a future to complete, this is the runtime's entry point - pub fn block_on(&self, future: F) -> F::Output { - self.handle.block_on(future) - } + fn block_on(&self, future: F) -> F::Output; - pub fn name(&self) -> &str { - &self.name - } + fn name(&self) -> &str; +} + +pub trait BuilderBuild { + fn build(&mut self) -> Result; } pub struct Builder { runtime_name: String, thread_name: String, + priority: Priority, builder: RuntimeBuilder, } @@ -98,11 +95,17 @@ impl Default for Builder { runtime_name: format!("runtime-{}", RUNTIME_ID.fetch_add(1, Ordering::Relaxed)), thread_name: "default-worker".to_string(), builder: RuntimeBuilder::new_multi_thread(), + priority: Priority::VeryHigh, } } } impl Builder { + pub fn priority(&mut self, priority: Priority) -> &mut Self { + self.priority = priority; + self + } + /// Sets the number of worker threads the Runtime will use. /// /// This can be any number above 0. The default value is the number of cores available to the system. @@ -139,8 +142,78 @@ impl Builder { self.thread_name = val.into(); self } +} + +impl BuilderBuild for Builder { + fn build(&mut self) -> Result { + let runtime = self + .builder + .enable_all() + .thread_name(self.thread_name.clone()) + .on_thread_start(on_thread_start(self.thread_name.clone())) + .on_thread_stop(on_thread_stop(self.thread_name.clone())) + .on_thread_park(on_thread_park(self.thread_name.clone())) + .on_thread_unpark(on_thread_unpark(self.thread_name.clone())) + .build() + .context(BuildRuntimeSnafu)?; + + let name = self.runtime_name.clone(); + let handle = runtime.handle().clone(); + let (send_stop, recv_stop) = oneshot::channel(); + // Block the runtime to shutdown. + let _ = thread::Builder::new() + .name(format!("{}-blocker", self.thread_name)) + .spawn(move || runtime.block_on(recv_stop)); + + #[cfg(tokio_unstable)] + register_collector(name.clone(), &handle); + + Ok(DefaultRuntime::new( + &name, + handle, + Arc::new(Dropper { + close: Some(send_stop), + }), + )) + } +} +impl BuilderBuild for Builder { + fn build(&mut self) -> Result { + let runtime = self + .builder + .enable_all() + .thread_name(self.thread_name.clone()) + .on_thread_start(on_thread_start(self.thread_name.clone())) + .on_thread_stop(on_thread_stop(self.thread_name.clone())) + .on_thread_park(on_thread_park(self.thread_name.clone())) + .on_thread_unpark(on_thread_unpark(self.thread_name.clone())) + .build() + .context(BuildRuntimeSnafu)?; - pub fn build(&mut self) -> Result { + let name = self.runtime_name.clone(); + let handle = runtime.handle().clone(); + let (send_stop, recv_stop) = oneshot::channel(); + // Block the runtime to shutdown. + let _ = thread::Builder::new() + .name(format!("{}-blocker", self.thread_name)) + .spawn(move || runtime.block_on(recv_stop)); + + #[cfg(tokio_unstable)] + register_collector(name.clone(), &handle); + + Ok(RuntimeThrottleTime::new( + &name, + self.priority, + handle, + Arc::new(Dropper { + close: Some(send_stop), + }), + )) + } +} + +impl BuilderBuild for Builder { + fn build(&mut self) -> Result { let runtime = self .builder .enable_all() @@ -163,13 +236,14 @@ impl Builder { #[cfg(tokio_unstable)] register_collector(name.clone(), &handle); - Ok(Runtime { - name, + Ok(RuntimeThrottleCount::new( + &name, + self.priority, handle, - _dropper: Arc::new(Dropper { + Arc::new(Dropper { close: Some(send_stop), }), - }) + )) } } @@ -215,6 +289,7 @@ fn on_thread_unpark(thread_name: String) -> impl Fn() + 'static { #[cfg(test)] mod tests { + use std::sync::Arc; use std::thread; use std::time::Duration; @@ -235,7 +310,7 @@ mod tests { #[test] fn test_metric() { - let runtime = Builder::default() + let runtime: Runtime = Builder::default() .worker_threads(5) .thread_name("test_runtime_metric") .build() diff --git a/src/common/runtime/src/runtime_default.rs b/src/common/runtime/src/runtime_default.rs new file mode 100644 index 000000000000..49462c0f89db --- /dev/null +++ b/src/common/runtime/src/runtime_default.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::sync::Arc; + +use tokio::runtime::Handle; +pub use tokio::task::JoinHandle; + +use crate::runtime::{Dropper, RuntimeTrait}; +use crate::Builder; + +/// A runtime to run future tasks +#[derive(Clone, Debug)] +pub struct DefaultRuntime { + name: String, + handle: Handle, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +impl DefaultRuntime { + pub fn new(name: &str, handle: Handle, dropper: Arc) -> Self { + Self { + name: name.to_string(), + handle, + _dropper: dropper, + } + } +} + +impl RuntimeTrait for DefaultRuntime { + fn builder() -> Builder { + Builder::default() + } + + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle.spawn(future) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } + + fn name(&self) -> &str { + &self.name + } +} diff --git a/src/common/runtime/src/runtime_throttle_count_mode.rs b/src/common/runtime/src/runtime_throttle_count_mode.rs new file mode 100644 index 000000000000..f837fa7614f3 --- /dev/null +++ b/src/common/runtime/src/runtime_throttle_count_mode.rs @@ -0,0 +1,91 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::future::Future; +use std::sync::Arc; + +use tokio::runtime::Handle; +pub use tokio::task::JoinHandle; + +use crate::future_throttle_count_mode::ThrottleFuture; +use crate::ratelimit::Ratelimiter; +use crate::runtime::{Dropper, RuntimeTrait}; +use crate::throttle_priority::Priority; +use crate::Builder; + +#[derive(Debug)] +pub struct RuntimeThrottleShareWithFuture { + pub ratelimiter: Option, +} + +/// A runtime to run future tasks +#[derive(Clone, Debug)] +pub struct RuntimeThrottleCount { + name: String, + handle: Handle, + share_with_future: Arc, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +impl RuntimeThrottleCount { + pub fn new(name: &str, priority: Priority, handle: Handle, dropper: Arc) -> Self { + Self { + name: name.to_string(), + handle, + share_with_future: Arc::new(RuntimeThrottleShareWithFuture { + ratelimiter: priority.ratelimiter_count(), + }), + _dropper: dropper, + } + } +} + +impl RuntimeTrait for RuntimeThrottleCount { + fn builder() -> Builder { + Builder::default() + } + + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle + .spawn(ThrottleFuture::new(self.share_with_future.clone(), future)) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } + + fn name(&self) -> &str { + &self.name + } +} diff --git a/src/common/runtime/src/runtime_throttle_time_mode.rs b/src/common/runtime/src/runtime_throttle_time_mode.rs new file mode 100644 index 000000000000..989b47ce6966 --- /dev/null +++ b/src/common/runtime/src/runtime_throttle_time_mode.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::TypeId; +use std::collections::HashMap; +use std::fmt::Debug; +use std::future::Future; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use parking_lot::RwLock; +use tokio::runtime::Handle; +pub use tokio::task::JoinHandle; + +use crate::future_throttle_time_mode::ThrottleFuture; +use crate::ratelimit::Ratelimiter; +use crate::runtime::{Dropper, RuntimeTrait}; +use crate::throttle_priority::Priority; +use crate::Builder; + +#[derive(Debug)] +pub struct RuntimeThrottleShareWithFuture { + pub ratelimiter: Option, + pub each_future_avg_poll_time: RwLock>, +} + +/// A runtime to run future tasks +#[derive(Clone, Debug)] +pub struct RuntimeThrottleTime { + name: String, + handle: Handle, + share_with_future: Arc, + // Used to receive a drop signal when dropper is dropped, inspired by databend + _dropper: Arc, +} + +impl RuntimeThrottleTime { + pub fn new(name: &str, priority: Priority, handle: Handle, dropper: Arc) -> Self { + Self { + name: name.to_string(), + handle, + share_with_future: Arc::new(RuntimeThrottleShareWithFuture { + ratelimiter: priority.ratelimiter_time(), + each_future_avg_poll_time: HashMap::new().into(), + }), + _dropper: dropper, + } + } +} + +impl RuntimeTrait for RuntimeThrottleTime { + fn builder() -> Builder { + Builder::default() + } + + /// Spawn a future and execute it in this thread pool + /// + /// Similar to tokio::runtime::Runtime::spawn() + fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + self.handle + .spawn(ThrottleFuture::new(self.share_with_future.clone(), future)) + } + + /// Run the provided function on an executor dedicated to blocking + /// operations. + fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.handle.spawn_blocking(func) + } + + /// Run a future to complete, this is the runtime's entry point + fn block_on(&self, future: F) -> F::Output { + self.handle.block_on(future) + } + + fn name(&self) -> &str { + &self.name + } +} diff --git a/src/common/runtime/src/test_metrics.rs b/src/common/runtime/src/test_metrics.rs new file mode 100644 index 000000000000..21dab9d550eb --- /dev/null +++ b/src/common/runtime/src/test_metrics.rs @@ -0,0 +1,234 @@ +use std::collections::BTreeMap; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::thread; + +use sysinfo::{ProcessRefreshKind, RefreshKind}; + +use crate::runtime::{BuilderBuild, RuntimeTrait}; +use crate::throttle_priority::Priority; +use crate::{Builder, Runtime}; + +// configurations +const TEST_GENERATE_DIR: &'static str = "test_generate_dir"; +const WORKLOAD_TYPE_COUNT: u64 = 4; + +#[derive(Debug)] +pub struct ProcessMetric { + pub cpu_usage: f32, +} + +// return (stop flag, stop receiver) +pub fn start_monitor_thread() -> ( + Arc, + tokio::sync::oneshot::Receiver, +) { + let (tx, rx) = tokio::sync::oneshot::channel(); + let stop = Arc::new(AtomicBool::new(false)); + let stop_clone = stop.clone(); + // let pid = std::process::id(); + thread::spawn(move || { + // monitor cpu usage + let mut metric = sysinfo::System::new_with_specifics( + RefreshKind::new().with_processes(ProcessRefreshKind::everything()), + ); + let mut cpu_usage = 0.0; + let mut metric_count = 0; + while !stop_clone.load(std::sync::atomic::Ordering::Relaxed) { + metric.refresh_cpu(); + // let process = metric.process(Pid::from(pid as usize)).unwrap(); + cpu_usage += metric.cpus().iter().map(|cpu| cpu.cpu_usage()).sum::(); + metric_count += 1; + + // thread::sleep(std::time::Duration::from_millis(1)); + thread::yield_now(); + } + let cpu_usage = if metric_count == 0 { + 0.0 + } else { + cpu_usage / metric_count as f32 + }; + tx.send(ProcessMetric { cpu_usage }).unwrap(); + }); + + (stop, rx) +} + +fn compute_pi_str(precision: usize) -> String { + let mut pi = 0.0; + let mut sign = 1.0; + + for i in 0..precision { + pi += sign / (2 * i + 1) as f64; + sign *= -1.0; + } + + pi *= 4.0; + + // 将结果转换为字符串,并保留小数点后一定位数 + format!("{:.prec$}", pi, prec = precision) +} +#[test] +fn test_compute_pi_str_time() { + let start = std::time::Instant::now(); + compute_pi_str(10); + println!("elapsed {}", start.elapsed().as_micros()); +} + +async fn workload_compute_heavily() { + let prefix = 10; + // let mut file = tokio::fs::OpenOptions::new() + // .write(true) + // .append(true) + // .create(true) + // .open(format!("{}/pi_{}", TEST_GENRATE_DIR, prefix)) + // .await + // .unwrap(); + for _i in 0..3000 { + let _pi = compute_pi_str(prefix); + tokio::task::yield_now().await; + std::thread::yield_now(); + // println!("{} {}", i, pi); + // file.write_all(pi.as_bytes()).await.unwrap(); + } +} +async fn workload_compute_heavily2() { + let prefix = 30; + // let mut file = tokio::fs::OpenOptions::new() + // .write(true) + // .append(true) + // .create(true) + // .open(format!("{}/pi_{}", TEST_GENRATE_DIR, prefix)) + // .await + // .unwrap(); + for _ in 0..2000 { + let _ = compute_pi_str(prefix); + tokio::task::yield_now().await; + std::thread::yield_now(); + // println!("{} {}", i, pi); + // file.write_all(pi.as_bytes()).await.unwrap(); + } +} +async fn workload_write_file(_idx: u64) { + use tokio::io::AsyncWriteExt; + let prefix = 100; + let mut file = tokio::fs::OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(format!("{}/pi_{}", TEST_GENERATE_DIR, prefix)) + .await + .unwrap(); + for _ in 0..100 { + let pi = compute_pi_str(prefix); + + file.write_all(pi.as_bytes()).await.unwrap(); + } +} +async fn workload_spawn_blocking_write_file() { + use std::io::Write; + let prefix = 100; + let mut file = Some( + std::fs::OpenOptions::new() + .write(true) + .append(true) + .create(true) + .open(format!("{}/pi_{}", TEST_GENERATE_DIR, prefix)) + .unwrap(), + ); + for _ in 0..100 { + let pi = compute_pi_str(prefix); + let mut file1 = file.take().unwrap(); + file = Some( + tokio::task::spawn_blocking(move || { + file1.write_all(pi.as_bytes()).unwrap(); + file1 + }) + .await + .unwrap(), + ); + // file.write_all(pi.as_bytes()).await.unwrap(); + } +} + +#[test] +fn test_all_cpu_usage() { + let _ = std::fs::create_dir_all(TEST_GENERATE_DIR); + let mut collect: BTreeMap = BTreeMap::new(); + let priorities = [ + Priority::VeryLow, + Priority::Low, + Priority::Middle, + Priority::High, + Priority::VeryHigh, + ]; + for i in 0..WORKLOAD_TYPE_COUNT { + for p in priorities.iter() { + let runtime: Runtime = Builder::default() + .runtime_name("test") + .thread_name("test") + .worker_threads(8) + .priority(*p) + .build() + .expect("Fail to create runtime"); + let runtime2 = runtime.clone(); + let metric = runtime.block_on(test_spec_priority_and_workload(*p, runtime2, i)); + collect.insert(format!("{},{}", *p as u64, i), metric.cpu_usage); + } + } + // write to json + let writer = std::fs::File::create("priority_workload_cpu_usage.json").unwrap(); + + serde_json::to_writer(writer, &collect).unwrap(); +} + +pub async fn test_spec_priority_and_workload( + priority: Priority, + runtime: Runtime, + workload_id: u64, +) -> ProcessMetric { + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + println!( + "testing cpu usage for priority {:?} workload_id {}", + priority, workload_id, + ); + // start monitor thread + let (stop, rx) = start_monitor_thread(); + let mut tasks = vec![]; + let start = std::time::Instant::now(); + for i in 0..500 { + // persist cpu usage in file: {priority}.{workload_id} + match workload_id { + 0 => { + tasks.push(runtime.spawn(workload_compute_heavily())); + } + 1 => { + tasks.push(runtime.spawn(workload_compute_heavily2())); + } + 2 => { + tasks.push(runtime.spawn(workload_spawn_blocking_write_file())); + } + 3 => { + tasks.push(runtime.spawn(workload_write_file(i))); + } + id => { + panic!("invalid workload_id {}", id); + } + } + // if i % 100 == 0 { + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // } + } + for task in tasks { + task.await.unwrap(); + } + let elapsed = start.elapsed(); + println!( + "test cpu usage for priority {:?} workload_id {} elapsed {}ms", + priority, + workload_id, + elapsed.as_millis() + ); + stop.store(true, std::sync::atomic::Ordering::SeqCst); + rx.await.unwrap() +} diff --git a/src/common/runtime/src/throttle_priority.rs b/src/common/runtime/src/throttle_priority.rs new file mode 100644 index 000000000000..565b12c04cda --- /dev/null +++ b/src/common/runtime/src/throttle_priority.rs @@ -0,0 +1,54 @@ +use std::time::Duration; + +use crate::ratelimit::Ratelimiter; + +#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] +pub enum Priority { + VeryLow = 0, + Low = 1, + Middle = 2, + High = 3, + VeryHigh = 4, +} + +impl Priority { + pub fn ratelimiter_time(&self) -> Option { + let max = 200000; + let gen_per_10ms = match self { + Priority::VeryLow => Some(8000), + Priority::Low => Some(10000), + Priority::Middle => Some(15000), + Priority::High => Some(20000), + Priority::VeryHigh => None, + }; + if let Some(gen_per_10ms) = gen_per_10ms { + Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // 10ms 生产100个,即10ms内只允许100次poll + .max_tokens(max) // 预留资源,避免突发 + .build() + .unwrap() + .into() + } else { + None + } + } + + pub fn ratelimiter_count(&self) -> Option { + let max = 8000; + let gen_per_10ms = match self { + Priority::VeryLow => Some(2000), + Priority::Low => Some(4000), + Priority::Middle => Some(6000), + Priority::High => Some(8000), + Priority::VeryHigh => None, + }; + if let Some(gen_per_10ms) = gen_per_10ms { + Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // 10ms 生产100个,即10ms内只允许100次poll + .max_tokens(max) // 预留资源,避免突发 + .build() + .unwrap() + .into() + } else { + None + } + } +} diff --git a/src/common/telemetry/Cargo.toml b/src/common/telemetry/Cargo.toml index 20fc52a763f7..dc5544458d0d 100644 --- a/src/common/telemetry/Cargo.toml +++ b/src/common/telemetry/Cargo.toml @@ -25,7 +25,7 @@ opentelemetry = { version = "0.21.0", default-features = false, features = [ opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] } opentelemetry-semantic-conventions = "0.13.0" opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] } -parking_lot = { version = "0.12" } +parking_lot.workspace = true prometheus.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 2e3216d075d0..6d051c2eeeeb 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -42,7 +42,7 @@ humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true once_cell.workspace = true -parking_lot = "0.12" +parking_lot.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b94fa17d44c0..7c9978af7dc9 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -70,7 +70,7 @@ openmetrics-parser = "0.4" # opensrv-mysql = "0.7.0" opensrv-mysql = { git = "https://github.com/datafuselabs/opensrv", rev = "6bbc3b65e6b19212c4f7fc4f40c20daf6f452deb" } opentelemetry-proto.workspace = true -parking_lot = "0.12" +parking_lot.workspace = true pgwire = { version = "0.24.2", default-features = false, features = ["server-api-ring"] } pin-project = "1.0" pipeline.workspace = true