Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Future-proof the Futures API #59119

Merged
merged 1 commit into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/liballoc/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ use core::ops::{
CoerceUnsized, DispatchFromDyn, Deref, DerefMut, Receiver, Generator, GeneratorState
};
use core::ptr::{self, NonNull, Unique};
use core::task::{Waker, Poll};
use core::task::{Context, Poll};

use crate::vec::Vec;
use crate::raw_vec::RawVec;
Expand Down Expand Up @@ -914,7 +914,7 @@ impl<G: ?Sized + Generator> Generator for Pin<Box<G>> {
impl<F: ?Sized + Future + Unpin> Future for Box<F> {
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), waker)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut *self), cx)
}
}
18 changes: 10 additions & 8 deletions src/libcore/future/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use marker::Unpin;
use ops;
use pin::Pin;
use task::{Poll, Waker};
use task::{Context, Poll};

/// A future represents an asynchronous computation.
///
Expand Down Expand Up @@ -44,8 +44,9 @@ pub trait Future {
/// Once a future has finished, clients should not `poll` it again.
///
/// When a future is not ready yet, `poll` returns `Poll::Pending` and
/// stores a clone of the [`Waker`] to be woken once the future can
/// make progress. For example, a future waiting for a socket to become
/// stores a clone of the [`Waker`] copied from the current [`Context`].
/// This [`Waker`] is then woken once the future can make progress.
/// For example, a future waiting for a socket to become
/// readable would call `.clone()` on the [`Waker`] and store it.
/// When a signal arrives elsewhere indicating that the socket is readable,
/// `[Waker::wake]` is called and the socket future's task is awoken.
Expand Down Expand Up @@ -88,16 +89,17 @@ pub trait Future {
///
/// [`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
/// [`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
/// [`Context`]: ../task/struct.Context.html
/// [`Waker`]: ../task/struct.Waker.html
/// [`Waker::wake`]: ../task/struct.Waker.html#method.wake
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

impl<F: ?Sized + Future + Unpin> Future for &mut F {
type Output = F::Output;

fn poll(mut self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), waker)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
F::poll(Pin::new(&mut **self), cx)
}
}

Expand All @@ -108,7 +110,7 @@ where
{
type Output = <<P as ops::Deref>::Target as Future>::Output;

fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(waker)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::get_mut(self).as_mut().poll(cx)
}
}
2 changes: 1 addition & 1 deletion src/libcore/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ mod poll;
pub use self::poll::Poll;

mod wake;
pub use self::wake::{Waker, RawWaker, RawWakerVTable};
pub use self::wake::{Context, Waker, RawWaker, RawWakerVTable};
101 changes: 97 additions & 4 deletions src/libcore/task/wake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
issue = "50547")]

use fmt;
use marker::Unpin;
use marker::{PhantomData, Unpin};

/// A `RawWaker` allows the implementor of a task executor to create a [`Waker`]
/// which provides customized wakeup behavior.
Expand Down Expand Up @@ -36,6 +36,10 @@ impl RawWaker {
/// The `vtable` customizes the behavior of a `Waker` which gets created
/// from a `RawWaker`. For each operation on the `Waker`, the associated
/// function in the `vtable` of the underlying `RawWaker` will be called.
#[rustc_promotable]
#[unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
pub const fn new(data: *const (), vtable: &'static RawWakerVTable) -> RawWaker {
RawWaker {
data,
Expand Down Expand Up @@ -63,21 +67,105 @@ pub struct RawWakerVTable {
/// required for this additional instance of a [`RawWaker`] and associated
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
/// of the same task that would have been awoken by the original [`RawWaker`].
pub clone: unsafe fn(*const ()) -> RawWaker,
clone: unsafe fn(*const ()) -> RawWaker,

/// This function will be called when `wake` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// The implemention of this function must not consume the provided data
/// pointer.
pub wake: unsafe fn(*const ()),
wake: unsafe fn(*const ()),

/// This function gets called when a [`RawWaker`] gets dropped.
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
drop: unsafe fn(*const ()),
}

impl RawWakerVTable {
/// Creates a new `RawWakerVTable` from the provided `clone`, `wake`, and
/// `drop` functions.
///
/// # `clone`
///
/// This function will be called when the [`RawWaker`] gets cloned, e.g. when
/// the [`Waker`] in which the [`RawWaker`] is stored gets cloned.
///
/// The implementation of this function must retain all resources that are
/// required for this additional instance of a [`RawWaker`] and associated
/// task. Calling `wake` on the resulting [`RawWaker`] should result in a wakeup
/// of the same task that would have been awoken by the original [`RawWaker`].
///
/// # `wake`
///
/// This function will be called when `wake` is called on the [`Waker`].
/// It must wake up the task associated with this [`RawWaker`].
///
/// The implemention of this function must not consume the provided data
/// pointer.
///
/// # `drop`
///
/// This function gets called when a [`RawWaker`] gets dropped.
///
/// The implementation of this function must make sure to release any
/// resources that are associated with this instance of a [`RawWaker`] and
/// associated task.
pub drop: unsafe fn(*const ()),
#[rustc_promotable]
#[unstable(feature = "futures_api",
reason = "futures in libcore are unstable",
issue = "50547")]
pub const fn new(
cramertj marked this conversation as resolved.
Show resolved Hide resolved
clone: unsafe fn(*const ()) -> RawWaker,
wake: unsafe fn(*const ()),
drop: unsafe fn(*const ()),
) -> Self {
Self {
clone,
wake,
drop,
}
}
}

/// The `Context` of an asynchronous task.
///
/// Currently, `Context` only serves to provide access to a `&Waker`
/// which can be used to wake the current task.
pub struct Context<'a> {
waker: &'a Waker,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
// covariant).
_marker: PhantomData<fn(&'a ()) -> &'a ()>,
}

impl<'a> Context<'a> {
/// Create a new `Context` from a `&Waker`.
#[inline]
pub fn from_waker(waker: &'a Waker) -> Self {
Context {
waker,
_marker: PhantomData,
}
}

/// Returns a reference to the `Waker` for the current task.
#[inline]
pub fn waker(&self) -> &'a Waker {
&self.waker
}
}

impl fmt::Debug for Context<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Context")
.field("waker", &self.waker)
.finish()
}
}

/// A `Waker` is a handle for waking up a task by notifying its executor that it
Expand All @@ -98,6 +186,7 @@ unsafe impl Sync for Waker {}

impl Waker {
/// Wake up the task associated with this `Waker`.
#[inline]
pub fn wake(&self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
Expand All @@ -115,6 +204,7 @@ impl Waker {
/// returns `true`, it is guaranteed that the `Waker`s will awaken the same task.
///
/// This function is primarily used for optimization purposes.
#[inline]
pub fn will_wake(&self, other: &Waker) -> bool {
self.waker == other.waker
}
Expand All @@ -124,6 +214,7 @@ impl Waker {
/// The behavior of the returned `Waker` is undefined if the contract defined
/// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld.
/// Therefore this method is unsafe.
#[inline]
pub unsafe fn new_unchecked(waker: RawWaker) -> Waker {
Waker {
waker,
Expand All @@ -132,6 +223,7 @@ impl Waker {
}

impl Clone for Waker {
#[inline]
fn clone(&self) -> Self {
Waker {
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
Expand All @@ -143,6 +235,7 @@ impl Clone for Waker {
}

impl Drop for Waker {
#[inline]
fn drop(&mut self) {
// SAFETY: This is safe because `Waker::new_unchecked` is the only way
// to initialize `drop` and `data` requiring the user to acknowledge
Expand Down
61 changes: 36 additions & 25 deletions src/libstd/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use core::marker::Unpin;
use core::pin::Pin;
use core::option::Option;
use core::ptr::NonNull;
use core::task::{Waker, Poll};
use core::task::{Context, Poll};
use core::ops::{Drop, Generator, GeneratorState};

#[doc(inline)]
Expand All @@ -32,72 +32,83 @@ impl<T: Generator<Yield = ()>> !Unpin for GenFuture<T> {}
#[unstable(feature = "gen_future", issue = "50547")]
impl<T: Generator<Yield = ()>> Future for GenFuture<T> {
type Output = T::Return;
fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Safe because we're !Unpin + !Drop mapping to a ?Unpin value
let gen = unsafe { Pin::map_unchecked_mut(self, |s| &mut s.0) };
set_task_waker(waker, || match gen.resume() {
set_task_context(cx, || match gen.resume() {
GeneratorState::Yielded(()) => Poll::Pending,
GeneratorState::Complete(x) => Poll::Ready(x),
})
}
}

thread_local! {
static TLS_WAKER: Cell<Option<NonNull<Waker>>> = Cell::new(None);
static TLS_CX: Cell<Option<NonNull<Context<'static>>>> = Cell::new(None);
}

struct SetOnDrop(Option<NonNull<Waker>>);
struct SetOnDrop(Option<NonNull<Context<'static>>>);

impl Drop for SetOnDrop {
fn drop(&mut self) {
TLS_WAKER.with(|tls_waker| {
tls_waker.set(self.0.take());
TLS_CX.with(|tls_cx| {
tls_cx.set(self.0.take());
});
}
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Sets the thread-local task context used by async/await futures.
pub fn set_task_waker<F, R>(waker: &Waker, f: F) -> R
pub fn set_task_context<F, R>(cx: &mut Context<'_>, f: F) -> R
where
F: FnOnce() -> R
{
let old_waker = TLS_WAKER.with(|tls_waker| {
tls_waker.replace(Some(NonNull::from(waker)))
// transmute the context's lifetime to 'static so we can store it.
let cx = unsafe {
core::mem::transmute::<&mut Context<'_>, &mut Context<'static>>(cx)
};
let old_cx = TLS_CX.with(|tls_cx| {
tls_cx.replace(Some(NonNull::from(cx)))
});
let _reset_waker = SetOnDrop(old_waker);
let _reset = SetOnDrop(old_cx);
f()
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Retrieves the thread-local task waker used by async/await futures.
/// Retrieves the thread-local task context used by async/await futures.
///
/// This function acquires exclusive access to the task waker.
/// This function acquires exclusive access to the task context.
///
/// Panics if no waker has been set or if the waker has already been
/// retrieved by a surrounding call to get_task_waker.
pub fn get_task_waker<F, R>(f: F) -> R
/// Panics if no context has been set or if the context has already been
/// retrieved by a surrounding call to get_task_context.
pub fn get_task_context<F, R>(f: F) -> R
where
F: FnOnce(&Waker) -> R
F: FnOnce(&mut Context<'_>) -> R
{
let waker_ptr = TLS_WAKER.with(|tls_waker| {
let cx_ptr = TLS_CX.with(|tls_cx| {
// Clear the entry so that nested `get_task_waker` calls
// will fail or set their own value.
tls_waker.replace(None)
tls_cx.replace(None)
});
let _reset_waker = SetOnDrop(waker_ptr);
let _reset = SetOnDrop(cx_ptr);

let waker_ptr = waker_ptr.expect(
"TLS Waker not set. This is a rustc bug. \
let mut cx_ptr = cx_ptr.expect(
"TLS Context not set. This is a rustc bug. \
Please file an issue on https://github.com/rust-lang/rust.");
unsafe { f(waker_ptr.as_ref()) }

// Safety: we've ensured exclusive access to the context by
// removing the pointer from TLS, only to be replaced once
// we're done with it.
//
// The pointer that was inserted came from an `&mut Context<'_>`,
// so it is safe to treat as mutable.
unsafe { f(cx_ptr.as_mut()) }
}

#[unstable(feature = "gen_future", issue = "50547")]
/// Polls a future in the current thread-local task waker.
pub fn poll_with_tls_waker<F>(f: Pin<&mut F>) -> Poll<F::Output>
pub fn poll_with_tls_context<F>(f: Pin<&mut F>) -> Poll<F::Output>
where
F: Future
{
get_task_waker(|waker| F::poll(f, waker))
get_task_context(|cx| F::poll(f, cx))
}
2 changes: 1 addition & 1 deletion src/libstd/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ macro_rules! r#await {
let mut pinned = $e;
loop {
if let $crate::task::Poll::Ready(x) =
$crate::future::poll_with_tls_waker(unsafe {
$crate::future::poll_with_tls_context(unsafe {
$crate::pin::Pin::new_unchecked(&mut pinned)
})
{
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::panicking;
use crate::ptr::{Unique, NonNull};
use crate::rc::Rc;
use crate::sync::{Arc, Mutex, RwLock, atomic};
use crate::task::{Waker, Poll};
use crate::task::{Context, Poll};
use crate::thread::Result;

#[stable(feature = "panic_hooks", since = "1.10.0")]
Expand Down Expand Up @@ -323,9 +323,9 @@ impl<T: fmt::Debug> fmt::Debug for AssertUnwindSafe<T> {
impl<F: Future> Future for AssertUnwindSafe<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, waker: &Waker) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pinned_field = unsafe { Pin::map_unchecked_mut(self, |x| &mut x.0) };
F::poll(pinned_field, waker)
F::poll(pinned_field, cx)
}
}

Expand Down
Loading