From dcd2a01d8525a90036aff177ee26aede07c2b114 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 27 Oct 2019 11:06:04 -0700 Subject: [PATCH 1/3] Add a minimal Futures executor This change adds `std::thread::block_on_future`, which represents a minimal Futures executor. It is modelled after futures-rs `futures::executor::block_on`, which blocks the current thread until the Future had been driven to completion. --- src/libstd/tests/block_on_future.rs | 229 ++++++++++++++++++++++++++ src/libstd/thread/block_on_future.rs | 236 +++++++++++++++++++++++++++ src/libstd/thread/mod.rs | 9 + 3 files changed, 474 insertions(+) create mode 100644 src/libstd/tests/block_on_future.rs create mode 100644 src/libstd/thread/block_on_future.rs diff --git a/src/libstd/tests/block_on_future.rs b/src/libstd/tests/block_on_future.rs new file mode 100644 index 0000000000000..d4c08a702fbb5 --- /dev/null +++ b/src/libstd/tests/block_on_future.rs @@ -0,0 +1,229 @@ +//! Tests for the block_on_future function + +#![feature(block_on_future)] + +use std::future::Future; +use std::task::{Context, Poll, Waker}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::thread::{block_on_future, JoinHandle, spawn}; + +struct WakeFromRemoteThreadFuture { + was_polled: bool, + wake_by_ref: bool, + join_handle: Option>, +} + +impl WakeFromRemoteThreadFuture { + fn new(wake_by_ref: bool) -> Self { + WakeFromRemoteThreadFuture { + was_polled: false, + wake_by_ref, + join_handle: None, + } + } +} + +impl Future for WakeFromRemoteThreadFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if !self.was_polled { + self.was_polled = true; + let waker = cx.waker().clone(); + let wake_by_ref = self.wake_by_ref; + self.join_handle = Some(spawn(move || { + if wake_by_ref { + waker.wake(); + } else { + waker.wake_by_ref(); + } + })); + Poll::Pending + } else { + if let Some(handle) = self.join_handle.take() { + handle.join().unwrap(); + } + Poll::Ready(()) + } + } +} + +struct Yield { + iterations: usize, +} + +impl Yield { + fn new(iterations: usize) -> Self { + Yield { + iterations + } + } +} + +impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.iterations == 0 { + Poll::Ready(()) + } else { + self.iterations -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +struct NeverReady { +} + +impl NeverReady { + fn new() -> Self { + NeverReady {} + } +} + +impl Future for NeverReady { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } +} + +struct WakerStore { + waker: Option, +} + +struct StoreWakerFuture { + store: Arc>, +} + +impl StoreWakerFuture { + fn new(store: Arc>) -> Self { + StoreWakerFuture { + store + } + } +} + +impl Future for StoreWakerFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + (*self.store.lock().unwrap()).waker = Some(cx.waker().clone()); + Poll::Ready(()) + } +} + +struct WakeFromPreviouslyStoredWakerFuture { + store: Arc>, + was_polled: bool, + join_handle: Option>, +} + +impl WakeFromPreviouslyStoredWakerFuture { + fn new(store: Arc>) -> Self { + WakeFromPreviouslyStoredWakerFuture { + store, + was_polled: false, + join_handle: None, + } + } +} + +impl Future for WakeFromPreviouslyStoredWakerFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + if !self.was_polled { + self.was_polled = true; + // Don't take the waker from Context but from the side channel + let waker = self.store.lock().unwrap().waker.clone().take().unwrap(); + self.join_handle = Some(spawn(move || { + waker.wake(); + })); + Poll::Pending + } else { + if let Some(handle) = self.join_handle.take() { + handle.join().unwrap(); + } + Poll::Ready(()) + } + } +} + +#[test] +fn wake_from_local_thread() { + block_on_future(async { + Yield::new(10).await; + }); +} + +#[test] +fn wake_from_foreign_thread() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(false).await; + }); +} + +#[test] +fn wake_by_ref_from_foreign_thread() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn wake_from_multiple_threads() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(false).await; + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn wake_local_remote_local() { + block_on_future(async { + Yield::new(10).await; + WakeFromRemoteThreadFuture::new(false).await; + Yield::new(20).await; + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn returns_result_from_task() { + let result = block_on_future(async { + let x = 42i32; + Yield::new(10).await; + x + }); + assert_eq!(42, result); +} + +#[test] +#[should_panic] +fn panics_if_waker_was_not_cloned_and_task_is_not_ready() { + block_on_future(async { + NeverReady::new().await; + }); +} + +#[test] +fn does_not_panic_if_waker_is_cloned_and_used_a_lot_later() { + let store = Arc::new(Mutex::new(WakerStore { + waker: None, + })); + + block_on_future(async { + StoreWakerFuture::new(store.clone()).await; + Yield::new(10).await; + // Multiple wakes from an outdated waker - because it can + // have been cloned multiple times. + WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await; + WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await; + WakeFromPreviouslyStoredWakerFuture::new(store).await; + }); +} diff --git a/src/libstd/thread/block_on_future.rs b/src/libstd/thread/block_on_future.rs new file mode 100644 index 0000000000000..7f430412745f6 --- /dev/null +++ b/src/libstd/thread/block_on_future.rs @@ -0,0 +1,236 @@ +//! Future/Task/Thread integration. +//! The method defined in this module allows to block a thread until an +//! async task completes. + +use crate::future::Future; +use crate::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use crate::mem; +use crate::pin::Pin; +use crate::sync::Arc; +use super::{current, park, Inner, Thread}; + +/// Carries a flag that is used to wakeup the executor. +/// A pointer to this struct is passed to the thread-local waker. +struct LocalWakeState { + is_woken: bool, + waker_was_cloned: bool, +} + +/// Returns the vtable that is used for waking up the executor +/// from another thread. +fn threadsafe_waker_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new( + clone_threadsafe_waker, + wake_threadsafe_waker, + wake_threadsafe_waker_by_ref, + drop_threadsafe_waker, + ) +} + +/// Returns the vtable that is used for waking up the executor +/// from inside it's execution on the current thread. +fn current_thread_waker_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new( + create_threadsafe_waker, + wake_current_thread, + wake_current_thread_by_ref, + |_| {}, + ) +} + +/// This method will be called when the waker reference gets cloned, +/// which makes it possible to transfer it to another thread. In this +/// case we have to create a threadsafe `Waker`. In order to to this +/// we retain the thread handle and store it in the new `RawWaker`s +/// data pointer. +unsafe fn create_threadsafe_waker(data: *const()) -> RawWaker { + let wake_state = data as *mut LocalWakeState; + (*wake_state).waker_was_cloned = true; + + // Get the `Arc` of a current thread handle and store into in + // the type erased pointer. + // + // This mechanism exploits the fact that `Thread` is already an `Arc`. + // Therefore in order to create clones of the thread handle we can just + // directly create clones of the Inner state, and recreate the `Thread` + // handle when necessary. + // + // If the implementation of `Thread` would be changed to something different, + // we would need to wrap the complete `Thread` object in another `Arc` by + // adopt the following line to: + // `let arc_thread = Arc::new(current());` + let arc_thread_inner = current().inner; + let ptr = Arc::into_raw(arc_thread_inner) as *const (); + RawWaker::new(ptr, threadsafe_waker_vtable()) +} + +unsafe fn clone_threadsafe_waker(data: *const()) -> RawWaker { + increase_refcount(data); + RawWaker::new(data, threadsafe_waker_vtable()) +} + +fn wake_current_thread(_data: *const()) { + unreachable!("A current thread waker can only be woken by reference"); +} + +unsafe fn wake_current_thread_by_ref(data: *const()) { + let wake_state = data as *mut LocalWakeState; + (*wake_state).is_woken = true; +} + +unsafe fn wake_threadsafe_waker(data: *const ()) { + let arc_thread_inner = Arc::from_raw(data as *const Inner); + let thread = Thread { + inner: arc_thread_inner, + }; + thread.unpark(); +} + +unsafe fn wake_threadsafe_waker_by_ref(data: *const ()) { + // Retain `Arc`, but don't touch refcount by wrapping in `ManuallyDrop` + let arc_thread_inner = Arc::from_raw(data as *const Inner); + let thread = mem::ManuallyDrop::new(Thread { + inner: arc_thread_inner, + }); + thread.unpark(); +} + +unsafe fn drop_threadsafe_waker(data: *const ()) { + drop(Thread { + inner: Arc::from_raw(data as *const Inner), + }) +} + +#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. +unsafe fn increase_refcount(data: *const ()) { + // Retain Arc, but don't touch refcount by wrapping in `ManuallyDrop` + let arc = mem::ManuallyDrop::new(Arc::::from_raw(data as *const Inner)); + // Now increase refcount, but don't drop new refcount either. + // Note: `Arc::clone()` will not panic but abort if the newrefcount is + // unrealistically high. Therefore this is safe, as long not more `Waker` + // clones are created than what is allowed for other `Arc` instances. + let _arc_clone: mem::ManuallyDrop<_> = arc.clone(); +} + +/// Runs a [`Future`] to completion on the current thread and returns its output +/// value. +/// +/// This method represents a minimal [`Future`]s executor. The executor will +/// drive the [`Future`] to completion on the current thread. The executor will +/// be providing a [`Waker`] to all polled Child-[`Future`]s which can be woken +/// from either the current thread or any other thread. +/// +/// # Examples +/// +/// ``` +/// #![feature(block_on_future)] +/// use std::thread::block_on_future; +/// +/// let result = block_on_future(async { +/// 5 +/// }); +/// assert_eq!(5, result); +/// ``` +#[unstable(feature = "block_on_future", issue = "0")] +pub fn block_on_future(mut future: F) -> F::Output { + // Pin the `Future` - which had been moved into this function - on the stack. + // Safety: This is safe since the `Future` gets aliased and will not be moved + // out of this function again. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; + + let mut waker_state = LocalWakeState { + is_woken: true, + waker_was_cloned: false, + }; + + // Safety: The `Waker` that we create here is references data on the current + // callstack. This is safe, since the polled `Future` only gets a reference + // to this `Waker`. When it tries to clone the `Waker`, a threadsafe and owned + // version is created instead. + unsafe { + let waker = Waker::from_raw(RawWaker::new( + &waker_state as *const LocalWakeState as *const (), + current_thread_waker_vtable())); + + let mut cx = Context::from_waker(&waker); + loop { + while waker_state.is_woken { + // Reset is_woken, so that we do not spin if the poll does not + // directly wake us up. + waker_state.is_woken = false; + if let Poll::Ready(task_result) = future.as_mut().poll(&mut cx) { + return task_result; + } + } + + // The task is not ready, and the `Waker` had not been woken from the + // current thread. In order for us to proceed we wait until the + // thread gets unparked by another thread. If the `Waker` has not been + // cloned this will never happen and represents a deadlock, which + // gets reported here. + if !waker_state.waker_was_cloned { + panic!("Deadlock: Task is not ready, but the Waker had not been cloned"); + // Note: This flag is never reset, since a `Waker` that had been cloned + // once can be cloned more often to wakeup this executor. We don't + // have knowledge on how many clones are around - therefore the + // deadlock detection only works for the case the `Waker` never + // gets cloned. + } + park(); + // If thread::park has returned, we have been notified by another + // thread. Therefore we are woken. + // Remark: This flag can not be set by the other thread directly, + // because it may no longer be alive at the point of time when + // wake() is called. + waker_state.is_woken = true; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn current_thread_refcount() -> usize { + Arc::strong_count(¤t().inner) + } + + /// Check that we don't leak any thread::Inner instances to wrong refcount + /// handling. + #[test] + fn check_refcounts() { + let original = current_thread_refcount(); + + let waker_state = LocalWakeState { + is_woken: true, + waker_was_cloned: false, + }; + + let waker = unsafe { Waker::from_raw(RawWaker::new( + &waker_state as *const LocalWakeState as *const (), + current_thread_waker_vtable())) }; + + waker.wake_by_ref(); + assert_eq!(original, current_thread_refcount()); + + let clone1 = waker.clone(); + assert_eq!(original + 1, current_thread_refcount()); + let clone2 = waker.clone(); + assert_eq!(original + 2, current_thread_refcount()); + let clone3 = clone1.clone(); + assert_eq!(original + 3, current_thread_refcount()); + + drop(clone1); + assert_eq!(original + 2, current_thread_refcount()); + + clone2.wake_by_ref(); + assert_eq!(original + 2, current_thread_refcount()); + clone2.wake(); + assert_eq!(original + 1, current_thread_refcount()); + + clone3.wake_by_ref(); + assert_eq!(original + 1, current_thread_refcount()); + clone3.wake(); + assert_eq!(original, current_thread_refcount()); + } +} diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 0ffa6ace2e4d2..8f1728450dc8a 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -1742,3 +1742,12 @@ mod tests { // NOTE: the corresponding test for stderr is in ui/thread-stderr, due // to the test harness apparently interfering with stderr configuration. } + +//////////////////////////////////////////////////////////////////////////////// +// Futures/task integration +//////////////////////////////////////////////////////////////////////////////// + +mod block_on_future; + +#[unstable(feature = "block_on_future", issue = "0")] +pub use self::block_on_future::block_on_future; From 7627b0fed7aa07814c287e6a421492252089bc17 Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Sun, 27 Oct 2019 23:31:11 -0700 Subject: [PATCH 2/3] Make the initial `Waker` provided by `block_on_future` `Sync` As it was pointed out in the review, the initial `Waker` for `block_on_future` was not holding up the `Sync` guarantees. If the `Waker` reference had been passed to another thread and cloned there, the cloned threadsafe `Waker` would have captured the wrong `Thread` handle. This change removes the optimization. A threadsafe `Waker` is now immediately created. --- src/libstd/tests/block_on_future.rs | 95 ++++++++++++----- src/libstd/thread/block_on_future.rs | 146 ++++++++------------------- 2 files changed, 111 insertions(+), 130 deletions(-) diff --git a/src/libstd/tests/block_on_future.rs b/src/libstd/tests/block_on_future.rs index d4c08a702fbb5..49d0d130e4495 100644 --- a/src/libstd/tests/block_on_future.rs +++ b/src/libstd/tests/block_on_future.rs @@ -75,23 +75,6 @@ impl Future for Yield { } } -struct NeverReady { -} - -impl NeverReady { - fn new() -> Self { - NeverReady {} - } -} - -impl Future for NeverReady { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - Poll::Pending - } -} - struct WakerStore { waker: Option, } @@ -203,14 +186,6 @@ fn returns_result_from_task() { assert_eq!(42, result); } -#[test] -#[should_panic] -fn panics_if_waker_was_not_cloned_and_task_is_not_ready() { - block_on_future(async { - NeverReady::new().await; - }); -} - #[test] fn does_not_panic_if_waker_is_cloned_and_used_a_lot_later() { let store = Arc::new(Mutex::new(WakerStore { @@ -227,3 +202,73 @@ fn does_not_panic_if_waker_is_cloned_and_used_a_lot_later() { WakeFromPreviouslyStoredWakerFuture::new(store).await; }); } + +struct WakeSynchronouslyFromOtherThreadFuture { + was_polled: bool, + use_clone: bool, +} + +impl WakeSynchronouslyFromOtherThreadFuture { + fn new(use_clone: bool) -> Self { + WakeSynchronouslyFromOtherThreadFuture { + was_polled: false, + use_clone, + } + } +} + +/// This is just a helper to transfer a waker by reference/pointer +/// to another thread without the availability of scoped threads. +struct WakerBox { + waker: *const Waker, +} + +unsafe impl Send for WakerBox {} + +impl Future for WakeSynchronouslyFromOtherThreadFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.was_polled { + self.was_polled = true; + // This captures the waker by pointer and passes it to the other thread, + // since we don't have a scoped thread API available here. + // The pointer is however guaranteed to be alive when we call it, due to + // joining the thread in this scope. + let waker_box = WakerBox { + waker: cx.waker() as *const Waker, + }; + let use_clone = self.use_clone; + spawn(move ||{ + let x = waker_box; + unsafe { + if !use_clone { + (*(x.waker as *mut Waker)).wake_by_ref(); + } else { + let cloned_waker = (*(x.waker as *mut Waker)).clone(); + cloned_waker.wake_by_ref(); + } + } + }).join().unwrap(); + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +#[test] +fn wake_synchronously_by_ref_from_other_thread() { + block_on_future(async { + WakeSynchronouslyFromOtherThreadFuture::new(false).await; + Yield::new(10).await; + }) +} + +#[test] +fn clone_and_wake_synchronously_from_other_thread() { + block_on_future(async { + WakeSynchronouslyFromOtherThreadFuture::new(true).await; + Yield::new(10).await; + }) +} diff --git a/src/libstd/thread/block_on_future.rs b/src/libstd/thread/block_on_future.rs index 7f430412745f6..b58a1fcf34738 100644 --- a/src/libstd/thread/block_on_future.rs +++ b/src/libstd/thread/block_on_future.rs @@ -9,44 +9,21 @@ use crate::pin::Pin; use crate::sync::Arc; use super::{current, park, Inner, Thread}; -/// Carries a flag that is used to wakeup the executor. -/// A pointer to this struct is passed to the thread-local waker. -struct LocalWakeState { - is_woken: bool, - waker_was_cloned: bool, -} - -/// Returns the vtable that is used for waking up the executor -/// from another thread. -fn threadsafe_waker_vtable() -> &'static RawWakerVTable { - &RawWakerVTable::new( - clone_threadsafe_waker, - wake_threadsafe_waker, - wake_threadsafe_waker_by_ref, - drop_threadsafe_waker, - ) -} - /// Returns the vtable that is used for waking up the executor -/// from inside it's execution on the current thread. -fn current_thread_waker_vtable() -> &'static RawWakerVTable { +/// from any thread. +fn waker_vtable() -> &'static RawWakerVTable { &RawWakerVTable::new( - create_threadsafe_waker, - wake_current_thread, - wake_current_thread_by_ref, - |_| {}, + clone_waker, + wake_waker, + wake_waker_by_ref, + drop_waker, ) } -/// This method will be called when the waker reference gets cloned, -/// which makes it possible to transfer it to another thread. In this -/// case we have to create a threadsafe `Waker`. In order to to this -/// we retain the thread handle and store it in the new `RawWaker`s -/// data pointer. -unsafe fn create_threadsafe_waker(data: *const()) -> RawWaker { - let wake_state = data as *mut LocalWakeState; - (*wake_state).waker_was_cloned = true; - +/// Creates a [`RawWaker`] which captures the current thread handle +/// and allows to wake up the [`block_on_future`] executor from any +/// thread by calling [`Thread::unpark()`]. +fn create_threadsafe_raw_waker() -> RawWaker { // Get the `Arc` of a current thread handle and store into in // the type erased pointer. // @@ -61,24 +38,15 @@ unsafe fn create_threadsafe_waker(data: *const()) -> RawWaker { // `let arc_thread = Arc::new(current());` let arc_thread_inner = current().inner; let ptr = Arc::into_raw(arc_thread_inner) as *const (); - RawWaker::new(ptr, threadsafe_waker_vtable()) + RawWaker::new(ptr, waker_vtable()) } -unsafe fn clone_threadsafe_waker(data: *const()) -> RawWaker { +unsafe fn clone_waker(data: *const()) -> RawWaker { increase_refcount(data); - RawWaker::new(data, threadsafe_waker_vtable()) -} - -fn wake_current_thread(_data: *const()) { - unreachable!("A current thread waker can only be woken by reference"); + RawWaker::new(data, waker_vtable()) } -unsafe fn wake_current_thread_by_ref(data: *const()) { - let wake_state = data as *mut LocalWakeState; - (*wake_state).is_woken = true; -} - -unsafe fn wake_threadsafe_waker(data: *const ()) { +unsafe fn wake_waker(data: *const ()) { let arc_thread_inner = Arc::from_raw(data as *const Inner); let thread = Thread { inner: arc_thread_inner, @@ -86,7 +54,7 @@ unsafe fn wake_threadsafe_waker(data: *const ()) { thread.unpark(); } -unsafe fn wake_threadsafe_waker_by_ref(data: *const ()) { +unsafe fn wake_waker_by_ref(data: *const ()) { // Retain `Arc`, but don't touch refcount by wrapping in `ManuallyDrop` let arc_thread_inner = Arc::from_raw(data as *const Inner); let thread = mem::ManuallyDrop::new(Thread { @@ -95,7 +63,7 @@ unsafe fn wake_threadsafe_waker_by_ref(data: *const ()) { thread.unpark(); } -unsafe fn drop_threadsafe_waker(data: *const ()) { +unsafe fn drop_waker(data: *const ()) { drop(Thread { inner: Arc::from_raw(data as *const Inner), }) @@ -138,52 +106,23 @@ pub fn block_on_future(mut future: F) -> F::Output { // out of this function again. let mut future = unsafe { Pin::new_unchecked(&mut future) }; - let mut waker_state = LocalWakeState { - is_woken: true, - waker_was_cloned: false, + // Safety: The `Waker` that we create upholds all guarantees that are expected + // from a `Waker` + let waker = unsafe { + Waker::from_raw(create_threadsafe_raw_waker()) }; - // Safety: The `Waker` that we create here is references data on the current - // callstack. This is safe, since the polled `Future` only gets a reference - // to this `Waker`. When it tries to clone the `Waker`, a threadsafe and owned - // version is created instead. - unsafe { - let waker = Waker::from_raw(RawWaker::new( - &waker_state as *const LocalWakeState as *const (), - current_thread_waker_vtable())); - - let mut cx = Context::from_waker(&waker); - loop { - while waker_state.is_woken { - // Reset is_woken, so that we do not spin if the poll does not - // directly wake us up. - waker_state.is_woken = false; - if let Poll::Ready(task_result) = future.as_mut().poll(&mut cx) { - return task_result; - } - } - - // The task is not ready, and the `Waker` had not been woken from the - // current thread. In order for us to proceed we wait until the - // thread gets unparked by another thread. If the `Waker` has not been - // cloned this will never happen and represents a deadlock, which - // gets reported here. - if !waker_state.waker_was_cloned { - panic!("Deadlock: Task is not ready, but the Waker had not been cloned"); - // Note: This flag is never reset, since a `Waker` that had been cloned - // once can be cloned more often to wakeup this executor. We don't - // have knowledge on how many clones are around - therefore the - // deadlock detection only works for the case the `Waker` never - // gets cloned. - } - park(); - // If thread::park has returned, we have been notified by another - // thread. Therefore we are woken. - // Remark: This flag can not be set by the other thread directly, - // because it may no longer be alive at the point of time when - // wake() is called. - waker_state.is_woken = true; + let mut cx = Context::from_waker(&waker); + loop { + if let Poll::Ready(task_result) = future.as_mut().poll(&mut cx) { + return task_result; } + + // The task is not ready. In order for us to proceed we wait until the + // thread gets unparked. If the `Waker` had been woken inside `.poll()`, + // then `park()` will immediately return, and we will call `.poll()` + // again without any wait period. + park(); } } @@ -201,31 +140,28 @@ mod tests { fn check_refcounts() { let original = current_thread_refcount(); - let waker_state = LocalWakeState { - is_woken: true, - waker_was_cloned: false, - }; - - let waker = unsafe { Waker::from_raw(RawWaker::new( - &waker_state as *const LocalWakeState as *const (), - current_thread_waker_vtable())) }; + let waker = unsafe { Waker::from_raw(create_threadsafe_raw_waker()) }; + assert_eq!(original + 1, current_thread_refcount()); waker.wake_by_ref(); - assert_eq!(original, current_thread_refcount()); + assert_eq!(original + 1, current_thread_refcount()); let clone1 = waker.clone(); - assert_eq!(original + 1, current_thread_refcount()); - let clone2 = waker.clone(); assert_eq!(original + 2, current_thread_refcount()); - let clone3 = clone1.clone(); + let clone2 = waker.clone(); assert_eq!(original + 3, current_thread_refcount()); + let clone3 = clone1.clone(); + assert_eq!(original + 4, current_thread_refcount()); drop(clone1); - assert_eq!(original + 2, current_thread_refcount()); + assert_eq!(original + 3, current_thread_refcount()); clone2.wake_by_ref(); - assert_eq!(original + 2, current_thread_refcount()); + assert_eq!(original + 3, current_thread_refcount()); clone2.wake(); + assert_eq!(original + 2, current_thread_refcount()); + + drop(waker); assert_eq!(original + 1, current_thread_refcount()); clone3.wake_by_ref(); From 8630ed829554df786f4a553d09a4ed9aa237975a Mon Sep 17 00:00:00 2001 From: Matthias Einwag Date: Mon, 28 Oct 2019 00:32:20 -0700 Subject: [PATCH 3/3] Fix test --- src/libstd/tests/block_on_future.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libstd/tests/block_on_future.rs b/src/libstd/tests/block_on_future.rs index 49d0d130e4495..dd0a8d1aab2c3 100644 --- a/src/libstd/tests/block_on_future.rs +++ b/src/libstd/tests/block_on_future.rs @@ -34,9 +34,9 @@ impl Future for WakeFromRemoteThreadFuture { let wake_by_ref = self.wake_by_ref; self.join_handle = Some(spawn(move || { if wake_by_ref { - waker.wake(); - } else { waker.wake_by_ref(); + } else { + waker.wake(); } })); Poll::Pending