diff --git a/.github/workflows/windsock_benches.yaml b/.github/workflows/windsock_benches.yaml index a3595f3cf..b2e5cc5e1 100644 --- a/.github/workflows/windsock_benches.yaml +++ b/.github/workflows/windsock_benches.yaml @@ -41,9 +41,6 @@ jobs: cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers samply name=cassandra,compression=none,connection_count=1,driver=scylla,operation=read_i64,protocol=v4,shotover=standard,topology=single cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers sys_monitor name=kafka,shotover=standard,size=1B,topology=single cargo windsock local-run --bench-length-seconds 5 --operations-per-second 100 --profilers shotover_metrics name=redis,encryption=none,operation=get,shotover=standard,topology=single - - # windsock/examples/cassandra.rs - this can stay here until windsock is moved to its own repo - cargo run --release --example cassandra -- local-run --bench-length-seconds 5 --operations-per-second 100 - name: Ensure that tests did not create or modify any files that arent .gitignore'd run: | if [ -n "$(git status --porcelain)" ]; then diff --git a/Cargo.lock b/Cargo.lock index cc6efe129..c559f0b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5639,6 +5639,8 @@ checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8" [[package]] name = "windsock" version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5730a5bb3421d2422b41fc14e51e8dd191294f3de9eeccc6568eebf9644ee027" dependencies = [ "anyhow", "async-trait", @@ -5646,8 +5648,6 @@ dependencies = [ "clap", "console", "copy_dir", - "docker-compose-runner", - "scylla", "serde", "strum 0.26.1", "time", diff --git a/Cargo.toml b/Cargo.toml index 75519017b..8f60f193d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,5 @@ [workspace] members = [ - "windsock", "shotover", "shotover-proxy", "test-helpers", diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index c36e4d72f..7a7ee8d94 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -47,7 +47,7 @@ tokio-bin-process.workspace = true rustls-pemfile = "2.0.0" rustls-pki-types = "1.1.0" aws-throwaway.workspace = true -windsock = { path = "../windsock" } +windsock = "0.1.0" regex = "1.7.0" cql-ws = { git = "https://github.com/shotover/cql-ws" } opensearch = "2.1.0" diff --git a/windsock/Cargo.toml b/windsock/Cargo.toml deleted file mode 100644 index 178da7444..000000000 --- a/windsock/Cargo.toml +++ /dev/null @@ -1,23 +0,0 @@ -[package] -name = "windsock" -version = "0.1.0" -edition = "2021" -license = "Apache-2.0" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow.workspace = true -async-trait = "0.1.68" -bincode.workspace = true -clap.workspace = true -console = "0.15.5" -copy_dir = "0.1.2" -serde = { workspace = true, features = ["derive"] } -strum = { version = "0.26.0", features = ["derive"] } -time = { version = "0.3.25", features = ["serde"] } -tokio.workspace = true - -[dev-dependencies] -scylla = { version = "0.12.0", features = ["ssl"] } -docker-compose-runner = "0.3.0" diff --git a/windsock/build.rs b/windsock/build.rs deleted file mode 100644 index a566c2b4c..000000000 --- a/windsock/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -use std::env; - -fn main() { - let profile = env::var("PROFILE").unwrap(); - println!("cargo:rustc-env=PROFILE={profile}"); - println!("cargo:rerun-if-changed=build.rs"); -} diff --git a/windsock/examples/cassandra-docker-compose.yaml b/windsock/examples/cassandra-docker-compose.yaml deleted file mode 100644 index 6993ff13c..000000000 --- a/windsock/examples/cassandra-docker-compose.yaml +++ /dev/null @@ -1,26 +0,0 @@ -version: "3.3" - -networks: - cassandra_subnet: - name: cassandra_subnet - driver: bridge - ipam: - driver: default - config: - - subnet: 172.16.1.0/24 - gateway: 172.16.1.1 - -services: - cassandra-one: - image: bitnami/cassandra:4.0.6 - networks: - cassandra_subnet: - ipv4_address: 172.16.1.2 - environment: - &environment - MAX_HEAP_SIZE: "400M" - MIN_HEAP_SIZE: "400M" - HEAP_NEWSIZE: "48M" - volumes: - - type: tmpfs - target: /var/lib/cassandra diff --git a/windsock/examples/cassandra.rs b/windsock/examples/cassandra.rs deleted file mode 100644 index 9b28183d6..000000000 --- a/windsock/examples/cassandra.rs +++ /dev/null @@ -1,163 +0,0 @@ -use anyhow::Result; -use async_trait::async_trait; -use docker_compose_runner::{DockerCompose, Image}; -use scylla::SessionBuilder; -use scylla::{transport::Compression, Session}; -use std::{ - collections::HashMap, - path::Path, - sync::Arc, - time::{Duration, Instant}, -}; -use tokio::sync::mpsc::UnboundedSender; -use windsock::cloud::NoCloud; -use windsock::{Bench, BenchParameters, BenchTask, Profiling, Report, Windsock}; - -fn main() { - set_working_dir(); - Windsock::new( - vec![ - Box::new(CassandraBench::new(Some(Compression::Lz4))), - Box::new(CassandraBench::new(None)), - ], - NoCloud::new_boxed(), - &["release"], - ) - .run(); -} - -struct CassandraBench { - compression: Option, -} - -impl CassandraBench { - fn new(compression: Option) -> Self { - CassandraBench { compression } - } -} - -#[async_trait] -impl Bench for CassandraBench { - type CloudResourcesRequired = (); - type CloudResources = (); - fn tags(&self) -> HashMap { - [ - ("name".to_owned(), "cassandra".to_owned()), - ("topology".to_owned(), "single".to_owned()), - ("message_type".to_owned(), "write1000bytes".to_owned()), - ( - "compression".to_owned(), - match &self.compression { - Some(Compression::Lz4) => "LZ4".to_owned(), - Some(Compression::Snappy) => "Snappy".to_owned(), - None => "None".to_owned(), - }, - ), - ] - .into_iter() - .collect() - } - - async fn orchestrate_cloud( - &self, - _resources: (), - _running_in_release: bool, - _profiling: Profiling, - _bench_parameters: BenchParameters, - ) -> Result<()> { - todo!() - } - - async fn orchestrate_local( - &self, - _running_in_release: bool, - _profiling: Profiling, - parameters: BenchParameters, - ) -> Result<()> { - let _docker_compose = docker_compose("examples/cassandra-docker-compose.yaml"); - let address = "127.0.0.1:9042"; - - self.execute_run(address, ¶meters).await; - - Ok(()) - } - - async fn run_bencher( - &self, - _resources: &str, - parameters: BenchParameters, - reporter: UnboundedSender, - ) { - let session = Arc::new( - SessionBuilder::new() - .known_nodes(["172.16.1.2:9042"]) - // We do not need to refresh metadata as there is nothing else fiddling with the topology or schema. - // By default the metadata refreshes every 60s and that can cause performance issues so we disable it by using an absurdly high refresh interval - .cluster_metadata_refresh_interval(Duration::from_secs(10000000000)) - .user("cassandra", "cassandra") - .compression(self.compression) - .build() - .await - .unwrap(), - ); - - let tasks = BenchTaskCassandra { session } - .spawn_tasks(reporter.clone(), parameters.operations_per_second) - .await; - - let start = Instant::now(); - reporter.send(Report::Start).unwrap(); - - for _ in 0..parameters.runtime_seconds { - let second = Instant::now(); - tokio::time::sleep(Duration::from_secs(1)).await; - reporter - .send(Report::SecondPassed(second.elapsed())) - .unwrap(); - } - - reporter.send(Report::FinishedIn(start.elapsed())).unwrap(); - - // make sure the tasks complete before we drop the database they are connecting to - for task in tasks { - task.await.unwrap(); - } - } -} - -#[derive(Clone)] -struct BenchTaskCassandra { - session: Arc, -} - -#[async_trait] -impl BenchTask for BenchTaskCassandra { - async fn run_one_operation(&self) -> Result<(), String> { - self.session - .query("SELECT * FROM system.peers", ()) - .await - .map_err(|err| format!("{err:?}")) - .map(|_| ()) - } -} - -fn docker_compose(file_path: &str) -> DockerCompose { - DockerCompose::new(&IMAGE_WAITERS, |_| {}, file_path) -} - -static IMAGE_WAITERS: [Image; 1] = [Image { - name: "bitnami/cassandra:4.0.6", - log_regex_to_wait_for: r"Startup complete", - timeout: Duration::from_secs(120), -}]; - -fn set_working_dir() { - // tests and benches will set the directory to the directory of the crate, we are acting as a benchmark so we do the same - std::env::set_current_dir( - Path::new(env!("CARGO_MANIFEST_DIR")) - .parent() - .unwrap() - .join(env!("CARGO_PKG_NAME")), - ) - .unwrap(); -} diff --git a/windsock/readme.md b/windsock/readme.md deleted file mode 100644 index 8d072d031..000000000 --- a/windsock/readme.md +++ /dev/null @@ -1,206 +0,0 @@ -# Windsock - A DB benchmarking framework - -Windsock is a generic DB benchmarking framework. - -What you do: - -* Bring your own rust async compatible DB driver -* Define your benchmark logic which reports some simple stats back to windsock -* Define your pool of benchmarks - -What windsock does: - -* Provides a CLI from which you can: - * Query available benchmarks - * Selectively run benchmarks - * Process benchmark results into readable tables -* Generates a webpage from which you can explore graphed results - -Windsock is suitable for: - -* Iteratively testing performance during development of a database or service (for microbenchmarks you will need a different tool though) -* Investigating performance of different workloads on a database you intend to use. -* Generating a webpage of graphs to show off the performance of your released database. (not yet implemented) - -## Define benches - -To use windsock create a rust crate that imports windsock: - -```toml -windsock = { git = "https://github.com/shotover/shotover-proxy" } -``` - -And then implement the crate like this (simplified): - -```rust -fn main() { - // Define our benchmarks and give them to windsock - Windsock::new(vec![ - Box::new(CassandraBench { topology: Topology::Cluster3 }), - Box::new(CassandraBench { topology: Topology::Single }) - ]) - // Hand control of the app over to windsock - // Windsock processes CLI args, possibly running benchmarks and then terminates. - .run(); -} - -pub struct CassandraBench { topology: Topology } - -#[async_trait] -impl Bench for CassandraBench { - // define tags that windsock will use to filter and name the benchmark instance - fn tags(&self) -> HashMap { - [ - ("name".to_owned(), "cassandra".to_owned()), - ( - "topology".to_owned(), - match &self.topology { - Topology::Single => "single".to_owned(), - Topology::Cluster3 => "cluster3".to_owned(), - }, - ), - ] - .into_iter() - .collect() - } - - // the benchmark logic for this benchmark instance - async fn run(&self, runtime_seconds: usize, operations_per_second: Option, reporter: UnboundedSender) { - // bring up the DB - let _handle = init_cassandra(); - - // create the DB driver session - let session = init_session().await; - - // spawn tokio tasks to concurrently hit the database - // The exact query is defined in `run_one_operation` below - BenchTaskCassandra { session }.spawn_tasks(reporter.clone(), operations_per_second).await; - - // tell windsock to begin benchmarking - reporter.send(Report::Start).unwrap(); - let start = Instant::now(); - - // run the bench for the time requested by the user on the CLI (defaults to 15s) - tokio::time::sleep(Duration::from_secs(runtime_seconds)).await; - - // tell windsock to finalize the benchmark - reporter.send(Report::FinishedIn(start.elapsed())).unwrap(); - } -} - -// This struct is cloned once for each tokio task it will be run in. -#[derive(Clone)] -struct BenchTaskCassandra { - session: Arc, -} - -#[async_trait] -impl BenchTask for BenchTaskCassandra { - async fn run_one_operation(&self) -> Result<(), String> { - self.session.query("SELECT * FROM table").await - } -} -``` - -**TODO:** document running windsock as both a standalone crate and as a cargo bench. - -This example is simplified for demonstration purposes, refer to `examples/cassandra.rs` for a full working example. - -## Running benches - -Then we run our crate to run the benchmarks and view results like: - -```none -> cargo windsock run-local -... benchmark running logs -> cargo windsock results -Results for cassandra - topology ──single ─cluster3 -Measurements ═══════════════════════════ - Operations Total 750762 372624 - Operations Per Sec 83418 41403 - Min 0.255ms 0.229ms - 1 0.389ms 0.495ms - 2 0.411ms 0.571ms - 5 0.460ms 0.714ms - 10 0.567ms 0.876ms - 25 1.131ms 1.210ms - 50 1.306ms 1.687ms - 75 1.519ms 2.600ms - 90 1.763ms 4.881ms - 95 2.132ms 7.542ms - 98 2.588ms 14.008ms - 99 2.951ms 19.297ms - 99.9 7.952ms 40.896ms - 99.99 25.559ms 80.692ms -``` - -TODO: make this into a comparison to make it more flashy and use an image to include the coloring - -and graphs: TODO - -## How to perform various tasks in windsock - -### Just run every bench - -```shell -> cargo windsock run-local -``` - -### Run benches with matching tags and view all the results in one table - -```shell -> cargo windsock run-local db=kafka OPS=1000 topology=single # run benchmarks matching some tags -> cargo windsock results # view the results of the benchmarks with the same tags in a single table -``` - -### Iteratively compare results against a previous implementation - -```shell -> git checkout main # checkout original implementation -> cargo windsock run-local # run all benchmarks -> cargo windsock baseline-set # set the last benchmark run as the baseline -> vim src/main.rs # modify implementation -> cargo windsock run-local # run all benchmarks, every result is compared against the baseline -> cargo windsock results # view those results in a nice table -> vim src/main.rs # modify implementation again -> cargo windsock run-local # run all benchmarks, every result is compared against the baseline -``` - -### Run benchmarks in the cloud (simple) - -```shell -# create cloud resources, run benchmarks and then cleanup - all in one command -> cargo windsock cloud-setup-run-cleanup -``` - -### Iteratively compare results against a previous implementation (running in a remote cloud) - -```shell -# Setup the cloud resources and then form a baseline -> git checkout main # checkout original implementation -> cargo windsock cloud-setup db=kafka # setup the cloud resources required to run all kafka benchmarks -> cargo windsock cloud-run db=kafka # run all the kafka benchmarks in the cloud -> cargo windsock baseline-set # set the last benchmark run as the baseline - -# Make a change and and measure the effect -> vim src/main.rs # modify implementation -> cargo windsock cloud-run db=kafka # run all benchmarks, every result is compared against the baseline -> cargo windsock results # view those results in a nice table, compared against the baseline - -# And again -> vim src/main.rs # modify implementation again -> cargo windsock cloud-run db=kafka # run all benchmarks, every result is compared against the baseline - -# And finally... -> cargo windsock cloud-cleanup # Terminate all the cloud resources now that we are done -``` - -### Generate graph webpage - -TODO: not yet implemented - -```shell -> cargo windsock local-run # run all benches -> cargo windsock generate-webpage # generate a webpage from the results -``` diff --git a/windsock/src/bench.rs b/windsock/src/bench.rs deleted file mode 100644 index fa8fe1ce4..000000000 --- a/windsock/src/bench.rs +++ /dev/null @@ -1,343 +0,0 @@ -use crate::cli::RunArgs; -use crate::report::{report_builder, Report, ReportArchive}; -use crate::tables::ReportColumn; -use anyhow::Result; -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; -use std::fmt::Write; -use std::path::PathBuf; -use std::time::{Duration, Instant}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::task::JoinHandle; - -pub struct BenchState { - bench: Box>, - pub(crate) tags: Tags, - pub(crate) supported_profilers: Vec, -} - -impl BenchState { - pub fn new( - bench: Box< - dyn Bench, - >, - ) -> Self { - let tags = Tags(bench.tags()); - let supported_profilers = bench.supported_profilers(); - BenchState { - bench, - tags, - supported_profilers, - } - } - - pub async fn orchestrate( - &mut self, - args: &RunArgs, - running_in_release: bool, - cloud_resources: Option, - ) { - let name = self.tags.get_name(); - println!("Running {:?}", name); - - let profilers_to_use = args.profilers.clone(); - let results_path = if !profilers_to_use.is_empty() { - let path = crate::data::windsock_path() - .join("profiler_results") - .join(&name); - std::fs::create_dir_all(&path).unwrap(); - path - } else { - PathBuf::new() - }; - - if let Some(cloud_resources) = cloud_resources { - self.bench - .orchestrate_cloud( - cloud_resources, - running_in_release, - Profiling { - results_path, - profilers_to_use, - }, - BenchParameters::from_args(args), - ) - .await - .unwrap(); - } else { - self.bench - .orchestrate_local( - running_in_release, - Profiling { - results_path, - profilers_to_use, - }, - BenchParameters::from_args(args), - ) - .await - .unwrap(); - } - - crate::tables::display_results_table(&[ReportColumn { - baseline: ReportArchive::load_baseline(&name).unwrap(), - current: ReportArchive::load(&name).unwrap(), - }]); - } - - pub async fn run(&mut self, args: &RunArgs, running_in_release: bool, resources: &str) { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let process = tokio::spawn(report_builder( - self.tags.clone(), - rx, - args.operations_per_second, - running_in_release, - )); - - self.bench - .run_bencher(resources, BenchParameters::from_args(args), tx) - .await; - - process.await.unwrap(); - } - - // TODO: will return None when running in non-local setup - pub fn cores_required(&self) -> Option { - Some(self.bench.cores_required()) - } - - pub fn required_cloud_resources(&self) -> ResourcesRequired { - self.bench.required_cloud_resources() - } -} - -/// Implement this to define your benchmarks -/// A single implementation of `Bench` can represent multiple benchmarks by initializing it multiple times with different state that returns unique tags. -#[async_trait] -pub trait Bench { - type CloudResourcesRequired; - type CloudResources; - - /// Returns tags that are used for forming comparisons, graphs and naming the benchmark - fn tags(&self) -> HashMap; - - /// Returns the names of profilers that this bench can be run with - fn supported_profilers(&self) -> Vec { - vec![] - } - - /// Specifies the cloud resources that should be provided to this bench - fn required_cloud_resources(&self) -> Self::CloudResourcesRequired { - unimplemented!("To support running in cloud this bench needs to implement `Bench::required_cloud_resources`"); - } - - /// How many cores to assign the async runtime in which the bench runs. - fn cores_required(&self) -> usize { - 1 - } - - /// Windsock will call this method to orchestrate the bench in cloud mode. - /// It must setup cloud resources to run the bench in a cloud and then start the bench returning the results on conclusion - async fn orchestrate_cloud( - &self, - cloud: Self::CloudResources, - running_in_release: bool, - profiling: Profiling, - bench_parameters: BenchParameters, - ) -> Result<()>; - - /// Windsock will call this method to orchestrate the bench in local mode. - /// It must setup local resources to run the bench locally and then start the bench returning the results on conclusion - async fn orchestrate_local( - &self, - running_in_release: bool, - profiling: Profiling, - bench_parameters: BenchParameters, - ) -> Result<()>; - - /// Windsock will call this method to run the bencher. - /// But the implementation of `orchestrate_local` or `orchestrate_cloud` must run the bencher through windsock in some way. - /// This will be: - /// * In the case of `orchestrate_cloud`, the windsock binary must be uploaded to a cloud VM and `windsock --internal-run` executed there. - /// * In the case of `orchestrate_local`, call `Bench::execute_run` to indirectly call this method by executing another instance of the windsock executable. - /// - /// The `resources` arg is a string that is passed in from the argument to `--internal-run` or at a higher level the argument to `Bench::execute_run`. - /// Use this string to instruct the bencher where to find the resources it needs. e.g. the IP address of the DB to benchmark. - /// To pass in multiple resources it is recommended to use a serialization method such as `serde-json`. - async fn run_bencher( - &self, - resources: &str, - bench_parameters: BenchParameters, - reporter: UnboundedSender, - ); - - /// Call within `Bench::orchestrate_local` to call `Bench::run` - async fn execute_run(&self, resources: &str, bench_parameters: &BenchParameters) { - let name_and_resources = format!("{} {}", self.name(), resources); - let output = tokio::process::Command::new(std::env::current_exe().unwrap().as_os_str()) - .args(run_args_vec(name_and_resources, bench_parameters)) - .output() - .await - .unwrap(); - if !output.status.success() { - let stdout = String::from_utf8(output.stdout).unwrap(); - let stderr = String::from_utf8(output.stderr).unwrap(); - panic!("Bench run failed:\nstdout:\n{stdout}\nstderr:\n{stderr}") - } - } - - /// Call within `Bench::orchestrate_cloud` to determine how to invoke the uploaded windsock executable - fn run_args(&self, resources: &str, bench_parameters: &BenchParameters) -> String { - let name_and_resources = format!("\"{} {}\"", self.name(), resources); - run_args_vec(name_and_resources, bench_parameters).join(" ") - } - - fn name(&self) -> String { - Tags(self.tags()).get_name() - } -} - -fn run_args_vec(name_and_resources: String, bench_parameters: &BenchParameters) -> Vec { - let mut args = vec![]; - args.push("internal-run".to_owned()); - args.push("--bench-length-seconds".to_owned()); - args.push(bench_parameters.runtime_seconds.to_string()); - - if let Some(ops) = bench_parameters.operations_per_second { - args.push("--operations-per-second".to_owned()); - args.push(ops.to_string()); - }; - - args.push(name_and_resources); - - args -} - -pub struct BenchParameters { - pub runtime_seconds: u32, - pub operations_per_second: Option, -} - -impl BenchParameters { - fn from_args(args: &RunArgs) -> Self { - BenchParameters { - runtime_seconds: args.bench_length_seconds.unwrap_or(15), - operations_per_second: args.operations_per_second, - } - } -} - -pub struct Profiling { - pub results_path: PathBuf, - pub profilers_to_use: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct Tags(pub HashMap); - -impl Tags { - pub fn get_name(&self) -> String { - let mut result = String::new(); - - let mut tags: Vec<(&String, &String)> = self.0.iter().collect(); - tags.sort_by_key(|x| x.0); - for (key, value) in tags { - if !result.is_empty() { - write!(result, ",").unwrap(); - } - write!(result, "{key}={value}").unwrap(); - } - result - } - - /// Does not handle invalid names, only use on internally generated names - pub fn from_name(name: &str) -> Self { - let mut map = HashMap::new(); - for tag in name.split(',') { - if tag.contains('=') { - let mut pair = tag.split('='); - let key = pair.next().unwrap().to_owned(); - let value = pair.next().unwrap().to_owned(); - map.insert(key, value); - } else { - panic!("tag without an '=' was found") - } - } - Tags(map) - } - - /// returns the set wise intersection of two `Tags`s - pub(crate) fn intersection(&self, other: &Tags) -> Self { - let mut intersection = HashMap::new(); - for (key, value) in &self.0 { - if other.0.get(key).map(|x| x == value).unwrap_or(false) { - intersection.insert(key.clone(), value.clone()); - } - } - Tags(intersection) - } - - pub(crate) fn keys(&self) -> HashSet { - self.0.keys().cloned().collect() - } -} - -/// An optional helper trait for defining benchmarks. -/// Usually you have an async rust DB driver that you need to call across multiple tokio tasks -/// This helper will spawn these tasks and send the required `Report::QueryCompletedIn`. -/// -/// To use this helper: -/// 1. implement `BenchTask` for a struct that contains the required db resources -/// 2. have run_one_operation use those resources to perform a single operation -/// 3. call spawn_tasks on an instance of BenchTask, it will clone your BenchTask instance once for each task it generates -#[async_trait] -pub trait BenchTask: Clone + Send + Sync + 'static { - async fn run_one_operation(&self) -> Result<(), String>; - - async fn spawn_tasks( - &self, - reporter: UnboundedSender, - operations_per_second: Option, - ) -> Vec> { - let mut tasks = vec![]; - // 100 is a generally nice amount of tasks to have, but if we have more tasks than OPS the throughput is very unstable - let task_count = operations_per_second.map(|x| x.min(100)).unwrap_or(100); - - let allocated_time_per_op = operations_per_second - .map(|ops| (Duration::from_secs(1) * task_count as u32) / ops as u32); - for i in 0..task_count { - let task = self.clone(); - let reporter = reporter.clone(); - tasks.push(tokio::spawn(async move { - // spread load out over a second - tokio::time::sleep(Duration::from_nanos((1_000_000_000 / task_count) * i)).await; - - let mut interval = allocated_time_per_op.map(tokio::time::interval); - - loop { - if let Some(interval) = &mut interval { - interval.tick().await; - } - - let operation_start = Instant::now(); - let report = match task.run_one_operation().await { - Ok(()) => Report::QueryCompletedIn(operation_start.elapsed()), - Err(message) => Report::QueryErrored { - completed_in: operation_start.elapsed(), - message, - }, - }; - if reporter.send(report).is_err() { - // The benchmark has completed and the reporter no longer wants to receive reports so just shutdown - return; - } - } - })); - } - - // sleep until all tasks have started running - tokio::time::sleep(Duration::from_secs(1)).await; - - tasks - } -} diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs deleted file mode 100644 index a5e09e32b..000000000 --- a/windsock/src/cli.rs +++ /dev/null @@ -1,198 +0,0 @@ -use anyhow::{anyhow, Error}; -use clap::{Args, Parser, Subcommand}; - -const ABOUT: &str = r#"Bench Names: - Each benchmark has a unique name, this name is used by many options listed below. - The name is derived from an alphabetical sorting of its tags so you wont find it directly in the bench - implementation but it will be listed in the `list` command. - -Tag Filters: - Many options below take tag filters that specify which benches to include. - Tag filters specify which benches to include and the filter results are unioned. - - So: - * The filter "foo=some_value" will include only benches with the tag key `foo` and the tag value `some_value` - * The filter "foo=some_value bar=another_value" will include only benches that match "foo=some_value" and "bar=another_value" - * The filter "" will include all benches - - A filters tags can also be separated by commas allowing names to function as filters. - So: foo=some_value,bar=another_value is a name but it can also be used where a filter is accepted."#; - -#[derive(Subcommand, Clone)] -pub enum Command { - /// List the name of every bench. - #[clap(verbatim_doc_comment)] - List, - - /// Create cloud resources for running benches - #[clap(verbatim_doc_comment)] - CloudSetup { - /// e.g. "db=kafka connection_count=100" - #[clap(verbatim_doc_comment)] - filter: String, - }, - - /// Run benches in the cloud using the resources created by cloud-setup - #[clap(verbatim_doc_comment)] - CloudRun(RunArgs), - - /// cleanup cloud resources created by cloud-setup - /// Make sure to call this when your benchmarking session is finished! - #[clap(verbatim_doc_comment)] - CloudCleanup, - - /// cloud-setup, cloud-run and cloud-cleanup combined into a single command. - /// Convenient for getting a quick understanding of performance. - /// However, if you are performing optimization work prefer the individual commands as you will get: - /// * more stable results (same cloud instance) - /// * faster results (skip recreating and destroying cloud resources) - #[clap(verbatim_doc_comment)] - CloudSetupRunCleanup(RunArgs), - - /// Run benches entirely on your local machine - #[clap(verbatim_doc_comment)] - LocalRun(RunArgs), - - /// The results of the last benchmarks run becomes the new baseline from which future benchmark runs will be compared. - #[clap(verbatim_doc_comment)] - BaselineSet, - - /// Removes the stored baseline. Following runs will no longer compare against a baseline. - #[clap(verbatim_doc_comment)] - BaselineClear, - - /// Generate graphs webpage from the last benchmarks run. - #[clap(verbatim_doc_comment)] - GenerateWebpage, - - /// Display results from the last benchmark run by: - /// Listing bench results matching tag filters. - /// - /// Usage: Provide tag filters - #[clap(verbatim_doc_comment)] - // TODO: get trailing_var_arg(true) working so user can avoid having to wrap in "" - Results { - /// Do not compare against the set baseline. - #[clap(long, verbatim_doc_comment)] - ignore_baseline: bool, - - /// e.g. "db=kafka connection_count=100" - #[clap(verbatim_doc_comment)] - filter: Option, - }, - - /// Display results from the last benchmark run by: - /// Comparing various benches against a specific base bench. - /// - /// Usage: First provide the base benchmark name then provide benchmark names to compare against the base. - /// "base_name other_name1 other_name2" - #[clap(verbatim_doc_comment)] - CompareByName { filter: String }, - - /// Display results from the last benchmark run by: - /// Comparing benches matching tag filters against a specific base bench. - /// - /// Usage: First provide the base benchmark name then provide tag filters - /// "base_name db=kafka connection_count=10" - #[clap(verbatim_doc_comment)] - CompareByTags { filter: String }, - - /// Not for human use. Call this from your bench orchestration method to launch your bencher. - #[clap(verbatim_doc_comment)] - InternalRun(RunArgs), -} - -#[derive(Args, Clone)] -pub struct RunArgs { - /// Instruct benches to profile the application under test with the specified profilers. - /// Benches that do not support the specified profilers will be skipped. - #[clap(long, verbatim_doc_comment, value_delimiter = ',')] - pub profilers: Vec, - - /// How long in seconds to run each bench for. - /// By default benches will run for 15 seconds. - #[clap(long, verbatim_doc_comment)] - pub bench_length_seconds: Option, - - /// Instruct the benches to cap their operations per second to the specified amount. - /// By default the benches will run with unlimited operations per second. - #[clap(long, verbatim_doc_comment)] - pub operations_per_second: Option, - - /// Run all benches that match the specified tag key/values. - /// `tag_key=tag_value foo=bar` - #[clap(verbatim_doc_comment)] - pub filter: Option, -} - -impl RunArgs { - pub fn filter(&self) -> String { - match &self.filter { - // convert a name into a filter by swapping commas for spaces - Some(filter) => filter.replace(',', " "), - // If not provided use the empty filter - None => String::new(), - } - } -} - -#[derive(Parser)] -#[clap(about=ABOUT)] -pub struct WindsockArgs { - #[command(subcommand)] - pub command: Option, - - #[clap(long, hide(true))] - list: bool, - - #[clap(long, hide(true))] - format: Option, - - #[clap(long, hide(true))] - ignored: bool, - - #[clap(long, hide(true))] - pub exact: Option, - - #[clap(long, hide(true))] - nocapture: bool, -} - -#[derive(clap::ValueEnum, Clone, Copy)] -enum NextestFormat { - Terse, -} - -impl WindsockArgs { - pub fn nextest_list(&self) -> bool { - self.list - } - - pub fn nextest_list_all(&self) -> bool { - self.list && matches!(&self.format, Some(NextestFormat::Terse)) && !self.ignored - } - - pub fn nextest_list_ignored(&self) -> bool { - self.list && matches!(&self.format, Some(NextestFormat::Terse)) && self.ignored - } - - pub fn nextest_run_by_name(&self) -> Option<&str> { - if self.nocapture { - self.exact.as_deref() - } else { - None - } - } - - pub fn nextest_invalid_args(&self) -> Option { - if self.format.is_some() && self.list { - Some(anyhow!("`--format` only exists for nextest compatibility and is not supported without `--list`")) - } else if self.nocapture && self.exact.is_none() { - Some(anyhow!("`--nocapture` only exists for nextest compatibility and is not supported without `--exact`")) - } else if self.exact.is_some() && !self.nocapture { - Some(anyhow!("`--exact` only exists for nextest compatibility and is not supported without `--nocapture`")) - } else { - None - } - } -} diff --git a/windsock/src/cloud.rs b/windsock/src/cloud.rs deleted file mode 100644 index 82b4dd379..000000000 --- a/windsock/src/cloud.rs +++ /dev/null @@ -1,102 +0,0 @@ -use std::path::Path; - -use async_trait::async_trait; - -/// Implement this to give windsock some control over your cloud. -/// Currently the only thing windsock needs is the ability to cleanup resources since resource creation should happen within your own benches. -#[async_trait(?Send)] -pub trait Cloud { - /// Each bench creates an instance of this to provide its resource requirements - type CloudResourcesRequired; - /// This is given to the benches and contains methods or data required to access the instances. - type CloudResources; - - /// Cleanup all cloud resources created by windsock. - /// You should destroy not just resources created during this bench run but also resources created in past bench runs that might have missed cleanup due to a panic. - async fn cleanup_resources(&mut self); - - /// This is called once at start up before running any benches. - /// The implementation must return an object containing all the requested cloud resources. - /// The `required_resources` contains the `CloudResourcesRequired` returned by each bench that will be executed in this run. - /// - /// benches_will_run: - /// * true - the benches will be run, ensure they have everything they need to complete succesfully. - /// * false - the benches will not be run, due to `--store-cloud-resources-file`, you can skip uploading anything that will be reuploaded when restoring the resources - async fn create_resources( - &mut self, - required_resources: Vec, - benches_will_run: bool, - ) -> Self::CloudResources; - - /// Construct a file at the provided path that will allow restoring the passed resources - /// - /// It is gauranteed this will be called after all the benches have completed. - async fn store_resources_file(&mut self, path: &Path, resources: Self::CloudResources); - - /// Restore the resources from the data in the passed file. - /// It is the same file path that was passed to [`Cloud::store_resources_file`] - /// - /// The implementation should panic when the loaded messages cannot meet the requirements of the passed `required_sources`. - /// This is done rather than loading the required resources from disk as this case usually represents a user error. - /// Loading from disk is used for more consistent results across benches but the user cannot hope to get consistent results while changing the benches that will be run. - /// They are better off recreating the resources from scratch in this case. - async fn load_resources_file( - &mut self, - path: &Path, - required_resources: Vec, - ) -> Self::CloudResources; - - /// This is called once at start up before running any benches. - /// The returned Vec specifies the order in which to run benches. - fn order_benches( - &mut self, - benches: Vec>, - ) -> Vec> { - benches - } - - /// This is called before running each bench. - /// Use it to destroy or create resources as needed. - /// However, this method will not be called when `--save-resources-file` or `--load-resources-file` is set. - /// - /// It is recommended to create all resources within create_resources for faster completion time, but it may be desirable in some circumstances to create some of them here. - /// It is recommended to always destroy resources that will never be used again here. - async fn adjust_resources( - &mut self, - _benches: &[BenchInfo], - _bench_index: usize, - _resources: &mut Self::CloudResources, - ) { - } -} - -pub struct BenchInfo { - pub name: String, - pub resources: CloudResourceRequest, -} - -/// A dummy cloud instance for when the user isnt using windsock cloud functionality -pub struct NoCloud; -impl NoCloud { - pub fn new_boxed() -> Box> - where - Self: Sized, - { - Box::new(NoCloud) - } -} - -#[async_trait(?Send)] -impl Cloud for NoCloud { - type CloudResourcesRequired = (); - type CloudResources = (); - async fn cleanup_resources(&mut self) {} - async fn create_resources(&mut self, _requests: Vec<()>, _benches_will_run: bool) {} - async fn store_resources_file(&mut self, _path: &Path, _resources: ()) {} - async fn load_resources_file( - &mut self, - _path: &Path, - _required_resources: Vec, - ) { - } -} diff --git a/windsock/src/data.rs b/windsock/src/data.rs deleted file mode 100644 index 76242e92c..000000000 --- a/windsock/src/data.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::path::PathBuf; - -pub fn windsock_path() -> PathBuf { - // If we are run via cargo (we are in a target directory) use the target directory for storage. - // Otherwise just fallback to the current working directory. - let mut path = std::env::current_exe().unwrap(); - while path.pop() { - if path.file_name().map(|x| x == "target").unwrap_or(false) { - return path.join("windsock_data"); - } - } - - PathBuf::from("windsock_data") -} - -pub fn cloud_resources_path() -> PathBuf { - windsock_path().join("cloud_resources") -} diff --git a/windsock/src/filter.rs b/windsock/src/filter.rs deleted file mode 100644 index 5d3e730c9..000000000 --- a/windsock/src/filter.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::bench::Tags; -use anyhow::{anyhow, Result}; - -struct FilterTag { - key: String, - values: Vec, -} - -pub(crate) struct Filter { - filter: Vec, -} - -impl Filter { - pub(crate) fn from_query(query: &str) -> Result { - let mut filter = vec![]; - for pair in query.split_whitespace() { - let mut iter = pair.split('='); - let key = iter.next().unwrap().to_owned(); - let values = match iter.next() { - Some(rhs) => rhs.split('|').map(|x| x.to_owned()).collect(), - None => { - return Err(anyhow!( - "Expected exactly one '=' but found no '=' in tag {pair:?}" - )) - } - }; - if iter.next().is_some() { - return Err(anyhow!( - "Expected exactly one '=' but found multiple '=' in tag {pair:?}" - )); - } - filter.push(FilterTag { key, values }) - } - Ok(Filter { filter }) - } - - pub(crate) fn matches(&self, tags: &Tags) -> bool { - for FilterTag { key, values } in &self.filter { - match tags.0.get(key) { - Some(check_value) => { - if !values.contains(check_value) { - return false; - } - } - None => return false, - } - } - true - } -} diff --git a/windsock/src/lib.rs b/windsock/src/lib.rs deleted file mode 100644 index 1096e309c..000000000 --- a/windsock/src/lib.rs +++ /dev/null @@ -1,328 +0,0 @@ -mod bench; -mod cli; -pub mod cloud; -mod data; -mod filter; -mod list; -mod report; -mod tables; - -pub use bench::{Bench, BenchParameters, BenchTask, Profiling}; -use data::cloud_resources_path; -pub use report::{ - ExternalReport, LatencyPercentile, Metric, OperationsReport, PubSubReport, Report, - ReportArchive, -}; -pub use tables::Goal; - -use anyhow::{anyhow, Result}; -use bench::BenchState; -use clap::{CommandFactory, Parser}; -use cli::{Command, RunArgs, WindsockArgs}; -use cloud::{BenchInfo, Cloud}; -use filter::Filter; -use std::process::exit; -use tokio::runtime::Runtime; - -pub struct Windsock { - benches: Vec>, - cloud: Box>, - running_in_release: bool, -} - -impl Windsock { - /// The benches will be run and filtered out according to the CLI arguments - /// - /// Run order: - /// * Locally: The benches that are run will always be done so in the order they are listed, this allows tricks to avoid recreating DB's for every bench. - /// e.g. the database handle can be put behind a mutex and only resetup when actually neccessary - /// * Cloud: The benches will be run in an order optimized according to its required cloud resources. - /// - /// `release_profiles` specifies which cargo profiles Windsock will run under, if a different profile is used windsock will refuse to run. - pub fn new( - benches: Vec< - Box>, - >, - cloud: Box< - dyn Cloud, - >, - release_profiles: &[&str], - ) -> Self { - let running_in_release = release_profiles.contains(&env!("PROFILE")); - - Windsock { - benches: benches.into_iter().map(BenchState::new).collect(), - cloud, - running_in_release, - } - } - - // Hands control of the process over to windsock, this method will never return - // Windsock processes CLI arguments and then runs benchmarks as instructed by the user. - pub fn run(self) -> ! { - match self.run_inner() { - Ok(()) => exit(0), - Err(err) => { - eprintln!("{:?}", err); - exit(1); - } - } - } - - fn run_inner(mut self) -> Result<()> { - let args = WindsockArgs::parse(); - - let running_in_release = self.running_in_release; - if let Some(command) = args.command { - match command { - Command::List => list::list(&self.benches), - Command::BaselineSet => { - ReportArchive::set_baseline(); - println!("Baseline set"); - } - Command::BaselineClear => { - ReportArchive::clear_baseline(); - println!("Baseline cleared"); - } - Command::GenerateWebpage => { - println!("Webpage generation is not implemented yet!") - } - Command::Results { - ignore_baseline, - filter, - } => tables::results( - ignore_baseline, - &filter.unwrap_or_default().replace(',', " "), - )?, - Command::CompareByName { filter } => tables::compare_by_name(&filter)?, - Command::CompareByTags { filter } => tables::compare_by_tags(&filter)?, - Command::CloudSetup { filter } => { - create_runtime(None).block_on(self.cloud_setup(filter))? - } - Command::CloudRun(args) => { - create_runtime(None).block_on(self.cloud_run(args, running_in_release))?; - } - Command::CloudCleanup => { - create_runtime(None).block_on(self.cloud_cleanup()); - } - Command::CloudSetupRunCleanup(args) => { - create_runtime(None) - .block_on(self.cloud_setup_run_cleanup(args, running_in_release))?; - } - Command::LocalRun(args) => { - create_runtime(None).block_on(self.local_run(args, running_in_release))?; - } - Command::InternalRun(args) => self.internal_run(&args, running_in_release)?, - } - } else if args.nextest_list() { - list::nextest_list(&args, &self.benches); - } else if let Some(name) = args.nextest_run_by_name() { - create_runtime(None).block_on(self.run_nextest(name, running_in_release))?; - } else if let Some(err) = args.nextest_invalid_args() { - return Err(err); - } else { - WindsockArgs::command().print_help().unwrap(); - } - - Ok(()) - } - - async fn cloud_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> { - let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?; - let resources = self.load_cloud_from_disk(&bench_infos).await?; - self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources) - .await?; - println!("Cloud resources have not been cleaned up."); - println!("Make sure to use `cloud-cleanup` when you are finished with them."); - Ok(()) - } - - async fn cloud_setup_run_cleanup( - &mut self, - args: RunArgs, - running_in_release: bool, - ) -> Result<()> { - let bench_infos = self.bench_infos(&args.filter(), &args.profilers)?; - let resources = self.temp_setup_cloud(&bench_infos).await?; - self.run_filtered_benches_cloud(args, running_in_release, bench_infos, resources) - .await?; - self.cloud_cleanup().await; - Ok(()) - } - - fn internal_run(&mut self, args: &RunArgs, running_in_release: bool) -> Result<()> { - let name_and_resources = args - .filter - .as_ref() - .expect("Filter arg must be provided for internal-run"); - let (name, resources) = - name_and_resources.split_at(name_and_resources.find(' ').unwrap() + 1); - let name = name.trim(); - match self.benches.iter_mut().find(|x| x.tags.get_name() == name) { - Some(bench) => { - if args - .profilers - .iter() - .all(|x| bench.supported_profilers.contains(x)) - { - create_runtime(bench.cores_required()).block_on(async { - bench.run(args, running_in_release, resources).await; - }); - Ok(()) - } else { - Err(anyhow!("Specified bench {name:?} was requested to run with the profilers {:?} but it only supports the profilers {:?}", args.profilers, bench.supported_profilers)) - } - } - None => Err(anyhow!("Specified bench {name:?} does not exist.")), - } - } - - async fn run_nextest(&mut self, name: &str, running_in_release: bool) -> Result<()> { - let args = RunArgs { - profilers: vec![], - // This is not a real bench we are just testing that it works, - // so set some really minimal runtime values - bench_length_seconds: Some(2), - operations_per_second: Some(100), - filter: Some(name.to_string()), - }; - - self.local_run(args, running_in_release).await - } - - fn bench_infos( - &mut self, - filter: &str, - profilers_enabled: &[String], - ) -> Result>> { - let filter = Filter::from_query(filter) - .map_err(|err| anyhow!("Failed to parse FILTER {filter:?}\n{err}"))?; - let mut bench_infos = vec![]; - for bench in &mut self.benches { - if filter.matches(&bench.tags) - && profilers_enabled - .iter() - .all(|x| bench.supported_profilers.contains(x)) - { - bench_infos.push(BenchInfo { - resources: bench.required_cloud_resources(), - name: bench.tags.get_name(), - }); - } - } - Ok(self.cloud.order_benches(bench_infos)) - } - - async fn load_cloud_from_disk( - &mut self, - bench_infos: &[BenchInfo], - ) -> Result { - if !bench_infos.is_empty() { - let resources = bench_infos.iter().map(|x| x.resources.clone()).collect(); - Ok(self - .cloud - .load_resources_file(&cloud_resources_path(), resources) - .await) - } else { - Err(anyhow!("No benches found with the specified filter")) - } - } - - async fn cloud_setup(&mut self, filter: String) -> Result<()> { - let bench_infos = self.bench_infos(&filter, &[])?; - - let resources = if !bench_infos.is_empty() { - let resources = bench_infos.iter().map(|x| x.resources.clone()).collect(); - self.cloud.create_resources(resources, false).await - } else { - return Err(anyhow!("No benches found with the specified filter")); - }; - - self.cloud - .store_resources_file(&cloud_resources_path(), resources) - .await; - - println!( - "Cloud resources have been created in preparation for running the following benches:" - ); - for bench in bench_infos { - println!(" {}", bench.name); - } - println!("Make sure to use `cloud-cleanup` when you are finished with these resources"); - - Ok(()) - } - - async fn temp_setup_cloud( - &mut self, - bench_infos: &[BenchInfo], - ) -> Result { - let resources = if !bench_infos.is_empty() { - let resources = bench_infos.iter().map(|x| x.resources.clone()).collect(); - self.cloud.create_resources(resources, true).await - } else { - return Err(anyhow!("No benches found with the specified filter")); - }; - - Ok(resources) - } - - async fn run_filtered_benches_cloud( - &mut self, - args: RunArgs, - running_in_release: bool, - bench_infos: Vec>, - mut resources: Resources, - ) -> Result<()> { - ReportArchive::clear_last_run(); - - for (i, bench_info) in bench_infos.iter().enumerate() { - for bench in &mut self.benches { - if bench.tags.get_name() == bench_info.name { - self.cloud - .adjust_resources(&bench_infos, i, &mut resources) - .await; - bench - .orchestrate(&args, running_in_release, Some(resources.clone())) - .await; - break; - } - } - } - - Ok(()) - } - - async fn cloud_cleanup(&mut self) { - std::fs::remove_file(cloud_resources_path()).ok(); - self.cloud.cleanup_resources().await; - } - - async fn local_run(&mut self, args: RunArgs, running_in_release: bool) -> Result<()> { - ReportArchive::clear_last_run(); - let filter = args.filter(); - let filter = Filter::from_query(&filter) - .map_err(|err| anyhow!("Failed to parse FILTER {:?}\n{err}", filter))?; - - for bench in &mut self.benches { - if filter.matches(&bench.tags) - && args - .profilers - .iter() - .all(|x| bench.supported_profilers.contains(x)) - { - bench.orchestrate(&args, running_in_release, None).await; - } - } - Ok(()) - } -} - -fn create_runtime(worker_threads: Option) -> Runtime { - let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); - runtime_builder.enable_all().thread_name("Windsock-Thread"); - if let Some(worker_threads) = worker_threads { - runtime_builder.worker_threads(worker_threads); - } - runtime_builder.build().unwrap() -} diff --git a/windsock/src/list.rs b/windsock/src/list.rs deleted file mode 100644 index 109297f65..000000000 --- a/windsock/src/list.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::{bench::BenchState, cli::WindsockArgs}; - -pub fn list(benches: &[BenchState]) { - // regular usage - println!("Benches:"); - for bench in benches { - println!("{}", bench.tags.get_name()); - } -} - -pub fn nextest_list( - args: &WindsockArgs, - benches: &[BenchState], -) { - if args.nextest_list_all() { - // list all windsock benches in nextest format - for bench in benches { - println!("{}: benchmark", bench.tags.get_name()); - } - } else if args.nextest_list_ignored() { - // windsock does not support ignored tests - } else { - // in case the user accidentally runs `--list` just give them the regular `list` output. - list(benches); - } -} diff --git a/windsock/src/report.rs b/windsock/src/report.rs deleted file mode 100644 index f31e04228..000000000 --- a/windsock/src/report.rs +++ /dev/null @@ -1,549 +0,0 @@ -use crate::{bench::Tags, data::windsock_path, Goal}; -use anyhow::{anyhow, Result}; -use serde::{Deserialize, Serialize}; -use std::{io::ErrorKind, path::PathBuf, time::Duration}; -use strum::{EnumCount, EnumIter, IntoEnumIterator}; -use time::OffsetDateTime; -use tokio::sync::mpsc::UnboundedReceiver; - -#[derive(Debug, Serialize, Deserialize)] -pub enum Report { - /// Indicates the warmup is over and the benchmark has begun. - /// Any Completed/Errored Events received before this are considered warmups and discarded. - Start, - - /// Indicates a response came back from the service. - /// The Duration should be the time between the request being sent and the response being received - QueryCompletedIn(Duration), - - /// Indicates an an error response came back from the service. - QueryErrored { - /// The time between the request being sent and the response being received - completed_in: Duration, - /// The error message received from the service or the local error that occured while trying to communicate with the service. - message: String, - }, - - /// Indicates a pubsub produce ack came back from the service. - /// The Duration should be the time between the request being sent and the response being received - ProduceCompletedIn(Duration), - - /// Indicates a pubsub produce error response came back from the service. - ProduceErrored { - completed_in: Duration, - message: String, - }, - /// Indicates a pubsub consume response comes back from the service. - ConsumeCompleted, - - /// Indicates pubsub consume error response came back from the service. - ConsumeErrored { message: String }, - - /// Indicates a second has passed for the benchmarker - SecondPassed(Duration), - - /// Contains the time that the test ran for - FinishedIn(Duration), - - /// Adds a note that will be visible to the user when viewing the benchmark results. - AddInfoMessage(String), - - /// Ignore all other reports and use the ManualReport as the only source of benchmark metrics. - /// Do not use this under normal circumstances. - /// Instead this should only be used if you have an independent benchmarker that you want to call from windsock and include in windsocks results. - ExternalBenchmark(Box), -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ExternalReport { - pub bench_started_at: OffsetDateTime, - pub operations_report: Option, - pub pubsub_report: Option, - pub error_messages: Vec, -} - -#[derive(EnumIter, EnumCount)] -pub enum Percentile { - Min = 0, - P1, - P2, - P5, - P10, - P25, - P50, - P75, - P90, - P95, - P98, - P99, - P99_9, - P99_99, - Max, -} - -impl Percentile { - pub fn value(&self) -> f64 { - match self { - Percentile::Min => 0.0, - Percentile::P1 => 0.01, - Percentile::P2 => 0.02, - Percentile::P5 => 0.05, - Percentile::P10 => 0.10, - Percentile::P25 => 0.25, - Percentile::P50 => 0.50, - Percentile::P75 => 0.75, - Percentile::P90 => 0.90, - Percentile::P95 => 0.95, - Percentile::P98 => 0.98, - Percentile::P99 => 0.99, - Percentile::P99_9 => 0.999, - Percentile::P99_99 => 0.9999, - Percentile::Max => 1.0, - } - } - - pub fn name(&self) -> &'static str { - match self { - Percentile::Min => "Min ", - Percentile::P1 => "1 ", - Percentile::P2 => "2 ", - Percentile::P5 => "5 ", - Percentile::P10 => "10 ", - Percentile::P25 => "25 ", - Percentile::P50 => "50 ", - Percentile::P75 => "75 ", - Percentile::P90 => "90 ", - Percentile::P95 => "95 ", - Percentile::P98 => "98 ", - Percentile::P99 => "99 ", - Percentile::P99_9 => "99.9 ", - Percentile::P99_99 => "99.99", - Percentile::Max => "Max ", - } - } -} - -pub type Percentiles = [Duration; Percentile::COUNT]; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ReportArchive { - pub(crate) running_in_release: bool, - pub(crate) tags: Tags, - pub bench_started_at: OffsetDateTime, - pub(crate) operations_report: Option, - pub(crate) pubsub_report: Option, - pub metrics: Vec, - pub error_messages: Vec, - pub info_messages: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct OperationsReport { - pub total: u64, - pub total_errors: u64, - pub requested_operations_per_second: Option, - pub total_operations_per_second: u32, - pub total_errors_per_second: u32, - pub mean_time: Duration, - pub time_percentiles: Percentiles, - pub total_each_second: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct PubSubReport { - pub total_produce: u64, - pub total_produce_error: u64, - pub total_consume: u64, - pub total_consume_error: u64, - pub total_backlog: i64, - pub requested_produce_per_second: Option, - pub produce_per_second: u32, - pub produce_errors_per_second: u32, - pub consume_per_second: u32, - pub consume_errors_per_second: u32, - pub produce_mean_time: Duration, - pub produce_time_percentiles: Percentiles, - pub produce_each_second: Vec, - pub consume_each_second: Vec, - pub backlog_each_second: Vec, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Metric { - Total { - name: String, - compare: f64, - value: String, - goal: Goal, - }, - EachSecond { - name: String, - values: Vec<(f64, String, Goal)>, - }, - LatencyPercentiles { - name: String, - values: Vec, - }, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct LatencyPercentile { - pub quantile: String, - pub value: f64, - pub value_display: String, -} - -impl LatencyPercentile { - pub(crate) fn to_measurement(&self) -> (f64, String, Goal) { - ( - self.value, - self.value_display.clone(), - Goal::SmallerIsBetter, - ) - } -} - -impl Metric { - pub fn name(&self) -> &str { - match self { - Metric::Total { name, .. } => name, - Metric::EachSecond { name, .. } => name, - Metric::LatencyPercentiles { name, .. } => name, - } - } - - pub(crate) fn identifier(&self) -> MetricIdentifier { - match self { - Metric::Total { name, .. } => MetricIdentifier::Total { - name: name.to_owned(), - }, - Metric::EachSecond { name, .. } => MetricIdentifier::EachSecond { - name: name.to_owned(), - }, - Metric::LatencyPercentiles { name, .. } => MetricIdentifier::LatencyPercentiles { - name: name.to_owned(), - }, - } - } - - #[allow(clippy::len_without_is_empty)] - pub(crate) fn len(&self) -> usize { - match self { - Metric::Total { .. } => 1, - Metric::EachSecond { values, .. } => values.len(), - Metric::LatencyPercentiles { values, .. } => values.len(), - } - } -} - -#[derive(PartialEq)] -pub enum MetricIdentifier { - Total { name: String }, - EachSecond { name: String }, - LatencyPercentiles { name: String }, -} - -fn error_message_insertion(messages: &mut Vec, new_message: String) { - if !messages.contains(&new_message) { - if messages.len() <= 5 { - messages.push(new_message); - } else if messages.len() == 6 { - messages.push("more than 5 unique error messages encountered, most likely they are actually small variants of the the same error. Only the first 5 error messages have been logged".to_owned()); - } - } -} - -impl ReportArchive { - fn path(&self) -> PathBuf { - Self::last_run_path().join(self.tags.get_name()) - } - - pub fn load(name: &str) -> Result { - match std::fs::read(Self::last_run_path().join(name)) { - Ok(bytes) => bincode::deserialize(&bytes).map_err(|e| - anyhow!(e).context("The bench archive from the previous run is not a valid archive, maybe the format changed since the last run") - ), - Err(err) if err.kind() == ErrorKind::NotFound => Err(anyhow!("The bench {name:?} does not exist or was not run in the previous run")), - Err(err) => Err(anyhow!("The bench {name:?} encountered a file read error {err:?}")) - } - } - - pub fn load_baseline(name: &str) -> Result> { - match std::fs::read(Self::baseline_path().join(name)) { - Ok(bytes) => bincode::deserialize(&bytes) - .map_err(|e| - anyhow!(e).context("The bench archive from the baseline is not a valid archive, maybe the format changed since the baseline was set") - ) - .map(Some), - Err(err) if err.kind() == ErrorKind::NotFound => Ok(None), - Err(err) => Err(anyhow!("The bench {name:?} encountered a file read error {err:?}")) - } - } - - pub fn reports_in_last_run() -> Vec { - let report_dir = Self::last_run_path(); - std::fs::create_dir_all(&report_dir).unwrap(); - - let mut reports: Vec = std::fs::read_dir(report_dir) - .unwrap() - .map(|x| { - x.unwrap() - .path() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_owned() - }) - .collect(); - reports.sort(); - reports - } - - pub fn save(&self) { - let path = self.path(); - std::fs::create_dir_all(path.parent().unwrap()).unwrap(); - std::fs::write(&path, bincode::serialize(self).unwrap()) - .map_err(|e| panic!("Failed to write to {path:?} {e}")) - .unwrap() - } - - pub(crate) fn clear_last_run() { - let path = Self::last_run_path(); - if path.exists() { - // Just an extra sanity check that we truly are deleting a last_run directory - assert_eq!(path.file_name().unwrap(), "last_run"); - std::fs::remove_dir_all(path).unwrap(); - } - } - - pub fn set_baseline() { - Self::clear_baseline(); - - let last_run_path = Self::last_run_path(); - let baseline_path = Self::baseline_path(); - if last_run_path.exists() { - copy_dir::copy_dir(last_run_path, baseline_path).unwrap(); - } - } - - pub fn clear_baseline() { - let path = Self::baseline_path(); - if path.exists() { - // Just an extra sanity check that we truly are deleting a baseline directory - assert_eq!(path.file_name().unwrap(), "baseline"); - std::fs::remove_dir_all(path).unwrap(); - } - } - - pub fn last_run_path() -> PathBuf { - let path = windsock_path().join("last_run"); - std::fs::create_dir_all(&path).unwrap(); - path - } - - pub fn baseline_path() -> PathBuf { - windsock_path().join("baseline") - } -} - -pub(crate) async fn report_builder( - tags: Tags, - mut rx: UnboundedReceiver, - requested_ops: Option, - running_in_release: bool, -) -> ReportArchive { - let mut external_report = None; - let mut finished_in = None; - let mut started = None; - let mut pubsub_report = None; - let mut operations_report = None; - let mut operation_times = vec![]; - let mut produce_times = vec![]; - let mut total_operation_time = Duration::from_secs(0); - let mut total_produce_time = Duration::from_secs(0); - let mut error_messages = vec![]; - let mut info_messages = vec![]; - - while let Some(report) = rx.recv().await { - match report { - Report::Start => { - started = Some(OffsetDateTime::now_utc()); - } - Report::AddInfoMessage(message) => info_messages.push(message), - Report::QueryCompletedIn(duration) => { - let report = operations_report.get_or_insert_with(OperationsReport::default); - if started.is_some() { - report.total += 1; - total_operation_time += duration; - operation_times.push(duration); - match report.total_each_second.last_mut() { - Some(last) => *last += 1, - None => report.total_each_second.push(0), - } - } - } - Report::QueryErrored { - completed_in, - message, - } => { - let report = operations_report.get_or_insert_with(OperationsReport::default); - if started.is_some() { - error_message_insertion(&mut error_messages, message); - report.total_errors += 1; - total_operation_time += completed_in; - } - } - Report::ProduceCompletedIn(duration) => { - let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started.is_some() { - report.total_backlog += 1; - report.total_produce += 1; - total_produce_time += duration; - produce_times.push(duration); - match report.produce_each_second.last_mut() { - Some(last) => *last += 1, - None => report.produce_each_second.push(0), - } - } - } - Report::ProduceErrored { - completed_in, - message, - } => { - let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started.is_some() { - error_message_insertion(&mut error_messages, message); - report.total_produce_error += 1; - total_produce_time += completed_in; - } - } - Report::ConsumeCompleted => { - let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started.is_some() { - report.total_backlog -= 1; - report.total_consume += 1; - match report.consume_each_second.last_mut() { - Some(last) => *last += 1, - None => report.consume_each_second.push(0), - } - } - } - Report::ConsumeErrored { message } => { - let report = pubsub_report.get_or_insert_with(PubSubReport::default); - if started.is_some() { - error_message_insertion(&mut error_messages, message); - report.total_consume_error += 1; - } - } - Report::SecondPassed(duration) => { - assert!( - duration >= Duration::from_secs(1) && duration < Duration::from_millis(1050), - "Expected duration to be within 50ms of a second but was {duration:?}" - ); - if let Some(report) = operations_report.as_mut() { - report.total_each_second.push(0); - } - if let Some(report) = pubsub_report.as_mut() { - report.produce_each_second.push(0); - report.consume_each_second.push(0); - report.backlog_each_second.push(report.total_backlog); - } - } - Report::FinishedIn(duration) => { - if started.is_none() { - panic!("The bench never returned Report::Start") - } - finished_in = Some(duration); - // immediately drop rx so the benchmarks tasks stop trying to bench, logic doesnt rely on this it just saves resources - std::mem::drop(rx); - break; - } - Report::ExternalBenchmark(report) => { - // immediately drop rx so the benchmarks tasks stop trying to bench, logic doesnt rely on this it just saves resources - std::mem::drop(rx); - - external_report = Some(report); - break; - } - } - } - - if let Some(external_report) = external_report { - started = Some(external_report.bench_started_at); - operations_report = external_report.operations_report; - pubsub_report = external_report.pubsub_report; - error_messages = external_report.error_messages; - } else { - let finished_in = match finished_in { - Some(x) => x, - None => panic!("The bench never returned Report::FinishedIn(..)"), - }; - - if let Some(report) = operations_report.as_mut() { - report.requested_operations_per_second = requested_ops; - report.mean_time = mean_time(&operation_times, total_operation_time); - report.total_operations_per_second = calculate_ops(report.total, finished_in); - report.total_errors_per_second = calculate_ops(report.total_errors, finished_in); - report.time_percentiles = calculate_percentiles(operation_times); - - // This is not a complete result so discard it. - report.total_each_second.pop(); - } - - if let Some(report) = pubsub_report.as_mut() { - report.requested_produce_per_second = requested_ops; - report.produce_mean_time = mean_time(&produce_times, total_produce_time); - report.produce_per_second = calculate_ops(report.total_produce, finished_in); - report.produce_errors_per_second = - calculate_ops(report.total_produce_error, finished_in); - report.consume_per_second = calculate_ops(report.total_consume, finished_in); - report.consume_errors_per_second = - calculate_ops(report.total_consume_error, finished_in); - report.produce_time_percentiles = calculate_percentiles(produce_times); - - // This is not a complete result so discard it. - report.produce_each_second.pop(); - report.consume_each_second.pop(); - } - } - - let archive = ReportArchive { - bench_started_at: started.unwrap(), - running_in_release, - tags, - pubsub_report, - error_messages, - info_messages, - operations_report, - metrics: vec![], - }; - archive.save(); - archive -} - -fn mean_time(times: &[Duration], total_time: Duration) -> Duration { - if !times.is_empty() { - total_time / times.len() as u32 - } else { - Duration::from_secs(0) - } -} - -fn calculate_ops(total: u64, finished_in: Duration) -> u32 { - (total as u128 / (finished_in.as_nanos() / 1_000_000_000)) as u32 -} - -fn calculate_percentiles(mut times: Vec) -> Percentiles { - let mut percentiles = [Duration::ZERO; Percentile::COUNT]; - times.sort(); - if !times.is_empty() { - for (i, p) in Percentile::iter().enumerate() { - let percentile_index = (p.value() * times.len() as f64) as usize; - // Need to cap at last index, otherwise the MAX percentile will overflow by 1 - let index = percentile_index.min(times.len() - 1); - percentiles[i] = times[index]; - } - } - percentiles -} diff --git a/windsock/src/tables.rs b/windsock/src/tables.rs deleted file mode 100644 index 21faca8c7..000000000 --- a/windsock/src/tables.rs +++ /dev/null @@ -1,915 +0,0 @@ -use crate::{ - bench::Tags, - filter::Filter, - report::{MetricIdentifier, Percentile, ReportArchive}, - Metric, -}; -use anyhow::{Context, Result}; -use console::{pad_str, pad_str_with, style, Alignment}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, time::Duration}; -use strum::IntoEnumIterator; - -pub(crate) struct ReportColumn { - pub(crate) baseline: Option, - pub(crate) current: ReportArchive, -} - -impl ReportColumn { - pub fn load(name: &str) -> Result { - Ok(ReportColumn { - baseline: None, - current: ReportArchive::load(name)?, - }) - } - - pub fn load_with_baseline(name: &str) -> Result { - Ok(ReportColumn { - baseline: ReportArchive::load_baseline(name)?, - current: ReportArchive::load(name)?, - }) - } -} - -pub fn compare_by_name(names: &str) -> Result<()> { - let columns: Result> = - names.split_whitespace().map(ReportColumn::load).collect(); - let mut columns = columns?; - - let baseline = columns.first().map(|x| x.current.clone()); - for column in &mut columns.iter_mut().skip(1) { - column.baseline = baseline.clone(); - } - - display_compare_table(&columns); - Ok(()) -} - -pub fn compare_by_tags(arg: &str) -> Result<()> { - let mut split = arg.split_whitespace(); - let base_name = split.next().unwrap().to_owned(); - let base = ReportArchive::load(&base_name)?; - - let tag_args: Vec<_> = split.collect(); - let tag_args = tag_args.join(" "); - - let filter = Filter::from_query(&tag_args) - .with_context(|| format!("Failed to parse tag filter from {:?}", tag_args))?; - let archives: Result> = ReportArchive::reports_in_last_run() - .iter() - .filter(|name| **name != base_name && filter.matches(&Tags::from_name(name))) - .map(|x| { - Ok(ReportColumn { - baseline: Some(base.clone()), - current: ReportArchive::load(x)?, - }) - }) - .collect(); - let mut archives = archives?; - - archives.insert( - 0, - ReportColumn { - baseline: None, - current: base, - }, - ); - - display_compare_table(&archives); - - Ok(()) -} - -pub fn results(ignore_baseline: bool, filter: &str) -> Result<()> { - let filter = Filter::from_query(filter) - .with_context(|| format!("Failed to parse tag filter from {:?}", filter))?; - let archives: Result> = ReportArchive::reports_in_last_run() - .iter() - .filter(|name| filter.matches(&Tags::from_name(name))) - .map(|x| { - if ignore_baseline { - ReportColumn::load(x) - } else { - ReportColumn::load_with_baseline(x) - } - }) - .collect(); - let archives = archives?; - if archives.iter().any(|x| x.baseline.is_some()) { - // If there are any baselines then compare against baselines - display_baseline_compare_table(&archives); - } else { - // Otherwise display just results without any comparison - display_results_table(&archives); - } - - Ok(()) -} - -pub(crate) fn display_baseline_compare_table(reports: &[ReportColumn]) { - if reports.is_empty() { - println!("Need at least one report to display baseline comparison"); - return; - } - - base(reports, "Comparison against baseline"); -} - -pub(crate) fn display_compare_table(reports: &[ReportColumn]) { - if reports.len() < 2 { - println!("Need at least two reports to display a comparison against first column"); - return; - } - - base(reports, "Comparison against first column"); -} - -pub(crate) fn display_results_table(reports: &[ReportColumn]) { - if reports.is_empty() { - println!("Need at least one report to display results"); - return; - } - - base(reports, "Results"); -} - -fn base(reports: &[ReportColumn], table_type: &str) { - // if the user has set CARGO_TERM_COLOR to force cargo to use colors then they probably want us to use colors too - if std::env::var("CARGO_TERM_COLOR") - .map(|x| x.to_lowercase() == "always") - .unwrap_or(false) - { - console::set_colors_enabled(true); - } - - let mut intersection = reports[0].current.tags.clone(); - for report in reports { - intersection = intersection.intersection(&report.current.tags); - } - - let mut rows = vec![]; - rows.push(Row::Heading(format!( - "{} for {}", - table_type, - intersection.get_name() - ))); - - let intersection_keys = intersection.keys(); - let mut nonintersecting_keys: Vec = reports - .iter() - .fold(HashSet::new(), |acc, x| { - acc.union( - &x.current - .tags - .keys() - .difference(&intersection_keys) - .cloned() - .collect(), - ) - .cloned() - .collect() - }) - .into_iter() - .collect(); - nonintersecting_keys.sort(); - if !nonintersecting_keys.is_empty() { - rows.push(Row::Heading("Unique Tags".to_owned())); - } - for key in nonintersecting_keys { - rows.push(Row::ColumnNames { - names: reports - .iter() - .map(|x| x.current.tags.0.get(&key).cloned().unwrap_or("".to_owned())) - .collect(), - legend: key, - }); - } - - if reports - .iter() - .any(|x| x.current.operations_report.is_some()) - { - rows.push(Row::Heading("Opns (Operations)".to_owned())); - rows.push(Row::measurements(reports, "Total Opns", |report| { - report.operations_report.as_ref().map(|report| { - ( - report.total as f64, - report.total.to_string(), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements(reports, "Total Errors", |report| { - report.operations_report.as_ref().map(|report| { - ( - report.total_errors as f64, - report.total_errors.to_string(), - Goal::SmallerIsBetter, - ) - }) - })); - rows.push(Row::measurements( - reports, - "Target Opns Per Sec", - |report| { - report.operations_report.as_ref().map(|report| { - ( - report - .requested_operations_per_second - .map(|x| x as f64) - .unwrap_or(f64::INFINITY), - report - .requested_operations_per_second - .map(|x| x.to_string()) - .unwrap_or("MAX".to_owned()), - Goal::BiggerIsBetter, - ) - }) - }, - )); - rows.push(Row::measurements(reports, "Opns Per Sec", |report| { - report.operations_report.as_ref().map(|report| { - ( - report.total_operations_per_second as f64, - format!("{:.0}", report.total_operations_per_second), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements(reports, "Errors Per Sec", |report| { - report.operations_report.as_ref().map(|report| { - ( - report.total_errors_per_second as f64, - format!("{:.0}", report.total_errors_per_second), - Goal::SmallerIsBetter, - ) - }) - })); - - rows.push(Row::measurements(reports, "Opn Time Mean", |report| { - report.operations_report.as_ref().map(|report| { - ( - report.mean_time.as_secs_f64(), - duration_ms(report.mean_time), - Goal::SmallerIsBetter, - ) - }) - })); - - rows.push(Row::Heading("Opn Time Percentiles".to_owned())); - for (i, p) in Percentile::iter().enumerate() { - rows.push(Row::measurements(reports, p.name(), |report| { - report.operations_report.as_ref().map(|report| { - ( - report.time_percentiles[i].as_secs_f64(), - duration_ms(report.time_percentiles[i]), - Goal::SmallerIsBetter, - ) - }) - })); - } - - rows.push(Row::Heading("Opns Each Second".to_owned())); - for i in 0..reports - .iter() - .map(|x| { - x.current - .operations_report - .as_ref() - .map(|report| report.total_each_second.len()) - .unwrap_or(0) - }) - .max() - .unwrap() - { - rows.push(Row::measurements(reports, &i.to_string(), |report| { - report.operations_report.as_ref().and_then(|report| { - report - .total_each_second - .get(i) - .map(|value| (*value as f64, value.to_string(), Goal::BiggerIsBetter)) - }) - })); - } - } - - if reports.iter().any(|x| x.current.pubsub_report.is_some()) { - rows.push(Row::Heading("Produce/Consume".to_owned())); - rows.push(Row::measurements(reports, "Total Produce", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.total_produce as f64, - report.total_produce.to_string(), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements( - reports, - "Errors Total Produce", - |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.total_produce_error as f64, - report.total_produce_error.to_string(), - Goal::SmallerIsBetter, - ) - }) - }, - )); - rows.push(Row::measurements(reports, "Total Consume", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.total_consume as f64, - report.total_consume.to_string(), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements( - reports, - "Errors Total Consume", - |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.total_consume_error as f64, - report.total_consume_error.to_string(), - Goal::SmallerIsBetter, - ) - }) - }, - )); - rows.push(Row::measurements(reports, "Total Backlog", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.total_backlog as f64, - report.total_backlog.to_string(), - Goal::SmallerIsBetter, - ) - }) - })); - - rows.push(Row::measurements( - reports, - "Target Produce Per Sec", - |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report - .requested_produce_per_second - .map(|x| x as f64) - .unwrap_or(f64::INFINITY), - report - .requested_produce_per_second - .map(|x| x.to_string()) - .unwrap_or("MAX".to_owned()), - Goal::BiggerIsBetter, - ) - }) - }, - )); - rows.push(Row::measurements(reports, "Produce Per Sec", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.produce_per_second as f64, - format!("{:.0}", report.produce_per_second), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements( - reports, - "Errors Produce Per Sec", - |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.produce_errors_per_second as f64, - format!("{:.0}", report.produce_errors_per_second), - Goal::SmallerIsBetter, - ) - }) - }, - )); - rows.push(Row::measurements(reports, "Consume Per Sec", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.consume_per_second as f64, - format!("{:.0}", report.consume_per_second), - Goal::BiggerIsBetter, - ) - }) - })); - rows.push(Row::measurements( - reports, - "Errors Consume Per Sec", - |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.consume_errors_per_second as f64, - format!("{:.0}", report.consume_errors_per_second), - Goal::SmallerIsBetter, - ) - }) - }, - )); - - rows.push(Row::measurements(reports, "Produce Time Mean", |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.produce_mean_time.as_secs_f64(), - duration_ms(report.produce_mean_time), - Goal::SmallerIsBetter, - ) - }) - })); - - rows.push(Row::Heading("Produce Time Percentiles".to_owned())); - for (i, p) in Percentile::iter().enumerate() { - rows.push(Row::measurements(reports, p.name(), |report| { - report.pubsub_report.as_ref().map(|report| { - ( - report.produce_time_percentiles[i].as_secs_f64(), - duration_ms(report.produce_time_percentiles[i]), - Goal::SmallerIsBetter, - ) - }) - })); - } - - rows.push(Row::Heading("Produce Each Second".to_owned())); - for i in 0..reports - .iter() - .map(|x| { - x.current - .pubsub_report - .as_ref() - .map(|report| report.produce_each_second.len()) - .unwrap_or(0) - }) - .max() - .unwrap() - { - rows.push(Row::measurements(reports, &i.to_string(), |report| { - report.pubsub_report.as_ref().and_then(|report| { - report - .produce_each_second - .get(i) - .map(|value| (*value as f64, value.to_string(), Goal::BiggerIsBetter)) - }) - })); - } - - rows.push(Row::Heading("Consume Each Second".to_owned())); - for i in 0..reports - .iter() - .map(|x| { - x.current - .pubsub_report - .as_ref() - .map(|report| report.consume_each_second.len()) - .unwrap_or(0) - }) - .max() - .unwrap() - { - rows.push(Row::measurements(reports, &i.to_string(), |report| { - report.pubsub_report.as_ref().and_then(|report| { - report - .consume_each_second - .get(i) - .map(|value| (*value as f64, value.to_string(), Goal::BiggerIsBetter)) - }) - })); - } - - rows.push(Row::Heading("Total Backlog Each Second".to_owned())); - for i in 0..reports - .iter() - .map(|x| { - x.current - .pubsub_report - .as_ref() - .map(|report| report.backlog_each_second.len()) - .unwrap_or(0) - }) - .max() - .unwrap() - { - rows.push(Row::measurements(reports, &i.to_string(), |report| { - report.pubsub_report.as_ref().and_then(|report| { - report - .backlog_each_second - .get(i) - .map(|value| (*value as f64, value.to_string(), Goal::SmallerIsBetter)) - }) - })); - } - } - - let mut metrics_to_display = vec![]; - for report in reports { - for metric in &report.current.metrics { - if !metrics_to_display.contains(&metric.identifier()) { - metrics_to_display.push(metric.identifier()) - } - } - } - for metric_identifier in metrics_to_display { - match &metric_identifier { - MetricIdentifier::Total { name } => { - rows.push(Row::measurements(reports, name, |report| { - report - .metrics - .iter() - .find(|metric| metric.identifier() == metric_identifier) - .map(|metric| match metric { - Metric::Total { - compare, - value, - goal, - .. - } => (*compare, value.to_owned(), *goal), - _ => unreachable!(), - }) - })); - } - MetricIdentifier::EachSecond { name } => { - rows.push(Row::Heading(format!("{name} Each Second"))); - for i in 0..reports - .iter() - .map(|x| { - x.current - .metrics - .iter() - .find(|x| x.identifier() == metric_identifier) - .map(|metric| metric.len()) - .unwrap_or(0) - }) - .max() - .unwrap() - { - rows.push(Row::measurements(reports, &i.to_string(), |report| { - report - .metrics - .iter() - .find(|x| x.identifier() == metric_identifier) - .and_then(|metric| match metric { - Metric::EachSecond { values, .. } => values.get(i).cloned(), - _ => unreachable!(), - }) - })); - } - } - MetricIdentifier::LatencyPercentiles { name } => { - rows.push(Row::Heading(format!("{name} Percentiles"))); - for (i, largest_col) in reports - .iter() - .map(|x| { - x.current - .metrics - .iter() - .find(|x| x.identifier() == metric_identifier) - .map(|metric| match metric { - Metric::LatencyPercentiles { values, .. } => values.clone(), - _ => unreachable!(), - }) - .unwrap_or(vec![]) - }) - .max_by_key(|x| x.len()) - .unwrap() - .into_iter() - .enumerate() - { - rows.push(Row::measurements( - reports, - &largest_col.quantile, - |report| { - report - .metrics - .iter() - .find(|x| x.identifier() == metric_identifier) - .and_then(|metric| match metric { - Metric::LatencyPercentiles { values, .. } => { - values.get(i).map(|x| x.to_measurement()) - } - _ => unreachable!(), - }) - }, - )); - } - } - } - } - - // the width of the legend column - let legend_width: usize = rows - .iter() - .skip(1) // skip the main heading because its big and its alignment doesnt matter - .map(|x| match x { - Row::Heading(heading) => heading.len(), - Row::ColumnNames { legend, .. } => legend.len(), - Row::Measurements { legend, .. } => legend.len(), - }) - .max() - .unwrap_or(10); - // the width of the comparison component of each column - let comparison_widths: Vec = reports - .iter() - .enumerate() - .map(|(i, _)| { - rows.iter() - .map(|x| match x { - Row::Heading(_) => 0, - Row::ColumnNames { .. } => 0, - Row::Measurements { measurements, .. } => measurements[i].comparison.len() + 1, // + 1 ensures we get separation from the previous column - }) - .max() - .unwrap() - }) - .collect(); - // the width of each entire column - let column_widths: Vec = reports - .iter() - .enumerate() - .map(|(i, _)| { - rows.iter() - .map(|x| match x { - Row::Heading(_) => 0, // Ignore these - Row::ColumnNames { names, .. } => names[i].len() + 1, // + 1 ensures we get separation from the previous column - Row::Measurements { measurements, .. } => { - measurements[i].value.len() + 1 // ensures we get seperation from the previous column - + comparison_widths[i] - } - }) - .max() - .unwrap() - }) - .collect(); - let total_width = legend_width + column_widths.iter().sum::(); - - for row in rows { - match row { - Row::Heading(heading) => { - println!( - "{}", - style(pad_str_with( - &format!("{} ", heading), - total_width, - Alignment::Left, - None, - '═' - )) - .yellow() - .bold() - ) - } - Row::ColumnNames { legend, names } => { - print!( - "{}", - style(pad_str(&legend, legend_width, Alignment::Right, None)) - .yellow() - .bold() - ); - for (i, name) in names.into_iter().enumerate() { - print!( - " {}", - style(pad_str_with( - &name, - column_widths[i] - 1, - Alignment::Center, - None, - '─', - )) - ) - } - println!() - } - Row::Measurements { - legend, - measurements, - } => { - print!( - "{}", - style(pad_str(&legend, legend_width, Alignment::Right, None)) - .yellow() - .bold() - ); - for (i, measurement) in measurements.into_iter().enumerate() { - let colorer = match measurement.color { - Color::Good => |x| style(x).green(), - Color::Bad => |x| style(x).red(), - Color::Neutral => |x| style(x).dim(), - }; - let contents = format!( - "{}{}", - measurement.value, - colorer(pad_str( - &measurement.comparison, - comparison_widths[i], - Alignment::Right, - None - )), - ); - print!( - "{}", - pad_str(&contents, column_widths[i], Alignment::Right, None), - ); - } - println!() - } - } - } - - for report in reports { - if !report.current.error_messages.is_empty() { - let error = format!( - "Bench encountered errors: {}", - report.current.tags.get_name() - ); - println!("{}", style(error).red().bold()); - for (i, message) in report.current.error_messages.iter().enumerate() { - println!(" {i}. {message}"); - } - } - - if let Some(baseline) = &report.baseline { - if !baseline.error_messages.is_empty() { - let error = format!( - "Bench baseline encountered errors: {}", - report.current.tags.get_name() - ); - println!("{}", style(error).red().bold()); - for (i, message) in report.current.error_messages.iter().enumerate() { - println!(" {i}. {message}"); - } - } - } - } - - let errors_found = reports.iter().any(|x| { - !x.current.error_messages.is_empty() - || x.baseline - .as_ref() - .map(|x| !x.error_messages.is_empty()) - .unwrap_or(false) - }); - let not_running_in_release_found = reports.iter().any(|x| { - !x.current.running_in_release - || x.baseline - .as_ref() - .map(|x| !x.running_in_release) - .unwrap_or(false) - }); - let info_found = reports.iter().any(|x| { - !x.current.info_messages.is_empty() - || x.baseline - .as_ref() - .map(|x| !x.info_messages.is_empty()) - .unwrap_or(false) - }); - - if errors_found && not_running_in_release_found { - // ensure these two sections are kept apart - println!(); - } - - for report in reports { - if !report.current.running_in_release { - let error = format!( - "Bench results invalid! Bench compiled with non-release profile: {}", - report.current.tags.get_name() - ); - println!("{}", style(error).red().bold()); - } - - if let Some(baseline) = &report.baseline { - if !baseline.running_in_release { - let error = format!( - "Baseline bench results invalid! Baseline bench compiled with non-release profile: {}", - baseline.tags.get_name() - ); - println!("{}", style(error).red().bold()); - } - } - } - - #[allow(clippy::nonminimal_bool)] - if info_found - && (not_running_in_release_found || (errors_found && !not_running_in_release_found)) - { - // ensure these two sections are kept apart - println!(); - } - - for report in reports { - if !report.current.info_messages.is_empty() { - let error = format!("notes for {}", report.current.tags.get_name()); - println!("{}", style(error).blue().bold()); - for (i, message) in report.current.info_messages.iter().enumerate() { - println!(" {i}. {message}"); - } - } - - if let Some(baseline) = &report.baseline { - if !baseline.info_messages.is_empty() { - let error = format!("notes for baseline {}", report.current.tags.get_name()); - println!("{}", style(error).blue().bold()); - for (i, message) in report.current.info_messages.iter().enumerate() { - println!(" {i}. {message}"); - } - } - } - } -} - -fn duration_ms(duration: Duration) -> String { - format!("{:.3}ms", duration.as_micros() as f32 / 1000.0) -} - -enum Row { - Heading(String), - ColumnNames { - legend: String, - names: Vec, - }, - Measurements { - legend: String, - measurements: Vec, - }, -} - -struct Measurement { - value: String, - comparison: String, - color: Color, -} - -#[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub enum Goal { - BiggerIsBetter, - SmallerIsBetter, - None, -} - -enum Color { - Good, - Bad, - Neutral, -} - -impl Row { - fn measurements Option<(f64, String, Goal)>>( - reports: &[ReportColumn], - legend: &str, - f: F, - ) -> Row { - let legend = legend.to_owned(); - let measurements = reports - .iter() - .map(|x| { - let (value, comparison, comparison_raw, goal) = - if let Some((compare, value, goal)) = f(&x.current) { - if let Some((base, _, _)) = x.baseline.as_ref().and_then(&f) { - let comparison_raw: f64 = (compare - base) / base * 100.0; - let comparison = if comparison_raw.is_nan() { - "-".into() - } else { - format!("{:+.1}%", comparison_raw) - }; - - (value, comparison, comparison_raw, goal) - } else { - (value, "".to_owned(), 0.0, Goal::BiggerIsBetter) - } - } else { - ("".to_owned(), "".to_owned(), 0.0, Goal::BiggerIsBetter) - }; - - let color = if comparison_raw > 5.0 { - if let Goal::BiggerIsBetter = goal { - Color::Good - } else { - Color::Bad - } - } else if comparison_raw < -5.0 { - if let Goal::SmallerIsBetter = goal { - Color::Good - } else { - Color::Bad - } - } else { - Color::Neutral - }; - Measurement { - value, - comparison, - color, - } - }) - .collect(); - Row::Measurements { - legend, - measurements, - } - } -}