Skip to content

Commit

Permalink
windsock: better support external benchers (#1367)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 20, 2023
1 parent dceca4a commit 312f287
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 53 deletions.
2 changes: 1 addition & 1 deletion windsock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod report;
mod tables;

pub use bench::{Bench, BenchParameters, BenchTask, Profiling};
pub use report::{Metric, Report, ReportArchive};
pub use report::{ExternalReport, Metric, OperationsReport, PubSubReport, Report, ReportArchive};
pub use tables::Goal;

use anyhow::{anyhow, Result};
Expand Down
135 changes: 83 additions & 52 deletions windsock/src/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ pub enum Report {
SecondPassed(Duration),
/// contains the time that the test ran for
FinishedIn(Duration),

/// Ignore all other reports and use the ManualReport as the only source of benchmark metrics.
/// Do not use this under normal circumstances.
/// Instead this should only be used if you have an independent benchmarker that you want to call from windsock and include in windsocks results.
ExternalBenchmark(Box<ExternalReport>),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ExternalReport {
pub bench_started_at: OffsetDateTime,
pub operations_report: Option<OperationsReport>,
pub pubsub_report: Option<PubSubReport>,
pub error_messages: Vec<String>,
}

#[derive(EnumIter, EnumCount)]
Expand Down Expand Up @@ -89,7 +102,7 @@ impl Percentile {
}
}

type Percentiles = [Duration; Percentile::COUNT];
pub type Percentiles = [Duration; Percentile::COUNT];

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ReportArchive {
Expand All @@ -103,34 +116,34 @@ pub struct ReportArchive {
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub(crate) struct OperationsReport {
pub(crate) total: u64,
pub(crate) total_errors: u64,
pub(crate) requested_operations_per_second: Option<u64>,
pub(crate) total_operations_per_second: u32,
pub(crate) total_errors_per_second: u32,
pub(crate) mean_time: Duration,
pub(crate) time_percentiles: Percentiles,
pub(crate) total_each_second: Vec<u64>,
pub struct OperationsReport {
pub total: u64,
pub total_errors: u64,
pub requested_operations_per_second: Option<u64>,
pub total_operations_per_second: u32,
pub total_errors_per_second: u32,
pub mean_time: Duration,
pub time_percentiles: Percentiles,
pub total_each_second: Vec<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Default)]
pub(crate) struct PubSubReport {
pub(crate) total_produce: u64,
pub(crate) total_produce_error: u64,
pub(crate) total_consume: u64,
pub(crate) total_consume_error: u64,
pub(crate) total_backlog: i64,
pub(crate) requested_produce_per_second: Option<u64>,
pub(crate) produce_per_second: u32,
pub(crate) produce_errors_per_second: u32,
pub(crate) consume_per_second: u32,
pub(crate) consume_errors_per_second: u32,
pub(crate) produce_mean_time: Duration,
pub(crate) produce_time_percentiles: Percentiles,
pub(crate) produce_each_second: Vec<u64>,
pub(crate) consume_each_second: Vec<u64>,
pub(crate) backlog_each_second: Vec<i64>,
pub struct PubSubReport {
pub total_produce: u64,
pub total_produce_error: u64,
pub total_consume: u64,
pub total_consume_error: u64,
pub total_backlog: i64,
pub requested_produce_per_second: Option<u64>,
pub produce_per_second: u32,
pub produce_errors_per_second: u32,
pub consume_per_second: u32,
pub consume_errors_per_second: u32,
pub produce_mean_time: Duration,
pub produce_time_percentiles: Percentiles,
pub produce_each_second: Vec<u64>,
pub consume_each_second: Vec<u64>,
pub backlog_each_second: Vec<i64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -284,6 +297,7 @@ pub(crate) async fn report_builder(
requested_ops: Option<u64>,
running_in_release: bool,
) -> ReportArchive {
let mut external_report = None;
let mut finished_in = None;
let mut started = None;
let mut pubsub_report = None;
Expand Down Expand Up @@ -387,36 +401,53 @@ pub(crate) async fn report_builder(
std::mem::drop(rx);
break;
}
Report::ExternalBenchmark(report) => {
// immediately drop rx so the benchmarks tasks stop trying to bench, logic doesnt rely on this it just saves resources
std::mem::drop(rx);

external_report = Some(report);
break;
}
}
}
let finished_in = match finished_in {
Some(x) => x,
None => panic!("The bench never returned Report::FinishedIn(..)"),
};

if let Some(report) = operations_report.as_mut() {
report.requested_operations_per_second = requested_ops;
report.mean_time = mean_time(&operation_times, total_operation_time);
report.total_operations_per_second = calculate_ops(report.total, finished_in);
report.total_errors_per_second = calculate_ops(report.total_errors, finished_in);
report.time_percentiles = calculate_percentiles(operation_times);

// This is not a complete result so discard it.
report.total_each_second.pop();
}
if let Some(external_report) = external_report {
started = Some(external_report.bench_started_at);
operations_report = external_report.operations_report;
pubsub_report = external_report.pubsub_report;
error_messages = external_report.error_messages;
} else {
let finished_in = match finished_in {
Some(x) => x,
None => panic!("The bench never returned Report::FinishedIn(..)"),
};

if let Some(report) = operations_report.as_mut() {
report.requested_operations_per_second = requested_ops;
report.mean_time = mean_time(&operation_times, total_operation_time);
report.total_operations_per_second = calculate_ops(report.total, finished_in);
report.total_errors_per_second = calculate_ops(report.total_errors, finished_in);
report.time_percentiles = calculate_percentiles(operation_times);

// This is not a complete result so discard it.
report.total_each_second.pop();
}

if let Some(report) = pubsub_report.as_mut() {
report.requested_produce_per_second = requested_ops;
report.produce_mean_time = mean_time(&produce_times, total_produce_time);
report.produce_per_second = calculate_ops(report.total_produce, finished_in);
report.produce_errors_per_second = calculate_ops(report.total_produce_error, finished_in);
report.consume_per_second = calculate_ops(report.total_consume, finished_in);
report.consume_errors_per_second = calculate_ops(report.total_consume_error, finished_in);
report.produce_time_percentiles = calculate_percentiles(produce_times);

// This is not a complete result so discard it.
report.produce_each_second.pop();
report.consume_each_second.pop();
if let Some(report) = pubsub_report.as_mut() {
report.requested_produce_per_second = requested_ops;
report.produce_mean_time = mean_time(&produce_times, total_produce_time);
report.produce_per_second = calculate_ops(report.total_produce, finished_in);
report.produce_errors_per_second =
calculate_ops(report.total_produce_error, finished_in);
report.consume_per_second = calculate_ops(report.total_consume, finished_in);
report.consume_errors_per_second =
calculate_ops(report.total_consume_error, finished_in);
report.produce_time_percentiles = calculate_percentiles(produce_times);

// This is not a complete result so discard it.
report.produce_each_second.pop();
report.consume_each_second.pop();
}
}

let archive = ReportArchive {
Expand Down

0 comments on commit 312f287

Please sign in to comment.