Skip to content

Fix pinned boxed async functions #916

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions futures-macro-async/src/attribute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use self::Attribute::*;

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Attribute {
Pinned,
PinnedBoxed,
PinnedBoxedSend,
Unpinned,
UnpinnedBoxed,
UnpinnedBoxedSend,
}

impl Attribute {
pub fn boxed(&self) -> bool {
match *self {
PinnedBoxed | PinnedBoxedSend | UnpinnedBoxed | UnpinnedBoxedSend => true,
Pinned | Unpinned => false,
}
}

pub fn pinned(&self) -> bool {
match *self {
Pinned | PinnedBoxed | PinnedBoxedSend => true,
Unpinned | UnpinnedBoxed | UnpinnedBoxedSend => false,
}
}
}
94 changes: 50 additions & 44 deletions futures-macro-async/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,17 @@ if_nightly! {
use syn::punctuated::Punctuated;
use syn::fold::Fold;

mod attribute;
mod elision;

use attribute::Attribute;

macro_rules! quote_cs {
($($t:tt)*) => (quote_spanned!(Span::call_site() => $($t)*))
}

fn async_inner<F>(
boxed: bool,
pinned: bool,
attribute: Attribute,
function: TokenStream,
gen_function: Tokens,
return_ty: F,
Expand Down Expand Up @@ -191,7 +193,7 @@ if_nightly! {
});
syn::token::Semi([block.brace_token.0]).to_tokens(&mut result);

let await = if pinned {
let await = if attribute.pinned() {
await()
} else {
await_move()
Expand Down Expand Up @@ -220,7 +222,7 @@ if_nightly! {
// sure if more errors will highlight this function call...
let output_span = first_last(&output);
let gen_function = respan(gen_function.into(), &output_span);
let body_inner = if pinned {
let body_inner = if attribute.pinned() {
quote_cs! {
#gen_function (static move || -> #output #gen_body)
}
Expand All @@ -229,9 +231,15 @@ if_nightly! {
#gen_function (move || -> #output #gen_body)
}
};
let body_inner = if boxed {
let body = quote_cs! { ::futures::__rt::std::boxed::Box::new(#body_inner) };
respan(body.into(), &output_span)
let body_inner = match attribute {
Attribute::PinnedBoxed => quote_cs! { (#body_inner).pin_local() },
Attribute::PinnedBoxedSend => quote_cs! { (#body_inner).pin() },
Attribute::UnpinnedBoxed | Attribute::UnpinnedBoxedSend =>
quote_cs! { ::futures::__rt::std::boxed::Box::new(#body_inner) },
Attribute::Pinned | Attribute::Unpinned => body_inner,
};
let body_inner = if attribute.boxed() {
respan(body_inner.into(), &output_span)
} else {
body_inner.into()
};
Expand All @@ -255,35 +263,34 @@ if_nightly! {

#[proc_macro_attribute]
pub fn async(attribute: TokenStream, function: TokenStream) -> TokenStream {
let (boxed, send) = match &attribute.to_string() as &str {
"( pinned )" => (true, false),
"( pinned_send )" => (true, true),
"" => (false, false),
let attribute = match &attribute.to_string() as &str {
"( pinned )" => Attribute::PinnedBoxed,
"( pinned_send )" => Attribute::PinnedBoxedSend,
"" => Attribute::Pinned,
_ => panic!("the #[async] attribute currently only takes `pinned` `pinned_send` as an arg"),
};

async_inner(boxed, true, function, quote_cs! { ::futures::__rt::gen_pinned }, |output, lifetimes| {
async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_pinned }, |output, lifetimes| {
// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?
let output_span = first_last(&output);
let return_ty = if boxed && !send {
quote_cs! {
::futures::__rt::boxed::PinBox<::futures::__rt::Future<
let return_ty = match attribute {
Attribute::PinnedBoxed => quote_cs! {
::futures::__rt::std::boxed::PinBox<::futures::__rt::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
>>
}
} else if boxed && send {
quote_cs! {
::futures::__rt::boxed::PinBox<::futures::__rt::Future<
> + #(#lifetimes +)*>
},
Attribute::PinnedBoxedSend => quote_cs! {
::futures::__rt::std::boxed::PinBox<::futures::__rt::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
> + Send>
}
} else {
quote_cs! {
impl ::futures::__rt::MyStableFuture<!> + #( #lifetimes + )*
}
> + Send + #(#lifetimes +)*>
},
Attribute::Pinned => quote_cs! {
impl ::futures::__rt::MyStableFuture<!> + #(#lifetimes +)*
},
_ => unreachable!(),
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bang(return_ty, &output)
Expand All @@ -293,32 +300,30 @@ if_nightly! {
#[proc_macro_attribute]
pub fn async_move(attribute: TokenStream, function: TokenStream) -> TokenStream {
// Handle arguments to the #[async_move] attribute, if any
let (boxed, send) = match &attribute.to_string() as &str {
"( boxed )" => (true, false),
"( boxed_send )" => (true, true),
"" => (false, false),
let attribute = match &attribute.to_string() as &str {
"( boxed )" => Attribute::UnpinnedBoxed,
"( boxed_send )" => Attribute::UnpinnedBoxedSend,
"" => Attribute::Unpinned,
_ => panic!("the #[async_move] attribute currently only takes `boxed` or `boxed_send`, as an arg"),
};

async_inner(boxed, false, function, quote_cs! { ::futures::__rt::gen_move }, |output, lifetimes| {
async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_move }, |output, lifetimes| {
// TODO: can we lift the restriction that `futures` must be at the root of
// the crate?
let output_span = first_last(&output);
let return_ty = if boxed && !send {
quote_cs! {
let return_ty = match attribute {
Attribute::UnpinnedBoxed => quote_cs! {
::futures::__rt::std::boxed::Box<::futures::__rt::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
> + ::futures::__rt::std::marker::Unpin + #(#lifetimes +)*>
}
} else if boxed && send {
quote_cs! {
},
Attribute::UnpinnedBoxedSend => quote_cs! {
::futures::__rt::std::boxed::Box<::futures::__rt::Future<
Item = <! as ::futures::__rt::IsResult>::Ok,
Error = <! as ::futures::__rt::IsResult>::Err,
> + ::futures::__rt::std::marker::Unpin + Send + #(#lifetimes +)*>
}
} else {
},
// Dunno why this is buggy, hits weird typecheck errors in tests
//
// quote_cs! {
Expand All @@ -327,7 +332,8 @@ if_nightly! {
// Error = <#output as ::futures::__rt::MyTry>::MyError,
// >
// }
quote_cs! { impl ::futures::__rt::MyFuture<!> + #(#lifetimes +)* }
Attribute::Unpinned => quote_cs! { impl ::futures::__rt::MyFuture<!> + #(#lifetimes +)* },
_ => unreachable!(),
};
let return_ty = respan(return_ty.into(), &output_span);
replace_bang(return_ty, &output)
Expand Down Expand Up @@ -368,14 +374,14 @@ if_nightly! {
}
}

let boxed = boxed;
let attribute = if boxed { Attribute::PinnedBoxed } else { Attribute::Pinned };
let item_ty = item_ty.expect("#[async_stream] requires item type to be specified");

async_inner(boxed, true, function, quote_cs! { ::futures::__rt::gen_stream_pinned }, |output, lifetimes| {
async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_stream_pinned }, |output, lifetimes| {
let output_span = first_last(&output);
let return_ty = if boxed {
quote_cs! {
::futures::__rt::boxed::PinBox<::futures::__rt::Stream<
::futures::__rt::std::boxed::PinBox<::futures::__rt::Stream<
Item = !,
Error = <! as ::futures::__rt::IsResult>::Err,
> + #(#lifetimes +)*>
Expand Down Expand Up @@ -422,10 +428,10 @@ if_nightly! {
}
}

let boxed = boxed;
let attribute = if boxed { Attribute::UnpinnedBoxed } else { Attribute::Unpinned };
let item_ty = item_ty.expect("#[async_stream_move] requires item type to be specified");

async_inner(boxed, false, function, quote_cs! { ::futures::__rt::gen_stream }, |output, lifetimes| {
async_inner(attribute, function, quote_cs! { ::futures::__rt::gen_stream }, |output, lifetimes| {
let output_span = first_last(&output);
let return_ty = if boxed {
quote_cs! {
Expand Down
9 changes: 9 additions & 0 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,15 @@ pub mod prelude {
#[cfg(feature = "std")]
pub use futures_core::executor::Executor;

#[cfg(feature = "nightly")]
pub use futures_stable::{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't these already exported through the stable submodule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to put them into futures::prelude as normal Future and Stream so that users don't need to use them every time in modules which use #[async] stuff otherwise they'll see some error messages about missing methods of pin() or pin_local().

StableFuture,
StableStream
};

#[cfg(all(feature = "nightly", feature = "std"))]
pub use futures_stable::StableExecutor;

pub use futures_sink::Sink;

#[cfg(feature = "std")]
Expand Down
45 changes: 44 additions & 1 deletion futures/tests/async_await/pinned.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::stable::{block_on_stable, StableFuture};
use futures::stable::block_on_stable;
use futures::executor::{block_on, ThreadPool};
use futures::prelude::*;

#[async]
Expand Down Expand Up @@ -36,6 +37,31 @@ fn qux2(x: i32) -> Result<i32, i32> {
await!(baz2(x).pin())
}

#[async(pinned)]
fn boxed(x: i32) -> Result<i32, i32> {
Ok(x)
}

#[async(pinned)]
fn boxed_borrow(x: &i32) -> Result<i32, i32> {
Ok(*x)
}

#[async(pinned_send)]
fn boxed_send(x: i32) -> Result<i32, i32> {
Ok(x)
}

#[async(pinned_send)]
fn boxed_send_borrow(x: &i32) -> Result<i32, i32> {
Ok(*x)
}

#[async(pinned_send)]
fn spawnable() -> Result<(), Never> {
Ok(())
}

#[async_stream(item = u64)]
fn _stream1() -> Result<(), i32> {
fn integer() -> u64 { 1 }
Expand All @@ -45,6 +71,15 @@ fn _stream1() -> Result<(), i32> {
Ok(())
}

#[async_stream(pinned, item = u64)]
fn _stream_boxed() -> Result<(), i32> {
fn integer() -> u64 { 1 }
let x = &integer();
stream_yield!(0);
stream_yield!(*x);
Ok(())
}

#[async]
pub fn uses_async_for() -> Result<Vec<u64>, i32> {
let mut v = vec![];
Expand All @@ -62,5 +97,13 @@ fn main() {
assert_eq!(block_on_stable(baz(17)), Ok(17));
assert_eq!(block_on_stable(qux(17)), Ok(17));
assert_eq!(block_on_stable(qux2(17)), Ok(17));
assert_eq!(block_on(boxed(17)), Ok(17));
assert_eq!(block_on(boxed_send(17)), Ok(17));
assert_eq!(block_on_stable(uses_async_for()), Ok(vec![0, 1]));
}

#[test]
fn run_pinned_future_in_thread_pool() {
let mut pool = ThreadPool::new().unwrap();
pool.spawn_pinned(spawnable()).unwrap();
}