From 1691e06db661a19a5e25276c18cd165386f027bb Mon Sep 17 00:00:00 2001 From: Taylor Cramer Date: Mon, 11 Mar 2019 16:56:00 -0700 Subject: [PATCH] Future-proof the Futures API --- src/liballoc/boxed.rs | 6 +- src/libcore/future/future.rs | 18 ++-- src/libcore/task/mod.rs | 2 +- src/libcore/task/wake.rs | 101 +++++++++++++++++- src/libstd/future.rs | 61 ++++++----- src/libstd/macros.rs | 2 +- src/libstd/panic.rs | 6 +- .../compile-fail/must_use-in-stdlib-traits.rs | 4 +- src/test/run-pass/async-await.rs | 15 ++- src/test/run-pass/auxiliary/arc_wake.rs | 14 +-- src/test/run-pass/futures-api.rs | 17 +-- 11 files changed, 176 insertions(+), 70 deletions(-) diff --git a/src/liballoc/boxed.rs b/src/liballoc/boxed.rs index b2315c6a73907..d75a0c298c278 100644 --- a/src/liballoc/boxed.rs +++ b/src/liballoc/boxed.rs @@ -81,7 +81,7 @@ use core::ops::{ CoerceUnsized, DispatchFromDyn, Deref, DerefMut, Receiver, Generator, GeneratorState }; use core::ptr::{self, NonNull, Unique}; -use core::task::{Waker, Poll}; +use core::task::{Context, Poll}; use crate::vec::Vec; use crate::raw_vec::RawVec; @@ -914,7 +914,7 @@ impl Generator for Pin> { impl Future for Box { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - F::poll(Pin::new(&mut *self), waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + F::poll(Pin::new(&mut *self), cx) } } diff --git a/src/libcore/future/future.rs b/src/libcore/future/future.rs index a143b54a61f54..114a6b9336777 100644 --- a/src/libcore/future/future.rs +++ b/src/libcore/future/future.rs @@ -5,7 +5,7 @@ use marker::Unpin; use ops; use pin::Pin; -use task::{Poll, Waker}; +use task::{Context, Poll}; /// A future represents an asynchronous computation. /// @@ -44,8 +44,9 @@ pub trait Future { /// Once a future has finished, clients should not `poll` it again. /// /// When a future is not ready yet, `poll` returns `Poll::Pending` and - /// stores a clone of the [`Waker`] to be woken once the future can - /// make progress. For example, a future waiting for a socket to become + /// stores a clone of the [`Waker`] copied from the current [`Context`]. + /// This [`Waker`] is then woken once the future can make progress. + /// For example, a future waiting for a socket to become /// readable would call `.clone()` on the [`Waker`] and store it. /// When a signal arrives elsewhere indicating that the socket is readable, /// `[Waker::wake]` is called and the socket future's task is awoken. @@ -88,16 +89,17 @@ pub trait Future { /// /// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending /// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready + /// [`Context`]: ../task/struct.Context.html /// [`Waker`]: ../task/struct.Waker.html /// [`Waker::wake`]: ../task/struct.Waker.html#method.wake - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll; } impl Future for &mut F { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll { - F::poll(Pin::new(&mut **self), waker) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + F::poll(Pin::new(&mut **self), cx) } } @@ -108,7 +110,7 @@ where { type Output = <

::Target as Future>::Output; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { - Pin::get_mut(self).as_mut().poll(waker) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::get_mut(self).as_mut().poll(cx) } } diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index 9b8f598116200..29bae69ea83c1 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -8,4 +8,4 @@ mod poll; pub use self::poll::Poll; mod wake; -pub use self::wake::{Waker, RawWaker, RawWakerVTable}; +pub use self::wake::{Context, Waker, RawWaker, RawWakerVTable}; diff --git a/src/libcore/task/wake.rs b/src/libcore/task/wake.rs index 12f812d3bed3e..979a0792608c1 100644 --- a/src/libcore/task/wake.rs +++ b/src/libcore/task/wake.rs @@ -3,7 +3,7 @@ issue = "50547")] use fmt; -use marker::Unpin; +use marker::{PhantomData, Unpin}; /// A `RawWaker` allows the implementor of a task executor to create a [`Waker`] /// which provides customized wakeup behavior. @@ -36,6 +36,10 @@ impl RawWaker { /// The `vtable` customizes the behavior of a `Waker` which gets created /// from a `RawWaker`. For each operation on the `Waker`, the associated /// function in the `vtable` of the underlying `RawWaker` will be called. + #[rustc_promotable] + #[unstable(feature = "futures_api", + reason = "futures in libcore are unstable", + issue = "50547")] pub const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker { RawWaker { data, @@ -63,21 +67,105 @@ pub struct RawWakerVTable { /// required for this additional instance of a [`RawWaker`] and associated /// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup /// of the same task that would have been awoken by the original [`RawWaker`]. - pub clone: unsafe fn(*const ()) -> RawWaker, + clone: unsafe fn(*const ()) -> RawWaker, /// This function will be called when `wake` is called on the [`Waker`]. /// It must wake up the task associated with this [`RawWaker`]. /// /// The implemention of this function must not consume the provided data /// pointer. - pub wake: unsafe fn(*const ()), + wake: unsafe fn(*const ()), + + /// This function gets called when a [`RawWaker`] gets dropped. + /// + /// The implementation of this function must make sure to release any + /// resources that are associated with this instance of a [`RawWaker`] and + /// associated task. + drop: unsafe fn(*const ()), +} +impl RawWakerVTable { + /// Creates a new `RawWakerVTable` from the provided `clone`, `wake`, and + /// `drop` functions. + /// + /// # `clone` + /// + /// This function will be called when the [`RawWaker`] gets cloned, e.g. when + /// the [`Waker`] in which the [`RawWaker`] is stored gets cloned. + /// + /// The implementation of this function must retain all resources that are + /// required for this additional instance of a [`RawWaker`] and associated + /// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup + /// of the same task that would have been awoken by the original [`RawWaker`]. + /// + /// # `wake` + /// + /// This function will be called when `wake` is called on the [`Waker`]. + /// It must wake up the task associated with this [`RawWaker`]. + /// + /// The implemention of this function must not consume the provided data + /// pointer. + /// + /// # `drop` + /// /// This function gets called when a [`RawWaker`] gets dropped. /// /// The implementation of this function must make sure to release any /// resources that are associated with this instance of a [`RawWaker`] and /// associated task. - pub drop: unsafe fn(*const ()), + #[rustc_promotable] + #[unstable(feature = "futures_api", + reason = "futures in libcore are unstable", + issue = "50547")] + pub const fn new( + clone: unsafe fn(*const ()) -> RawWaker, + wake: unsafe fn(*const ()), + drop: unsafe fn(*const ()), + ) -> Self { + Self { + clone, + wake, + drop, + } + } +} + +/// The `Context` of an asynchronous task. +/// +/// Currently, `Context` only serves to provide access to a `&Waker` +/// which can be used to wake the current task. +pub struct Context<'a> { + waker: &'a Waker, + // Ensure we future-proof against variance changes by forcing + // the lifetime to be invariant (argument-position lifetimes + // are contravariant while return-position lifetimes are + // covariant). + _marker: PhantomData &'a ()>, +} + +impl<'a> Context<'a> { + /// Create a new `Context` from a `&Waker`. + #[inline] + pub fn from_waker(waker: &'a Waker) -> Self { + Context { + waker, + _marker: PhantomData, + } + } + + /// Returns a reference to the `Waker` for the current task. + #[inline] + pub fn waker(&self) -> &'a Waker { + &self.waker + } +} + +impl fmt::Debug for Context<'_> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Context") + .field("waker", &self.waker) + .finish() + } } /// A `Waker` is a handle for waking up a task by notifying its executor that it @@ -98,6 +186,7 @@ unsafe impl Sync for Waker {} impl Waker { /// Wake up the task associated with this `Waker`. + #[inline] pub fn wake(&self) { // The actual wakeup call is delegated through a virtual function call // to the implementation which is defined by the executor. @@ -115,6 +204,7 @@ impl Waker { /// returns `true`, it is guaranteed that the `Waker`s will awaken the same task. /// /// This function is primarily used for optimization purposes. + #[inline] pub fn will_wake(&self, other: &Waker) -> bool { self.waker == other.waker } @@ -124,6 +214,7 @@ impl Waker { /// The behavior of the returned `Waker` is undefined if the contract defined /// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld. /// Therefore this method is unsafe. + #[inline] pub unsafe fn new_unchecked(waker: RawWaker) -> Waker { Waker { waker, @@ -132,6 +223,7 @@ impl Waker { } impl Clone for Waker { + #[inline] fn clone(&self) -> Self { Waker { // SAFETY: This is safe because `Waker::new_unchecked` is the only way @@ -143,6 +235,7 @@ impl Clone for Waker { } impl Drop for Waker { + #[inline] fn drop(&mut self) { // SAFETY: This is safe because `Waker::new_unchecked` is the only way // to initialize `drop` and `data` requiring the user to acknowledge diff --git a/src/libstd/future.rs b/src/libstd/future.rs index aa784746122db..898387cb9f56d 100644 --- a/src/libstd/future.rs +++ b/src/libstd/future.rs @@ -5,7 +5,7 @@ use core::marker::Unpin; use core::pin::Pin; use core::option::Option; use core::ptr::NonNull; -use core::task::{Waker, Poll}; +use core::task::{Context, Poll}; use core::ops::{Drop, Generator, GeneratorState}; #[doc(inline)] @@ -32,10 +32,10 @@ impl> !Unpin for GenFuture {} #[unstable(feature = "gen_future", issue = "50547")] impl> Future for GenFuture { type Output = T::Return; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Safe because we're !Unpin + !Drop mapping to a ?Unpin value let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) }; - set_task_waker(waker, || match gen.resume() { + set_task_context(cx, || match gen.resume() { GeneratorState::Yielded(()) => Poll::Pending, GeneratorState::Complete(x) => Poll::Ready(x), }) @@ -43,61 +43,72 @@ impl> Future for GenFuture { } thread_local! { - static TLS_WAKER: Cell>> = Cell::new(None); + static TLS_CX: Cell>>> = Cell::new(None); } -struct SetOnDrop(Option>); +struct SetOnDrop(Option>>); impl Drop for SetOnDrop { fn drop(&mut self) { - TLS_WAKER.with(|tls_waker| { - tls_waker.set(self.0.take()); + TLS_CX.with(|tls_cx| { + tls_cx.set(self.0.take()); }); } } #[unstable(feature = "gen_future", issue = "50547")] /// Sets the thread-local task context used by async/await futures. -pub fn set_task_waker(waker: &Waker, f: F) -> R +pub fn set_task_context(cx: &mut Context<'_>, f: F) -> R where F: FnOnce() -> R { - let old_waker = TLS_WAKER.with(|tls_waker| { - tls_waker.replace(Some(NonNull::from(waker))) + // transmute the context's lifetime to 'static so we can store it. + let cx = unsafe { + core::mem::transmute::<&mut Context<'_>, &mut Context<'static>>(cx) + }; + let old_cx = TLS_CX.with(|tls_cx| { + tls_cx.replace(Some(NonNull::from(cx))) }); - let _reset_waker = SetOnDrop(old_waker); + let _reset = SetOnDrop(old_cx); f() } #[unstable(feature = "gen_future", issue = "50547")] -/// Retrieves the thread-local task waker used by async/await futures. +/// Retrieves the thread-local task context used by async/await futures. /// -/// This function acquires exclusive access to the task waker. +/// This function acquires exclusive access to the task context. /// -/// Panics if no waker has been set or if the waker has already been -/// retrieved by a surrounding call to get_task_waker. -pub fn get_task_waker(f: F) -> R +/// Panics if no context has been set or if the context has already been +/// retrieved by a surrounding call to get_task_context. +pub fn get_task_context(f: F) -> R where - F: FnOnce(&Waker) -> R + F: FnOnce(&mut Context<'_>) -> R { - let waker_ptr = TLS_WAKER.with(|tls_waker| { + let cx_ptr = TLS_CX.with(|tls_cx| { // Clear the entry so that nested `get_task_waker` calls // will fail or set their own value. - tls_waker.replace(None) + tls_cx.replace(None) }); - let _reset_waker = SetOnDrop(waker_ptr); + let _reset = SetOnDrop(cx_ptr); - let waker_ptr = waker_ptr.expect( - "TLS Waker not set. This is a rustc bug. \ + let mut cx_ptr = cx_ptr.expect( + "TLS Context not set. This is a rustc bug. \ Please file an issue on https://github.com/rust-lang/rust."); - unsafe { f(waker_ptr.as_ref()) } + + // Safety: we've ensured exclusive access to the context by + // removing the pointer from TLS, only to be replaced once + // we're done with it. + // + // The pointer that was inserted came from an `&mut Context<'_>`, + // so it is safe to treat as mutable. + unsafe { f(cx_ptr.as_mut()) } } #[unstable(feature = "gen_future", issue = "50547")] /// Polls a future in the current thread-local task waker. -pub fn poll_with_tls_waker(f: Pin<&mut F>) -> Poll +pub fn poll_with_tls_context(f: Pin<&mut F>) -> Poll where F: Future { - get_task_waker(|waker| F::poll(f, waker)) + get_task_context(|cx| F::poll(f, cx)) } diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index be4db1f737d83..44ca56e261152 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -346,7 +346,7 @@ macro_rules! r#await { let mut pinned = $e; loop { if let $crate::task::Poll::Ready(x) = - $crate::future::poll_with_tls_waker(unsafe { + $crate::future::poll_with_tls_context(unsafe { $crate::pin::Pin::new_unchecked(&mut pinned) }) { diff --git a/src/libstd/panic.rs b/src/libstd/panic.rs index cc147d851de54..5a8101e230119 100644 --- a/src/libstd/panic.rs +++ b/src/libstd/panic.rs @@ -12,7 +12,7 @@ use crate::panicking; use crate::ptr::{Unique, NonNull}; use crate::rc::Rc; use crate::sync::{Arc, Mutex, RwLock, atomic}; -use crate::task::{Waker, Poll}; +use crate::task::{Context, Poll}; use crate::thread::Result; #[stable(feature = "panic_hooks", since = "1.10.0")] @@ -323,9 +323,9 @@ impl fmt::Debug for AssertUnwindSafe { impl Future for AssertUnwindSafe { type Output = F::Output; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) }; - F::poll(pinned_field, waker) + F::poll(pinned_field, cx) } } diff --git a/src/test/compile-fail/must_use-in-stdlib-traits.rs b/src/test/compile-fail/must_use-in-stdlib-traits.rs index b4f07ab33214c..503b39e181ab7 100644 --- a/src/test/compile-fail/must_use-in-stdlib-traits.rs +++ b/src/test/compile-fail/must_use-in-stdlib-traits.rs @@ -4,7 +4,7 @@ use std::iter::Iterator; use std::future::Future; -use std::task::{Poll, Waker}; +use std::task::{Context, Poll}; use std::pin::Pin; use std::unimplemented; @@ -13,7 +13,7 @@ struct MyFuture; impl Future for MyFuture { type Output = u32; - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { Poll::Pending } } diff --git a/src/test/run-pass/async-await.rs b/src/test/run-pass/async-await.rs index 72af51629927d..4f5f7724ad076 100644 --- a/src/test/run-pass/async-await.rs +++ b/src/test/run-pass/async-await.rs @@ -1,7 +1,7 @@ // edition:2018 // aux-build:arc_wake.rs -#![feature(arbitrary_self_types, async_await, await_macro, futures_api)] +#![feature(async_await, await_macro, futures_api)] extern crate arc_wake; @@ -11,9 +11,7 @@ use std::sync::{ Arc, atomic::{self, AtomicUsize}, }; -use std::task::{ - Poll, Waker, -}; +use std::task::{Context, Poll}; use arc_wake::ArcWake; struct Counter { @@ -32,11 +30,11 @@ fn wake_and_yield_once() -> WakeOnceThenComplete { WakeOnceThenComplete(false) } impl Future for WakeOnceThenComplete { type Output = (); - fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if self.0 { Poll::Ready(()) } else { - waker.wake(); + cx.waker().wake(); self.0 = true; Poll::Pending } @@ -146,10 +144,11 @@ where let mut fut = Box::pin(f(9)); let counter = Arc::new(Counter { wakes: AtomicUsize::new(0) }); let waker = ArcWake::into_waker(counter.clone()); + let mut cx = Context::from_waker(&waker); assert_eq!(0, counter.wakes.load(atomic::Ordering::SeqCst)); - assert_eq!(Poll::Pending, fut.as_mut().poll(&waker)); + assert_eq!(Poll::Pending, fut.as_mut().poll(&mut cx)); assert_eq!(1, counter.wakes.load(atomic::Ordering::SeqCst)); - assert_eq!(Poll::Ready(9), fut.as_mut().poll(&waker)); + assert_eq!(Poll::Ready(9), fut.as_mut().poll(&mut cx)); } fn main() { diff --git a/src/test/run-pass/auxiliary/arc_wake.rs b/src/test/run-pass/auxiliary/arc_wake.rs index 034e378af7f19..74ec56f55171a 100644 --- a/src/test/run-pass/auxiliary/arc_wake.rs +++ b/src/test/run-pass/auxiliary/arc_wake.rs @@ -1,19 +1,19 @@ // edition:2018 -#![feature(arbitrary_self_types, futures_api)] +#![feature(futures_api)] use std::sync::Arc; use std::task::{ - Poll, Waker, RawWaker, RawWakerVTable, + Waker, RawWaker, RawWakerVTable, }; macro_rules! waker_vtable { ($ty:ident) => { - &RawWakerVTable { - clone: clone_arc_raw::<$ty>, - drop: drop_arc_raw::<$ty>, - wake: wake_arc_raw::<$ty>, - } + &RawWakerVTable::new( + clone_arc_raw::<$ty>, + wake_arc_raw::<$ty>, + drop_arc_raw::<$ty>, + ) }; } diff --git a/src/test/run-pass/futures-api.rs b/src/test/run-pass/futures-api.rs index fd4b585d34572..5d0b0db510f41 100644 --- a/src/test/run-pass/futures-api.rs +++ b/src/test/run-pass/futures-api.rs @@ -1,7 +1,6 @@ // aux-build:arc_wake.rs -#![feature(arbitrary_self_types, futures_api)] -#![allow(unused)] +#![feature(futures_api)] extern crate arc_wake; @@ -12,7 +11,7 @@ use std::sync::{ atomic::{self, AtomicUsize}, }; use std::task::{ - Poll, Waker, + Context, Poll, }; use arc_wake::ArcWake; @@ -30,8 +29,9 @@ struct MyFuture; impl Future for MyFuture { type Output = (); - fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Wake twice + let waker = cx.waker(); waker.wake(); waker.wake(); Poll::Ready(()) @@ -44,10 +44,11 @@ fn test_waker() { }); let waker = ArcWake::into_waker(counter.clone()); assert_eq!(2, Arc::strong_count(&counter)); - - assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&waker)); - assert_eq!(2, counter.wakes.load(atomic::Ordering::SeqCst)); - + { + let mut context = Context::from_waker(&waker); + assert_eq!(Poll::Ready(()), Pin::new(&mut MyFuture).poll(&mut context)); + assert_eq!(2, counter.wakes.load(atomic::Ordering::SeqCst)); + } drop(waker); assert_eq!(1, Arc::strong_count(&counter)); }