diff --git a/Cargo.lock b/Cargo.lock index f176072..d0c131e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,6 +1512,7 @@ dependencies = [ "hyper-util", "itertools", "log", + "parking_lot", "ratatui", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index 76ba71f..66abbe8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ http = { version = "1.1", optional = true } tui-logger = { version = "0.11", optional = true } log = { version = "0.4", optional = true } cfg-if = "1" +parking_lot = "0.12" [dev-dependencies] tokio = { version = "1.36", features = ["rt-multi-thread"] } diff --git a/src/cli.rs b/src/cli.rs index 5e20896..df756d0 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -92,6 +92,7 @@ use tokio::{ use tokio_util::sync::CancellationToken; use crate::{ + clock::Clock, collector::{ReportCollector, SilentCollector, TuiCollector}, reporter::{BenchReporter, JsonReporter, TextReporter}, runner::{BenchOpts, BenchSuite, Runner}, @@ -144,9 +145,9 @@ pub struct BenchCli { } impl BenchCli { - pub(crate) fn bench_opts(&self, start: Instant) -> BenchOpts { + pub(crate) fn bench_opts(&self, clock: Clock) -> BenchOpts { BenchOpts { - start, + clock, concurrency: self.concurrency, iterations: self.iterations, duration: self.duration.map(|d| d.into()), @@ -193,8 +194,8 @@ where let (pause_tx, pause_rx) = watch::channel(false); let cancel = CancellationToken::new(); - let opts = cli.bench_opts(Instant::now()); - let runner = Runner::new(bench_suite, opts, res_tx, pause_rx, cancel.clone()); + let opts = cli.bench_opts(Clock::start_at(Instant::now())); + let runner = Runner::new(bench_suite, opts.clone(), res_tx, pause_rx, cancel.clone()); let mut collector: Box = match cli.collector() { Collector::Tui => Box::new(TuiCollector::new(opts, cli.fps, res_rx, pause_tx, cancel)?), diff --git a/src/clock.rs b/src/clock.rs new file mode 100644 index 0000000..90c646f --- /dev/null +++ b/src/clock.rs @@ -0,0 +1,99 @@ +use std::sync::Arc; + +use parking_lot::Mutex; +use tokio::time::{self, Duration, Instant}; + +/// A logical clock that can be paused +#[derive(Debug, Clone, Default)] +pub struct Clock { + inner: Arc>, +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct InnerClock { + status: Status, + elapsed: Duration, +} + +#[derive(Debug, Clone, Copy, Default)] +pub(crate) enum Status { + #[default] + Paused, + Running(Instant), +} + +impl Clock { + pub fn start_at(instant: Instant) -> Self { + let inner = InnerClock { + status: Status::Running(instant), + elapsed: Duration::default(), + }; + Self { inner: Arc::new(Mutex::new(inner)) } + } + + pub fn resume(&mut self) { + let mut inner = self.inner.lock(); + if let Status::Paused = inner.status { + inner.status = Status::Running(Instant::now()); + } + } + + pub fn pause(&mut self) { + let mut inner = self.inner.lock(); + if let Status::Running(checkpoint) = inner.status { + inner.elapsed += checkpoint.elapsed(); + inner.status = Status::Paused; + } + } + + pub fn elapsed(&self) -> Duration { + let inner = self.inner.lock(); + match inner.status { + Status::Paused => inner.elapsed, + Status::Running(checkpoint) => inner.elapsed + checkpoint.elapsed(), + } + } + + pub async fn sleep(&self, mut duration: Duration) { + let wake_time = self.elapsed() + duration; + loop { + time::sleep(duration).await; + let elapsed = self.elapsed(); + if elapsed >= wake_time { + break; + } + duration = wake_time - elapsed; + } + } + + async fn sleep_until(&self, deadline: Duration) { + let now = self.elapsed(); + if deadline <= now { + return; + } + self.sleep(deadline - now).await; + } + + pub fn ticker(&self, duration: Duration) -> Ticker { + Ticker::new(self.clone(), duration) + } +} + +/// A ticker that ticks at a fixed logical interval +#[derive(Debug, Clone)] +pub struct Ticker { + clock: Clock, + interval: Duration, + next_tick: Duration, +} + +impl Ticker { + pub fn new(clock: Clock, duration: Duration) -> Self { + Self { clock, interval: duration, next_tick: duration } + } + + pub async fn tick(&mut self) { + self.clock.sleep_until(self.next_tick).await; + self.next_tick += self.interval; + } +} diff --git a/src/collector/silent.rs b/src/collector/silent.rs index 28aa577..26b49fc 100644 --- a/src/collector/silent.rs +++ b/src/collector/silent.rs @@ -54,7 +54,7 @@ impl super::ReportCollector for SilentCollector { } } - let elapsed = self.bench_opts.start.elapsed(); + let elapsed = self.bench_opts.clock.elapsed(); let concurrency = self.bench_opts.concurrency; Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed }) } diff --git a/src/collector/tui.rs b/src/collector/tui.rs index a36084b..8644310 100644 --- a/src/collector/tui.rs +++ b/src/collector/tui.rs @@ -40,6 +40,8 @@ use crate::{ util::{IntoAdjustedByte, TryIntoAdjustedByte}, }; +const SECOND: Duration = Duration::from_secs(1); + /// A report collector with real-time TUI support. pub struct TuiCollector { /// The benchmark options. @@ -127,16 +129,16 @@ impl ReportCollector for TuiCollector { let mut current_tw = TimeWindow::Second; let mut auto_tw = true; - let start = self.bench_opts.start; + let mut clock = self.bench_opts.clock.clone(); + + let mut latest_iters = RotateWindowGroup::new(60); + let mut latest_iters_ticker = clock.ticker(SECOND); - let mut latest_iters = RotateWindowGroup::new(start, 60); - const SECOND: Duration = Duration::from_secs(1); - let mut latest_iters_timer = tokio::time::interval_at(start + SECOND, SECOND); - latest_iters_timer.set_missed_tick_behavior(MissedTickBehavior::Burst); + let mut latest_stats = RotateDiffWindowGroup::new(self.fps); + let mut latest_stats_ticker = clock.ticker(SECOND / self.fps as u32); - let mut latest_stats = RotateDiffWindowGroup::new(start, self.fps); - let mut refresh_timer = tokio::time::interval(Duration::from_secs(1) / self.fps as u32); - refresh_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); + let mut ui_ticker = tokio::time::interval(SECOND / self.fps as u32); + ui_ticker.set_missed_tick_behavior(MissedTickBehavior::Burst); #[cfg(feature = "log")] let mut show_logs = false; @@ -146,9 +148,7 @@ impl ReportCollector for TuiCollector { loop { tokio::select! { biased; - t = refresh_timer.tick() => { - latest_stats.rotate(t, &stats); - + _ = ui_ticker.tick() => { while crossterm::event::poll(Duration::from_secs(0))? { use KeyCode::*; if let Event::Key(KeyEvent { code, modifiers, .. }) = crossterm::event::read()? { @@ -167,8 +167,12 @@ impl ReportCollector for TuiCollector { break 'outer; } (Char('p') | Pause, _) => { - // TODO: pause logical time instead of real time let pause = !*self.pause.borrow(); + if pause { + clock.pause(); + } else { + clock.resume(); + } self.pause.send_replace(pause); } #[cfg(feature = "log")] @@ -195,16 +199,20 @@ impl ReportCollector for TuiCollector { } } - elapsed = t - start; - current_tw = if auto_tw && !*self.pause.borrow() { + elapsed = clock.elapsed(); + current_tw = if auto_tw { *TimeWindow::variants().iter().rfind(|&&ts| elapsed > ts.into()).unwrap_or(&TimeWindow::Second) } else { current_tw }; break; } - t = latest_iters_timer.tick() => { - latest_iters.rotate(t); + _ = latest_stats_ticker.tick() => { + latest_stats.rotate(&stats); + continue; + } + _ = latest_iters_ticker.tick() => { + latest_iters.rotate(); continue; } r = self.res_rx.recv() => match r { @@ -271,7 +279,7 @@ impl ReportCollector for TuiCollector { })?; } - let elapsed = start.elapsed(); + let elapsed = clock.elapsed(); let concurrency = self.bench_opts.concurrency; Ok(BenchReport { concurrency, hist, stats, status_dist, error_dist, elapsed }) } diff --git a/src/lib.rs b/src/lib.rs index 6d7a1ed..b90e747 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,7 @@ //! Stateful bench is also supported, see the [examples/http_reqwest](https://github.com/wfxr/rlt/blob/main/examples/http_reqwest.rs). #![deny(missing_docs)] +mod clock; mod duration; mod histogram; mod report; diff --git a/src/runner.rs b/src/runner.rs index e1d4173..104cd61 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -12,17 +12,17 @@ use tokio::{ select, sync::{mpsc, watch}, task::JoinSet, - time::{sleep_until, Instant, MissedTickBehavior}, + time::MissedTickBehavior, }; use tokio_util::sync::CancellationToken; -use crate::report::IterReport; +use crate::{clock::Clock, report::IterReport}; /// Core options for the benchmark runner. -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub struct BenchOpts { /// Start time of the benchmark. - pub start: Instant, + pub clock: Clock, /// Number of concurrent workers. pub concurrency: u32, @@ -37,12 +37,6 @@ pub struct BenchOpts { pub rate: Option, } -impl BenchOpts { - pub(crate) fn endtime(&self) -> Option { - self.duration.map(|d| self.start + d) - } -} - /// A trait for benchmark suites. #[async_trait] pub trait BenchSuite: Clone { @@ -164,7 +158,6 @@ where async fn bench(self) -> Result<()> { let concurrency = self.opts.concurrency; let iterations = self.opts.iterations; - let endtime = self.opts.endtime(); let mut set: JoinSet> = JoinSet::new(); for worker in 0..concurrency { @@ -194,10 +187,10 @@ where }); } - if let Some(t) = endtime { + if let Some(t) = self.opts.duration { select! { _ = self.cancel.cancelled() => (), - _ = sleep_until(t) => self.cancel.cancel(), + _ = self.opts.clock.sleep(t) => self.cancel.cancel(), _ = join_all(&mut set) => (), } }; @@ -209,7 +202,8 @@ where async fn bench_with_rate(self, rate: u32) -> Result<()> { let concurrency = self.opts.concurrency; let iterations = self.opts.iterations; - let endtime = self.opts.endtime(); + let clock = self.opts.clock.clone(); + let duration = self.opts.duration; let (tx, rx) = flume::bounded(self.opts.concurrency as usize); let b = self.clone(); @@ -218,14 +212,14 @@ where timer.set_missed_tick_behavior(MissedTickBehavior::Burst); let mut iter = 0; loop { - let t = timer.tick().await; + timer.tick().await; if b.paused() { match b.cancel.is_cancelled() { false => continue, true => break, } } - if matches!(endtime, Some(endtime) if t >= endtime) { + if matches!(duration, Some(duration) if clock.elapsed() >= duration) { break; } if matches!(iterations, Some(iterations) if iter >= iterations) { diff --git a/src/stats/window.rs b/src/stats/window.rs index 58f6cfa..2a72305 100644 --- a/src/stats/window.rs +++ b/src/stats/window.rs @@ -1,9 +1,6 @@ -use std::{ - collections::VecDeque, - iter::{once, repeat_with}, -}; +use std::collections::VecDeque; -use tokio::time::{Duration, Instant}; +use tokio::time::Duration; use crate::report::IterReport; @@ -34,10 +31,6 @@ impl RotateWindow { self.buckets.push_front(bucket); } - fn rotate_multi(&mut self, buckets: impl Iterator) { - buckets.for_each(|bucket| self.rotate(bucket)); - } - fn len(&self) -> usize { self.buckets.len() } @@ -58,7 +51,7 @@ impl RotateWindow { } pub struct RotateWindowGroup { - frame: Instant, + pub counter: u64, pub stats_by_sec: RotateWindow, pub stats_by_10sec: RotateWindow, pub stats_by_min: RotateWindow, @@ -66,9 +59,9 @@ pub struct RotateWindowGroup { } impl RotateWindowGroup { - pub fn new(frame: Instant, buckets: usize) -> Self { + pub fn new(buckets: usize) -> Self { Self { - frame, + counter: 0, stats_by_sec: RotateWindow::new(buckets), stats_by_10sec: RotateWindow::new(buckets), stats_by_min: RotateWindow::new(buckets), @@ -83,26 +76,22 @@ impl RotateWindowGroup { self.stats_by_10min.push(stats); } - pub fn rotate(&mut self, now: Instant) { - let duration = now - self.frame; - if duration.as_secs() == 0 { - return; - } + pub fn rotate(&mut self) { + self.counter += 1; self.stats_by_sec.rotate(IterStats::new()); - if duration.as_secs() % 10 == 0 { + if self.counter % 10 == 0 { self.stats_by_10sec.rotate(IterStats::new()); } - if duration.as_secs() % 60 == 0 { + if self.counter % 60 == 0 { self.stats_by_min.rotate(IterStats::new()); } - if duration.as_secs() % 600 == 0 { + if self.counter % 600 == 0 { self.stats_by_10min.rotate(IterStats::new()); } } } pub struct RotateDiffWindowGroup { - frame: Instant, interval: Duration, stats_last_sec: RotateWindow, stats_last_10sec: RotateWindow, @@ -119,34 +108,24 @@ impl RotateDiffWindowGroup { &mut self.stats_last_10min, ] } - pub fn new(frame: Instant, fps: u8) -> Self { + pub fn new(fps: u8) -> Self { let fps = fps as usize; let interval = Duration::from_secs_f64(1.0 / fps as f64); - let frame = frame - interval; let mut group = Self { - frame, interval, stats_last_sec: RotateWindow::new(fps + 1), stats_last_10sec: RotateWindow::new(fps * 10 + 1), stats_last_min: RotateWindow::new(fps * 60 + 1), stats_last_10min: RotateWindow::new(fps * 600 + 1), }; - group.rotate(frame, &IterStats::new()); + group.rotate(&IterStats::new()); group } - pub fn rotate(&mut self, next_frame: Instant, stats: &IterStats) { - if next_frame < self.frame + self.interval { - return; - } - - let duration = next_frame - self.frame; - let frames = (duration.as_millis() / self.interval.as_millis()) as usize; - let buckets = repeat_with(IterStats::new).take(frames - 1).chain(once(stats.clone())); + pub fn rotate(&mut self, stats: &IterStats) { for s in self.all_stats().iter_mut() { - s.rotate_multi(buckets.clone()); + s.rotate(stats.clone()); } - self.frame += self.interval * frames as u32; } pub fn stats_last_sec(&self) -> (IterStats, Duration) {