Skip to content

Commit

Permalink
feat: add rate limiter feature
Browse files Browse the repository at this point in the history
Signed-off-by: Wenxuan Zhang <wenxuangm@gmail.com>
  • Loading branch information
wfxr committed Apr 23, 2024
1 parent 1b9a62c commit 3ca6a6e
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ name = "rlt"
path = "src/lib.rs"

[features]
default = ["http"]
default = ["http", "rate_limit"]
http = ["dep:http"]
log = ["dep:log", "dep:tui-logger"]
rate_limit = ["dep:governor"]

[dependencies]
anyhow = "1"
Expand All @@ -46,7 +47,7 @@ tui-logger = { version = "0.11", optional = true }
log = { version = "0.4", optional = true }
cfg-if = "1"
parking_lot = "0.12"
governor = "0.6"
governor = { version = "0.6", optional = true }
nonzero_ext = "0.3"

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub struct BenchCli {
#[clap(long, short = 'd')]
pub duration: Option<humantime::Duration>,

#[cfg(feature = "rate_limit")]
/// Rate limit for benchmarking, in iterations per second (ips)
///
/// When set, benchmark will try to run at the specified rate.
Expand Down Expand Up @@ -154,6 +155,7 @@ impl BenchCli {
concurrency: self.concurrency.get(),
iterations: self.iterations.map(|n| n.get()),
duration: self.duration.map(|d| d.into()),
#[cfg(feature = "rate_limit")]
rate: self.rate,
}
}
Expand Down
13 changes: 11 additions & 2 deletions src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::time::{self, Duration, Instant};
/// A logical clock that can be paused
#[derive(Debug, Clone)]
pub struct Clock {
#[cfg(feature = "rate_limit")]
start: Instant,
inner: Arc<Mutex<InnerClock>>,
}
Expand All @@ -26,7 +27,14 @@ pub(crate) enum Status {
impl Clock {
pub fn start_at(start: Instant) -> Self {
let inner = InnerClock { status: Status::Running(start), elapsed: Duration::default() };
Self { start, inner: Arc::new(Mutex::new(inner)) }

cfg_if::cfg_if! {
if #[cfg(feature = "rate_limit")] {
Self { start, inner: Arc::new(Mutex::new(inner)) }
} else {
Self { inner: Arc::new(Mutex::new(inner)) }
}
}
}

pub fn resume(&mut self) {
Expand Down Expand Up @@ -77,6 +85,7 @@ impl Clock {
}
}

#[cfg(feature = "rate_limit")]
impl governor::clock::Clock for Clock {
type Instant = std::time::Instant;

Expand All @@ -85,7 +94,7 @@ impl governor::clock::Clock for Clock {
self.start.into_std() + elapsed
}
}

#[cfg(feature = "rate_limit")]
impl governor::clock::ReasonablyRealtime for Clock {}

/// A ticker that ticks at a fixed logical interval
Expand Down
24 changes: 15 additions & 9 deletions src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
//! This module defines traits for stateful and stateless benchmark suites.
use anyhow::Result;
use async_trait::async_trait;
use governor::{Quota, RateLimiter};
use nonzero_ext::nonzero;
use std::{
num::NonZeroU32,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -18,6 +15,14 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;

cfg_if::cfg_if! {
if #[cfg(feature = "rate_limit")] {
use std::num::NonZeroU32;
use governor::{Quota, RateLimiter};
use nonzero_ext::nonzero;
}
}

use crate::{
clock::Clock,
// rate_limiter::{self, RateLimiter},
Expand All @@ -39,6 +44,7 @@ pub struct BenchOpts {
/// Duration to run the benchmark.
pub duration: Option<Duration>,

#[cfg(feature = "rate_limit")]
/// Rate limit for benchmarking, in iterations per second (ips).
pub rate: Option<NonZeroU32>,
}
Expand Down Expand Up @@ -144,6 +150,7 @@ where
async fn iteration(&mut self, state: &mut BS::WorkerState, info: &IterInfo) {
self.wait_if_paused().await;
let res = self.suite.bench(state, info).await;

#[cfg(feature = "log")]
if let Err(e) = &res {
log::error!("Error in iteration({info:?}): {:?}", e);
Expand All @@ -157,6 +164,7 @@ where
let concurrency = self.opts.concurrency;
let iterations = self.opts.iterations;

#[cfg(feature = "rate_limit")]
let buckets = self.opts.rate.map(|r| {
let quota = Quota::per_second(r).allow_burst(nonzero!(1u32));
let clock = &self.opts.clock;
Expand All @@ -165,8 +173,9 @@ where

let mut set: JoinSet<Result<()>> = JoinSet::new();
for worker in 0..concurrency {
let mut b = self.clone();
#[cfg(feature = "rate_limit")]
let buckets = buckets.clone();
let mut b = self.clone();
set.spawn(async move {
let mut state = b.suite.state(worker).await?;
let mut info = IterInfo::new(worker);
Expand All @@ -181,6 +190,7 @@ where
}
}

#[cfg(feature = "rate_limit")]
if let Some(buckets) = &buckets {
select! {
biased;
Expand Down Expand Up @@ -214,12 +224,8 @@ where
join_all(&mut set).await
}

fn paused(&self) -> bool {
*self.pause.borrow()
}

async fn wait_if_paused(&mut self) {
while self.paused() {
while *self.pause.borrow() {
if self.pause.changed().await.is_err() {
return;
}
Expand Down

0 comments on commit 3ca6a6e

Please sign in to comment.