Skip to content

Commit

Permalink
fix(hydroflow): cleanup timing in some surface_async tests, #1078 (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel authored Mar 1, 2024
1 parent 39ab8b0 commit 18ee2ad
Showing 1 changed file with 79 additions and 79 deletions.
158 changes: 79 additions & 79 deletions hydroflow/tests/surface_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,31 +339,38 @@ async fn asynctest_dest_asyncwrite_duplex() {

#[multiplatform_test(hydroflow, env_tracing)]
async fn asynctest_source_stream() {
LocalSet::new()
.run_until(async {
let (a_send, a_recv) = hydroflow::util::unbounded_channel::<usize>();
let (b_send, b_recv) = hydroflow::util::unbounded_channel::<usize>();

tokio::task::spawn_local(async move {
let mut flow = hydroflow_syntax! {
source_stream(a_recv) -> for_each(|x| { b_send.send(x).unwrap(); });
};
flow.run_async().await.unwrap();
});
tokio::task::spawn_local(async move {
let mut flow = hydroflow_syntax! {
source_stream(b_recv) -> for_each(|x| println!("{}", x));
};
flow.run_async().await.unwrap();
});

a_send.send(1).unwrap();
a_send.send(2).unwrap();
a_send.send(3).unwrap();
let (a_send, a_recv) = hydroflow::util::unbounded_channel::<usize>();
let (b_send, b_recv) = hydroflow::util::unbounded_channel::<usize>();
let (c_send, c_recv) = hydroflow::util::unbounded_channel::<usize>();

tokio::task::yield_now().await;
})
.await;
let task_a = tokio::task::spawn_local(async move {
let mut flow = hydroflow_syntax! {
source_stream(a_recv) -> for_each(|x| { b_send.send(x).unwrap(); });
};
flow.run_async().await.unwrap();
});
let task_b = tokio::task::spawn_local(async move {
let mut flow = hydroflow_syntax! {
source_stream(b_recv) -> for_each(|x| { c_send.send(x).unwrap(); });
};
flow.run_async().await.unwrap();
});

a_send.send(1).unwrap();
a_send.send(2).unwrap();
a_send.send(3).unwrap();

tokio::select! {
biased;
_ = task_a => unreachable!(),
_ = task_b => unreachable!(),
_ = tokio::task::yield_now() => (),
};

assert_eq!(
&[1, 2, 3],
&*collect_ready_async::<Vec<_>, _>(&mut { c_recv }).await
);
}

/// Check to make sure hf.run_async() does not hang due to replaying stateful operators saturating
Expand All @@ -373,58 +380,47 @@ async fn asynctest_source_stream() {
/// the send loop delay (task_a).
#[multiplatform_test(hydroflow, env_tracing)]
async fn asynctest_check_state_yielding() {
LocalSet::new()
.run_until(async {
let (a_send, a_recv) = hydroflow::util::unbounded_channel::<usize>();
let (b_send, mut b_recv) = hydroflow::util::unbounded_channel::<usize>();

let task_a = tokio::task::spawn_local(
async move {
for a in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
tracing::debug!(a = a, "Sending.");
a_send.send(a).unwrap();
}
}
.instrument(tracing::debug_span!("task_a")),
);
let (a_send, a_recv) = hydroflow::util::unbounded_channel::<usize>();
let (b_send, mut b_recv) = hydroflow::util::unbounded_channel::<usize>();

let task_b = tokio::task::spawn_local(
async move {
let mut hf = hydroflow_syntax! {
source_stream(a_recv)
-> reduce::<'static>(|a: &mut _, b| *a += b)
-> for_each(|x| b_send.send(x).unwrap());
};

// Run 100 millis longer than the sending task.
let done_sending = async {
task_a.await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
};

tokio::select! {
_ = done_sending => {
tracing::info!("`task_a` (sending) complete.");
},
_ = hf.run_async() => {
panic!("`run_async()` should run forever.");
}
}

assert_eq!(
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready_async(&mut b_recv).await
);
}
.instrument(tracing::debug_span!("task_b")),
let task_a = tokio::task::spawn_local(
async move {
tokio::time::sleep(Duration::from_millis(100)).await;
for a in 0..10 {
tracing::debug!(a = a, "Sending.");
a_send.send(a).unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}
tokio::task::yield_now().await;
}
.instrument(tracing::debug_span!("task_a")),
);

let task_b = tokio::task::spawn_local(
async move {
let mut hf = hydroflow_syntax! {
source_stream(a_recv)
-> reduce::<'static>(|a: &mut _, b| *a += b)
-> for_each(|x| b_send.send(x).unwrap());
};

tokio::select! {
biased;
_ = hf.run_async() => panic!("`run_async()` should run forever."),
_ = task_a => tracing::info!("`task_a` (sending) complete."),
}

assert_eq!(
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]
.into_iter()
.collect::<BTreeSet<_>>(),
collect_ready_async(&mut b_recv).await
);
}
.instrument(tracing::debug_span!("task_b")),
);

task_b.await.unwrap();
})
.await;
task_b.await.unwrap();
}

#[multiplatform_test(hydroflow, env_tracing)]
Expand Down Expand Up @@ -452,18 +448,22 @@ async fn asynctest_event_repeat_iter() {
my_union = union() -> for_each(|x| b_send.send(x).unwrap());
};

tokio::task::spawn(
let send_task = tokio::task::spawn({
async move {
// Wait, then send `10`.
tokio::time::sleep(Duration::from_millis(100)).await;
tracing::debug!("sending `10`.");
a_send.send(10).unwrap();
}
.instrument(tracing::debug_span!("sender")),
);
.instrument(tracing::debug_span!("sender"))
});

tokio::time::timeout(Duration::from_millis(200), hf.run_async())
.await
.expect_err("Expected timeout");
// Run until barrier completes.
tokio::select! {
biased; // `run_async` needs to be polled to process the data first, otherwise the task may complete before data is processed.
_ = hf.run_async() => panic!("`run_async()` should run forever."),
_ = send_task => tracing::info!("sending complete"),
};

let seen: Vec<_> = collect_ready_async(b_recv).await;
assert_eq!(&[0, 1, 2, 0, 1, 2, 10], &*seen);
Expand Down

0 comments on commit 18ee2ad

Please sign in to comment.