Skip to content

Commit

Permalink
Rollup merge of #91645 - ibraheemdev:future-join, r=joshtriplett
Browse files Browse the repository at this point in the history
Implement `core::future::join!`

`join!` polls multiple futures concurrently and returns their outputs.

```rust
async fn run() {
    let (a, b) = join!(async { 0 }, async { 1 });
}
```

cc `@rust-lang/wg-async-foundations`
  • Loading branch information
matthiaskrgr authored Dec 9, 2021
2 parents f820496 + 5478f43 commit 90c3e9a
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 0 deletions.
147 changes: 147 additions & 0 deletions library/core/src/future/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#![allow(unused_imports)] // items are used by the macro

use crate::cell::UnsafeCell;
use crate::future::{poll_fn, Future};
use crate::mem;
use crate::pin::Pin;
use crate::task::{Context, Poll};

/// Polls multiple futures simultaneously, returning a tuple
/// of all results once complete.
///
/// While `join!(a, b)` is similar to `(a.await, b.await)`,
/// `join!` polls both futures concurrently and is therefore more efficient.
///
/// # Examples
///
/// ```
/// #![feature(future_join, future_poll_fn)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
///
/// # let _ = async {
/// let x = join!(one(), two()).await;
/// assert_eq!(x, (1, 2));
/// # };
/// ```
///
/// `join!` is variadic, so you can pass any number of futures:
///
/// ```
/// #![feature(future_join, future_poll_fn)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
/// async fn three() -> usize { 3 }
///
/// # let _ = async {
/// let x = join!(one(), two(), three()).await;
/// assert_eq!(x, (1, 2, 3));
/// # };
/// ```
#[unstable(feature = "future_join", issue = "91642")]
pub macro join {
( $($fut:expr),* $(,)?) => {
join! { @count: (), @futures: {}, @rest: ($($fut,)*) }
},
// Recurse until we have the position of each future in the tuple
(
// A token for each future that has been expanded: "_ _ _"
@count: ($($count:tt)*),
// Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
@futures: { $($fut:tt)* },
// Take a future from @rest to expand
@rest: ($current:expr, $($rest:tt)*)
) => {
join! {
@count: ($($count)* _),
@futures: { $($fut)* $current => ($($count)*), },
@rest: ($($rest)*)
}
},
// Now generate the output future
(
@count: ($($count:tt)*),
@futures: {
$( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
},
@rest: ()
) => {
async move {
let mut futures = ( $( MaybeDone::Future($fut), )* );

poll_fn(move |cx| {
let mut done = true;

$(
let ( $($pos,)* fut, .. ) = &mut futures;

// SAFETY: The futures are never moved
done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() };
)*

if done {
// Extract all the outputs
Poll::Ready(($({
let ( $($pos,)* fut, .. ) = &mut futures;

fut.take_output().unwrap()
}),*))
} else {
Poll::Pending
}
}).await
}
}
}

/// Future used by `join!` that stores it's output to
/// be later taken and doesn't panic when polled after ready.
///
/// This type is public in a private module for use by the macro.
#[allow(missing_debug_implementations)]
#[unstable(feature = "future_join", issue = "91642")]
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Took,
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match &*self {
MaybeDone::Done(_) => match mem::replace(self, Self::Took) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
_ => None,
}
}
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> Future for MaybeDone<F> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: pinning in structural for `f`
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) {
Poll::Ready(val) => self.set(Self::Done(val)),
Poll::Pending => return Poll::Pending,
},
MaybeDone::Done(_) => {}
MaybeDone::Took => unreachable!(),
}
}

Poll::Ready(())
}
}
4 changes: 4 additions & 0 deletions library/core/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use crate::{

mod future;
mod into_future;
mod join;
mod pending;
mod poll_fn;
mod ready;

#[stable(feature = "futures_api", since = "1.36.0")]
pub use self::future::Future;

#[unstable(feature = "future_join", issue = "91642")]
pub use self::join::join;

#[unstable(feature = "into_future", issue = "67644")]
pub use into_future::IntoFuture;

Expand Down
85 changes: 85 additions & 0 deletions library/core/tests/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::future::{join, Future};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Wake};
use std::thread;

struct PollN {
val: usize,
polled: usize,
num: usize,
}

impl Future for PollN {
type Output = usize;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.polled += 1;

if self.polled == self.num {
return Poll::Ready(self.val);
}

cx.waker().wake_by_ref();
Poll::Pending
}
}

fn poll_n(val: usize, num: usize) -> PollN {
PollN { val, num, polled: 0 }
}

#[test]
fn test_join() {
block_on(async move {
let x = join!(async { 0 }).await;
assert_eq!(x, 0);

let x = join!(async { 0 }, async { 1 }).await;
assert_eq!(x, (0, 1));

let x = join!(async { 0 }, async { 1 }, async { 2 }).await;
assert_eq!(x, (0, 1, 2));

let x = join!(
poll_n(0, 1),
poll_n(1, 5),
poll_n(2, 2),
poll_n(3, 1),
poll_n(4, 2),
poll_n(5, 3),
poll_n(6, 4),
poll_n(7, 1)
)
.await;
assert_eq!(x, (0, 1, 2, 3, 4, 5, 6, 7));

let y = String::new();
let x = join!(async {
println!("{}", &y);
1
})
.await;
assert_eq!(x, 1);
});
}

fn block_on(fut: impl Future) {
struct Waker;
impl Wake for Waker {
fn wake(self: Arc<Self>) {
thread::current().unpark()
}
}

let waker = Arc::new(Waker).into();
let mut cx = Context::from_waker(&waker);
let mut fut = Box::pin(fut);

loop {
match fut.as_mut().poll(&mut cx) {
Poll::Ready(_) => break,
Poll::Pending => thread::park(),
}
}
}
3 changes: 3 additions & 0 deletions library/core/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#![feature(flt2dec)]
#![feature(fmt_internals)]
#![feature(float_minimum_maximum)]
#![feature(future_join)]
#![feature(future_poll_fn)]
#![feature(array_from_fn)]
#![feature(hashmap_internals)]
#![feature(try_find)]
Expand Down Expand Up @@ -94,6 +96,7 @@ mod clone;
mod cmp;
mod const_ptr;
mod fmt;
mod future;
mod hash;
mod intrinsics;
mod iter;
Expand Down

0 comments on commit 90c3e9a

Please sign in to comment.