Skip to content

Commit

Permalink
feat(hydroflow): Added poll_futures and poll_futures_async operators. (
Browse files Browse the repository at this point in the history
…hydro-project#1143)

Co-authored-by: Ryan Alameddine <rhalameddin@gmail.com>
  • Loading branch information
RyanAlameddine and Ryan Alameddine authored Apr 22, 2024
1 parent c3f5a37 commit 997d90a
Show file tree
Hide file tree
Showing 13 changed files with 690 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license = "Apache-2.0"
criterion = { version = "0.5", features = [ "async_tokio", "html_reports" ] }
hydroflow = { path = "../hydroflow" }
lazy_static = "1.4.0"
futures = "0.3"
# pprof = { version = "0.6", features = [ "flamegraph", "criterion" ] }
rand = "0.8.4"
rand_distr = "0.4.3"
Expand Down Expand Up @@ -61,3 +62,7 @@ harness = false
[[bench]]
name = "symmetric_hash_join"
harness = false

[[bench]]
name = "futures"
harness = false
175 changes: 175 additions & 0 deletions benches/benches/futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use criterion::{criterion_group, criterion_main, Criterion};
use hydroflow::hydroflow_syntax;
use hydroflow::scheduled::graph::Hydroflow;

const NUM_ELEMS: u32 = 3000;

/// A future which returns () after it manually woken
pub struct ManualFut {
done: Rc<RefCell<bool>>,
waker: Rc<RefCell<Option<Waker>>>,
}
impl ManualFut {
pub fn new(done: Rc<RefCell<bool>>, waker: Rc<RefCell<Option<Waker>>>) -> ManualFut {
ManualFut { done, waker }
}
}

impl Future for ManualFut {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match *self.done.borrow() {
true => Poll::Ready(()),
false => {
self.waker.replace(Some(cx.waker().clone()));
Poll::Pending
}
}
}
}

fn benchmark_immediately_available(c: &mut Criterion) {
c.bench_function("futures/immediately_available", |b| {
b.iter_batched(
|| {
let mut df = hydroflow_syntax! {
source_iter(0..NUM_ELEMS)
-> map(|x| async move {
x
})
-> defer_tick()
-> poll_futures()
-> for_each(|_| {});
};

df.run_tick(); // skip loading and mapping to future
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
}

type WakeInfo = (Rc<RefCell<bool>>, Vec<Rc<RefCell<Option<Waker>>>>);

fn benchmark_delayed(c: &mut Criterion) {
fn setup<'a>(count: u32, wake_one: bool) -> (Hydroflow<'a>, WakeInfo) {
let done = Rc::new(RefCell::new(false));
let mut wakers = Vec::new();

let range = 0..count;
let futs = range
.map(|i| {
let waker = Rc::new(RefCell::new(None));
let d = if !wake_one || i == 0 {
wakers.push(waker.clone());
done.clone()
} else {
Rc::new(RefCell::new(false))
};
ManualFut::new(d, waker)
})
.collect::<Vec<_>>();

let df = {
hydroflow_syntax! {
source_iter(futs)
-> poll_futures()
-> for_each(|_| {});
}
};

(df, (done, wakers))
}

fn wake_all((done, wakers): WakeInfo) {
*done.borrow_mut() = true;
wakers.into_iter().for_each(|waker| {
if let Some(waker) = waker.borrow_mut().take() {
waker.wake();
} else {
panic!("waker not found but future should have been polled")
}
})
}

// Tick with the initial poll
c.bench_function("futures/delayed/initial", |b| {
b.iter_batched(
|| setup(NUM_ELEMS, false).0,
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});

// Tick when no results are available
c.bench_function("futures/delayed/waiting", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, true);
df.run_tick();
df.run_tick();
df.run_tick();
wake_all(wakes);
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});

// Tick when results became available
c.bench_function("futures/delayed/ready", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, false);
df.run_tick();
wake_all(wakes);
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
// Tick after all results have been consumed
c.bench_function("futures/delayed/done", |b| {
b.iter_batched(
|| {
let (mut df, wakes) = setup(NUM_ELEMS, false);
df.run_tick();
wake_all(wakes);
df.run_tick();
df
},
|mut df| {
df.run_tick();
},
criterion::BatchSize::SmallInput,
);
});
}

criterion_group!(
name=futures;
config=Criterion::default().measurement_time(Duration::from_secs(30));
targets=benchmark_immediately_available,
benchmark_delayed
);
criterion_main!(futures);
9 changes: 6 additions & 3 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,13 @@ where
C: Default + Extend<S::Item>,
S: Stream,
{
let any = std::cell::Cell::new(true);
let mut unfused_iter = ready_iter(stream).inspect(|_| any.set(true));
use std::sync::atomic::Ordering;

let got_any_items = std::sync::atomic::AtomicBool::new(true);
let mut unfused_iter =
ready_iter(stream).inspect(|_| got_any_items.store(true, Ordering::Relaxed));
let mut out = C::default();
while any.replace(false) {
while got_any_items.swap(false, Ordering::Relaxed) {
out.extend(unfused_iter.by_ref());
// Tokio unbounded channel returns items in lenght-128 chunks, so we have to be careful
// that everything gets returned. That is why we yield here and loop.
Expand Down
93 changes: 93 additions & 0 deletions hydroflow/tests/surface_poll_futures.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::collections::HashSet;

use hydroflow::hydroflow_syntax;
use hydroflow::util::collect_ready_async;
use tokio::time::{sleep, Duration};

#[hydroflow::test]
async fn single_batch_test() {
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();

let mut df = hydroflow_syntax! {
source_iter(0..10)
-> map(|x| async move {
sleep(Duration::from_millis(100)).await;
x
})
-> poll_futures()
-> for_each(|x| result_send.send(x).unwrap());
};

let handle = tokio::task::spawn(async move {
sleep(Duration::from_secs(1)).await;
assert_eq!(
HashSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
collect_ready_async::<HashSet<_>, _>(&mut result_recv).await
);
});

tokio::time::timeout(Duration::from_secs(2), df.run_async())
.await
.expect_err("Expected time out");

handle.await.unwrap();
}

#[hydroflow::test]
async fn multi_batch_test() {
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u64>();

let mut df = hydroflow_syntax! {
source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8])
-> map(|x| async move {
sleep(Duration::from_millis(10*x)).await;
x
})
-> poll_futures()
-> for_each(|x| result_send.send(x).unwrap());
};

let handle = tokio::task::spawn(async move {
sleep(Duration::from_secs(1)).await;
assert_eq!(
HashSet::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]),
collect_ready_async::<HashSet<_>, _>(&mut result_recv).await
);
});

tokio::time::timeout(Duration::from_secs(2), df.run_async())
.await
.expect_err("Expected time out");

handle.await.unwrap();
}

#[hydroflow::test]
async fn pusherator_test() {
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u64>();

let mut df = hydroflow_syntax! {
ins = source_iter([2, 3, 1, 9, 6, 5, 4, 7, 8])
-> tee();

ins -> for_each(|_| {});
ins -> map(|x| async move {
sleep(Duration::from_millis(10*x)).await;
x
}) -> poll_futures() -> for_each(|x| result_send.send(x).unwrap());
};

let handle = tokio::task::spawn(async move {
sleep(Duration::from_secs(1)).await;
assert_eq!(
HashSet::from_iter([1, 2, 3, 4, 5, 6, 7, 8, 9]),
collect_ready_async::<HashSet<_>, _>(&mut result_recv).await
);
});

tokio::time::timeout(Duration::from_secs(2), df.run_async())
.await
.expect_err("Expected time out");

handle.await.unwrap();
}
Loading

0 comments on commit 997d90a

Please sign in to comment.