Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async-await: [WIP] replace std await! macro with await syntax #1080

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions async-await/src/chat.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(await_macro, async_await)]

use tokio::await;
use tokio::async_wait;
use tokio::codec::{LinesCodec, Decoder};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;
Expand Down Expand Up @@ -33,7 +33,7 @@ async fn process(stream: TcpStream, state: Arc<Mutex<Shared>>) -> io::Result<()>
let mut lines = LinesCodec::new().framed(stream);

// Extract the peer's name
let name = match await!(lines.next()) {
let name = match async_wait!(lines.next()) {
Some(name) => name?,
None => {
// Disconnected early
Expand All @@ -56,15 +56,15 @@ async fn process(stream: TcpStream, state: Arc<Mutex<Shared>>) -> io::Result<()>
// Spawn a task that receives all lines broadcasted to us from other peers
// and writes it to the client.
tokio::spawn_async(async move {
while let Some(line) = await!(rx.next()) {
while let Some(line) = async_wait!(rx.next()) {
let line = line.unwrap();
await!(lines_tx.send_async(line)).unwrap();
async_wait!(lines_tx.send_async(line)).unwrap();
}
});

// Use the current task to read lines from the socket and broadcast them to
// other peers.
while let Some(message) = await!(lines_rx.next()) {
while let Some(message) = async_wait!(lines_rx.next()) {
// TODO: Error handling
let message = message.unwrap();

Expand Down Expand Up @@ -113,7 +113,7 @@ async fn main() {
// Start the Tokio runtime.
let mut incoming = listener.incoming();

while let Some(stream) = await!(incoming.next()) {
while let Some(stream) = async_wait!(incoming.next()) {
let stream = match stream {
Ok(stream) => stream,
Err(_) => continue,
Expand All @@ -122,7 +122,7 @@ async fn main() {
let state = state.clone();

tokio::spawn_async(async move {
if let Err(_) = await!(process(stream, state)) {
if let Err(_) = async_wait!(process(stream, state)) {
eprintln!("failed to process connection");
}
});
Expand Down
10 changes: 5 additions & 5 deletions async-await/src/echo_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(await_macro, async_await)]

use tokio::await;
use tokio::async_wait;
use tokio::net::TcpStream;
use tokio::prelude::*;

Expand All @@ -14,7 +14,7 @@ const MESSAGES: &[&str] = &[
];

async fn run_client(addr: &SocketAddr) -> io::Result<()> {
let mut stream = await!(TcpStream::connect(addr))?;
let mut stream = async_wait!(TcpStream::connect(addr))?;

// Buffer to read into
let mut buf = [0; 128];
Expand All @@ -23,10 +23,10 @@ async fn run_client(addr: &SocketAddr) -> io::Result<()> {
println!(" > write = {:?}", msg);

// Write the message to the server
await!(stream.write_all_async(msg.as_bytes()))?;
async_wait!(stream.write_all_async(msg.as_bytes()))?;

// Read the message back from the server
await!(stream.read_exact_async(&mut buf[..msg.len()]))?;
async_wait!(stream.read_exact_async(&mut buf[..msg.len()]))?;

assert_eq!(&buf[..msg.len()], msg.as_bytes());
}
Expand All @@ -43,7 +43,7 @@ async fn main() {

// Connect to the echo serveer

match await!(run_client(&addr)) {
match async_wait!(run_client(&addr)) {
Ok(_) => println!("done."),
Err(e) => eprintln!("echo client failed; error = {:?}", e),
}
Expand Down
8 changes: 4 additions & 4 deletions async-await/src/echo_server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(await_macro, async_await)]

use tokio::await;
use tokio::async_wait;
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::*;

Expand All @@ -11,11 +11,11 @@ fn handle(mut stream: TcpStream) {
let mut buf = [0; 1024];

loop {
match await!(stream.read_async(&mut buf)).unwrap() {
match async_wait!(stream.read_async(&mut buf)).unwrap() {
0 => break, // Socket closed
n => {
// Send the data back
await!(stream.write_all_async(&buf[0..n])).unwrap();
async_wait!(stream.write_all_async(&buf[0..n])).unwrap();
}
}
}
Expand All @@ -35,7 +35,7 @@ async fn main() {

let mut incoming = listener.incoming();

while let Some(stream) = await!(incoming.next()) {
while let Some(stream) = async_wait!(incoming.next()) {
let stream = stream.unwrap();
handle(stream);
}
Expand Down
6 changes: 3 additions & 3 deletions async-await/src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(await_macro, async_await)]

use tokio::await;
use tokio::async_wait;
use tokio::prelude::*;
use hyper::Client;

Expand All @@ -13,7 +13,7 @@ async fn main() {

let uri = "http://httpbin.org/ip".parse().unwrap();

let response = await!({
let response = async_wait!({
client.get(uri)
.timeout(Duration::from_secs(10))
}).unwrap();
Expand All @@ -22,7 +22,7 @@ async fn main() {

let mut body = response.into_body();

while let Some(chunk) = await!(body.next()) {
while let Some(chunk) = async_wait!(body.next()) {
let chunk = chunk.unwrap();
println!("chunk = {}", str::from_utf8(&chunk[..]).unwrap());
}
Expand Down
4 changes: 2 additions & 2 deletions async-await/tests/macros.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![feature(await_macro, async_await)]

use tokio::await;
use tokio::async_wait;
use tokio::timer::Delay;
use std::time::{Duration, Instant};

Expand All @@ -18,5 +18,5 @@ async fn fail_no_async() {
#[tokio::test]
async fn use_timer() {
let when = Instant::now() + Duration::from_millis(10);
await!(Delay::new(when));
async_wait!(Delay::new(when));
}
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
parameters:
name: test_nightly
displayName: Test Async / Await
rust: nightly-2019-04-25
rust: nightly-2019-05-09

# Try cross compiling
- template: ci/azure-cross-compile.yml
Expand Down
2 changes: 1 addition & 1 deletion tokio-futures/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Then, get started. In your application, add:

```rust
// The nightly features that are commonly needed with async / await
#![feature(await_macro, async_await)]
#![feature(async_await)]

// This pulls in the `tokio-futures` crate. While Rust 2018 doesn't require
// `extern crate`, we need to pull in the macros.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
/// Wait for a future to complete.
#[macro_export]
macro_rules! await {
macro_rules! async_wait {
($e:expr) => {{
#[allow(unused_imports)]
use $crate::compat::backward::IntoAwaitable as IntoAwaitableBackward;
#[allow(unused_imports)]
use $crate::compat::forward::IntoAwaitable as IntoAwaitableForward;
use $crate::std_await;

#[allow(unused_mut)]
let mut e = $e;
let e = e.into_awaitable();
std_await!(e)
e.await
}};
}
28 changes: 14 additions & 14 deletions tokio-futures/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait AsyncReadExt: AsyncRead {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -35,7 +35,7 @@ pub trait AsyncReadExt: AsyncRead {
/// let mut reader = Cursor::new([1, 2, 3, 4]);
/// let mut output = [0u8; 5];
///
/// let bytes = await!(reader.read_async(&mut output[..])).unwrap();
/// let bytes = reader.read_async(&mut output[..]).await.unwrap();
///
/// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
/// // reader. In a real system you could get anywhere from 1 to
Expand All @@ -59,7 +59,7 @@ pub trait AsyncReadExt: AsyncRead {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -69,7 +69,7 @@ pub trait AsyncReadExt: AsyncRead {
/// let mut reader = Cursor::new([1, 2, 3, 4]);
/// let mut output = [0u8; 4];
///
/// await!(reader.read_exact_async(&mut output)).unwrap();
/// reader.read_exact_async(&mut output).await.unwrap();
///
/// assert_eq!(output, [1, 2, 3, 4]);
/// });
Expand All @@ -78,7 +78,7 @@ pub trait AsyncReadExt: AsyncRead {
/// ## EOF is hit before `buf` is filled
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -88,7 +88,7 @@ pub trait AsyncReadExt: AsyncRead {
/// let mut reader = Cursor::new([1, 2, 3, 4]);
/// let mut output = [0u8; 5];
///
/// let result = await!(reader.read_exact_async(&mut output));
/// let result = reader.read_exact_async(&mut output).await;
///
/// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
/// });
Expand All @@ -110,7 +110,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -120,7 +120,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// let mut buf = [0u8; 5];
/// let mut writer = Cursor::new(&mut buf[..]);
///
/// let n = await!(writer.write_async(&[1, 2, 3, 4])).unwrap();
/// let n = writer.write_async(&[1, 2, 3, 4]).await.unwrap();
///
/// assert_eq!(writer.into_inner()[..n], [1, 2, 3, 4, 0][..n]);
/// });
Expand All @@ -139,7 +139,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -149,7 +149,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// let mut buf = [0u8; 5];
/// let mut writer = Cursor::new(&mut buf[..]);
///
/// await!(writer.write_all_async(&[1, 2, 3, 4])).unwrap();
/// writer.write_all_async(&[1, 2, 3, 4]).await.unwrap();
///
/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
/// });
Expand All @@ -163,7 +163,7 @@ pub trait AsyncWriteExt: AsyncWrite {
/// # Examples
///
/// ```edition2018
/// #![feature(async_await, await_macro)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
Expand All @@ -175,9 +175,9 @@ pub trait AsyncWriteExt: AsyncWrite {
/// {
/// let mut writer = Cursor::new(&mut output[..]);
/// let mut buffered = BufWriter::new(writer);
/// await!(buffered.write_all_async(&[1, 2])).unwrap();
/// await!(buffered.write_all_async(&[3, 4])).unwrap();
/// await!(buffered.flush_async()).unwrap();
/// buffered.write_all_async(&[1, 2]).await.unwrap();
/// buffered.write_all_async(&[3, 4]).await.unwrap();
/// buffered.flush_async().await.unwrap();
/// }
///
/// assert_eq!(output, [1, 2, 3, 4, 0]);
Expand Down
9 changes: 2 additions & 7 deletions tokio-futures/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![cfg(feature = "async-await-preview")]
#![feature(await_macro)]
#![feature(async_await, await_macro)]
#![doc(html_root_url = "https://docs.rs/tokio-futures/0.1.0")]
#![deny(missing_docs, missing_debug_implementations)]
#![cfg_attr(test, deny(warnings))]
Expand All @@ -23,13 +23,8 @@ macro_rules! try_ready {
}

#[macro_use]
mod await;
mod async_wait;
pub mod compat;
pub mod io;
pub mod sink;
pub mod stream;

// Rename the `await` macro in `std`. This is used by the redefined
// `await` macro in this crate.
#[doc(hidden)]
pub use std::await as std_await;
10 changes: 5 additions & 5 deletions tokio-futures/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ pub trait StreamExt: Stream {
/// # Examples
///
/// ```edition2018
/// #![feature(await_macro, async_await)]
/// #![feature(async_await)]
/// tokio::run_async(async {
/// // The extension trait can also be imported with
/// // `use tokio::prelude::*`.
/// use tokio::prelude::{stream, StreamAsyncExt};
///
/// let mut stream = stream::iter_ok::<_, ()>(1..3);
///
/// assert_eq!(await!(stream.next()), Some(Ok(1)));
/// assert_eq!(await!(stream.next()), Some(Ok(2)));
/// assert_eq!(await!(stream.next()), Some(Ok(3)));
/// assert_eq!(await!(stream.next()), None);
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Ok(3)));
/// assert_eq!(stream.next().await, None);
/// });
/// ```
fn next(&mut self) -> Next<Self>
Expand Down