From ca58bee42f523535753b081d3824646dbfb7248a Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Mon, 11 Jul 2022 11:52:07 -0700 Subject: [PATCH 1/3] task: make Builder::spawn* methods fallible Making the `task::Builder::spawn*` methods fallible allows applications to gracefully handle spawn errors (e.g. due to resource exhaustion) without tokio panicking internally. This change is also a good analogue for `std::thread::Builder` which has fallible spawn methods (whereas `std::thread::spawn` internally panics) --- tokio/src/task/builder.rs | 25 +++++++++++++------------ tokio/src/task/join_set.rs | 20 +++++++++++--------- tokio/tests/task_builder.rs | 20 +++++++++++++++++--- 3 files changed, 41 insertions(+), 24 deletions(-) diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index ddb5c430ef1..1c779c82386 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -3,7 +3,7 @@ use crate::{ runtime::{context, Handle}, task::{JoinHandle, LocalSet}, }; -use std::future::Future; +use std::{future::Future, io}; /// Factory which is used to configure the properties of a new task. /// @@ -83,12 +83,12 @@ impl<'a> Builder<'a> { /// See [`task::spawn`](crate::task::spawn) for /// more details. #[track_caller] - pub fn spawn(self, future: Fut) -> JoinHandle + pub fn spawn(self, future: Fut) -> io::Result> where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - super::spawn::spawn_inner(future, self.name) + Ok(super::spawn::spawn_inner(future, self.name)) } /// Spawn a task with this builder's settings on the provided [runtime @@ -99,12 +99,12 @@ impl<'a> Builder<'a> { /// [runtime handle]: crate::runtime::Handle /// [`Handle::spawn`]: crate::runtime::Handle::spawn #[track_caller] - pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> JoinHandle + pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> io::Result> where Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - handle.spawn_named(future, self.name) + Ok(handle.spawn_named(future, self.name)) } /// Spawns `!Send` a task on the current [`LocalSet`] with this builder's @@ -122,12 +122,12 @@ impl<'a> Builder<'a> { /// [`task::spawn_local`]: crate::task::spawn_local /// [`LocalSet`]: crate::task::LocalSet #[track_caller] - pub fn spawn_local(self, future: Fut) -> JoinHandle + pub fn spawn_local(self, future: Fut) -> io::Result> where Fut: Future + 'static, Fut::Output: 'static, { - super::local::spawn_local_inner(future, self.name) + Ok(super::local::spawn_local_inner(future, self.name)) } /// Spawns `!Send` a task on the provided [`LocalSet`] with this builder's @@ -138,12 +138,12 @@ impl<'a> Builder<'a> { /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local /// [`LocalSet`]: crate::task::LocalSet #[track_caller] - pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> JoinHandle + pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> io::Result> where Fut: Future + 'static, Fut::Output: 'static, { - local_set.spawn_named(future, self.name) + Ok(local_set.spawn_named(future, self.name)) } /// Spawns blocking code on the blocking threadpool. @@ -155,7 +155,7 @@ impl<'a> Builder<'a> { /// See [`task::spawn_blocking`](crate::task::spawn_blocking) /// for more details. #[track_caller] - pub fn spawn_blocking(self, function: Function) -> JoinHandle + pub fn spawn_blocking(self, function: Function) -> io::Result> where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, @@ -174,7 +174,7 @@ impl<'a> Builder<'a> { self, function: Function, handle: &Handle, - ) -> JoinHandle + ) -> io::Result> where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, @@ -186,6 +186,7 @@ impl<'a> Builder<'a> { self.name, handle, ); - join_handle + + Ok(join_handle) } } diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index cb08e4df065..beacf682f63 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -6,6 +6,7 @@ //! details. use std::fmt; use std::future::Future; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -377,13 +378,13 @@ impl<'a, T: 'static> Builder<'a, T> { /// /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn(self, future: F) -> AbortHandle + pub fn spawn(self, future: F) -> io::Result where F: Future, F: Send + 'static, T: Send, { - self.joinset.insert(self.builder.spawn(future)) + Ok(self.joinset.insert(self.builder.spawn(future)?)) } /// Spawn the provided task on the provided [runtime handle] with this @@ -397,13 +398,13 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`AbortHandle`]: crate::task::AbortHandle /// [runtime handle]: crate::runtime::Handle #[track_caller] - pub fn spawn_on(mut self, future: F, handle: &Handle) -> AbortHandle + pub fn spawn_on(mut self, future: F, handle: &Handle) -> io::Result where F: Future, F: Send + 'static, T: Send, { - self.joinset.insert(self.builder.spawn_on(future, handle)) + Ok(self.joinset.insert(self.builder.spawn_on(future, handle)?)) } /// Spawn the provided task on the current [`LocalSet`] with this builder's @@ -420,12 +421,12 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`LocalSet`]: crate::task::LocalSet /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn_local(self, future: F) -> AbortHandle + pub fn spawn_local(self, future: F) -> io::Result where F: Future, F: 'static, { - self.joinset.insert(self.builder.spawn_local(future)) + Ok(self.joinset.insert(self.builder.spawn_local(future)?)) } /// Spawn the provided task on the provided [`LocalSet`] with this builder's @@ -438,13 +439,14 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`LocalSet`]: crate::task::LocalSet /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> AbortHandle + pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> io::Result where F: Future, F: 'static, { - self.joinset - .insert(self.builder.spawn_local_on(future, local_set)) + Ok(self + .joinset + .insert(self.builder.spawn_local_on(future, local_set)?)) } } diff --git a/tokio/tests/task_builder.rs b/tokio/tests/task_builder.rs index 1499abf19e4..78329ff26a4 100644 --- a/tokio/tests/task_builder.rs +++ b/tokio/tests/task_builder.rs @@ -11,6 +11,7 @@ mod tests { let result = Builder::new() .name("name") .spawn(async { "task executed" }) + .unwrap() .await; assert_eq!(result.unwrap(), "task executed"); @@ -21,6 +22,7 @@ mod tests { let result = Builder::new() .name("name") .spawn_blocking(|| "task executed") + .unwrap() .await; assert_eq!(result.unwrap(), "task executed"); @@ -34,6 +36,7 @@ mod tests { Builder::new() .name("name") .spawn_local(async move { unsend_data }) + .unwrap() .await }) .await; @@ -43,14 +46,20 @@ mod tests { #[test] async fn spawn_without_name() { - let result = Builder::new().spawn(async { "task executed" }).await; + let result = Builder::new() + .spawn(async { "task executed" }) + .unwrap() + .await; assert_eq!(result.unwrap(), "task executed"); } #[test] async fn spawn_blocking_without_name() { - let result = Builder::new().spawn_blocking(|| "task executed").await; + let result = Builder::new() + .spawn_blocking(|| "task executed") + .unwrap() + .await; assert_eq!(result.unwrap(), "task executed"); } @@ -59,7 +68,12 @@ mod tests { async fn spawn_local_without_name() { let unsend_data = Rc::new("task executed"); let result = LocalSet::new() - .run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await }) + .run_until(async move { + Builder::new() + .spawn_local(async move { unsend_data }) + .unwrap() + .await + }) .await; assert_eq!(*result.unwrap(), "task executed"); From c09b90ef5f4e02587cff494de3334dc87133f36e Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Mon, 11 Jul 2022 12:59:13 -0700 Subject: [PATCH 2/3] task: surface spawn_blocking errors through the Builder Using `tokio::task::spawn_blocking` continues to exhibit the previous behavior (panic if there aren't any worker threads available to accept the task, but return a dummy handle if the runtime is shutting down) --- tokio/src/runtime/blocking/mod.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 27 ++++++++++++++++++++++++--- tokio/src/runtime/handle.rs | 19 +++++++++++++------ tokio/src/task/builder.rs | 21 ++++++++++++++++----- 4 files changed, 54 insertions(+), 15 deletions(-) diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 88d5e6b6a99..7633299b302 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -4,7 +4,7 @@ //! compilation. mod pool; -pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, Spawner, Task}; +pub(crate) use pool::{spawn_blocking, BlockingPool, Mandatory, SpawnError, Spawner, Task}; cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f73868ee9e7..009b3226e28 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -11,6 +11,7 @@ use crate::runtime::{Builder, Callback, ToHandle}; use std::collections::{HashMap, VecDeque}; use std::fmt; +use std::io; use std::time::Duration; pub(crate) struct BlockingPool { @@ -82,6 +83,26 @@ pub(crate) enum Mandatory { NonMandatory, } +pub(crate) enum SpawnError { + /// Pool is shutting down and the task was not scheduled + ShuttingDown, + /// There are no worker threads available to take the task + /// and the OS failed to spawn a new one + NoThreads(io::Error), +} + +#[allow(clippy::from_over_into)] // Orphan rules +impl Into for SpawnError { + fn into(self) -> io::Error { + match self { + Self::ShuttingDown => { + io::Error::new(io::ErrorKind::Other, "blocking pool shutting down") + } + Self::NoThreads(e) => e, + } + } +} + impl Task { pub(crate) fn new(task: task::UnownedTask, mandatory: Mandatory) -> Task { Task { task, mandatory } @@ -220,7 +241,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> { + pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), SpawnError> { let mut shared = self.inner.shared.lock(); if shared.shutdown { @@ -230,7 +251,7 @@ impl Spawner { task.task.shutdown(); // no need to even push this task; it would never get picked up - return Err(()); + return Err(SpawnError::ShuttingDown); } shared.queue.push_back(task); @@ -261,7 +282,7 @@ impl Spawner { Err(e) => { // The OS refused to spawn the thread and there is no thread // to pick up the task that has just been pushed to the queue. - panic!("OS can't spawn worker thread: {}", e) + return Err(SpawnError::NoThreads(e)); } } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 14101070cd3..961668c0a74 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -341,7 +341,7 @@ impl HandleInner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, _was_spawned) = if cfg!(debug_assertions) + let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt) @@ -349,7 +349,14 @@ impl HandleInner { self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt) }; - join_handle + match spawn_result { + Ok(()) => join_handle, + // Compat: do not panic here, return the join_handle even though it will never resolve + Err(blocking::SpawnError::ShuttingDown) => join_handle, + Err(blocking::SpawnError::NoThreads(e)) => { + panic!("OS can't spawn worker thread: {}", e) + } + } } cfg_fs! { @@ -363,7 +370,7 @@ impl HandleInner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + let (join_handle, spawn_result) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { self.spawn_blocking_inner( Box::new(func), blocking::Mandatory::Mandatory, @@ -379,7 +386,7 @@ impl HandleInner { ) }; - if was_spawned { + if spawn_result.is_ok() { Some(join_handle) } else { None @@ -394,7 +401,7 @@ impl HandleInner { is_mandatory: blocking::Mandatory, name: Option<&str>, rt: &dyn ToHandle, - ) -> (JoinHandle, bool) + ) -> (JoinHandle, Result<(), blocking::SpawnError>) where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -424,7 +431,7 @@ impl HandleInner { let spawned = self .blocking_spawner .spawn(blocking::Task::new(task, is_mandatory), rt); - (handle, spawned.is_ok()) + (handle, spawned) } } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 1c779c82386..dd5d4845692 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -99,7 +99,11 @@ impl<'a> Builder<'a> { /// [runtime handle]: crate::runtime::Handle /// [`Handle::spawn`]: crate::runtime::Handle::spawn #[track_caller] - pub fn spawn_on(&mut self, future: Fut, handle: &Handle) -> io::Result> + pub fn spawn_on( + &mut self, + future: Fut, + handle: &Handle, + ) -> io::Result> where Fut: Future + Send + 'static, Fut::Output: Send + 'static, @@ -138,7 +142,11 @@ impl<'a> Builder<'a> { /// [`LocalSet::spawn_local`]: crate::task::LocalSet::spawn_local /// [`LocalSet`]: crate::task::LocalSet #[track_caller] - pub fn spawn_local_on(self, future: Fut, local_set: &LocalSet) -> io::Result> + pub fn spawn_local_on( + self, + future: Fut, + local_set: &LocalSet, + ) -> io::Result> where Fut: Future + 'static, Fut::Output: 'static, @@ -155,7 +163,10 @@ impl<'a> Builder<'a> { /// See [`task::spawn_blocking`](crate::task::spawn_blocking) /// for more details. #[track_caller] - pub fn spawn_blocking(self, function: Function) -> io::Result> + pub fn spawn_blocking( + self, + function: Function, + ) -> io::Result> where Function: FnOnce() -> Output + Send + 'static, Output: Send + 'static, @@ -180,13 +191,13 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( + let (join_handle, spawn_result) = handle.as_inner().spawn_blocking_inner( function, Mandatory::NonMandatory, self.name, handle, ); - Ok(join_handle) + spawn_result.map(|()| join_handle).map_err(Into::into) } } From cf181e8b1b29802cf9ef89d2777b653c04417351 Mon Sep 17 00:00:00 2001 From: Ivan Petkov Date: Mon, 11 Jul 2022 13:26:51 -0700 Subject: [PATCH 3/3] fix build --- tokio/src/runtime/blocking/pool.rs | 11 +++++------ tokio/src/task/builder.rs | 5 +++-- tokio/src/task/join_set.rs | 15 ++++++++------- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 009b3226e28..0a058496556 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -91,14 +91,13 @@ pub(crate) enum SpawnError { NoThreads(io::Error), } -#[allow(clippy::from_over_into)] // Orphan rules -impl Into for SpawnError { - fn into(self) -> io::Error { - match self { - Self::ShuttingDown => { +impl From for io::Error { + fn from(e: SpawnError) -> Self { + match e { + SpawnError::ShuttingDown => { io::Error::new(io::ErrorKind::Other, "blocking pool shutting down") } - Self::NoThreads(e) => e, + SpawnError::NoThreads(e) => e, } } } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index dd5d4845692..5635c1a430a 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -48,7 +48,7 @@ use std::{future::Future, io}; /// .spawn(async move { /// // Process each socket concurrently. /// process(socket).await -/// }); +/// })?; /// } /// } /// ``` @@ -198,6 +198,7 @@ impl<'a> Builder<'a> { handle, ); - spawn_result.map(|()| join_handle).map_err(Into::into) + spawn_result?; + Ok(join_handle) } } diff --git a/tokio/src/task/join_set.rs b/tokio/src/task/join_set.rs index beacf682f63..c3767c99d86 100644 --- a/tokio/src/task/join_set.rs +++ b/tokio/src/task/join_set.rs @@ -6,7 +6,6 @@ //! details. use std::fmt; use std::future::Future; -use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -102,13 +101,15 @@ impl JoinSet { /// use tokio::task::JoinSet; /// /// #[tokio::main] - /// async fn main() { + /// async fn main() -> std::io::Result<()> { /// let mut set = JoinSet::new(); /// /// // Use the builder to configure a task's name before spawning it. /// set.build_task() /// .name("my_task") - /// .spawn(async { /* ... */ }); + /// .spawn(async { /* ... */ })?; + /// + /// Ok(()) /// } /// ``` #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -378,7 +379,7 @@ impl<'a, T: 'static> Builder<'a, T> { /// /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn(self, future: F) -> io::Result + pub fn spawn(self, future: F) -> std::io::Result where F: Future, F: Send + 'static, @@ -398,7 +399,7 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`AbortHandle`]: crate::task::AbortHandle /// [runtime handle]: crate::runtime::Handle #[track_caller] - pub fn spawn_on(mut self, future: F, handle: &Handle) -> io::Result + pub fn spawn_on(mut self, future: F, handle: &Handle) -> std::io::Result where F: Future, F: Send + 'static, @@ -421,7 +422,7 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`LocalSet`]: crate::task::LocalSet /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn_local(self, future: F) -> io::Result + pub fn spawn_local(self, future: F) -> std::io::Result where F: Future, F: 'static, @@ -439,7 +440,7 @@ impl<'a, T: 'static> Builder<'a, T> { /// [`LocalSet`]: crate::task::LocalSet /// [`AbortHandle`]: crate::task::AbortHandle #[track_caller] - pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> io::Result + pub fn spawn_local_on(self, future: F, local_set: &LocalSet) -> std::io::Result where F: Future, F: 'static,