Skip to content

Commit

Permalink
Add option --drop-sampling-log to not keep the sampling log
Browse files Browse the repository at this point in the history
Fixes pkolaczk#67

(cherry picked from commit 48f3c8e)

Upd:
- Remove the '--drop-sampling-log' new option.
  It's role is played by the existing '--generate-report' option
  which has opposite default value and behavior.
  • Loading branch information
pkolaczk authored and vponomaryov committed Oct 28, 2024
1 parent 826270b commit 0d897c9
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 112 deletions.
15 changes: 14 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,20 @@ pub struct RunCommand {
#[clap(short('p'), long, default_value = "128", value_name = "COUNT")]
pub concurrency: NonZeroUsize,

/// Throughput sampling period, in seconds.
/// Sampling period, in seconds.
///
/// While running the workload, periodically takes a snapshot of the statistics
/// and records it as a separate data point in the log. At the end, the log gets saved to
/// the final report. The sampling log can be used later for generating plots
/// or HDR histogram logs for further detailed data analysis.
///
/// Sampling period does not affect the value of the final statistics computed
/// for the whole run. You'll not get more accurate measurements by sampling more frequently
/// (assuming the same total length of the run).
///
/// The sampling log is used for analyzing throughput fluctuations and the number of samples
/// does affect the accuracy of estimating the throughput error to some degree.
/// The throughput error estimate may be inaccurate if you collect less than 10 samples.
#[clap(
short('s'),
long("sampling"),
Expand Down
4 changes: 2 additions & 2 deletions src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ pub async fn par_execute(
name: &str,
exec_options: &ExecutionOptions,
sampling: Interval,
store_samples: bool,
workload: Workload,
show_progress: bool,
keep_log: bool,
) -> Result<BenchmarkStats> {
if exec_options.cycle_range.1 <= exec_options.cycle_range.0 {
return Err(LatteError::Configuration(format!(
Expand All @@ -203,7 +203,7 @@ pub async fn par_execute(
let progress = Arc::new(StatusLine::with_options(progress, progress_opts));
let deadline = BoundedCycleCounter::new(exec_options.duration, exec_options.cycle_range);
let mut streams = Vec::with_capacity(thread_count);
let mut stats = Recorder::start(rate, concurrency, store_samples);
let mut stats = Recorder::start(rate, concurrency, keep_log);

for _ in 0..thread_count {
let s = spawn_stream(
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,9 @@ async fn load(conf: LoadCommand) -> Result<()> {
"Loading...",
&load_options,
config::Interval::Unbounded,
false,
loader,
!conf.quiet,
false,
)
.await?;

Expand Down Expand Up @@ -268,9 +268,9 @@ async fn run(conf: RunCommand) -> Result<()> {
"Warming up...",
&warmup_options,
Interval::Unbounded,
conf.generate_report,
runner.clone()?,
!conf.quiet,
false,
)
.await?;
}
Expand Down Expand Up @@ -298,9 +298,9 @@ async fn run(conf: RunCommand) -> Result<()> {
"Running...",
&exec_options,
conf.sampling_interval,
conf.generate_report,
runner,
!conf.quiet,
conf.generate_report,
)
.await
{
Expand Down
166 changes: 73 additions & 93 deletions src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ impl<'a> Display for BenchmarkCmp<'a> {
writeln!(f, "{}", fmt_cmp_header(true))?;
}

let mut summary: Vec<Box<dyn Display>> = vec![
let summary: Vec<Box<dyn Display>> = vec![
self.line("Elapsed time", "s", |s| {
Quantity::from(s.elapsed_time_s).with_precision(3)
}),
Expand All @@ -683,104 +683,84 @@ impl<'a> Display for BenchmarkCmp<'a> {
self.line("└─", "row/req", |s| {
Quantity::from(s.row_count_per_req).with_precision(1)
}),
// self.line("Samples", "", |s| Quantity::from(s.samples_count)),
self.line("Concurrency", "req", |s| {
Quantity::from(s.concurrency).with_precision(0)
}),
self.line("└─", "%", |s| {
Quantity::from(s.concurrency_ratio).with_precision(0)
}),
self.line("Throughput", "op/s", |s| {
Quantity::from(s.cycle_throughput).with_precision(0)
})
.with_significance(self.cmp_cycle_throughput())
.with_orientation(1)
.into_box(),
self.line("├─", "req/s", |s| {
Quantity::from(s.req_throughput).with_precision(0)
})
.with_significance(self.cmp_req_throughput())
.with_orientation(1)
.into_box(),
self.line("└─", "row/s", |s| {
Quantity::from(s.row_throughput).with_precision(0)
})
.with_significance(self.cmp_row_throughput())
.with_orientation(1)
.into_box(),
self.line("Cycle latency", "ms", |s| {
Quantity::from(s.cycle_latency.mean).with_precision(3)
})
.with_significance(self.cmp_mean_resp_time())
.with_orientation(-1)
.into_box(),
self.line("Request latency", "ms", |s| {
Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3)
})
.with_significance(self.cmp_mean_resp_time())
.with_orientation(-1)
.into_box(),
];
if self.v1.log.len() > 1 {
let summary_part2: Vec<Box<dyn Display>> = vec![
// self.line("Mean sample size", "op", |s| {
// Quantity::from(s.log.iter().map(|s| s.cycle_count as f64).mean())
// .with_precision(0)
// }),
// self.line("└─", "req", |s| {
// Quantity::from(s.log.iter().map(|s| s.request_count as f64).mean())
// .with_precision(0)
// }),
self.line("Concurrency", "req", |s| {
Quantity::from(s.concurrency).with_precision(0)
}),
self.line("└─", "%", |s| {
Quantity::from(s.concurrency_ratio).with_precision(0)
}),
self.line("Throughput", "op/s", |s| {
Quantity::from(s.cycle_throughput).with_precision(0)
})
.with_significance(self.cmp_cycle_throughput())
.with_orientation(1)
.into_box(),
self.line("├─", "req/s", |s| {
Quantity::from(s.req_throughput).with_precision(0)
})
.with_significance(self.cmp_req_throughput())
.with_orientation(1)
.into_box(),
self.line("└─", "row/s", |s| {
Quantity::from(s.row_throughput).with_precision(0)
})
.with_significance(self.cmp_row_throughput())
.with_orientation(1)
.into_box(),
// self.line("Mean cycle time", "ms", |s| {
self.line("Cycle latency", "ms", |s| {
// Quantity::from(&s.cycle_time_ms).with_precision(3)
Quantity::from(s.cycle_latency.mean).with_precision(3)
})
.with_significance(self.cmp_mean_resp_time())
.with_orientation(-1)
.into_box(),
// self.line("Mean resp. time", "ms", |s| {
self.line("Request latency", "ms", |s| {
// Quantity::from(s.resp_time_ms.as_ref().map(|rt| rt.mean)).with_precision(3)
Quantity::from(s.request_latency.as_ref().map(|rt| rt.mean)).with_precision(3)
})
.with_significance(self.cmp_mean_resp_time())
.with_orientation(-1)
.into_box(),
];
summary.extend(summary_part2);
}
for l in summary {
writeln!(f, "{l}")?;
}

if self.v1.log.len() > 1 {
let resp_time_percentiles = [
Percentile::Min,
Percentile::P25,
Percentile::P50,
Percentile::P75,
Percentile::P90,
Percentile::P95,
Percentile::P98,
Percentile::P99,
Percentile::P99_9,
Percentile::P99_99,
Percentile::Max,
];

for fn_name in self.v1.cycle_latency_by_fn.keys() {
writeln!(f)?;
writeln!(
f,
"{}",
fmt_section_header(format!("CYCLE LATENCY for {fn_name} [ms] ").as_str())
)?;
if self.v2.is_some() {
writeln!(f, "{}", fmt_cmp_header(true))?;
}
let resp_time_percentiles = [
Percentile::Min,
Percentile::P25,
Percentile::P50,
Percentile::P75,
Percentile::P90,
Percentile::P95,
Percentile::P98,
Percentile::P99,
Percentile::P99_9,
Percentile::P99_99,
Percentile::Max,
];

for p in resp_time_percentiles.iter() {
let l = self
.line(p.name(), "", |s| {
let rt = s
.cycle_latency_by_fn
.get(fn_name)
.map(|l| l.percentiles.get(*p));
Quantity::from(rt).with_precision(3)
})
.with_orientation(-1)
.with_significance(self.cmp_resp_time_percentile(*p));
writeln!(f, "{l}")?;
}
for fn_name in self.v1.cycle_latency_by_fn.keys() {
writeln!(f)?;
writeln!(
f,
"{}",
fmt_section_header(format!("CYCLE LATENCY for {fn_name} [ms] ").as_str())
)?;
if self.v2.is_some() {
writeln!(f, "{}", fmt_cmp_header(true))?;
}

for p in resp_time_percentiles.iter() {
let l = self
.line(p.name(), "", |s| {
let rt = s
.cycle_latency_by_fn
.get(fn_name)
.map(|l| l.percentiles.get(*p));
Quantity::from(rt).with_precision(3)
})
.with_orientation(-1)
.with_significance(self.cmp_resp_time_percentile(*p));
writeln!(f, "{l}")?;
}
}

Expand Down
27 changes: 14 additions & 13 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ pub struct BenchmarkStats {
pub errors_ratio: Option<f64>,
pub row_count: u64,
pub row_count_per_req: Option<f64>,
pub samples_count: u64,
pub cycle_throughput: Mean,
pub cycle_throughput_ratio: Option<f64>,
pub req_throughput: Mean,
Expand Down Expand Up @@ -297,17 +296,20 @@ pub struct Recorder {
pub request_latency: LatencyDistributionRecorder,
pub concurrency_meter: TimeSeriesStats,
log: Vec<Sample>,
store_samples: bool,
samples_counter: u64,
rate_limit: Option<f64>,
concurrency_limit: NonZeroUsize,
keep_log: bool,
}

impl Recorder {
/// Creates a new recorder.
/// The `rate_limit` and `concurrency_limit` parameters are used only as the
/// reference levels for relative throughput and relative parallelism.
pub fn start(rate_limit: Option<f64>, concurrency_limit: NonZeroUsize, store_samples: bool) -> Recorder {
pub fn start(
rate_limit: Option<f64>,
concurrency_limit: NonZeroUsize,
keep_log: bool,
) -> Recorder {
let start_time = SystemTime::now();
let start_instant = Instant::now();
Recorder {
Expand All @@ -318,8 +320,6 @@ impl Recorder {
start_cpu_time: ProcessTime::now(),
end_cpu_time: ProcessTime::now(),
log: Vec::new(),
store_samples: store_samples,
samples_counter: 0,
rate_limit,
concurrency_limit,
cycle_count: 0,
Expand All @@ -334,6 +334,7 @@ impl Recorder {
request_latency: LatencyDistributionRecorder::default(),
throughput_meter: ThroughputMeter::default(),
concurrency_meter: TimeSeriesStats::default(),
keep_log,
}
}

Expand Down Expand Up @@ -365,13 +366,10 @@ impl Recorder {
if self.errors.len() < MAX_KEPT_ERRORS {
self.errors.extend(sample.req_errors.iter().cloned());
}

if self.store_samples || self.log.is_empty() {
self.log.push(sample);
} else {
self.log[0] = sample;
if !self.keep_log {
self.log.clear();
}
self.samples_counter += 1;
self.log.push(sample);
self.log.last().unwrap()
}

Expand All @@ -396,6 +394,10 @@ impl Recorder {
let concurrency = self.concurrency_meter.mean();
let concurrency_ratio = 100.0 * concurrency.value / self.concurrency_limit.get() as f64;

if !self.keep_log {
self.log.clear();
}

BenchmarkStats {
start_time: self.start_time.into(),
end_time: self.end_time.into(),
Expand All @@ -414,7 +416,6 @@ impl Recorder {
requests_per_cycle: self.request_count as f64 / self.cycle_count as f64,
row_count: self.row_count,
row_count_per_req: not_nan(self.row_count as f64 / self.request_count as f64),
samples_count: self.samples_counter,
cycle_throughput,
cycle_throughput_ratio,
req_throughput,
Expand Down

0 comments on commit 0d897c9

Please sign in to comment.