From f01641efb111ef8d227228122c6dc59e2443daee Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 3 Sep 2021 11:58:08 -0400 Subject: [PATCH 1/3] reexport `alloc::task::Wake` --- futures-core/src/task/mod.rs | 4 + futures-executor/src/local_pool.rs | 14 ++-- futures-executor/src/thread_pool.rs | 14 ++-- futures-task/src/arc_wake.rs | 49 ------------ futures-task/src/lib.rs | 5 +- futures-task/src/waker.rs | 28 +++---- futures-task/src/waker_ref.rs | 8 +- futures-test/src/task/wake_counter.rs | 14 ++-- futures-util/benches_disabled/bilock.rs | 20 ++++- futures-util/src/compat/compat03as01.rs | 10 ++- futures-util/src/future/future/shared.rs | 12 ++- .../src/stream/futures_unordered/task.rs | 17 ++-- futures-util/src/task/mod.rs | 4 +- futures/tests/object_safety.rs | 2 +- futures/tests/sink.rs | 13 ++- futures/tests/task_arc_wake.rs | 79 ------------------- 16 files changed, 104 insertions(+), 189 deletions(-) delete mode 100644 futures-task/src/arc_wake.rs delete mode 100644 futures/tests/task_arc_wake.rs diff --git a/futures-core/src/task/mod.rs b/futures-core/src/task/mod.rs index 19e4eaecdd..1740b0ecec 100644 --- a/futures-core/src/task/mod.rs +++ b/futures-core/src/task/mod.rs @@ -8,3 +8,7 @@ pub mod __internal; #[doc(no_inline)] pub use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +#[doc(no_inline)] +#[cfg(feature = "alloc")] +pub use alloc::task::Wake; diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index bee96d8db9..b5e7295b9c 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -2,7 +2,7 @@ use crate::enter; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use futures_task::{waker_ref, ArcWake}; +use futures_task::{waker_ref, Wake}; use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; use futures_util::pin_mut; use futures_util::stream::FuturesUnordered; @@ -60,17 +60,21 @@ thread_local! { }); } -impl ArcWake for ThreadNotify { - fn wake_by_ref(arc_self: &Arc) { +impl Wake for ThreadNotify { + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { // Make sure the wakeup is remembered until the next `park()`. - let unparked = arc_self.unparked.swap(true, Ordering::Relaxed); + let unparked = self.unparked.swap(true, Ordering::Relaxed); if !unparked { // If the thread has not been unparked yet, it must be done // now. If it was actually parked, it will run again, // otherwise the token made available by `unpark` // may be consumed before reaching `park()`, but `unparked` // ensures it is not forgotten. - arc_self.thread.unpark(); + self.thread.unpark(); } } } diff --git a/futures-executor/src/thread_pool.rs b/futures-executor/src/thread_pool.rs index f2347dbbdf..f4a88de678 100644 --- a/futures-executor/src/thread_pool.rs +++ b/futures-executor/src/thread_pool.rs @@ -2,7 +2,7 @@ use crate::enter; use crate::unpark_mutex::UnparkMutex; use futures_core::future::Future; use futures_core::task::{Context, Poll}; -use futures_task::{waker_ref, ArcWake}; +use futures_task::{waker_ref, Wake}; use futures_task::{FutureObj, Spawn, SpawnError}; use futures_util::future::FutureExt; use std::cmp; @@ -344,10 +344,14 @@ impl fmt::Debug for Task { } } -impl ArcWake for WakeHandle { - fn wake_by_ref(arc_self: &Arc) { - match arc_self.mutex.notify() { - Ok(task) => arc_self.exec.state.send(Message::Run(task)), +impl Wake for WakeHandle { + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { + match self.mutex.notify() { + Ok(task) => self.exec.state.send(Message::Run(task)), Err(()) => {} } } diff --git a/futures-task/src/arc_wake.rs b/futures-task/src/arc_wake.rs deleted file mode 100644 index aa6de0fc43..0000000000 --- a/futures-task/src/arc_wake.rs +++ /dev/null @@ -1,49 +0,0 @@ -use alloc::sync::Arc; - -/// A way of waking up a specific task. -/// -/// By implementing this trait, types that are expected to be wrapped in an `Arc` -/// can be converted into [`Waker`] objects. -/// Those Wakers can be used to signal executors that a task it owns -/// is ready to be `poll`ed again. -/// -/// Currently, there are two ways to convert `ArcWake` into [`Waker`]: -/// -/// * [`waker`](super::waker()) converts `Arc` into [`Waker`]. -/// * [`waker_ref`](super::waker_ref()) converts `&Arc` into [`WakerRef`] that -/// provides access to a [`&Waker`][`Waker`]. -/// -/// [`Waker`]: std::task::Waker -/// [`WakerRef`]: super::WakerRef -// Note: Send + Sync required because `Arc` doesn't automatically imply -// those bounds, but `Waker` implements them. -pub trait ArcWake: Send + Sync { - /// Indicates that the associated task is ready to make progress and should - /// be `poll`ed. - /// - /// This function can be called from an arbitrary thread, including threads which - /// did not create the `ArcWake` based [`Waker`]. - /// - /// Executors generally maintain a queue of "ready" tasks; `wake` should place - /// the associated task onto this queue. - /// - /// [`Waker`]: std::task::Waker - fn wake(self: Arc) { - Self::wake_by_ref(&self) - } - - /// Indicates that the associated task is ready to make progress and should - /// be `poll`ed. - /// - /// This function can be called from an arbitrary thread, including threads which - /// did not create the `ArcWake` based [`Waker`]. - /// - /// Executors generally maintain a queue of "ready" tasks; `wake_by_ref` should place - /// the associated task onto this queue. - /// - /// This function is similar to [`wake`](ArcWake::wake), but must not consume the provided data - /// pointer. - /// - /// [`Waker`]: std::task::Waker - fn wake_by_ref(arc_self: &Arc); -} diff --git a/futures-task/src/lib.rs b/futures-task/src/lib.rs index 439af135af..c86ee79fcb 100644 --- a/futures-task/src/lib.rs +++ b/futures-task/src/lib.rs @@ -22,9 +22,8 @@ pub use crate::spawn::{LocalSpawn, Spawn, SpawnError}; cfg_target_has_atomic! { #[cfg(feature = "alloc")] - mod arc_wake; - #[cfg(feature = "alloc")] - pub use crate::arc_wake::ArcWake; + #[doc(no_inline)] + pub use alloc::task::Wake; #[cfg(feature = "alloc")] mod waker; diff --git a/futures-task/src/waker.rs b/futures-task/src/waker.rs index a7310a07af..23a5610a3a 100644 --- a/futures-task/src/waker.rs +++ b/futures-task/src/waker.rs @@ -1,9 +1,9 @@ -use super::arc_wake::ArcWake; +use super::Wake; use alloc::sync::Arc; use core::mem; use core::task::{RawWaker, RawWakerVTable, Waker}; -pub(super) fn waker_vtable() -> &'static RawWakerVTable { +pub(super) fn waker_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( clone_arc_raw::, wake_arc_raw::, @@ -12,24 +12,22 @@ pub(super) fn waker_vtable() -> &'static RawWakerVTable { ) } -/// Creates a [`Waker`] from an `Arc`. +/// Creates a [`Waker`] from an `Arc`. /// /// The returned [`Waker`] will call -/// [`ArcWake.wake()`](ArcWake::wake) if awoken. +/// [`Wake.wake()`](Wake::wake) if awoken. pub fn waker(wake: Arc) -> Waker where - W: ArcWake + 'static, + W: Wake + Send + Sync + 'static, { - let ptr = Arc::into_raw(wake) as *const (); - - unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::())) } + wake.into() } // FIXME: panics on Arc::clone / refcount changes could wreak havoc on the // code here. We should guard against this by aborting. #[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. -unsafe fn increase_refcount(data: *const ()) { +unsafe fn increase_refcount(data: *const ()) { // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop let arc = mem::ManuallyDrop::new(Arc::::from_raw(data as *const T)); // Now increase refcount, but don't drop new refcount either @@ -37,23 +35,23 @@ unsafe fn increase_refcount(data: *const ()) { } // used by `waker_ref` -unsafe fn clone_arc_raw(data: *const ()) -> RawWaker { +unsafe fn clone_arc_raw(data: *const ()) -> RawWaker { increase_refcount::(data); RawWaker::new(data, waker_vtable::()) } -unsafe fn wake_arc_raw(data: *const ()) { +unsafe fn wake_arc_raw(data: *const ()) { let arc: Arc = Arc::from_raw(data as *const T); - ArcWake::wake(arc); + Wake::wake(arc); } // used by `waker_ref` -unsafe fn wake_by_ref_arc_raw(data: *const ()) { +unsafe fn wake_by_ref_arc_raw(data: *const ()) { // Retain Arc, but don't touch refcount by wrapping in ManuallyDrop let arc = mem::ManuallyDrop::new(Arc::::from_raw(data as *const T)); - ArcWake::wake_by_ref(&arc); + Wake::wake_by_ref(&arc); } -unsafe fn drop_arc_raw(data: *const ()) { +unsafe fn drop_arc_raw(data: *const ()) { drop(Arc::::from_raw(data as *const T)) } diff --git a/futures-task/src/waker_ref.rs b/futures-task/src/waker_ref.rs index 791c690120..613f8d5211 100644 --- a/futures-task/src/waker_ref.rs +++ b/futures-task/src/waker_ref.rs @@ -1,5 +1,5 @@ -use super::arc_wake::ArcWake; use super::waker::waker_vtable; +use super::Wake; use alloc::sync::Arc; use core::marker::PhantomData; use core::mem::ManuallyDrop; @@ -44,14 +44,14 @@ impl Deref for WakerRef<'_> { } } -/// Creates a reference to a [`Waker`] from a reference to `Arc`. +/// Creates a reference to a [`Waker`] from a reference to `Arc`. /// /// The resulting [`Waker`] will call -/// [`ArcWake.wake()`](ArcWake::wake) if awoken. +/// [`Wake.wake()`](Wake::wake) if awoken. #[inline] pub fn waker_ref(wake: &Arc) -> WakerRef<'_> where - W: ArcWake, + W: Wake, { // simply copy the pointer instead of using Arc::into_raw, // as we don't actually keep a refcount by using ManuallyDrop.< diff --git a/futures-test/src/task/wake_counter.rs b/futures-test/src/task/wake_counter.rs index 52c63e1cc9..78c2ce9fff 100644 --- a/futures-test/src/task/wake_counter.rs +++ b/futures-test/src/task/wake_counter.rs @@ -1,5 +1,5 @@ -use futures_core::task::Waker; -use futures_util::task::{self, ArcWake}; +use futures_core::task::{Wake, Waker}; +use futures_util::task; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -29,9 +29,13 @@ struct WakerInner { count: AtomicUsize, } -impl ArcWake for WakerInner { - fn wake_by_ref(arc_self: &Arc) { - let _ = arc_self.count.fetch_add(1, Ordering::SeqCst); +impl Wake for WakerInner { + fn wake(self: Arc) { + self.wake_by_ref() + } + + fn wake_by_ref(self: &Arc) { + let _ = self.count.fetch_add(1, Ordering::SeqCst); } } diff --git a/futures-util/benches_disabled/bilock.rs b/futures-util/benches_disabled/bilock.rs index 417f75d31e..e5861bbbac 100644 --- a/futures-util/benches_disabled/bilock.rs +++ b/futures-util/benches_disabled/bilock.rs @@ -7,7 +7,7 @@ mod bench { use futures_util::lock::BiLock; use futures_util::lock::BiLockAcquire; use futures_util::lock::BiLockAcquired; - use futures_util::task::ArcWake; + use futures_core::Wake; use std::sync::Arc; use test::Bencher; @@ -15,11 +15,25 @@ mod bench { fn notify_noop() -> Waker { struct Noop; - impl ArcWake for Noop { + impl Wake for Noop { fn wake(_: &Arc) {} } - ArcWake::into_waker(Arc::new(Noop)) + Arc::new(Noop).into() + } +} + + +/// Pseudo-stream which simply calls `lock.poll()` on `poll` +struct LockStream { + lock: BiLockAcquire, +} + +impl LockStream { + fn new(lock: BiLock) -> Self { + Self { + lock: lock.lock() + } } /// Pseudo-stream which simply calls `lock.poll()` on `poll` diff --git a/futures-util/src/compat/compat03as01.rs b/futures-util/src/compat/compat03as01.rs index 2573fe7a74..254b1e7f03 100644 --- a/futures-util/src/compat/compat03as01.rs +++ b/futures-util/src/compat/compat03as01.rs @@ -1,4 +1,4 @@ -use crate::task::{self as task03, ArcWake as ArcWake03, WakerRef}; +use crate::task::{self as task03, Wake as ArcWake03, WakerRef}; use futures_01::{ task as task01, Async as Async01, Future as Future01, Poll as Poll01, Stream as Stream01, }; @@ -184,8 +184,12 @@ impl Current { } impl ArcWake03 for Current { - fn wake_by_ref(arc_self: &Arc) { - arc_self.0.notify(); + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { + self.0.notify(); } } diff --git a/futures-util/src/future/future/shared.rs b/futures-util/src/future/future/shared.rs index 9b31932fe3..b596f93b79 100644 --- a/futures-util/src/future/future/shared.rs +++ b/futures-util/src/future/future/shared.rs @@ -1,4 +1,4 @@ -use crate::task::{waker_ref, ArcWake}; +use crate::task::{waker_ref, Wake}; use futures_core::future::{FusedFuture, Future}; use futures_core::task::{Context, Poll, Waker}; use slab::Slab; @@ -347,9 +347,13 @@ where } } -impl ArcWake for Notifier { - fn wake_by_ref(arc_self: &Arc) { - let wakers = &mut *arc_self.wakers.lock().unwrap(); +impl Wake for Notifier { + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { + let wakers = &mut *self.wakers.lock().unwrap(); if let Some(wakers) = wakers.as_mut() { for (_key, opt_waker) in wakers { if let Some(waker) = opt_waker.take() { diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index da2cd67d97..c4b09f2ab4 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -2,10 +2,9 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::sync::atomic::Ordering::{self, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; - use super::abort::abort; +use crate::task::{Wake, WakerRef, waker_ref}; use super::ReadyToRunQueue; -use crate::task::{waker_ref, ArcWake, WakerRef}; pub(super) struct Task { // The future @@ -41,9 +40,13 @@ pub(super) struct Task { unsafe impl Send for Task {} unsafe impl Sync for Task {} -impl ArcWake for Task { - fn wake_by_ref(arc_self: &Arc) { - let inner = match arc_self.ready_to_run_queue.upgrade() { +impl Wake for Task { + fn wake(self: Arc) { + self.wake_by_ref(); + } + + fn wake_by_ref(self: &Arc) { + let inner = match self.ready_to_run_queue.upgrade() { Some(inner) => inner, None => return, }; @@ -60,9 +63,9 @@ impl ArcWake for Task { // implementation guarantees that if we set the `queued` flag that // there's a reference count held by the main `FuturesUnordered` queue // still. - let prev = arc_self.queued.swap(true, SeqCst); + let prev = self.queued.swap(true, SeqCst); if !prev { - inner.enqueue(&**arc_self); + inner.enqueue(&**self); inner.waker.wake(); } } diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index c4afe308cd..3e058d08e8 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -20,8 +20,8 @@ pub use futures_task::noop_waker; pub use futures_task::noop_waker_ref; cfg_target_has_atomic! { - #[cfg(feature = "alloc")] - pub use futures_task::ArcWake; + #[doc(no_inline)] + pub use alloc::task::Wake; #[cfg(feature = "alloc")] pub use futures_task::waker; diff --git a/futures/tests/object_safety.rs b/futures/tests/object_safety.rs index 30c892f5e6..badc9e0cbd 100644 --- a/futures/tests/object_safety.rs +++ b/futures/tests/object_safety.rs @@ -41,7 +41,7 @@ fn io() { #[test] fn task() { - // `ArcWake`, `SpawnExt` and `LocalSpawnExt` are not object safe. + // ``SpawnExt` and `LocalSpawnExt` are not object safe. use futures::task::{LocalSpawn, Spawn}; assert_is_object_safe::<&dyn Spawn>(); diff --git a/futures/tests/sink.rs b/futures/tests/sink.rs index 423016c44e..6fe4f272d0 100644 --- a/futures/tests/sink.rs +++ b/futures/tests/sink.rs @@ -4,7 +4,8 @@ use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt}; use futures::ready; use futures::sink::{self, Sink, SinkErrInto, SinkExt}; use futures::stream::{self, Stream, StreamExt}; -use futures::task::{self, ArcWake, Context, Poll, Waker}; +use futures::task::{self, Context, Poll, Waker}; +use futures_core::task::Wake; use futures_test::task::panic_context; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; @@ -53,9 +54,13 @@ impl Flag { } } -impl ArcWake for Flag { - fn wake_by_ref(arc_self: &Arc) { - arc_self.set(true) +impl Wake for Flag { + fn wake(self: Arc) { + self.wake_by_ref() + } + + fn wake_by_ref(self: &Arc) { + self.set(true) } } diff --git a/futures/tests/task_arc_wake.rs b/futures/tests/task_arc_wake.rs deleted file mode 100644 index aedc15bcb8..0000000000 --- a/futures/tests/task_arc_wake.rs +++ /dev/null @@ -1,79 +0,0 @@ -use futures::task::{self, ArcWake, Waker}; -use std::panic; -use std::sync::{Arc, Mutex}; - -struct CountingWaker { - nr_wake: Mutex, -} - -impl CountingWaker { - fn new() -> Self { - Self { nr_wake: Mutex::new(0) } - } - - fn wakes(&self) -> i32 { - *self.nr_wake.lock().unwrap() - } -} - -impl ArcWake for CountingWaker { - fn wake_by_ref(arc_self: &Arc) { - let mut lock = arc_self.nr_wake.lock().unwrap(); - *lock += 1; - } -} - -#[test] -fn create_from_arc() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!(2, Arc::strong_count(&some_w)); - w1.wake_by_ref(); - assert_eq!(1, some_w.wakes()); - - let w2 = w1.clone(); - assert_eq!(3, Arc::strong_count(&some_w)); - - w2.wake_by_ref(); - assert_eq!(2, some_w.wakes()); - - drop(w2); - assert_eq!(2, Arc::strong_count(&some_w)); - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); -} - -#[test] -fn ref_wake_same() { - let some_w = Arc::new(CountingWaker::new()); - - let w1: Waker = task::waker(some_w.clone()); - let w2 = task::waker_ref(&some_w); - let w3 = w2.clone(); - - assert!(w1.will_wake(&w2)); - assert!(w2.will_wake(&w3)); -} - -#[test] -fn proper_refcount_on_wake_panic() { - struct PanicWaker; - - impl ArcWake for PanicWaker { - fn wake_by_ref(_arc_self: &Arc) { - panic!("WAKE UP"); - } - } - - let some_w = Arc::new(PanicWaker); - - let w1: Waker = task::waker(some_w.clone()); - assert_eq!( - "WAKE UP", - *panic::catch_unwind(|| w1.wake_by_ref()).unwrap_err().downcast::<&str>().unwrap() - ); - assert_eq!(2, Arc::strong_count(&some_w)); // some_w + w1 - drop(w1); - assert_eq!(1, Arc::strong_count(&some_w)); // some_w -} From a190a7ec2eddaf37d9062f353fac8c7c769fee0c Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 3 Sep 2021 12:26:42 -0400 Subject: [PATCH 2/3] fix no-std test --- futures-executor/src/local_pool.rs | 2 +- futures/tests/no-std/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index b5e7295b9c..1031c58183 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -64,7 +64,7 @@ impl Wake for ThreadNotify { fn wake(self: Arc) { self.wake_by_ref(); } - + fn wake_by_ref(self: &Arc) { // Make sure the wakeup is remembered until the next `park()`. let unparked = self.unparked.swap(true, Ordering::Relaxed); diff --git a/futures/tests/no-std/src/lib.rs b/futures/tests/no-std/src/lib.rs index 308218d6b7..8907efac27 100644 --- a/futures/tests/no-std/src/lib.rs +++ b/futures/tests/no-std/src/lib.rs @@ -8,7 +8,7 @@ pub use futures_core::task::__internal::AtomicWaker as _; #[cfg(feature = "futures-task-alloc")] #[cfg(target_has_atomic = "ptr")] -pub use futures_task::ArcWake as _; +pub use futures_task::task::Wake as _; #[cfg(feature = "futures-channel-alloc")] #[cfg(target_has_atomic = "ptr")] From fea01cab63d0095c69ca46817b62b48e529740d3 Mon Sep 17 00:00:00 2001 From: ibraheemdev Date: Fri, 3 Sep 2021 12:55:59 -0400 Subject: [PATCH 3/3] format --- futures-util/src/stream/futures_unordered/task.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/task.rs b/futures-util/src/stream/futures_unordered/task.rs index c4b09f2ab4..b273077282 100644 --- a/futures-util/src/stream/futures_unordered/task.rs +++ b/futures-util/src/stream/futures_unordered/task.rs @@ -1,10 +1,10 @@ +use super::abort::abort; +use super::ReadyToRunQueue; +use crate::task::{waker_ref, Wake, WakerRef}; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; use core::sync::atomic::Ordering::{self, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; -use super::abort::abort; -use crate::task::{Wake, WakerRef, waker_ref}; -use super::ReadyToRunQueue; pub(super) struct Task { // The future