Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

select & join #28

Merged
merged 3 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/01_02_why_async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
2 changes: 1 addition & 1 deletion examples/01_04_async_await_primer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
2 changes: 1 addition & 1 deletion examples/01_05_http_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ edition = "2018"
# for writing async code. Enable the "compat" feature to include the
# functions for using futures 0.3 and async/await with the Hyper library,
# which use futures 0.1.
futures-preview = { version = "=0.3.0-alpha.16", features = ["compat"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] }

# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP
# server and to make HTTP requests.
Expand Down
2 changes: 1 addition & 1 deletion examples/02_03_timer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
2 changes: 1 addition & 1 deletion examples/02_04_executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ edition = "2018"
[lib]

[dependencies]
futures-preview = { version = "=0.3.0-alpha.16" }
futures-preview = { version = "=0.3.0-alpha.17" }
timer_future = { package = "02_03_timer", path = "../02_03_timer" }
2 changes: 1 addition & 1 deletion examples/03_01_async_await/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
2 changes: 1 addition & 1 deletion examples/05_01_streams/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
2 changes: 1 addition & 1 deletion examples/05_02_iteration_and_concurrency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2018"
[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.16", features = ["async-await", "nightly"] }
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
10 changes: 10 additions & 0 deletions examples/06_02_join/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "06_02_join"
version = "0.1.0"
authors = ["Taylor Cramer <cramertj@google.com>"]
edition = "2018"

[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
78 changes: 78 additions & 0 deletions examples/06_02_join/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#![cfg(test)]
#![feature(async_await)]

struct Book;
struct Music;
async fn get_book() -> Book { Book }
async fn get_music() -> Music { Music }

mod naiive {
use super::*;
// ANCHOR: naiive
async fn get_book_and_music() -> (Book, Music) {
let book = get_book().await;
let music = get_music().await;
(book, music)
}
// ANCHOR_END: naiive
}

mod other_langs {
use super::*;
// ANCHOR: other_langs
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
let book_future = get_book();
let music_future = get_music();
(book_future.await, music_future.await)
}
// ANCHOR_END: other_langs
}

mod join {
use super::*;
// ANCHOR: join
use futures::join;

async fn get_book_and_music() -> (Book, Music) {
let book_fut = get_book();
let music_fut = get_music();
join!(book_fut, music_fut)
Copy link
Member

@taiki-e taiki-e Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-topic: Should we rewrite join!/try_join! with proc-macro to allow passing expressions directly here?

join!(get_book(), get_music())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be nice! I was actually just thinking the same thing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
// ANCHOR_END: join
}

mod try_join {
use super::{Book, Music};
// ANCHOR: try_join
use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
// ANCHOR_END: try_join
}

mod mismatched_err {
use super::{Book, Music};
// ANCHOR: try_join_map_err
use futures::{
future::TryFutureExt,
try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
// ANCHOR_END: try_join_map_err
}
10 changes: 10 additions & 0 deletions examples/06_03_select/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "06_03_select"
version = "0.1.0"
authors = ["Taylor Cramer <cramertj@google.com>"]
edition = "2018"

[lib]

[dev-dependencies]
futures-preview = { version = "=0.3.0-alpha.17", features = ["async-await", "nightly"] }
184 changes: 184 additions & 0 deletions examples/06_03_select/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#![cfg(test)]
#![feature(async_await)]
#![recursion_limit="128"]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a recursion limit error has occurred in this crate. https://travis-ci.com/rust-lang/async-book/builds/122145149#L961

mod example {
// ANCHOR: example
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();

pin_mut!(t1, t2);

select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
// ANCHOR_END: example
}

mod default_and_complete {
// ANCHOR: default_and_complete
use futures::{future, select};

async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;

loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // never runs (futures are ready, then complete)
};
}
assert_eq!(total, 10);
}
// ANCHOR_END: default_and_complete

#[test]
fn run_count() {
futures::executor::block_on(count());
}
}

mod fused_stream {
// ANCHOR: fused_stream
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};

async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;

loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}

total
}
// ANCHOR_END: fused_stream
}

mod fuse_terminated {
// ANCHOR: fuse_terminated
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived-- start a new `run_on_new_num_fut`,
// dropping the old one.
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// Run the `run_on_new_num_fut`
() = run_on_new_num_fut => {},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
// ANCHOR_END: fuse_terminated
}

mod futures_unordered {
// ANCHOR: futures_unordered
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived-- start a new `run_on_new_num_fut`.
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// Run the `run_on_new_num_futs` and check if any have completed
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}

// ANCHOR_END: futures_unordered
}
2 changes: 2 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ members = [
"03_01_async_await",
"05_01_streams",
"05_02_iteration_and_concurrency",
"06_02_join",
"06_03_select",
]
2 changes: 1 addition & 1 deletion src/02_execution/04_executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ authors = ["XYZ Author"]
edition = "2018"

[dependencies]
futures-preview = "=0.3.0-alpha.16"
futures-preview = "=0.3.0-alpha.17"
```

Next, we need the following imports at the top of `src/main.rs`:
Expand Down
2 changes: 1 addition & 1 deletion src/02_execution/05_io.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@ task, allowing the executor to drive more tasks to completion before returning
to check for more IO events (and the cycle continues...).

[The `Future` Trait]: ./02_future.md
[`mio`]: https://github.com/carllerche/mio
[`mio`]: https://github.com/tokio-rs/mio
14 changes: 14 additions & 0 deletions src/06_multiple_futures/01_chapter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Executing Multiple Futures at a Time

Up until now, we've mostly executed futures by using `.await`, which blocks
the current task until a particular `Future` completes. However, real
asynchronous applications often need to execute several different
operations concurrently.

In this chapter, we'll cover some ways to execute multiple asynchronous
operations at the same time:

- `join!`: waits for futures to all complete
- `select!`: waits for one of several futures to complete
- Spawning: creates a top-level task which ambiently runs a future to completion
- `FuturesUnordered`: a group of futures which yields the result of each subfuture
Loading