Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements to future::join!'s implementation #91721

Merged
merged 6 commits into from
Dec 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 100 additions & 54 deletions library/core/src/future/join.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(unused_imports)] // items are used by the macro
#![allow(unused_imports, unused_macros)] // items are used by the macro

use crate::cell::UnsafeCell;
use crate::future::{poll_fn, Future};
Expand Down Expand Up @@ -45,59 +45,104 @@ use crate::task::{Context, Poll};
/// # };
/// ```
#[unstable(feature = "future_join", issue = "91642")]
pub macro join {
( $($fut:expr),* $(,)?) => {
join! { @count: (), @futures: {}, @rest: ($($fut,)*) }
},
// Recurse until we have the position of each future in the tuple
pub macro join( $($fut:expr),+ $(,)? ) {
// Funnel through an internal macro not to leak implementation details.
join_internal! {
current_position: []
futures_and_positions: []
munching: [ $($fut)+ ]
}
}

// FIXME(danielhenrymantilla): a private macro should need no stability guarantee.
#[unstable(feature = "future_join", issue = "91642")]
/// To be able to *name* the i-th future in the tuple (say we want the .4-th),
/// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
/// In order to do that, we need to generate a `i`-long repetition of `_`,
/// for each i-th fut. Hence the recursive muncher approach.
macro join_internal {
// Recursion step: map each future with its "position" (underscore count).
(
// A token for each future that has been expanded: "_ _ _"
@count: ($($count:tt)*),
// Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
@futures: { $($fut:tt)* },
// Take a future from @rest to expand
@rest: ($current:expr, $($rest:tt)*)
) => {
join! {
@count: ($($count)* _),
@futures: { $($fut)* $current => ($($count)*), },
@rest: ($($rest)*)
// Accumulate a token for each future that has been expanded: "_ _ _".
current_position: [
$($underscores:tt)*
]
// Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`.
futures_and_positions: [
$($acc:tt)*
]
// Munch one future.
munching: [
$current:tt
$($rest:tt)*
]
) => (
join_internal! {
current_position: [
$($underscores)*
_
]
futures_and_positions: [
$($acc)*
$current ( $($underscores)* )
]
munching: [
$($rest)*
]
}
},
// Now generate the output future
(
@count: ($($count:tt)*),
@futures: {
$( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
},
@rest: ()
) => {
async move {
let mut futures = ( $( MaybeDone::Future($fut), )* );
),

// End of recursion: generate the output future.
(
current_position: $_:tt
futures_and_positions: [
$(
$fut_expr:tt ( $($pos:tt)* )
)*
]
// Nothing left to munch.
munching: []
) => (
match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
let mut futures = futures;
// SAFETY: this is `pin_mut!`.
let mut futures = unsafe { Pin::new_unchecked(&mut futures) };
poll_fn(move |cx| {
let mut done = true;

// For each `fut`, pin-project to it, and poll it.
$(
let ( $($pos,)* fut, .. ) = &mut futures;

// SAFETY: The futures are never moved
done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() };
// SAFETY: pinning projection
let fut = unsafe {
futures.as_mut().map_unchecked_mut(|it| {
let ( $($pos,)* fut, .. ) = it;
fut
})
};
// Despite how tempting it may be to `let () = fut.poll(cx).ready()?;`
// doing so would defeat the point of `join!`: to start polling eagerly all
// of the futures, to allow parallelizing the waits.
done &= fut.poll(cx).is_ready();
)*

if done {
// Extract all the outputs
Poll::Ready(($({
let ( $($pos,)* fut, .. ) = &mut futures;

fut.take_output().unwrap()
}),*))
} else {
Poll::Pending
if !done {
return Poll::Pending;
}
// All ready; time to extract all the outputs.

// SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
let futures = unsafe {
futures.as_mut().get_unchecked_mut()
};
Poll::Ready(
($(
{
let ( $($pos,)* fut, .. ) = &mut *futures;
fut.take_output().unwrap()
}
),*) // <- no trailing comma since we don't want 1-tuples.
)
}).await
}
}
}}
),
}

/// Future used by `join!` that stores it's output to
Expand All @@ -109,14 +154,14 @@ pub macro join {
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Took,
Taken,
}

#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match &*self {
MaybeDone::Done(_) => match mem::replace(self, Self::Took) {
match *self {
MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
Expand All @@ -132,13 +177,14 @@ impl<F: Future> Future for MaybeDone<F> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: pinning in structural for `f`
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) {
Poll::Ready(val) => self.set(Self::Done(val)),
Poll::Pending => return Poll::Pending,
},
// Do not mix match ergonomics with unsafe.
match *self.as_mut().get_unchecked_mut() {
MaybeDone::Future(ref mut f) => {
let val = Pin::new_unchecked(f).poll(cx).ready()?;
self.set(Self::Done(val));
}
MaybeDone::Done(_) => {}
MaybeDone::Took => unreachable!(),
MaybeDone::Taken => unreachable!(),
}
}

Expand Down
34 changes: 34 additions & 0 deletions library/core/tests/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,40 @@ fn test_join() {
});
}

/// Tests that `join!(…)` behaves "like a function": evaluating its arguments
/// before applying any of its own logic.
///
/// _e.g._, `join!(async_fn(&borrowed), …)` does not consume `borrowed`;
/// and `join!(opt_fut?, …)` does let that `?` refer to the callsite scope.
mod test_join_function_like_value_arg_semantics {
use super::*;

async fn async_fn(_: impl Sized) {}

// no need to _run_ this test, just to compile it.
fn _join_does_not_unnecessarily_move_mentioned_bindings() {
let not_copy = vec![()];
let _ = join!(async_fn(&not_copy)); // should not move `not_copy`
let _ = &not_copy; // OK
}

#[test]
fn join_lets_control_flow_effects_such_as_try_flow_through() {
let maybe_fut = None;
if false {
*&mut { maybe_fut } = Some(async {});
loop {}
}
assert!(Option::is_none(&try { join!(maybe_fut?, async { unreachable!() }) }));
}

#[test]
fn join_is_able_to_handle_temporaries() {
let _ = join!(async_fn(&String::from("temporary")));
let () = block_on(join!(async_fn(&String::from("temporary"))));
}
}

fn block_on(fut: impl Future) {
struct Waker;
impl Wake for Waker {
Expand Down
1 change: 1 addition & 0 deletions library/core/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#![feature(str_internals)]
#![feature(test)]
#![feature(trusted_len)]
#![feature(try_blocks)]
#![feature(try_trait_v2)]
#![feature(slice_internals)]
#![feature(slice_partition_dedup)]
Expand Down