From 0d897c96bd0f2b27dfbd16b88607b3b0afc8b8c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Sat, 10 Aug 2024 18:41:29 +0200 Subject: [PATCH] Add option --drop-sampling-log to not keep the sampling log Fixes #67 (cherry picked from commit 48f3c8e16731cc30073fd9c960c8de50ba14c1ff) Upd: - Remove the '--drop-sampling-log' new option. It's role is played by the existing '--generate-report' option which has opposite default value and behavior. --- src/config.rs | 15 ++++- src/exec.rs | 4 +- src/main.rs | 6 +- src/report.rs | 166 ++++++++++++++++++++++---------------------------- src/stats.rs | 27 ++++---- 5 files changed, 106 insertions(+), 112 deletions(-) diff --git a/src/config.rs b/src/config.rs index e7a349c..fad46b8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -421,7 +421,20 @@ pub struct RunCommand { #[clap(short('p'), long, default_value = "128", value_name = "COUNT")] pub concurrency: NonZeroUsize, - /// Throughput sampling period, in seconds. + /// Sampling period, in seconds. + /// + /// While running the workload, periodically takes a snapshot of the statistics + /// and records it as a separate data point in the log. At the end, the log gets saved to + /// the final report. The sampling log can be used later for generating plots + /// or HDR histogram logs for further detailed data analysis. + /// + /// Sampling period does not affect the value of the final statistics computed + /// for the whole run. You'll not get more accurate measurements by sampling more frequently + /// (assuming the same total length of the run). + /// + /// The sampling log is used for analyzing throughput fluctuations and the number of samples + /// does affect the accuracy of estimating the throughput error to some degree. + /// The throughput error estimate may be inaccurate if you collect less than 10 samples. #[clap( short('s'), long("sampling"), diff --git a/src/exec.rs b/src/exec.rs index 46d29ce..6f3f287 100644 --- a/src/exec.rs +++ b/src/exec.rs @@ -177,9 +177,9 @@ pub async fn par_execute( name: &str, exec_options: &ExecutionOptions, sampling: Interval, - store_samples: bool, workload: Workload, show_progress: bool, + keep_log: bool, ) -> Result { if exec_options.cycle_range.1 <= exec_options.cycle_range.0 { return Err(LatteError::Configuration(format!( @@ -203,7 +203,7 @@ pub async fn par_execute( let progress = Arc::new(StatusLine::with_options(progress, progress_opts)); let deadline = BoundedCycleCounter::new(exec_options.duration, exec_options.cycle_range); let mut streams = Vec::with_capacity(thread_count); - let mut stats = Recorder::start(rate, concurrency, store_samples); + let mut stats = Recorder::start(rate, concurrency, keep_log); for _ in 0..thread_count { let s = spawn_stream( diff --git a/src/main.rs b/src/main.rs index 0e03c53..0e8c4f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -205,9 +205,9 @@ async fn load(conf: LoadCommand) -> Result<()> { "Loading...", &load_options, config::Interval::Unbounded, - false, loader, !conf.quiet, + false, ) .await?; @@ -268,9 +268,9 @@ async fn run(conf: RunCommand) -> Result<()> { "Warming up...", &warmup_options, Interval::Unbounded, - conf.generate_report, runner.clone()?, !conf.quiet, + false, ) .await?; } @@ -298,9 +298,9 @@ async fn run(conf: RunCommand) -> Result<()> { "Running...", &exec_options, conf.sampling_interval, - conf.generate_report, runner, !conf.quiet, + conf.generate_report, ) .await { diff --git a/src/report.rs b/src/report.rs index 8966395..33ceec7 100644 --- a/src/report.rs +++ b/src/report.rs @@ -656,7 +656,7 @@ impl<'a> Display for BenchmarkCmp<'a> { writeln!(f, "{}", fmt_cmp_header(true))?; } - let mut summary: Vec> = vec![ + let summary: Vec> = vec![ self.line("Elapsed time", "s", |s| { Quantity::from(s.elapsed_time_s).with_precision(3) }), @@ -683,104 +683,84 @@ impl<'a> Display for BenchmarkCmp<'a> { self.line("└─", "row/req", |s| { Quantity::from(s.row_count_per_req).with_precision(1) }), - // self.line("Samples", "", |s| Quantity::from(s.samples_count)), + self.line("Concurrency", "req", |s| { + Quantity::from(s.concurrency).with_precision(0) + }), + self.line("└─", "%", |s| { + Quantity::from(s.concurrency_ratio).with_precision(0) + }), + self.line("Throughput", "op/s", |s| { + Quantity::from(s.cycle_throughput).with_precision(0) + }) + .with_significance(self.cmp_cycle_throughput()) + .with_orientation(1) + .into_box(), + self.line("├─", "req/s", |s| { + Quantity::from(s.req_throughput).with_precision(0) + }) + .with_significance(self.cmp_req_throughput()) + .with_orientation(1) + .into_box(), + self.line("└─", "row/s", |s| { + Quantity::from(s.row_throughput).with_precision(0) + }) + .with_significance(self.cmp_row_throughput()) + .with_orientation(1) + .into_box(), + self.line("Cycle latency", "ms", |s| { + Quantity::from(s.cycle_latency.mean).with_precision(3) + }) + .with_significance(self.cmp_mean_resp_time()) + .with_orientation(-1) + .into_box(), + self.line("Request latency", "ms", |s| { + Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3) + }) + .with_significance(self.cmp_mean_resp_time()) + .with_orientation(-1) + .into_box(), ]; - if self.v1.log.len() > 1 { - let summary_part2: Vec> = vec![ - // self.line("Mean sample size", "op", |s| { - // Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean()) - // .with_precision(0) - // }), - // self.line("└─", "req", |s| { - // Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean()) - // .with_precision(0) - // }), - self.line("Concurrency", "req", |s| { - Quantity::from(s.concurrency).with_precision(0) - }), - self.line("└─", "%", |s| { - Quantity::from(s.concurrency_ratio).with_precision(0) - }), - self.line("Throughput", "op/s", |s| { - Quantity::from(s.cycle_throughput).with_precision(0) - }) - .with_significance(self.cmp_cycle_throughput()) - .with_orientation(1) - .into_box(), - self.line("├─", "req/s", |s| { - Quantity::from(s.req_throughput).with_precision(0) - }) - .with_significance(self.cmp_req_throughput()) - .with_orientation(1) - .into_box(), - self.line("└─", "row/s", |s| { - Quantity::from(s.row_throughput).with_precision(0) - }) - .with_significance(self.cmp_row_throughput()) - .with_orientation(1) - .into_box(), - // self.line("Mean cycle time", "ms", |s| { - self.line("Cycle latency", "ms", |s| { - // Quantity::from(&s.cycle_time_ms).with_precision(3) - Quantity::from(s.cycle_latency.mean).with_precision(3) - }) - .with_significance(self.cmp_mean_resp_time()) - .with_orientation(-1) - .into_box(), - // self.line("Mean resp. time", "ms", |s| { - self.line("Request latency", "ms", |s| { - // Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3) - Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3) - }) - .with_significance(self.cmp_mean_resp_time()) - .with_orientation(-1) - .into_box(), - ]; - summary.extend(summary_part2); - } for l in summary { writeln!(f, "{l}")?; } - if self.v1.log.len() > 1 { - let resp_time_percentiles = [ - Percentile::Min, - Percentile::P25, - Percentile::P50, - Percentile::P75, - Percentile::P90, - Percentile::P95, - Percentile::P98, - Percentile::P99, - Percentile::P99_9, - Percentile::P99_99, - Percentile::Max, - ]; - - for fn_name in self.v1.cycle_latency_by_fn.keys() { - writeln!(f)?; - writeln!( - f, - "{}", - fmt_section_header(format!("CYCLE LATENCY for {fn_name} [ms] ").as_str()) - )?; - if self.v2.is_some() { - writeln!(f, "{}", fmt_cmp_header(true))?; - } + let resp_time_percentiles = [ + Percentile::Min, + Percentile::P25, + Percentile::P50, + Percentile::P75, + Percentile::P90, + Percentile::P95, + Percentile::P98, + Percentile::P99, + Percentile::P99_9, + Percentile::P99_99, + Percentile::Max, + ]; - for p in resp_time_percentiles.iter() { - let l = self - .line(p.name(), "", |s| { - let rt = s - .cycle_latency_by_fn - .get(fn_name) - .map(|l| l.percentiles.get(*p)); - Quantity::from(rt).with_precision(3) - }) - .with_orientation(-1) - .with_significance(self.cmp_resp_time_percentile(*p)); - writeln!(f, "{l}")?; - } + for fn_name in self.v1.cycle_latency_by_fn.keys() { + writeln!(f)?; + writeln!( + f, + "{}", + fmt_section_header(format!("CYCLE LATENCY for {fn_name} [ms] ").as_str()) + )?; + if self.v2.is_some() { + writeln!(f, "{}", fmt_cmp_header(true))?; + } + + for p in resp_time_percentiles.iter() { + let l = self + .line(p.name(), "", |s| { + let rt = s + .cycle_latency_by_fn + .get(fn_name) + .map(|l| l.percentiles.get(*p)); + Quantity::from(rt).with_precision(3) + }) + .with_orientation(-1) + .with_significance(self.cmp_resp_time_percentile(*p)); + writeln!(f, "{l}")?; } } diff --git a/src/stats.rs b/src/stats.rs index 183d878..03f9f53 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -202,7 +202,6 @@ pub struct BenchmarkStats { pub errors_ratio: Option, pub row_count: u64, pub row_count_per_req: Option, - pub samples_count: u64, pub cycle_throughput: Mean, pub cycle_throughput_ratio: Option, pub req_throughput: Mean, @@ -297,17 +296,20 @@ pub struct Recorder { pub request_latency: LatencyDistributionRecorder, pub concurrency_meter: TimeSeriesStats, log: Vec, - store_samples: bool, - samples_counter: u64, rate_limit: Option, concurrency_limit: NonZeroUsize, + keep_log: bool, } impl Recorder { /// Creates a new recorder. /// The `rate_limit` and `concurrency_limit` parameters are used only as the /// reference levels for relative throughput and relative parallelism. - pub fn start(rate_limit: Option, concurrency_limit: NonZeroUsize, store_samples: bool) -> Recorder { + pub fn start( + rate_limit: Option, + concurrency_limit: NonZeroUsize, + keep_log: bool, + ) -> Recorder { let start_time = SystemTime::now(); let start_instant = Instant::now(); Recorder { @@ -318,8 +320,6 @@ impl Recorder { start_cpu_time: ProcessTime::now(), end_cpu_time: ProcessTime::now(), log: Vec::new(), - store_samples: store_samples, - samples_counter: 0, rate_limit, concurrency_limit, cycle_count: 0, @@ -334,6 +334,7 @@ impl Recorder { request_latency: LatencyDistributionRecorder::default(), throughput_meter: ThroughputMeter::default(), concurrency_meter: TimeSeriesStats::default(), + keep_log, } } @@ -365,13 +366,10 @@ impl Recorder { if self.errors.len() < MAX_KEPT_ERRORS { self.errors.extend(sample.req_errors.iter().cloned()); } - - if self.store_samples || self.log.is_empty() { - self.log.push(sample); - } else { - self.log[0] = sample; + if !self.keep_log { + self.log.clear(); } - self.samples_counter += 1; + self.log.push(sample); self.log.last().unwrap() } @@ -396,6 +394,10 @@ impl Recorder { let concurrency = self.concurrency_meter.mean(); let concurrency_ratio = 100.0 * concurrency.value / self.concurrency_limit.get() as f64; + if !self.keep_log { + self.log.clear(); + } + BenchmarkStats { start_time: self.start_time.into(), end_time: self.end_time.into(), @@ -414,7 +416,6 @@ impl Recorder { requests_per_cycle: self.request_count as f64 / self.cycle_count as f64, row_count: self.row_count, row_count_per_req: not_nan(self.row_count as f64 / self.request_count as f64), - samples_count: self.samples_counter, cycle_throughput, cycle_throughput_ratio, req_throughput,