From 8fa29cb00a486c0ffc28f295c749573cb58a8967 Mon Sep 17 00:00:00 2001 From: Jacob Rothstein Date: Tue, 29 Jun 2021 10:47:30 -0700 Subject: [PATCH] rt: add tokio::task::Builder (#3881) Adds a builder API for spawning tasks. Initially, this enables the caller to name the spawned task in order to provide better visibility into all tasks in the system. --- tokio/src/runtime/handle.rs | 17 +++++- tokio/src/task/builder.rs | 105 ++++++++++++++++++++++++++++++++++++ tokio/src/task/local.rs | 11 +++- tokio/src/task/mod.rs | 5 ++ tokio/src/task/spawn.rs | 20 ++++--- tokio/src/util/trace.rs | 6 ++- tokio/tests/task_builder.rs | 67 +++++++++++++++++++++++ 7 files changed, 219 insertions(+), 12 deletions(-) create mode 100644 tokio/src/task/builder.rs create mode 100644 tokio/tests/task_builder.rs diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 173f0ca61f1..7dff91448f1 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -145,7 +145,7 @@ impl Handle { F::Output: Send + 'static, { #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task"); + let future = crate::util::trace::task(future, "task", None); self.spawner.spawn(future) } @@ -170,6 +170,15 @@ impl Handle { /// # } #[cfg_attr(tokio_track_caller, track_caller)] pub fn spawn_blocking(&self, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.spawn_blocking_inner(func, None) + } + + #[cfg_attr(tokio_track_caller, track_caller)] + pub(crate) fn spawn_blocking_inner(&self, func: F, name: Option<&str>) -> JoinHandle where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -187,6 +196,7 @@ impl Handle { "task", kind = %"blocking", function = %std::any::type_name::(), + task.name = %name.unwrap_or_default(), spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), ); #[cfg(not(tokio_track_caller))] @@ -194,10 +204,15 @@ impl Handle { target: "tokio::task", "task", kind = %"blocking", + task.name = %name.unwrap_or_default(), function = %std::any::type_name::(), ); fut.instrument(span) }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _ = name; + let (task, handle) = task::joinable(fut); let _ = self.blocking_spawner.spawn(task, &self); handle diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs new file mode 100644 index 00000000000..e46bdefe99a --- /dev/null +++ b/tokio/src/task/builder.rs @@ -0,0 +1,105 @@ +#![allow(unreachable_pub)] +use crate::util::error::CONTEXT_MISSING_ERROR; +use crate::{runtime::context, task::JoinHandle}; +use std::future::Future; + +/// Factory which is used to configure the properties of a new task. +/// +/// Methods can be chained in order to configure it. +/// +/// Currently, there is only one configuration option: +/// +/// - [`name`], which specifies an associated name for +/// the task +/// +/// There are three types of task that can be spawned from a Builder: +/// - [`spawn_local`] for executing futures on the current thread +/// - [`spawn`] for executing [`Send`] futures on the runtime +/// - [`spawn_blocking`] for executing blocking code in the +/// blocking thread pool. +/// +/// ## Example +/// +/// ```no_run +/// use tokio::net::{TcpListener, TcpStream}; +/// +/// use std::io; +/// +/// async fn process(socket: TcpStream) { +/// // ... +/// # drop(socket); +/// } +/// +/// #[tokio::main] +/// async fn main() -> io::Result<()> { +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// +/// loop { +/// let (socket, _) = listener.accept().await?; +/// +/// tokio::task::Builder::new() +/// .name("tcp connection handler") +/// .spawn(async move { +/// // Process each socket concurrently. +/// process(socket).await +/// }); +/// } +/// } +/// ``` +#[derive(Default, Debug)] +pub struct Builder<'a> { + name: Option<&'a str>, +} + +impl<'a> Builder<'a> { + /// Creates a new task builder. + pub fn new() -> Self { + Self::default() + } + + /// Assigns a name to the task which will be spawned. + pub fn name(&self, name: &'a str) -> Self { + Self { name: Some(name) } + } + + /// Spawns a task on the executor. + /// + /// See [`task::spawn`](crate::task::spawn) for + /// more details. + #[cfg_attr(tokio_track_caller, track_caller)] + pub fn spawn(self, future: Fut) -> JoinHandle + where + Fut: Future + Send + 'static, + Fut::Output: Send + 'static, + { + super::spawn::spawn_inner(future, self.name) + } + + /// Spawns a task on the current thread. + /// + /// See [`task::spawn_local`](crate::task::spawn_local) + /// for more details. + #[cfg_attr(tokio_track_caller, track_caller)] + pub fn spawn_local(self, future: Fut) -> JoinHandle + where + Fut: Future + 'static, + Fut::Output: 'static, + { + super::local::spawn_local_inner(future, self.name) + } + + /// Spawns blocking code on the blocking threadpool. + /// + /// See [`task::spawn_blocking`](crate::task::spawn_blocking) + /// for more details. + #[cfg_attr(tokio_track_caller, track_caller)] + pub fn spawn_blocking(self, function: Function) -> JoinHandle + where + Function: FnOnce() -> Output + Send + 'static, + Output: Send + 'static, + { + context::current() + .expect(CONTEXT_MISSING_ERROR) + .spawn_blocking_inner(function, self.name) + } +} diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 64f1ac57c10..49b0ec6c4d4 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -297,7 +297,14 @@ cfg_rt! { F: Future + 'static, F::Output: 'static, { - let future = crate::util::trace::task(future, "local"); + spawn_local_inner(future, None) + } + + pub(super) fn spawn_local_inner(future: F, name: Option<&str>) -> JoinHandle + where F: Future + 'static, + F::Output: 'static + { + let future = crate::util::trace::task(future, "local", name); CURRENT.with(|maybe_cx| { let cx = maybe_cx .expect("`spawn_local` called from outside of a `task::LocalSet`"); @@ -381,7 +388,7 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - let future = crate::util::trace::task(future, "local"); + let future = crate::util::trace::task(future, "local", None); let (task, handle) = unsafe { task::joinable_local(future) }; self.context.tasks.borrow_mut().queue.push_back(task); self.context.shared.waker.wake(); diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 25dab0cedfc..ae4c35c9ce8 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -299,4 +299,9 @@ cfg_rt! { mod unconstrained; pub use unconstrained::{unconstrained, Unconstrained}; + + cfg_trace! { + mod builder; + pub use builder::Builder; + } } diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index d846fb4a816..3c577b82d86 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,6 +1,4 @@ -use crate::runtime; -use crate::task::JoinHandle; -use crate::util::error::CONTEXT_MISSING_ERROR; +use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR}; use std::future::Future; @@ -124,14 +122,22 @@ cfg_rt! { /// error[E0391]: cycle detected when processing `main` /// ``` #[cfg_attr(tokio_track_caller, track_caller)] - pub fn spawn(task: T) -> JoinHandle + pub fn spawn(future: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - let spawn_handle = runtime::context::spawn_handle() - .expect(CONTEXT_MISSING_ERROR); - let task = crate::util::trace::task(task, "task"); + spawn_inner(future, None) + } + + #[cfg_attr(tokio_track_caller, track_caller)] + pub(super) fn spawn_inner(future: T, name: Option<&str>) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR); + let task = crate::util::trace::task(future, "task", name); spawn_handle.spawn(task) } } diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index 96a9db91d1f..c51a5a72bce 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -4,7 +4,7 @@ cfg_trace! { #[inline] #[cfg_attr(tokio_track_caller, track_caller)] - pub(crate) fn task(task: F, kind: &'static str) -> Instrumented { + pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>) -> Instrumented { use tracing::instrument::Instrument; #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); @@ -14,12 +14,14 @@ cfg_trace! { "task", %kind, spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + task.name = %name.unwrap_or_default() ); #[cfg(not(tokio_track_caller))] let span = tracing::trace_span!( target: "tokio::task", "task", %kind, + task.name = %name.unwrap_or_default() ); task.instrument(span) } @@ -29,7 +31,7 @@ cfg_trace! { cfg_not_trace! { cfg_rt! { #[inline] - pub(crate) fn task(task: F, _: &'static str) -> F { + pub(crate) fn task(task: F, _: &'static str, _name: Option<&str>) -> F { // nop task } diff --git a/tokio/tests/task_builder.rs b/tokio/tests/task_builder.rs new file mode 100644 index 00000000000..1499abf19e4 --- /dev/null +++ b/tokio/tests/task_builder.rs @@ -0,0 +1,67 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +mod tests { + use std::rc::Rc; + use tokio::{ + task::{Builder, LocalSet}, + test, + }; + + #[test] + async fn spawn_with_name() { + let result = Builder::new() + .name("name") + .spawn(async { "task executed" }) + .await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_blocking_with_name() { + let result = Builder::new() + .name("name") + .spawn_blocking(|| "task executed") + .await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_local_with_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { + Builder::new() + .name("name") + .spawn_local(async move { unsend_data }) + .await + }) + .await; + + assert_eq!(*result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_without_name() { + let result = Builder::new().spawn(async { "task executed" }).await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_blocking_without_name() { + let result = Builder::new().spawn_blocking(|| "task executed").await; + + assert_eq!(result.unwrap(), "task executed"); + } + + #[test] + async fn spawn_local_without_name() { + let unsend_data = Rc::new("task executed"); + let result = LocalSet::new() + .run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await }) + .await; + + assert_eq!(*result.unwrap(), "task executed"); + } +}