diff --git a/core/benches/types/concurrent_tasks.rs b/core/benches/types/concurrent_tasks.rs index ca1dfeec551..c4acc070153 100644 --- a/core/benches/types/concurrent_tasks.rs +++ b/core/benches/types/concurrent_tasks.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use criterion::black_box; use criterion::BatchSize; use criterion::Criterion; use once_cell::sync::Lazy; use opendal::raw::ConcurrentTasks; use opendal::Executor; +use std::time::Duration; pub static TOKIO: Lazy = Lazy::new(|| tokio::runtime::Runtime::new().expect("build tokio runtime")); @@ -34,15 +34,23 @@ pub fn bench_concurrent_tasks(c: &mut Criterion) { || { ConcurrentTasks::new(Executor::new(), *concurrent, |()| { Box::pin(async { - black_box(()); + tokio::time::sleep(Duration::from_millis(1)).await; ((), Ok(())) }) }) }, |mut tasks| async move { - let _ = tasks.execute(()).await; + for _ in 0..100 { + let _ = tasks.execute(()).await; + } + + loop { + if tasks.next().await.is_none() { + break; + } + } }, - BatchSize::NumIterations(1000), + BatchSize::PerIteration, ) }); } diff --git a/core/src/raw/futures_util.rs b/core/src/raw/futures_util.rs index 9ffac3c2f00..6fe4472d6eb 100644 --- a/core/src/raw/futures_util.rs +++ b/core/src/raw/futures_util.rs @@ -451,6 +451,7 @@ mod tests { use futures::future::BoxFuture; use futures::Stream; use rand::Rng; + use tokio::time::sleep; use super::*; @@ -506,4 +507,47 @@ mod tests { assert_eq!(expected, result, "concurrent futures failed: {}", name); } } + + #[tokio::test] + async fn test_concurrent_tasks() { + let executor = Executor::new(); + + let mut tasks = ConcurrentTasks::new(executor, 16, |(i, dur)| { + Box::pin(async move { + sleep(dur).await; + + // 5% rate to fail. + if rand::thread_rng().gen_range(0..100) > 90 { + return ( + (i, dur), + Err(Error::new(ErrorKind::Unexpected, "I'm lucky").set_temporary()), + ); + } + ((i, dur), Ok(i)) + }) + }); + + let mut ans = vec![]; + + for i in 0..10240 { + // Sleep up to 10ms + let dur = Duration::from_millis(rand::thread_rng().gen_range(0..10)); + loop { + let res = tasks.execute((i, dur)).await; + if res.is_ok() { + break; + } + } + } + + loop { + match tasks.next().await.transpose() { + Ok(Some(i)) => ans.push(i), + Ok(None) => break, + Err(_) => continue, + } + } + + assert_eq!(ans, (0..10240).collect::>()) + } } diff --git a/core/src/raw/oio/write/block_write.rs b/core/src/raw/oio/write/block_write.rs index 9d360539464..99c76562ca8 100644 --- a/core/src/raw/oio/write/block_write.rs +++ b/core/src/raw/oio/write/block_write.rs @@ -213,7 +213,7 @@ where let Some(result) = self.tasks.next().await.transpose()? else { break; }; - self.block_ids.push(result) + self.block_ids.push(result); } let block_ids = self.block_ids.clone(); diff --git a/core/tests/behavior/async_write.rs b/core/tests/behavior/async_write.rs index d646ebafd64..c95fdd5f825 100644 --- a/core/tests/behavior/async_write.rs +++ b/core/tests/behavior/async_write.rs @@ -347,7 +347,7 @@ pub async fn test_writer_sink(op: Operator) -> Result<()> { let mut w = op .writer_with(&path) - .chunk(5 * 1024 * 1024) + .chunk(4 * 1024 * 1024) .await? .into_bytes_sink(); w.send_all(&mut stream).await?; @@ -380,7 +380,7 @@ pub async fn test_writer_sink_with_concurrent(op: Operator) -> Result<()> { } let path = TEST_FIXTURE.new_file_path(); - let size = 5 * 1024 * 1024; // write file with 5 MiB + let size = 8 * 1024 * 1024; // write file with 8 MiB let content_a = gen_fixed_bytes(size); let content_b = gen_fixed_bytes(size); let mut stream = stream::iter(vec![