Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

future: provide join! macro #2158

Merged
merged 3 commits into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tokio/src/future/maybe_done.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::{Context, Poll};

/// A future that may have completed.
#[derive(Debug)]
pub(crate) enum MaybeDone<Fut: Future> {
pub enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
Future(Fut),
/// The output of the completed future
Expand All @@ -21,7 +21,7 @@ pub(crate) enum MaybeDone<Fut: Future> {
impl<Fut: Future + Unpin> Unpin for MaybeDone<Fut> {}

/// Wraps a future into a `MaybeDone`
pub(crate) fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
pub fn maybe_done<Fut: Future>(future: Fut) -> MaybeDone<Fut> {
MaybeDone::Future(future)
}

Expand All @@ -43,7 +43,7 @@ impl<Fut: Future> MaybeDone<Fut> {
/// Attempt to take the output of a `MaybeDone` without driving it
/// towards completion.
#[inline]
pub(crate) fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
pub fn take_output(self: Pin<&mut Self>) -> Option<Fut::Output> {
unsafe {
let this = self.get_unchecked_mut();
match this {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! Asynchronous values.

mod maybe_done;
pub(crate) use maybe_done::{maybe_done, MaybeDone};
pub use maybe_done::{maybe_done, MaybeDone};
carllerche marked this conversation as resolved.
Show resolved Hide resolved

mod poll_fn;
pub use poll_fn::poll_fn;
Expand Down
109 changes: 109 additions & 0 deletions tokio/src/macros/join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/// Wait on multiple concurrent branches, returning when **all** branches
carllerche marked this conversation as resolved.
Show resolved Hide resolved
/// complete.
///
/// The `join!` macro must be used inside of async functions, closures, and
/// blocks.
///
/// The `join!` macro takes a list of async expressions and evaluates them
/// concurrently on the same task. Each async expression evaluates to a future
/// and the futures from each expression are multiplexed on the current task.
///
/// # Notes
///
/// ### Runtime characteristics
///
/// By running all async expressions on the current task, the expressions are
/// able to run **concurrently** but not in **parallel**. This means all
/// expressions are run on the same thread and if one branch blocks the thread,
/// all other expressions will be unable to continue. If parallelism is
/// required, spawn each async expression using [`tokio::spawn`] and pass the
/// join handle to `join!`.
///
/// [`tokio::spawn`]: crate::spawn
///
/// # Examples
///
/// Basic join with two branches
///
/// ```
/// async fn do_stuff_async() {
/// // async work
/// }
///
/// async fn more_async_work() {
/// // more here
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let (first, second) = tokio::join!(
/// do_stuff_async(),
/// more_async_work());
///
/// // do something with the values
/// }
/// ```
#[macro_export]
macro_rules! join {
(@ {
// One `_` for each branch in the `join!` macro. This is not used once
// normalization is complete.
( $($count:tt)* )

// Normalized join! branches
$( ( $($skip:tt)* ) $e:expr, )*

}) => {{
use $crate::macros::support::{maybe_done, poll_fn, Future, Pin};
use $crate::macros::support::Poll::{Ready, Pending};

// Safety: nothing must be moved out of `futures`. This is to satisfy
// the requirement of `Pin::new_unchecked` called below.
let mut futures = ( $( maybe_done($e), )* );

poll_fn(move |cx| {
let mut is_pending = false;

$(
// Extract the future for this branch from the tuple.
let ( $($skip,)* fut, .. ) = &mut futures;

// Safety: future is stored on the stack above
// and never moved.
let mut fut = unsafe { Pin::new_unchecked(fut) };

// Try polling
if !fut.poll(cx).is_ready() {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
is_pending = true;
}
)*

if is_pending {
Pending
} else {
Ready(($({
// Extract the future for this branch from the tuple.
let ( $($skip,)* fut, .. ) = &mut futures;

// Safety: future is stored on the stack above
// and never moved.
let mut fut = unsafe { Pin::new_unchecked(fut) };

fut.take_output().expect("expected completed future")
},)*))
}
}).await
}};

// ===== Normalize =====

(@ { ( $($s:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
$crate::join!(@{ ($($s)* _) $($t)* ($($s)*) $e, } $($r)*)
};

// ===== Entry point =====

( $($e:expr),* $(,)?) => {
$crate::join!(@{ () } $($e,)*)
};
}
3 changes: 3 additions & 0 deletions tokio/src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ mod assert;
#[macro_use]
mod cfg;

#[macro_use]
mod join;

#[macro_use]
mod loom;

Expand Down
4 changes: 3 additions & 1 deletion tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/// Wait on multiple concurrent branches, returning when the **first** branch
/// completes, cancelling the remaining branches.
///
/// The `select!` macro must be used inside of async functions, closures, and
/// blocks.
///
/// The `select` macro accepts one or more branches with the following pattern:
///
/// ```text
Expand Down Expand Up @@ -159,7 +162,6 @@
/// }
/// };
/// }
///
/// ```
///
/// Basic stream selecting.
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/macros/support.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub use crate::future::poll_fn;
pub use crate::future::{maybe_done, poll_fn};
pub use crate::util::thread_rng_n;

pub use std::future::Future;
Expand Down
71 changes: 71 additions & 0 deletions tokio/tests/macros_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use tokio::sync::oneshot;
use tokio_test::{assert_pending, assert_ready, task};

#[tokio::test]
async fn sync_one_lit_expr_comma() {
let foo = tokio::join!(async { 1 },);

assert_eq!(foo, (1,));
}

#[tokio::test]
async fn sync_one_lit_expr_no_comma() {
let foo = tokio::join!(async { 1 });

assert_eq!(foo, (1,));
}

#[tokio::test]
async fn sync_two_lit_expr_comma() {
let foo = tokio::join!(async { 1 }, async { 2 },);

assert_eq!(foo, (1, 2));
}

#[tokio::test]
async fn sync_two_lit_expr_no_comma() {
let foo = tokio::join!(async { 1 }, async { 2 });

assert_eq!(foo, (1, 2));
}

#[tokio::test]
async fn sync_two_await() {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();

let mut join = task::spawn(async {
tokio::join!(async { rx1.await.unwrap() }, async { rx2.await.unwrap() })
});

assert_pending!(join.poll());

tx2.send(123).unwrap();
assert!(join.is_woken());
assert_pending!(join.poll());

tx1.send("hello").unwrap();
assert!(join.is_woken());
let res = assert_ready!(join.poll());

assert_eq!(("hello", 123), res);
}

#[test]
fn join_size() {
use futures::future;
use std::mem;

let fut = async {
let ready = future::ready(0i32);
tokio::join!(ready)
};
assert_eq!(mem::size_of_val(&fut), 16);

let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
tokio::join!(ready1, ready2)
};
assert_eq!(mem::size_of_val(&fut), 28);
}