diff --git a/hydroflow/tests/surface_async.rs b/hydroflow/tests/surface_async.rs index d6e8d3542f97..38175cec08ca 100644 --- a/hydroflow/tests/surface_async.rs +++ b/hydroflow/tests/surface_async.rs @@ -339,31 +339,38 @@ async fn asynctest_dest_asyncwrite_duplex() { #[multiplatform_test(hydroflow, env_tracing)] async fn asynctest_source_stream() { - LocalSet::new() - .run_until(async { - let (a_send, a_recv) = hydroflow::util::unbounded_channel::(); - let (b_send, b_recv) = hydroflow::util::unbounded_channel::(); - - tokio::task::spawn_local(async move { - let mut flow = hydroflow_syntax! { - source_stream(a_recv) -> for_each(|x| { b_send.send(x).unwrap(); }); - }; - flow.run_async().await.unwrap(); - }); - tokio::task::spawn_local(async move { - let mut flow = hydroflow_syntax! { - source_stream(b_recv) -> for_each(|x| println!("{}", x)); - }; - flow.run_async().await.unwrap(); - }); - - a_send.send(1).unwrap(); - a_send.send(2).unwrap(); - a_send.send(3).unwrap(); + let (a_send, a_recv) = hydroflow::util::unbounded_channel::(); + let (b_send, b_recv) = hydroflow::util::unbounded_channel::(); + let (c_send, c_recv) = hydroflow::util::unbounded_channel::(); - tokio::task::yield_now().await; - }) - .await; + let task_a = tokio::task::spawn_local(async move { + let mut flow = hydroflow_syntax! { + source_stream(a_recv) -> for_each(|x| { b_send.send(x).unwrap(); }); + }; + flow.run_async().await.unwrap(); + }); + let task_b = tokio::task::spawn_local(async move { + let mut flow = hydroflow_syntax! { + source_stream(b_recv) -> for_each(|x| { c_send.send(x).unwrap(); }); + }; + flow.run_async().await.unwrap(); + }); + + a_send.send(1).unwrap(); + a_send.send(2).unwrap(); + a_send.send(3).unwrap(); + + tokio::select! { + biased; + _ = task_a => unreachable!(), + _ = task_b => unreachable!(), + _ = tokio::task::yield_now() => (), + }; + + assert_eq!( + &[1, 2, 3], + &*collect_ready_async::, _>(&mut { c_recv }).await + ); } /// Check to make sure hf.run_async() does not hang due to replaying stateful operators saturating @@ -373,58 +380,47 @@ async fn asynctest_source_stream() { /// the send loop delay (task_a). #[multiplatform_test(hydroflow, env_tracing)] async fn asynctest_check_state_yielding() { - LocalSet::new() - .run_until(async { - let (a_send, a_recv) = hydroflow::util::unbounded_channel::(); - let (b_send, mut b_recv) = hydroflow::util::unbounded_channel::(); - - let task_a = tokio::task::spawn_local( - async move { - for a in 0..10 { - tokio::time::sleep(Duration::from_millis(100)).await; - tracing::debug!(a = a, "Sending."); - a_send.send(a).unwrap(); - } - } - .instrument(tracing::debug_span!("task_a")), - ); + let (a_send, a_recv) = hydroflow::util::unbounded_channel::(); + let (b_send, mut b_recv) = hydroflow::util::unbounded_channel::(); - let task_b = tokio::task::spawn_local( - async move { - let mut hf = hydroflow_syntax! { - source_stream(a_recv) - -> reduce::<'static>(|a: &mut _, b| *a += b) - -> for_each(|x| b_send.send(x).unwrap()); - }; - - // Run 100 millis longer than the sending task. - let done_sending = async { - task_a.await.unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - }; - - tokio::select! { - _ = done_sending => { - tracing::info!("`task_a` (sending) complete."); - }, - _ = hf.run_async() => { - panic!("`run_async()` should run forever."); - } - } - - assert_eq!( - [0, 1, 3, 6, 10, 15, 21, 28, 36, 45] - .into_iter() - .collect::>(), - collect_ready_async(&mut b_recv).await - ); - } - .instrument(tracing::debug_span!("task_b")), + let task_a = tokio::task::spawn_local( + async move { + tokio::time::sleep(Duration::from_millis(100)).await; + for a in 0..10 { + tracing::debug!(a = a, "Sending."); + a_send.send(a).unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + } + tokio::task::yield_now().await; + } + .instrument(tracing::debug_span!("task_a")), + ); + + let task_b = tokio::task::spawn_local( + async move { + let mut hf = hydroflow_syntax! { + source_stream(a_recv) + -> reduce::<'static>(|a: &mut _, b| *a += b) + -> for_each(|x| b_send.send(x).unwrap()); + }; + + tokio::select! { + biased; + _ = hf.run_async() => panic!("`run_async()` should run forever."), + _ = task_a => tracing::info!("`task_a` (sending) complete."), + } + + assert_eq!( + [0, 1, 3, 6, 10, 15, 21, 28, 36, 45] + .into_iter() + .collect::>(), + collect_ready_async(&mut b_recv).await ); + } + .instrument(tracing::debug_span!("task_b")), + ); - task_b.await.unwrap(); - }) - .await; + task_b.await.unwrap(); } #[multiplatform_test(hydroflow, env_tracing)] @@ -452,18 +448,22 @@ async fn asynctest_event_repeat_iter() { my_union = union() -> for_each(|x| b_send.send(x).unwrap()); }; - tokio::task::spawn( + let send_task = tokio::task::spawn({ async move { + // Wait, then send `10`. tokio::time::sleep(Duration::from_millis(100)).await; tracing::debug!("sending `10`."); a_send.send(10).unwrap(); } - .instrument(tracing::debug_span!("sender")), - ); + .instrument(tracing::debug_span!("sender")) + }); - tokio::time::timeout(Duration::from_millis(200), hf.run_async()) - .await - .expect_err("Expected timeout"); + // Run until barrier completes. + tokio::select! { + biased; // `run_async` needs to be polled to process the data first, otherwise the task may complete before data is processed. + _ = hf.run_async() => panic!("`run_async()` should run forever."), + _ = send_task => tracing::info!("sending complete"), + }; let seen: Vec<_> = collect_ready_async(b_recv).await; assert_eq!(&[0, 1, 2, 0, 1, 2, 10], &*seen);