diff --git a/Cargo.lock b/Cargo.lock index 1123fc6693227..f5e8d842ffd67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6478,6 +6478,7 @@ dependencies = [ "madsim-rdkafka", "madsim-tokio", "paste", + "pin-project", "pretty_assertions", "rand 0.8.5", "risingwave_common", diff --git a/src/tests/simulation/Cargo.toml b/src/tests/simulation/Cargo.toml index 55e1c6ba4fd73..6fd353dfb56c5 100644 --- a/src/tests/simulation/Cargo.toml +++ b/src/tests/simulation/Cargo.toml @@ -23,6 +23,7 @@ itertools = "0.10" lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" } madsim = "0.2.17" paste = "1" +pin-project = "1.0" pretty_assertions = "1" rand = "0.8" rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build"] } diff --git a/src/tests/simulation/src/slt.rs b/src/tests/simulation/src/slt.rs index d85d1d6bf7b3e..ec965eedfbfac 100644 --- a/src/tests/simulation/src/slt.rs +++ b/src/tests/simulation/src/slt.rs @@ -21,6 +21,7 @@ use sqllogictest::ParallelTestError; use crate::client::RisingWave; use crate::cluster::{Cluster, KillOpts}; +use crate::utils::TimedExt; fn is_create_table_as(sql: &str) -> bool { let parts: Vec = sql @@ -112,7 +113,13 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { // For normal records. if !kill { - match tester.run_async(record).await { + match tester + .run_async(record.clone()) + .timed(|_res, elapsed| { + println!("Record {:?} finished in {:?}", record, elapsed) + }) + .await + { Ok(_) => continue, Err(e) => panic!("{}", e), } @@ -128,7 +135,13 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { if cmd.ignore_kill() { for i in 0usize.. { let delay = Duration::from_secs(1 << i); - if let Err(err) = tester.run_async(record.clone()).await { + if let Err(err) = tester + .run_async(record.clone()) + .timed(|_res, elapsed| { + println!("Record {:?} finished in {:?}", record, elapsed) + }) + .await + { // cluster could be still under recovering if killed before, retry if // meets `no reader for dml in table with id {}`. let should_retry = @@ -162,7 +175,13 @@ pub async fn run_slt_task(cluster: Arc, glob: &str, opts: &KillOpts) { // retry up to 5 times until it succeed for i in 0usize.. { let delay = Duration::from_secs(1 << i); - match tester.run_async(record.clone()).await { + match tester + .run_async(record.clone()) + .timed(|_res, elapsed| { + println!("Record {:?} finished in {:?}", record, elapsed) + }) + .await + { Ok(_) => break, // allow 'table exists' error when retry CREATE statement Err(e) diff --git a/src/tests/simulation/src/utils.rs b/src/tests/simulation/src/utils/assert_result.rs similarity index 100% rename from src/tests/simulation/src/utils.rs rename to src/tests/simulation/src/utils/assert_result.rs diff --git a/src/tests/simulation/src/utils/mod.rs b/src/tests/simulation/src/utils/mod.rs new file mode 100644 index 0000000000000..b3b726467e613 --- /dev/null +++ b/src/tests/simulation/src/utils/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod assert_result; +pub use assert_result::*; + +mod timed_future; +pub use timed_future::*; diff --git a/src/tests/simulation/src/utils/timed_future.rs b/src/tests/simulation/src/utils/timed_future.rs new file mode 100644 index 0000000000000..b9003552a0ac8 --- /dev/null +++ b/src/tests/simulation/src/utils/timed_future.rs @@ -0,0 +1,76 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; +use std::pin::{pin, Pin}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use pin_project::pin_project; + +/// Inspired by https://stackoverflow.com/a/59935743/2990323 +/// A wrapper around a Future which adds timing data. +#[pin_project] +pub struct Timed +where + Fut: Future, + F: Fn(&Fut::Output, Duration), +{ + #[pin] + inner: Fut, + f: F, + start: Option, +} + +impl Future for Timed +where + Fut: Future, + F: Fn(&Fut::Output, Duration), +{ + type Output = Fut::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + let start = this.start.get_or_insert_with(Instant::now); + + match this.inner.poll(cx) { + // If the inner future is still pending, this wrapper is still pending. + Poll::Pending => Poll::Pending, + + // If the inner future is done, measure the elapsed time and finish this wrapper future. + Poll::Ready(v) => { + let elapsed = start.elapsed(); + (this.f)(&v, elapsed); + + Poll::Ready(v) + } + } + } +} + +pub trait TimedExt: Sized + Future { + fn timed(self, f: F) -> Timed + where + F: Fn(&Self::Output, Duration), + { + Timed { + inner: self, + f, + start: None, + } + } +} + +// All futures can use the `.timed` method defined above +impl TimedExt for F {}