diff --git a/src/libstd/tests/block_on_future.rs b/src/libstd/tests/block_on_future.rs new file mode 100644 index 0000000000000..dd0a8d1aab2c3 --- /dev/null +++ b/src/libstd/tests/block_on_future.rs @@ -0,0 +1,274 @@ +//! Tests for the block_on_future function + +#![feature(block_on_future)] + +use std::future::Future; +use std::task::{Context, Poll, Waker}; +use std::pin::Pin; +use std::sync::{Arc, Mutex}; +use std::thread::{block_on_future, JoinHandle, spawn}; + +struct WakeFromRemoteThreadFuture { + was_polled: bool, + wake_by_ref: bool, + join_handle: Option>, +} + +impl WakeFromRemoteThreadFuture { + fn new(wake_by_ref: bool) -> Self { + WakeFromRemoteThreadFuture { + was_polled: false, + wake_by_ref, + join_handle: None, + } + } +} + +impl Future for WakeFromRemoteThreadFuture { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if !self.was_polled { + self.was_polled = true; + let waker = cx.waker().clone(); + let wake_by_ref = self.wake_by_ref; + self.join_handle = Some(spawn(move || { + if wake_by_ref { + waker.wake_by_ref(); + } else { + waker.wake(); + } + })); + Poll::Pending + } else { + if let Some(handle) = self.join_handle.take() { + handle.join().unwrap(); + } + Poll::Ready(()) + } + } +} + +struct Yield { + iterations: usize, +} + +impl Yield { + fn new(iterations: usize) -> Self { + Yield { + iterations + } + } +} + +impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.iterations == 0 { + Poll::Ready(()) + } else { + self.iterations -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + } +} + +struct WakerStore { + waker: Option, +} + +struct StoreWakerFuture { + store: Arc>, +} + +impl StoreWakerFuture { + fn new(store: Arc>) -> Self { + StoreWakerFuture { + store + } + } +} + +impl Future for StoreWakerFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + (*self.store.lock().unwrap()).waker = Some(cx.waker().clone()); + Poll::Ready(()) + } +} + +struct WakeFromPreviouslyStoredWakerFuture { + store: Arc>, + was_polled: bool, + join_handle: Option>, +} + +impl WakeFromPreviouslyStoredWakerFuture { + fn new(store: Arc>) -> Self { + WakeFromPreviouslyStoredWakerFuture { + store, + was_polled: false, + join_handle: None, + } + } +} + +impl Future for WakeFromPreviouslyStoredWakerFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + if !self.was_polled { + self.was_polled = true; + // Don't take the waker from Context but from the side channel + let waker = self.store.lock().unwrap().waker.clone().take().unwrap(); + self.join_handle = Some(spawn(move || { + waker.wake(); + })); + Poll::Pending + } else { + if let Some(handle) = self.join_handle.take() { + handle.join().unwrap(); + } + Poll::Ready(()) + } + } +} + +#[test] +fn wake_from_local_thread() { + block_on_future(async { + Yield::new(10).await; + }); +} + +#[test] +fn wake_from_foreign_thread() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(false).await; + }); +} + +#[test] +fn wake_by_ref_from_foreign_thread() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn wake_from_multiple_threads() { + block_on_future(async { + WakeFromRemoteThreadFuture::new(false).await; + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn wake_local_remote_local() { + block_on_future(async { + Yield::new(10).await; + WakeFromRemoteThreadFuture::new(false).await; + Yield::new(20).await; + WakeFromRemoteThreadFuture::new(true).await; + }); +} + +#[test] +fn returns_result_from_task() { + let result = block_on_future(async { + let x = 42i32; + Yield::new(10).await; + x + }); + assert_eq!(42, result); +} + +#[test] +fn does_not_panic_if_waker_is_cloned_and_used_a_lot_later() { + let store = Arc::new(Mutex::new(WakerStore { + waker: None, + })); + + block_on_future(async { + StoreWakerFuture::new(store.clone()).await; + Yield::new(10).await; + // Multiple wakes from an outdated waker - because it can + // have been cloned multiple times. + WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await; + WakeFromPreviouslyStoredWakerFuture::new(store.clone()).await; + WakeFromPreviouslyStoredWakerFuture::new(store).await; + }); +} + +struct WakeSynchronouslyFromOtherThreadFuture { + was_polled: bool, + use_clone: bool, +} + +impl WakeSynchronouslyFromOtherThreadFuture { + fn new(use_clone: bool) -> Self { + WakeSynchronouslyFromOtherThreadFuture { + was_polled: false, + use_clone, + } + } +} + +/// This is just a helper to transfer a waker by reference/pointer +/// to another thread without the availability of scoped threads. +struct WakerBox { + waker: *const Waker, +} + +unsafe impl Send for WakerBox {} + +impl Future for WakeSynchronouslyFromOtherThreadFuture { + type Output = (); + + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if !self.was_polled { + self.was_polled = true; + // This captures the waker by pointer and passes it to the other thread, + // since we don't have a scoped thread API available here. + // The pointer is however guaranteed to be alive when we call it, due to + // joining the thread in this scope. + let waker_box = WakerBox { + waker: cx.waker() as *const Waker, + }; + let use_clone = self.use_clone; + spawn(move ||{ + let x = waker_box; + unsafe { + if !use_clone { + (*(x.waker as *mut Waker)).wake_by_ref(); + } else { + let cloned_waker = (*(x.waker as *mut Waker)).clone(); + cloned_waker.wake_by_ref(); + } + } + }).join().unwrap(); + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +#[test] +fn wake_synchronously_by_ref_from_other_thread() { + block_on_future(async { + WakeSynchronouslyFromOtherThreadFuture::new(false).await; + Yield::new(10).await; + }) +} + +#[test] +fn clone_and_wake_synchronously_from_other_thread() { + block_on_future(async { + WakeSynchronouslyFromOtherThreadFuture::new(true).await; + Yield::new(10).await; + }) +} diff --git a/src/libstd/thread/block_on_future.rs b/src/libstd/thread/block_on_future.rs new file mode 100644 index 0000000000000..b58a1fcf34738 --- /dev/null +++ b/src/libstd/thread/block_on_future.rs @@ -0,0 +1,172 @@ +//! Future/Task/Thread integration. +//! The method defined in this module allows to block a thread until an +//! async task completes. + +use crate::future::Future; +use crate::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use crate::mem; +use crate::pin::Pin; +use crate::sync::Arc; +use super::{current, park, Inner, Thread}; + +/// Returns the vtable that is used for waking up the executor +/// from any thread. +fn waker_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new( + clone_waker, + wake_waker, + wake_waker_by_ref, + drop_waker, + ) +} + +/// Creates a [`RawWaker`] which captures the current thread handle +/// and allows to wake up the [`block_on_future`] executor from any +/// thread by calling [`Thread::unpark()`]. +fn create_threadsafe_raw_waker() -> RawWaker { + // Get the `Arc` of a current thread handle and store into in + // the type erased pointer. + // + // This mechanism exploits the fact that `Thread` is already an `Arc`. + // Therefore in order to create clones of the thread handle we can just + // directly create clones of the Inner state, and recreate the `Thread` + // handle when necessary. + // + // If the implementation of `Thread` would be changed to something different, + // we would need to wrap the complete `Thread` object in another `Arc` by + // adopt the following line to: + // `let arc_thread = Arc::new(current());` + let arc_thread_inner = current().inner; + let ptr = Arc::into_raw(arc_thread_inner) as *const (); + RawWaker::new(ptr, waker_vtable()) +} + +unsafe fn clone_waker(data: *const()) -> RawWaker { + increase_refcount(data); + RawWaker::new(data, waker_vtable()) +} + +unsafe fn wake_waker(data: *const ()) { + let arc_thread_inner = Arc::from_raw(data as *const Inner); + let thread = Thread { + inner: arc_thread_inner, + }; + thread.unpark(); +} + +unsafe fn wake_waker_by_ref(data: *const ()) { + // Retain `Arc`, but don't touch refcount by wrapping in `ManuallyDrop` + let arc_thread_inner = Arc::from_raw(data as *const Inner); + let thread = mem::ManuallyDrop::new(Thread { + inner: arc_thread_inner, + }); + thread.unpark(); +} + +unsafe fn drop_waker(data: *const ()) { + drop(Thread { + inner: Arc::from_raw(data as *const Inner), + }) +} + +#[allow(clippy::redundant_clone)] // The clone here isn't actually redundant. +unsafe fn increase_refcount(data: *const ()) { + // Retain Arc, but don't touch refcount by wrapping in `ManuallyDrop` + let arc = mem::ManuallyDrop::new(Arc::::from_raw(data as *const Inner)); + // Now increase refcount, but don't drop new refcount either. + // Note: `Arc::clone()` will not panic but abort if the newrefcount is + // unrealistically high. Therefore this is safe, as long not more `Waker` + // clones are created than what is allowed for other `Arc` instances. + let _arc_clone: mem::ManuallyDrop<_> = arc.clone(); +} + +/// Runs a [`Future`] to completion on the current thread and returns its output +/// value. +/// +/// This method represents a minimal [`Future`]s executor. The executor will +/// drive the [`Future`] to completion on the current thread. The executor will +/// be providing a [`Waker`] to all polled Child-[`Future`]s which can be woken +/// from either the current thread or any other thread. +/// +/// # Examples +/// +/// ``` +/// #![feature(block_on_future)] +/// use std::thread::block_on_future; +/// +/// let result = block_on_future(async { +/// 5 +/// }); +/// assert_eq!(5, result); +/// ``` +#[unstable(feature = "block_on_future", issue = "0")] +pub fn block_on_future(mut future: F) -> F::Output { + // Pin the `Future` - which had been moved into this function - on the stack. + // Safety: This is safe since the `Future` gets aliased and will not be moved + // out of this function again. + let mut future = unsafe { Pin::new_unchecked(&mut future) }; + + // Safety: The `Waker` that we create upholds all guarantees that are expected + // from a `Waker` + let waker = unsafe { + Waker::from_raw(create_threadsafe_raw_waker()) + }; + + let mut cx = Context::from_waker(&waker); + loop { + if let Poll::Ready(task_result) = future.as_mut().poll(&mut cx) { + return task_result; + } + + // The task is not ready. In order for us to proceed we wait until the + // thread gets unparked. If the `Waker` had been woken inside `.poll()`, + // then `park()` will immediately return, and we will call `.poll()` + // again without any wait period. + park(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn current_thread_refcount() -> usize { + Arc::strong_count(¤t().inner) + } + + /// Check that we don't leak any thread::Inner instances to wrong refcount + /// handling. + #[test] + fn check_refcounts() { + let original = current_thread_refcount(); + + let waker = unsafe { Waker::from_raw(create_threadsafe_raw_waker()) }; + assert_eq!(original + 1, current_thread_refcount()); + + waker.wake_by_ref(); + assert_eq!(original + 1, current_thread_refcount()); + + let clone1 = waker.clone(); + assert_eq!(original + 2, current_thread_refcount()); + let clone2 = waker.clone(); + assert_eq!(original + 3, current_thread_refcount()); + let clone3 = clone1.clone(); + assert_eq!(original + 4, current_thread_refcount()); + + drop(clone1); + assert_eq!(original + 3, current_thread_refcount()); + + clone2.wake_by_ref(); + assert_eq!(original + 3, current_thread_refcount()); + clone2.wake(); + assert_eq!(original + 2, current_thread_refcount()); + + drop(waker); + assert_eq!(original + 1, current_thread_refcount()); + + clone3.wake_by_ref(); + assert_eq!(original + 1, current_thread_refcount()); + clone3.wake(); + assert_eq!(original, current_thread_refcount()); + } +} diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 0ffa6ace2e4d2..8f1728450dc8a 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -1742,3 +1742,12 @@ mod tests { // NOTE: the corresponding test for stderr is in ui/thread-stderr, due // to the test harness apparently interfering with stderr configuration. } + +//////////////////////////////////////////////////////////////////////////////// +// Futures/task integration +//////////////////////////////////////////////////////////////////////////////// + +mod block_on_future; + +#[unstable(feature = "block_on_future", issue = "0")] +pub use self::block_on_future::block_on_future;