Skip to content

Commit

Permalink
chore(core): Add unit and bench tests for concurrent tasks (#4695)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed Jun 5, 2024
1 parent d4e4163 commit 63ed080
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
16 changes: 12 additions & 4 deletions core/benches/types/concurrent_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
// specific language governing permissions and limitations
// under the License.

use criterion::black_box;
use criterion::BatchSize;
use criterion::Criterion;
use once_cell::sync::Lazy;
use opendal::raw::ConcurrentTasks;
use opendal::Executor;
use std::time::Duration;

pub static TOKIO: Lazy<tokio::runtime::Runtime> =
Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime"));
Expand All @@ -34,15 +34,23 @@ pub fn bench_concurrent_tasks(c: &mut Criterion) {
|| {
ConcurrentTasks::new(Executor::new(), *concurrent, |()| {
Box::pin(async {
black_box(());
tokio::time::sleep(Duration::from_millis(1)).await;
((), Ok(()))
})
})
},
|mut tasks| async move {
let _ = tasks.execute(()).await;
for _ in 0..100 {
let _ = tasks.execute(()).await;
}

loop {
if tasks.next().await.is_none() {
break;
}
}
},
BatchSize::NumIterations(1000),
BatchSize::PerIteration,
)
});
}
Expand Down
44 changes: 44 additions & 0 deletions core/src/raw/futures_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ mod tests {
use futures::future::BoxFuture;
use futures::Stream;
use rand::Rng;
use tokio::time::sleep;

use super::*;

Expand Down Expand Up @@ -506,4 +507,47 @@ mod tests {
assert_eq!(expected, result, "concurrent futures failed: {}", name);
}
}

#[tokio::test]
async fn test_concurrent_tasks() {
let executor = Executor::new();

let mut tasks = ConcurrentTasks::new(executor, 16, |(i, dur)| {
Box::pin(async move {
sleep(dur).await;

// 5% rate to fail.
if rand::thread_rng().gen_range(0..100) > 90 {
return (
(i, dur),
Err(Error::new(ErrorKind::Unexpected, "I'm lucky").set_temporary()),
);
}
((i, dur), Ok(i))
})
});

let mut ans = vec![];

for i in 0..10240 {
// Sleep up to 10ms
let dur = Duration::from_millis(rand::thread_rng().gen_range(0..10));
loop {
let res = tasks.execute((i, dur)).await;
if res.is_ok() {
break;
}
}
}

loop {
match tasks.next().await.transpose() {
Ok(Some(i)) => ans.push(i),
Ok(None) => break,
Err(_) => continue,
}
}

assert_eq!(ans, (0..10240).collect::<Vec<_>>())
}
}
2 changes: 1 addition & 1 deletion core/src/raw/oio/write/block_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where
let Some(result) = self.tasks.next().await.transpose()? else {
break;
};
self.block_ids.push(result)
self.block_ids.push(result);
}

let block_ids = self.block_ids.clone();
Expand Down
4 changes: 2 additions & 2 deletions core/tests/behavior/async_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> {

let mut w = op
.writer_with(&path)
.chunk(5 * 1024 * 1024)
.chunk(4 * 1024 * 1024)
.await?
.into_bytes_sink();
w.send_all(&mut stream).await?;
Expand Down Expand Up @@ -380,7 +380,7 @@ pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> {
}

let path = TEST_FIXTURE.new_file_path();
let size = 5 * 1024 * 1024; // write file with 5 MiB
let size = 8 * 1024 * 1024; // write file with 8 MiB
let content_a = gen_fixed_bytes(size);
let content_b = gen_fixed_bytes(size);
let mut stream = stream::iter(vec![
Expand Down

0 comments on commit 63ed080

Please sign in to comment.