Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LocalWaker and ContextBuilder types to core, and LocalWake trait to alloc. #118960

Merged
merged 16 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion library/alloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
#![feature(iter_next_chunk)]
#![feature(iter_repeat_n)]
#![feature(layout_for_ptr)]
#![feature(local_waker)]
#![feature(maybe_uninit_slice)]
#![feature(maybe_uninit_uninit_array)]
#![feature(maybe_uninit_uninit_array_transpose)]
Expand Down Expand Up @@ -253,7 +254,7 @@ pub mod str;
pub mod string;
#[cfg(all(not(no_rc), not(no_sync), target_has_atomic = "ptr"))]
pub mod sync;
#[cfg(all(not(no_global_oom_handling), not(no_rc), not(no_sync), target_has_atomic = "ptr"))]
#[cfg(all(not(no_global_oom_handling), not(no_rc), not(no_sync)))]
pub mod task;
#[cfg(test)]
mod tests;
Expand Down
185 changes: 180 additions & 5 deletions library/alloc/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

//! Types and Traits for working with asynchronous tasks.
//!
//! **Note**: This module is only available on platforms that support atomic
//! loads and stores of pointers. This may be detected at compile time using
//! **Note**: Some of the types in this module are only available
//! on platforms that support atomic loads and stores of pointers.
//! This may be detected at compile time using
//! `#[cfg(target_has_atomic = "ptr")]`.

use crate::rc::Rc;
use core::mem::ManuallyDrop;
use core::task::{RawWaker, RawWakerVTable, Waker};
use core::task::{LocalWaker, RawWaker, RawWakerVTable};

#[cfg(target_has_atomic = "ptr")]
use crate::sync::Arc;
#[cfg(target_has_atomic = "ptr")]
use core::task::Waker;

/// The implementation of waking a task on an executor.
///
Expand Down Expand Up @@ -73,6 +78,7 @@ use crate::sync::Arc;
/// println!("Hi from inside a future!");
/// });
/// ```
#[cfg(target_has_atomic = "ptr")]
#[stable(feature = "wake_trait", since = "1.51.0")]
pub trait Wake {
/// Wake this task.
Expand All @@ -91,7 +97,7 @@ pub trait Wake {
self.clone().wake();
}
}

#[cfg(target_has_atomic = "ptr")]
#[stable(feature = "wake_trait", since = "1.51.0")]
impl<W: Wake + Send + Sync + 'static> From<Arc<W>> for Waker {
/// Use a `Wake`-able type as a `Waker`.
Expand All @@ -103,7 +109,7 @@ impl<W: Wake + Send + Sync + 'static> From<Arc<W>> for Waker {
unsafe { Waker::from_raw(raw_waker(waker)) }
}
}

#[cfg(target_has_atomic = "ptr")]
#[stable(feature = "wake_trait", since = "1.51.0")]
impl<W: Wake + Send + Sync + 'static> From<Arc<W>> for RawWaker {
/// Use a `Wake`-able type as a `RawWaker`.
Expand All @@ -119,6 +125,7 @@ impl<W: Wake + Send + Sync + 'static> From<Arc<W>> for RawWaker {
// the safety of `From<Arc<W>> for Waker` does not depend on the correct
// trait dispatch - instead both impls call this function directly and
// explicitly.
#[cfg(target_has_atomic = "ptr")]
#[inline(always)]
fn raw_waker<W: Wake + Send + Sync + 'static>(waker: Arc<W>) -> RawWaker {
// Increment the reference count of the arc to clone it.
Expand Down Expand Up @@ -152,3 +159,171 @@ fn raw_waker<W: Wake + Send + Sync + 'static>(waker: Arc<W>) -> RawWaker {
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}

/// An analogous trait to `Wake` but used to construct a `LocalWaker`. This API
/// works in exactly the same way as `Wake`, except that it uses an `Rc` instead
/// of an `Arc`, and the result is a `LocalWaker` instead of a `Waker`.
///
/// The benefits of using `LocalWaker` over `Waker` are that it allows the local waker
/// to hold data that does not implement `Send` and `Sync`. Additionally, it saves calls
/// to `Arc::clone`, which requires atomic synchronization.
///
///
/// # Examples
///
/// This is a simplified example of a `spawn` and a `block_on` function. The `spawn` function
/// is used to push new tasks onto the run queue, while the block on function will remove them
/// and poll them. When a task is woken, it will put itself back on the run queue to be polled
/// by the executor.
///
/// **Note:** This example trades correctness for simplicity. A real world example would interleave
/// poll calls with calls to an io reactor to wait for events instead of spinning on a loop.
///
/// ```rust
/// #![feature(local_waker)]
/// #![feature(noop_waker)]
/// use std::task::{LocalWake, ContextBuilder, LocalWaker, Waker};
/// use std::future::Future;
/// use std::pin::Pin;
/// use std::rc::Rc;
/// use std::cell::RefCell;
/// use std::collections::VecDeque;
///
///
/// thread_local! {
/// // A queue containing all tasks ready to do progress
/// static RUN_QUEUE: RefCell<VecDeque<Rc<Task>>> = RefCell::default();
/// }
///
/// type BoxedFuture = Pin<Box<dyn Future<Output = ()>>>;
///
/// struct Task(RefCell<BoxedFuture>);
///
/// impl LocalWake for Task {
/// fn wake(self: Rc<Self>) {
/// RUN_QUEUE.with_borrow_mut(|queue| {
/// queue.push_back(self)
/// })
/// }
/// }
///
/// fn spawn<F>(future: F)
/// where
/// F: Future<Output=()> + 'static + Send + Sync
/// {
/// let task = RefCell::new(Box::pin(future));
/// RUN_QUEUE.with_borrow_mut(|queue| {
/// queue.push_back(Rc::new(Task(task)));
/// });
/// }
///
/// fn block_on<F>(future: F)
/// where
/// F: Future<Output=()> + 'static + Sync + Send
/// {
/// spawn(future);
/// loop {
/// let Some(task) = RUN_QUEUE.with_borrow_mut(|queue| queue.pop_front()) else {
/// // we exit, since there are no more tasks remaining on the queue
/// return;
/// };
///
/// // cast the Rc<Task> into a `LocalWaker`
/// let local_waker: LocalWaker = task.clone().into();
/// // Build the context using `ContextBuilder`
/// let mut cx = ContextBuilder::from_waker(Waker::noop())
/// .local_waker(&local_waker)
/// .build();
///
/// // Poll the task
/// let _ = task.0
/// .borrow_mut()
/// .as_mut()
/// .poll(&mut cx);
/// }
/// }
///
/// block_on(async {
/// println!("hello world");
/// });
/// ```
///
#[unstable(feature = "local_waker", issue = "118959")]
pub trait LocalWake {
/// Wake this task.
#[unstable(feature = "local_waker", issue = "118959")]
fn wake(self: Rc<Self>);

/// Wake this task without consuming the local waker.
///
/// If an executor supports a cheaper way to wake without consuming the
/// waker, it should override this method. By default, it clones the
/// [`Rc`] and calls [`wake`] on the clone.
///
/// [`wake`]: LocalWaker::wake
#[unstable(feature = "local_waker", issue = "118959")]
fn wake_by_ref(self: &Rc<Self>) {
self.clone().wake();
}
}

#[unstable(feature = "local_waker", issue = "118959")]
impl<W: LocalWake + 'static> From<Rc<W>> for LocalWaker {
/// Use a `Wake`-able type as a `LocalWaker`.
///
/// No heap allocations or atomic operations are used for this conversion.
fn from(waker: Rc<W>) -> LocalWaker {
// SAFETY: This is safe because raw_waker safely constructs
// a RawWaker from Rc<W>.
unsafe { LocalWaker::from_raw(local_raw_waker(waker)) }
}
}
#[allow(ineffective_unstable_trait_impl)]
#[unstable(feature = "local_waker", issue = "118959")]
impl<W: LocalWake + 'static> From<Rc<W>> for RawWaker {
/// Use a `Wake`-able type as a `RawWaker`.
///
/// No heap allocations or atomic operations are used for this conversion.
fn from(waker: Rc<W>) -> RawWaker {
local_raw_waker(waker)
}
}

// NB: This private function for constructing a RawWaker is used, rather than
// inlining this into the `From<Rc<W>> for RawWaker` impl, to ensure that
// the safety of `From<Rc<W>> for Waker` does not depend on the correct
// trait dispatch - instead both impls call this function directly and
// explicitly.
#[inline(always)]
fn local_raw_waker<W: LocalWake + 'static>(waker: Rc<W>) -> RawWaker {
// Increment the reference count of the Rc to clone it.
unsafe fn clone_waker<W: LocalWake + 'static>(waker: *const ()) -> RawWaker {
unsafe { Rc::increment_strong_count(waker as *const W) };
RawWaker::new(
waker as *const (),
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}

// Wake by value, moving the Rc into the LocalWake::wake function
unsafe fn wake<W: LocalWake + 'static>(waker: *const ()) {
let waker = unsafe { Rc::from_raw(waker as *const W) };
<W as LocalWake>::wake(waker);
}

// Wake by reference, wrap the waker in ManuallyDrop to avoid dropping it
unsafe fn wake_by_ref<W: LocalWake + 'static>(waker: *const ()) {
let waker = unsafe { ManuallyDrop::new(Rc::from_raw(waker as *const W)) };
<W as LocalWake>::wake_by_ref(&waker);
}

// Decrement the reference count of the Rc on drop
unsafe fn drop_waker<W: LocalWake + 'static>(waker: *const ()) {
unsafe { Rc::decrement_strong_count(waker as *const W) };
}

RawWaker::new(
Rc::into_raw(waker) as *const (),
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}
2 changes: 1 addition & 1 deletion library/core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use self::poll::Poll;

mod wake;
#[stable(feature = "futures_api", since = "1.36.0")]
pub use self::wake::{Context, RawWaker, RawWakerVTable, Waker};
pub use self::wake::{Context, ContextBuilder, LocalWaker, RawWaker, RawWakerVTable, Waker};

mod ready;
#[stable(feature = "ready_macro", since = "1.64.0")]
Expand Down
Loading
Loading