diff --git a/Cargo.lock b/Cargo.lock index 34e5527a2ad04..235b1f4035963 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7395,6 +7395,7 @@ dependencies = [ "tempfile", "test-utils", "tokio", + "tokio-util 0.7.4", "tracing", "tracing-subscriber 0.3.15", "workspace-hack 0.1.0", diff --git a/crates/sui-benchmark/Cargo.toml b/crates/sui-benchmark/Cargo.toml index ceb485f63847c..ad72bf1110201 100644 --- a/crates/sui-benchmark/Cargo.toml +++ b/crates/sui-benchmark/Cargo.toml @@ -35,6 +35,7 @@ duration-str = "0.4.0" hdrhistogram = "7.5.1" comfy-table = "6.1.0" bcs = "0.1.3" +tokio-util = "0.7.4" sui-core = { path = "../sui-core" } sui-config = { path = "../sui-config" } sui-types = { path = "../sui-types" } diff --git a/crates/sui-benchmark/src/drivers/bench_driver.rs b/crates/sui-benchmark/src/drivers/bench_driver.rs index 9784f8b71c2bc..4cc44875db336 100644 --- a/crates/sui-benchmark/src/drivers/bench_driver.rs +++ b/crates/sui-benchmark/src/drivers/bench_driver.rs @@ -18,6 +18,7 @@ use prometheus::IntCounterVec; use prometheus::Registry; use sui_core::authority_aggregator::AuthorityAggregator; use tokio::sync::OnceCell; +use tokio_util::sync::CancellationToken; use crate::drivers::driver::Driver; use crate::drivers::HistogramWrapper; @@ -143,6 +144,7 @@ pub struct BenchWorker { pub struct BenchDriver { pub stat_collection_interval: u64, pub start_time: Instant, + pub token: CancellationToken, } impl BenchDriver { @@ -150,8 +152,12 @@ impl BenchDriver { BenchDriver { stat_collection_interval, start_time: Instant::now(), + token: CancellationToken::new(), } } + pub fn terminate(&self) { + self.token.cancel() + } pub fn update_progress( start_time: Instant, interval: Interval, @@ -242,6 +248,7 @@ impl Driver for BenchDriver { }); for (i, worker) in bench_workers.into_iter().enumerate() { let committee = committee.clone(); + let cloned_token = self.token.clone(); let request_delay_micros = 1_000_000 / worker.target_qps; let mut free_pool = worker.payload; let progress = progress.clone(); @@ -272,7 +279,7 @@ impl Driver for BenchDriver { let mut stat_start_time: Instant = Instant::now(); loop { tokio::select! { - _ = tokio::signal::ctrl_c() => { + _ = cloned_token.cancelled() => { break; } _ = stat_interval.tick() => { @@ -515,7 +522,14 @@ impl Driver for BenchDriver { benchmark_stat }); drop(tx); - let _res: Vec<_> = try_join_all(tasks).await.unwrap().into_iter().collect(); + let all_tasks = try_join_all(tasks); + let _res = tokio::select! { + _ = tokio::signal::ctrl_c() => { + self.terminate(); + vec![] + } + res = all_tasks => res.unwrap().into_iter().collect() + }; let benchmark_stat = stat_task.await.unwrap(); Ok(benchmark_stat) }