Skip to content

Commit

Permalink
chore: Add time for each slt record in simulation test (risingwavelab…
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 authored Mar 23, 2023
1 parent ea72ed7 commit 3e8ce31
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ itertools = "0.10"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" }
madsim = "0.2.17"
paste = "1"
pin-project = "1.0"
pretty_assertions = "1"
rand = "0.8"
rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build"] }
Expand Down
25 changes: 22 additions & 3 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sqllogictest::ParallelTestError;

use crate::client::RisingWave;
use crate::cluster::{Cluster, KillOpts};
use crate::utils::TimedExt;

fn is_create_table_as(sql: &str) -> bool {
let parts: Vec<String> = sql
Expand Down Expand Up @@ -112,7 +113,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {

// For normal records.
if !kill {
match tester.run_async(record).await {
match tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
Ok(_) => continue,
Err(e) => panic!("{}", e),
}
Expand All @@ -128,7 +135,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
if cmd.ignore_kill() {
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
if let Err(err) = tester.run_async(record.clone()).await {
if let Err(err) = tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
// cluster could be still under recovering if killed before, retry if
// meets `no reader for dml in table with id {}`.
let should_retry =
Expand Down Expand Up @@ -162,7 +175,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
// retry up to 5 times until it succeed
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
match tester.run_async(record.clone()).await {
match tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
Ok(_) => break,
// allow 'table exists' error when retry CREATE statement
Err(e)
Expand Down
File renamed without changes.
19 changes: 19 additions & 0 deletions src/tests/simulation/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod assert_result;
pub use assert_result::*;

mod timed_future;
pub use timed_future::*;
76 changes: 76 additions & 0 deletions src/tests/simulation/src/utils/timed_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use pin_project::pin_project;

/// Inspired by https://stackoverflow.com/a/59935743/2990323
/// A wrapper around a Future which adds timing data.
#[pin_project]
pub struct Timed<Fut, F>
where
Fut: Future,
F: Fn(&Fut::Output, Duration),
{
#[pin]
inner: Fut,
f: F,
start: Option<Instant>,
}

impl<Fut, F> Future for Timed<Fut, F>
where
Fut: Future,
F: Fn(&Fut::Output, Duration),
{
type Output = Fut::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let start = this.start.get_or_insert_with(Instant::now);

match this.inner.poll(cx) {
// If the inner future is still pending, this wrapper is still pending.
Poll::Pending => Poll::Pending,

// If the inner future is done, measure the elapsed time and finish this wrapper future.
Poll::Ready(v) => {
let elapsed = start.elapsed();
(this.f)(&v, elapsed);

Poll::Ready(v)
}
}
}
}

pub trait TimedExt: Sized + Future {
fn timed<F>(self, f: F) -> Timed<Self, F>
where
F: Fn(&Self::Output, Duration),
{
Timed {
inner: self,
f,
start: None,
}
}
}

// All futures can use the `.timed` method defined above
impl<F: Future> TimedExt for F {}

0 comments on commit 3e8ce31

Please sign in to comment.