diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index d16116a205..d4f18f206f 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -33,7 +33,4 @@ if_std! { mod enter; pub use crate::enter::{enter, Enter, EnterError}; - - mod spawn; - pub use crate::spawn::{spawn, Spawn, spawn_with_handle, SpawnWithHandle, JoinHandle}; } diff --git a/futures-executor/src/spawn.rs b/futures-executor/src/spawn.rs deleted file mode 100644 index 9b0b442a48..0000000000 --- a/futures-executor/src/spawn.rs +++ /dev/null @@ -1,186 +0,0 @@ -use futures_core::future::Future; -use futures_core::task::{self, Poll}; -use futures_channel::oneshot::{channel, Sender, Receiver}; -use futures_util::future::FutureExt; -use futures_util::task::ContextExt; -use std::thread; -use std::sync::Arc; -use std::sync::atomic::Ordering; -use std::panic::{self, AssertUnwindSafe}; -use std::sync::atomic::AtomicBool; -use std::mem::PinMut; -use std::boxed::Box; -use std::marker::Unpin; - -/// A future representing the completion of task spawning. -/// -/// See [`spawn`](spawn()) for details. -#[derive(Debug)] -pub struct Spawn { - future: Option -} - -impl Spawn { - unsafe_unpinned!(future: Option); -} - -/// Spawn a task onto the default executor. -/// -/// This function returns a future that will spawn the given future as a task -/// onto the default executor. It does *not* provide any way to wait on task -/// completion or extract a value from the task. That can either be done through -/// a channel, or by using [`spawn_with_handle`](spawn_with_handle). -pub fn spawn(future: F) -> Spawn - where F: Future + 'static + Send -{ - Spawn { future: Some(future) } -} - -impl + Send + 'static> Future for Spawn { - type Output = (); - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll<()> { - cx.spawn(self.future().take().unwrap()); - Poll::Ready(()) - } -} - -/// A future representing the completion of task spawning, yielding a -/// [`JoinHandle`](JoinHandle) to the spawned task. -/// -/// See [`spawn_with_handle`](spawn_with_handle) for details. -#[derive(Debug)] -pub struct SpawnWithHandle { - future: Option -} - -impl SpawnWithHandle { - unsafe_unpinned!(future: Option); -} - -/// Spawn a task onto the default executor, yielding a -/// [`JoinHandle`](JoinHandle) to the spawned task. -/// -/// This function returns a future that will spawn the given future as a task -/// onto the default executor. On completion, that future will yield a -/// [`JoinHandle`](JoinHandle) that can itself be used as a future -/// representing the completion of the spawned task. -/// -/// # Examples -/// -/// ``` -/// # #![feature(pin, arbitrary_self_types, futures_api)] -/// # extern crate futures; -/// # -/// use futures::prelude::*; -/// use futures::future; -/// use futures::executor::{block_on, spawn_with_handle}; -/// -/// let future = future::ready::(1); -/// let join_handle = block_on(spawn_with_handle(future)); -/// let output = block_on(join_handle); -/// assert_eq!(output, 1); -/// ``` -pub fn spawn_with_handle(f: F) -> SpawnWithHandle - where F: Future + 'static + Send, F::Output: Send -{ - SpawnWithHandle { future: Some(f) } -} - -impl Future for SpawnWithHandle - where F: Future + Send + 'static, - F::Output: Send, -{ - type Output = JoinHandle; - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll { - let (tx, rx) = channel(); - let keep_running_flag = Arc::new(AtomicBool::new(false)); - // AssertUnwindSafe is used here because `Send + 'static` is basically - // an alias for an implementation of the `UnwindSafe` trait but we can't - // express that in the standard library right now. - let sender = MySender { - future: AssertUnwindSafe(self.future().take().unwrap()).catch_unwind(), - tx: Some(tx), - keep_running_flag: keep_running_flag.clone(), - }; - - cx.spawn(sender); - Poll::Ready(JoinHandle { - inner: rx , - keep_running_flag: keep_running_flag.clone() - }) - } -} - -struct MySender { - future: F, - tx: Option>, - keep_running_flag: Arc, -} -impl Unpin for MySender {} - -impl MySender { - unsafe_pinned!(future: F); - unsafe_unpinned!(tx: Option>); - unsafe_unpinned!(keep_running_flag: Arc); -} - -/// The type of future returned from the `ThreadPool::spawn` function, which -/// proxies the futures running on the thread pool. -/// -/// This future will resolve in the same way as the underlying future, and it -/// will propagate panics. -#[must_use] -#[derive(Debug)] -pub struct JoinHandle { - inner: Receiver>, - keep_running_flag: Arc, -} - -impl JoinHandle { - /// Drop this handle *without* canceling the underlying future. - /// - /// When `JoinHandle` is dropped, `ThreadPool` will try to abort the associated - /// task. This function can be used when you want to drop the handle but keep - /// executing the task. - pub fn forget(self) { - self.keep_running_flag.store(true, Ordering::SeqCst); - } -} - -impl Future for JoinHandle { - type Output = T; - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll { // ToDo: This was weird! Double check! - match self.inner.poll_unpin(cx) { - Poll::Ready(Ok(Ok(output))) => Poll::Ready(output), - Poll::Ready(Ok(Err(e))) => panic::resume_unwind(e), - Poll::Ready(Err(e)) => panic::resume_unwind(Box::new(e)), - Poll::Pending => Poll::Pending, - } - } -} - -impl Future for MySender { - type Output = (); - - fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll<()> { - if let Poll::Ready(_) = self.tx().as_mut().unwrap().poll_cancel(cx) { - if !self.keep_running_flag().load(Ordering::SeqCst) { - // Cancelled, bail out - return Poll::Ready(()) - } - } - - let output = match self.future().poll(cx) { - Poll::Ready(output) => output, - Poll::Pending => return Poll::Pending, - }; - - // if the receiving end has gone away then that's ok, we just ignore the - // send error here. - drop(self.tx().take().unwrap().send(output)); - Poll::Ready(()) - } -} diff --git a/futures-util/src/async_await/mod.rs b/futures-util/src/async_await/mod.rs index 1dcc5f01b5..81bf6da25e 100644 --- a/futures-util/src/async_await/mod.rs +++ b/futures-util/src/async_await/mod.rs @@ -20,5 +20,8 @@ mod join; // Primary export is a macro mod select; +// Primary export is a macro +mod spawn; + #[doc(hidden)] pub fn assert_unpin(_: &T) {} diff --git a/futures-util/src/async_await/spawn.rs b/futures-util/src/async_await/spawn.rs new file mode 100644 index 0000000000..85b520155a --- /dev/null +++ b/futures-util/src/async_await/spawn.rs @@ -0,0 +1,61 @@ +/// Spawns a task onto the context's executor that polls the given future with +/// output `()` to completion. +/// +/// This macro returns a [`Result`] that contains a +/// [`SpawnError`](crate::task::SpawnError) if spawning fails. +/// +/// You can use [`spawn_with_handle!`] if you want to spawn a future +/// with output other than `()` or if you want to be able to await its +/// completion. +/// +/// ``` +/// #![feature(async_await, await_macro, futures_api)] +/// #[macro_use] extern crate futures; +/// # futures::executor::block_on(async { +/// +/// let future = async { /* ... */ }; +/// spawn!(future).unwrap(); +/// # }); +/// ``` +#[macro_export] +macro_rules! spawn { + ($future:expr) => { + await!($crate::future::lazy(|cx| { + use $crate::task::ExecutorExt; + cx.executor().spawn($future) + })) + } +} + +/// Spawns a task onto the context's executor that polls the given future to +/// completion and returns a future that resolves to the spawned future's +/// output. +/// +/// This macro returns a [`Result`] that contains a +/// [`JoinHandle`](crate::task::JoinHandle), or, if spawning fails, a +/// [`SpawnError`](crate::task::SpawnError). +/// [`JoinHandle`](crate::task::JoinHandle) is a future that resolves +/// to the output of the spawned future +/// +/// # Examples +/// +/// ``` +/// #![feature(async_await, await_macro, futures_api)] +/// #[macro_use] extern crate futures; +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let future = future::ready(1); +/// let join_handle = spawn_with_handle!(future).unwrap(); +/// assert_eq!(await!(join_handle), 1); +/// # }); +/// ``` +#[macro_export] +macro_rules! spawn_with_handle { + ($future:expr) => { + await!($crate::future::lazy(|cx| { + use $crate::task::ExecutorExt; + cx.executor().spawn_with_handle($future) + })) + } +} diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 6ef478c9e7..2d64c777d1 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -639,16 +639,31 @@ pub trait FutureExt: Future { /// /// ``` /// #![feature(async_await, await_macro, futures_api)] + /// #[macro_use] extern crate futures; /// # futures::executor::block_on(async { - /// use futures::executor::{spawn_with_handle, ThreadPool}; - /// use futures::future::{self, FutureExt}; - /// - /// let pool = ThreadPool::new().expect("unable to create threadpool"); - /// let future = spawn_with_handle(future::ready(3)) - /// .with_executor(pool) - /// .flatten(); - /// assert_eq!(await!(future), 3); - /// # }); + /// use futures::executor::ThreadPool; + /// use futures::future::FutureExt; + /// use std::thread; + /// # let (tx, rx) = futures::channel::oneshot::channel(); + /// + /// let pool = ThreadPool::builder() + /// .name_prefix("my-pool-") + /// .pool_size(1) + /// .create().unwrap(); + /// + /// let val = await!((async { + /// assert_ne!(thread::current().name(), Some("my-pool-0")); + /// + /// // Spawned task runs on the executor specified via `with_executor` + /// let future = async { + /// assert_eq!(thread::current().name(), Some("my-pool-0")); + /// # tx.send("ran").unwrap(); + /// }; + /// spawn!(future).unwrap(); + /// }).with_executor(pool)); + /// + /// # assert_eq!(await!(rx), Ok("ran")) + /// # }) /// ``` fn with_executor(self, executor: E) -> WithExecutor where Self: Sized, diff --git a/futures-util/src/task/context.rs b/futures-util/src/task/context.rs deleted file mode 100644 index 595cd67668..0000000000 --- a/futures-util/src/task/context.rs +++ /dev/null @@ -1,29 +0,0 @@ -use futures_core::task; - -if_std! { - use futures_core::future::{Future, FutureObj}; - use std::boxed::Box; -} - -/// Extension trait for `Context`, adding methods that require allocation. -pub trait ContextExt { - /// Spawn a future onto the default executor. - /// - /// # Panics - /// - /// This method will panic if the default executor is unable to spawn. - /// - /// To handle executor errors, use `Context::executor()` on instead. - #[cfg(feature = "std")] - fn spawn(&mut self, future: Fut) - where Fut: Future + 'static + Send; -} - -impl<'a> ContextExt for task::Context<'a> { - #[cfg(feature = "std")] - fn spawn(&mut self, future: Fut) - where Fut: Future + 'static + Send - { - self.executor().spawn_obj(FutureObj::new(Box::new(future))).unwrap() - } -} diff --git a/futures-util/src/task/executor/mod.rs b/futures-util/src/task/executor/mod.rs new file mode 100644 index 0000000000..97787efd9c --- /dev/null +++ b/futures-util/src/task/executor/mod.rs @@ -0,0 +1,84 @@ +use futures_core::future::Future; +use futures_core::task::Executor; + +mod spawn_error; +pub use self::spawn_error::SpawnError; + +if_std! { + mod spawn_with_handle; + use self::spawn_with_handle::spawn_with_handle; + pub use self::spawn_with_handle::JoinHandle; +} + +impl ExecutorExt for Ex where Ex: Executor {} + +/// Extension trait for `Executor` +pub trait ExecutorExt: Executor { + /// Spawns a task that polls the given future with output `()` to + /// completion. + /// + /// This method returns a [`Result`] that contains a [`SpawnError`] if + /// spawning fails. + /// + /// You can use [`spawn_with_handle`](ExecutorExt::spawn_with_handle) if + /// you want to spawn a future with output other than `()` or if you want + /// to be able to await its completion. + /// + /// Note this method will eventually be replaced with the upcoming + /// `Executor::spawn` method which will take a `dyn Future` as input. + /// Technical limitations prevent `Executor::spawn` from being implemented + /// today. Feel free to use this method in the meantime. + /// + /// ``` + /// #![feature(async_await, await_macro, futures_api)] + /// # futures::executor::block_on(async { + /// use futures::executor::ThreadPool; + /// use futures::task::ExecutorExt; + /// + /// let mut executor = ThreadPool::new().unwrap(); + /// + /// let future = async { /* ... */ }; + /// executor.spawn(future).unwrap(); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn spawn(&mut self, future: Fut) -> Result<(), SpawnError> + where Fut: Future + Send + 'static, + { + let res = self.spawn_obj(Box::new(future).into()); + res.map_err(|err| SpawnError { kind: err.kind }) + } + + /// Spawns a task that polls the given future to completion and returns a + /// future that resolves to the spawned future's output. + /// + /// This method returns a [`Result`] that contains a [`JoinHandle`], or, if + /// spawning fails, a [`SpawnError`]. [`JoinHandle`] is a future that + /// resolves to the output of the spawned future. + /// + /// ``` + /// #![feature(async_await, await_macro, futures_api)] + /// # futures::executor::block_on(async { + /// use futures::executor::ThreadPool; + /// use futures::future; + /// use futures::task::ExecutorExt; + /// + /// let mut executor = ThreadPool::new().unwrap(); + /// + /// let future = future::ready(1); + /// let join_handle = executor.spawn_with_handle(future).unwrap(); + /// assert_eq!(await!(join_handle), 1); + /// # }); + /// ``` + #[cfg(feature = "std")] + fn spawn_with_handle( + &mut self, + future: Fut + ) -> Result, SpawnError> + where Fut: Future + Send + 'static, + Fut::Output: Send, + { + spawn_with_handle(self, future) + } +} + diff --git a/futures-util/src/task/executor/spawn_error.rs b/futures-util/src/task/executor/spawn_error.rs new file mode 100644 index 0000000000..211bc6bdd4 --- /dev/null +++ b/futures-util/src/task/executor/spawn_error.rs @@ -0,0 +1,8 @@ +use futures_core::task::SpawnErrorKind; + +/// The result of a failed spawn +#[derive(Debug)] +pub struct SpawnError { + /// The kind of error + pub kind: SpawnErrorKind, +} diff --git a/futures-util/src/task/executor/spawn_with_handle.rs b/futures-util/src/task/executor/spawn_with_handle.rs new file mode 100644 index 0000000000..cb246dca9c --- /dev/null +++ b/futures-util/src/task/executor/spawn_with_handle.rs @@ -0,0 +1,107 @@ +use crate::future::FutureExt; +use super::SpawnError; +use futures_channel::oneshot::{self, Sender, Receiver}; +use futures_core::future::Future; +use futures_core::task::{self, Poll, Executor, SpawnObjError}; +use std::marker::Unpin; +use std::mem::PinMut; +use std::panic::{self, AssertUnwindSafe}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread; + +/// The join handle returned by +/// [`spawn_with_handle`](crate::task::ExecutorExt::spawn_with_handle). +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub struct JoinHandle { + rx: Receiver>, + keep_running: Arc, +} + +impl JoinHandle { + /// Drops this handle *without* canceling the underlying future. + /// + /// This method can be used if you want to drop the handle, but let the + /// execution continue. + pub fn forget(self) { + self.keep_running.store(true, Ordering::SeqCst); + } +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll { + match self.rx.poll_unpin(cx) { + Poll::Ready(Ok(Ok(output))) => Poll::Ready(output), + Poll::Ready(Ok(Err(e))) => panic::resume_unwind(e), + Poll::Ready(Err(e)) => panic::resume_unwind(Box::new(e)), + Poll::Pending => Poll::Pending, + } + } +} + +struct Wrapped { + tx: Option>, + keep_running: Arc, + future: Fut, +} + +impl Unpin for Wrapped {} + +impl Wrapped { + unsafe_pinned!(future: Fut); + unsafe_unpinned!(tx: Option>); + unsafe_unpinned!(keep_running: Arc); +} + +impl Future for Wrapped { + type Output = (); + + fn poll(mut self: PinMut, cx: &mut task::Context) -> Poll<()> { + if let Poll::Ready(_) = self.tx().as_mut().unwrap().poll_cancel(cx) { + if !self.keep_running().load(Ordering::SeqCst) { + // Cancelled, bail out + return Poll::Ready(()) + } + } + + let output = match self.future().poll(cx) { + Poll::Ready(output) => output, + Poll::Pending => return Poll::Pending, + }; + + // if the receiving end has gone away then that's ok, we just ignore the + // send error here. + drop(self.tx().take().unwrap().send(output)); + Poll::Ready(()) + } +} + +pub(super) fn spawn_with_handle( + executor: &mut Ex, + future: Fut, +) -> Result, SpawnError> +where Ex: Executor + ?Sized, + Fut: Future + Send + 'static, + Fut::Output: Send, +{ + let (tx, rx) = oneshot::channel(); + let keep_running = Arc::new(AtomicBool::new(false)); + + // AssertUnwindSafe is used here because `Send + 'static` is basically + // an alias for an implementation of the `UnwindSafe` trait but we can't + // express that in the standard library right now. + let wrapped = Wrapped { + future: AssertUnwindSafe(future).catch_unwind(), + tx: Some(tx), + keep_running: keep_running.clone(), + }; + + let res = executor.spawn_obj(Box::new(wrapped).into()); + match res { + Ok(()) => Ok(JoinHandle { rx, keep_running }), + Err(SpawnObjError { kind, .. }) => Err(SpawnError { kind }), + } +} diff --git a/futures-util/src/task/mod.rs b/futures-util/src/task/mod.rs index a1df2c3999..e043aacf4b 100644 --- a/futures-util/src/task/mod.rs +++ b/futures-util/src/task/mod.rs @@ -1,7 +1,11 @@ //! Task notification -mod context; -pub use self::context::ContextExt; +mod executor; +pub use self::executor::{ExecutorExt, SpawnError}; + +if_std! { + pub use self::executor::JoinHandle; +} #[cfg_attr( feature = "nightly", diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 21712582a9..de45545b1d 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -60,7 +60,7 @@ pub use futures_util::{ #[cfg(feature = "std")] pub use futures_util::{ // Async-await - join, select, pending, poll, + join, select, pending, poll, spawn, spawn_with_handle, }; #[cfg(feature = "std")] @@ -145,9 +145,8 @@ pub mod executor { BlockingStream, Enter, EnterError, LocalExecutor, LocalPool, - Spawn, SpawnWithHandle, - ThreadPool, ThreadPoolBuilder, JoinHandle, - block_on, block_on_stream, enter, spawn, spawn_with_handle + ThreadPool, ThreadPoolBuilder, + block_on, block_on_stream, enter, }; } @@ -241,7 +240,7 @@ pub mod prelude { pub use crate::future::{self, Future, TryFuture, FutureExt, TryFutureExt}; pub use crate::stream::{self, Stream, TryStream, StreamExt, TryStreamExt}; - pub use crate::task::{self, Poll, ContextExt}; + pub use crate::task::{self, Poll, ExecutorExt}; pub use crate::sink::{self, Sink, SinkExt}; #[cfg(feature = "std")] @@ -348,16 +347,19 @@ pub mod task { SpawnErrorKind, SpawnObjError, SpawnLocalObjError, }; - pub use futures_util::task::ContextExt; + #[cfg(feature = "std")] + pub use futures_core::task::{ + Wake, local_waker, local_waker_from_nonlocal + }; + + pub use futures_util::task::{ExecutorExt, SpawnError}; + + #[cfg(feature = "std")] + pub use futures_util::task::JoinHandle; #[cfg_attr( feature = "nightly", cfg(all(target_has_atomic = "cas", target_has_atomic = "ptr")) )] pub use futures_util::task::AtomicWaker; - - #[cfg(feature = "std")] - pub use futures_core::task::{ - Wake, local_waker, local_waker_from_nonlocal - }; }