Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reexport alloc::task::Wake #2494

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions futures-core/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ pub mod __internal;

#[doc(no_inline)]
pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

#[doc(no_inline)]
#[cfg(feature = "alloc")]
pub use alloc::task::Wake;
14 changes: 9 additions & 5 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::enter;
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{waker_ref, Wake};
use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
use futures_util::pin_mut;
use futures_util::stream::FuturesUnordered;
Expand Down Expand Up @@ -60,17 +60,21 @@ thread_local! {
});
}

impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
impl Wake for ThreadNotify {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
// Make sure the wakeup is remembered until the next `park()`.
let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
let unparked = self.unparked.swap(true, Ordering::Relaxed);
if !unparked {
// If the thread has not been unparked yet, it must be done
// now. If it was actually parked, it will run again,
// otherwise the token made available by `unpark`
// may be consumed before reaching `park()`, but `unparked`
// ensures it is not forgotten.
arc_self.thread.unpark();
self.thread.unpark();
}
}
}
Expand Down
14 changes: 9 additions & 5 deletions futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::enter;
use crate::unpark_mutex::UnparkMutex;
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_task::{waker_ref, ArcWake};
use futures_task::{waker_ref, Wake};
use futures_task::{FutureObj, Spawn, SpawnError};
use futures_util::future::FutureExt;
use std::cmp;
Expand Down Expand Up @@ -344,10 +344,14 @@ impl fmt::Debug for Task {
}
}

impl ArcWake for WakeHandle {
fn wake_by_ref(arc_self: &Arc<Self>) {
match arc_self.mutex.notify() {
Ok(task) => arc_self.exec.state.send(Message::Run(task)),
impl Wake for WakeHandle {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
match self.mutex.notify() {
Ok(task) => self.exec.state.send(Message::Run(task)),
Err(()) => {}
}
}
Expand Down
49 changes: 0 additions & 49 deletions futures-task/src/arc_wake.rs

This file was deleted.

5 changes: 1 addition & 4 deletions futures-task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ pub use crate::spawn::{LocalSpawn, Spawn, SpawnError};

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
mod arc_wake;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use crate::arc_wake::ArcWake;
pub use alloc::task::Wake;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
28 changes: 13 additions & 15 deletions futures-task/src/waker.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::arc_wake::ArcWake;
use super::Wake;
use alloc::sync::Arc;
use core::mem;
use core::task::{RawWaker, RawWakerVTable, Waker};

pub(super) fn waker_vtable<W: ArcWake>() -> &'static RawWakerVTable {
pub(super) fn waker_vtable<W: Wake>() -> &'static RawWakerVTable {
&RawWakerVTable::new(
clone_arc_raw::<W>,
wake_arc_raw::<W>,
Expand All @@ -12,48 +12,46 @@ pub(super) fn waker_vtable<W: ArcWake>() -> &'static RawWakerVTable {
)
}

/// Creates a [`Waker`] from an `Arc<impl ArcWake>`.
/// Creates a [`Waker`] from an `Arc<impl Wake>`.
///
/// The returned [`Waker`] will call
/// [`ArcWake.wake()`](ArcWake::wake) if awoken.
/// [`Wake.wake()`](Wake::wake) if awoken.
pub fn waker<W>(wake: Arc<W>) -> Waker
where
W: ArcWake + 'static,
W: Wake + Send + Sync + 'static,
{
let ptr = Arc::into_raw(wake) as *const ();

unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) }
wake.into()
}

// FIXME: panics on Arc::clone / refcount changes could wreak havoc on the
// code here. We should guard against this by aborting.

#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant.
unsafe fn increase_refcount<T: ArcWake>(data: *const ()) {
unsafe fn increase_refcount<T: Wake>(data: *const ()) {
// Retain Arc, but don't touch refcount by wrapping in ManuallyDrop
let arc = mem::ManuallyDrop::new(Arc::<T>::from_raw(data as *const T));
// Now increase refcount, but don't drop new refcount either
let _arc_clone: mem::ManuallyDrop<_> = arc.clone();
}

// used by `waker_ref`
unsafe fn clone_arc_raw<T: ArcWake>(data: *const ()) -> RawWaker {
unsafe fn clone_arc_raw<T: Wake>(data: *const ()) -> RawWaker {
increase_refcount::<T>(data);
RawWaker::new(data, waker_vtable::<T>())
}

unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
unsafe fn wake_arc_raw<T: Wake>(data: *const ()) {
let arc: Arc<T> = Arc::from_raw(data as *const T);
ArcWake::wake(arc);
Wake::wake(arc);
}

// used by `waker_ref`
unsafe fn wake_by_ref_arc_raw<T: ArcWake>(data: *const ()) {
unsafe fn wake_by_ref_arc_raw<T: Wake>(data: *const ()) {
// Retain Arc, but don't touch refcount by wrapping in ManuallyDrop
let arc = mem::ManuallyDrop::new(Arc::<T>::from_raw(data as *const T));
ArcWake::wake_by_ref(&arc);
Wake::wake_by_ref(&arc);
}

unsafe fn drop_arc_raw<T: ArcWake>(data: *const ()) {
unsafe fn drop_arc_raw<T: Wake>(data: *const ()) {
drop(Arc::<T>::from_raw(data as *const T))
}
8 changes: 4 additions & 4 deletions futures-task/src/waker_ref.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::arc_wake::ArcWake;
use super::waker::waker_vtable;
use super::Wake;
use alloc::sync::Arc;
use core::marker::PhantomData;
use core::mem::ManuallyDrop;
Expand Down Expand Up @@ -44,14 +44,14 @@ impl Deref for WakerRef<'_> {
}
}

/// Creates a reference to a [`Waker`] from a reference to `Arc<impl ArcWake>`.
/// Creates a reference to a [`Waker`] from a reference to `Arc<impl Wake>`.
///
/// The resulting [`Waker`] will call
/// [`ArcWake.wake()`](ArcWake::wake) if awoken.
/// [`Wake.wake()`](Wake::wake) if awoken.
#[inline]
pub fn waker_ref<W>(wake: &Arc<W>) -> WakerRef<'_>
where
W: ArcWake,
W: Wake,
{
// simply copy the pointer instead of using Arc::into_raw,
// as we don't actually keep a refcount by using ManuallyDrop.<
Expand Down
14 changes: 9 additions & 5 deletions futures-test/src/task/wake_counter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures_core::task::Waker;
use futures_util::task::{self, ArcWake};
use futures_core::task::{Wake, Waker};
use futures_util::task;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

Expand Down Expand Up @@ -29,9 +29,13 @@ struct WakerInner {
count: AtomicUsize,
}

impl ArcWake for WakerInner {
fn wake_by_ref(arc_self: &Arc<Self>) {
let _ = arc_self.count.fetch_add(1, Ordering::SeqCst);
impl Wake for WakerInner {
fn wake(self: Arc<Self>) {
self.wake_by_ref()
}

fn wake_by_ref(self: &Arc<Self>) {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
}
}

Expand Down
20 changes: 17 additions & 3 deletions futures-util/benches_disabled/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,33 @@ mod bench {
use futures_util::lock::BiLock;
use futures_util::lock::BiLockAcquire;
use futures_util::lock::BiLockAcquired;
use futures_util::task::ArcWake;
use futures_core::Wake;

use std::sync::Arc;
use test::Bencher;

fn notify_noop() -> Waker {
struct Noop;

impl ArcWake for Noop {
impl Wake for Noop {
fn wake(_: &Arc<Self>) {}
}

ArcWake::into_waker(Arc::new(Noop))
Arc::new(Noop).into()
}
}


/// Pseudo-stream which simply calls `lock.poll()` on `poll`
struct LockStream {
lock: BiLockAcquire<u32>,
}

impl LockStream {
fn new(lock: BiLock<u32>) -> Self {
Self {
lock: lock.lock()
}
}

/// Pseudo-stream which simply calls `lock.poll()` on `poll`
Expand Down
10 changes: 7 additions & 3 deletions futures-util/src/compat/compat03as01.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef};
use crate::task::{self as task03, Wake as ArcWake03, WakerRef};
use futures_01::{
task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01,
};
Expand Down Expand Up @@ -184,8 +184,12 @@ impl Current {
}

impl ArcWake03 for Current {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.0.notify();
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
self.0.notify();
}
}

Expand Down
12 changes: 8 additions & 4 deletions futures-util/src/future/future/shared.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::task::{waker_ref, ArcWake};
use crate::task::{waker_ref, Wake};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
Expand Down Expand Up @@ -347,9 +347,13 @@ where
}
}

impl ArcWake for Notifier {
fn wake_by_ref(arc_self: &Arc<Self>) {
let wakers = &mut *arc_self.wakers.lock().unwrap();
impl Wake for Notifier {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
let wakers = &mut *self.wakers.lock().unwrap();
if let Some(wakers) = wakers.as_mut() {
for (_key, opt_waker) in wakers {
if let Some(waker) = opt_waker.take() {
Expand Down
21 changes: 12 additions & 9 deletions futures-util/src/stream/futures_unordered/task.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::abort::abort;
use super::ReadyToRunQueue;
use crate::task::{waker_ref, Wake, WakerRef};
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::sync::atomic::Ordering::{self, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};

use super::abort::abort;
use super::ReadyToRunQueue;
use crate::task::{waker_ref, ArcWake, WakerRef};

pub(super) struct Task<Fut> {
// The future
pub(super) future: UnsafeCell<Option<Fut>>,
Expand Down Expand Up @@ -41,9 +40,13 @@ pub(super) struct Task<Fut> {
unsafe impl<Fut> Send for Task<Fut> {}
unsafe impl<Fut> Sync for Task<Fut> {}

impl<Fut> ArcWake for Task<Fut> {
fn wake_by_ref(arc_self: &Arc<Self>) {
let inner = match arc_self.ready_to_run_queue.upgrade() {
impl<Fut> Wake for Task<Fut> {
fn wake(self: Arc<Self>) {
self.wake_by_ref();
}

fn wake_by_ref(self: &Arc<Self>) {
let inner = match self.ready_to_run_queue.upgrade() {
Some(inner) => inner,
None => return,
};
Expand All @@ -60,9 +63,9 @@ impl<Fut> ArcWake for Task<Fut> {
// implementation guarantees that if we set the `queued` flag that
// there's a reference count held by the main `FuturesUnordered` queue
// still.
let prev = arc_self.queued.swap(true, SeqCst);
let prev = self.queued.swap(true, SeqCst);
if !prev {
inner.enqueue(&**arc_self);
inner.enqueue(&**self);
inner.waker.wake();
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use futures_task::noop_waker_ref;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
pub use futures_task::ArcWake;
pub use futures_core::task::Wake;

#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
Expand Down
2 changes: 1 addition & 1 deletion futures/tests/no-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use futures_core::task::__internal::AtomicWaker as _;

#[cfg(feature = "futures-task-alloc")]
#[cfg(target_has_atomic = "ptr")]
pub use futures_task::ArcWake as _;
pub use futures_core::task::Wake as _;

#[cfg(feature = "futures-channel-alloc")]
#[cfg(target_has_atomic = "ptr")]
Expand Down
Loading