Skip to content

Commit

Permalink
rt: add tokio::task::Builder (#3881)
Browse files Browse the repository at this point in the history
Adds a builder API for spawning tasks. Initially, this enables the caller to name the spawned
task in order to provide better visibility into all tasks in the system.
  • Loading branch information
jbr authored Jun 29, 2021
1 parent b521cc2 commit 8fa29cb
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 12 deletions.
17 changes: 16 additions & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Handle {
F::Output: Send + 'static,
{
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task");
let future = crate::util::trace::task(future, "task", None);
self.spawner.spawn(future)
}

Expand All @@ -170,6 +170,15 @@ impl Handle {
/// # }
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
self.spawn_blocking_inner(func, None)
}

#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
Expand All @@ -187,17 +196,23 @@ impl Handle {
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
task.name = %name.unwrap_or_default(),
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
function = %std::any::type_name::<F>(),
);
fut.instrument(span)
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::joinable(fut);
let _ = self.blocking_spawner.spawn(task, &self);
handle
Expand Down
105 changes: 105 additions & 0 deletions tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#![allow(unreachable_pub)]
use crate::util::error::CONTEXT_MISSING_ERROR;
use crate::{runtime::context, task::JoinHandle};
use std::future::Future;

/// Factory which is used to configure the properties of a new task.
///
/// Methods can be chained in order to configure it.
///
/// Currently, there is only one configuration option:
///
/// - [`name`], which specifies an associated name for
/// the task
///
/// There are three types of task that can be spawned from a Builder:
/// - [`spawn_local`] for executing futures on the current thread
/// - [`spawn`] for executing [`Send`] futures on the runtime
/// - [`spawn_blocking`] for executing blocking code in the
/// blocking thread pool.
///
/// ## Example
///
/// ```no_run
/// use tokio::net::{TcpListener, TcpStream};
///
/// use std::io;
///
/// async fn process(socket: TcpStream) {
/// // ...
/// # drop(socket);
/// }
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let listener = TcpListener::bind("127.0.0.1:8080").await?;
///
/// loop {
/// let (socket, _) = listener.accept().await?;
///
/// tokio::task::Builder::new()
/// .name("tcp connection handler")
/// .spawn(async move {
/// // Process each socket concurrently.
/// process(socket).await
/// });
/// }
/// }
/// ```
#[derive(Default, Debug)]
pub struct Builder<'a> {
name: Option<&'a str>,
}

impl<'a> Builder<'a> {
/// Creates a new task builder.
pub fn new() -> Self {
Self::default()
}

/// Assigns a name to the task which will be spawned.
pub fn name(&self, name: &'a str) -> Self {
Self { name: Some(name) }
}

/// Spawns a task on the executor.
///
/// See [`task::spawn`](crate::task::spawn) for
/// more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
super::spawn::spawn_inner(future, self.name)
}

/// Spawns a task on the current thread.
///
/// See [`task::spawn_local`](crate::task::spawn_local)
/// for more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_local<Fut>(self, future: Fut) -> JoinHandle<Fut::Output>
where
Fut: Future + 'static,
Fut::Output: 'static,
{
super::local::spawn_local_inner(future, self.name)
}

/// Spawns blocking code on the blocking threadpool.
///
/// See [`task::spawn_blocking`](crate::task::spawn_blocking)
/// for more details.
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn_blocking<Function, Output>(self, function: Function) -> JoinHandle<Output>
where
Function: FnOnce() -> Output + Send + 'static,
Output: Send + 'static,
{
context::current()
.expect(CONTEXT_MISSING_ERROR)
.spawn_blocking_inner(function, self.name)
}
}
11 changes: 9 additions & 2 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,14 @@ cfg_rt! {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
spawn_local_inner(future, None)
}

pub(super) fn spawn_local_inner<F>(future: F, name: Option<&str>) -> JoinHandle<F::Output>
where F: Future + 'static,
F::Output: 'static
{
let future = crate::util::trace::task(future, "local", name);
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
Expand Down Expand Up @@ -381,7 +388,7 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
let future = crate::util::trace::task(future, "local", None);
let (task, handle) = unsafe { task::joinable_local(future) };
self.context.tasks.borrow_mut().queue.push_back(task);
self.context.shared.waker.wake();
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,9 @@ cfg_rt! {

mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};

cfg_trace! {
mod builder;
pub use builder::Builder;
}
}
20 changes: 13 additions & 7 deletions tokio/src/task/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use crate::runtime;
use crate::task::JoinHandle;
use crate::util::error::CONTEXT_MISSING_ERROR;
use crate::{task::JoinHandle, util::error::CONTEXT_MISSING_ERROR};

use std::future::Future;

Expand Down Expand Up @@ -124,14 +122,22 @@ cfg_rt! {
/// error[E0391]: cycle detected when processing `main`
/// ```
#[cfg_attr(tokio_track_caller, track_caller)]
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawn_handle = runtime::context::spawn_handle()
.expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(task, "task");
spawn_inner(future, None)
}

#[cfg_attr(tokio_track_caller, track_caller)]
pub(super) fn spawn_inner<T>(future: T, name: Option<&str>) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
let spawn_handle = crate::runtime::context::spawn_handle().expect(CONTEXT_MISSING_ERROR);
let task = crate::util::trace::task(future, "task", name);
spawn_handle.spawn(task)
}
}
6 changes: 4 additions & 2 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ cfg_trace! {

#[inline]
#[cfg_attr(tokio_track_caller, track_caller)]
pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> {
pub(crate) fn task<F>(task: F, kind: &'static str, name: Option<&str>) -> Instrumented<F> {
use tracing::instrument::Instrument;
#[cfg(tokio_track_caller)]
let location = std::panic::Location::caller();
Expand All @@ -14,12 +14,14 @@ cfg_trace! {
"task",
%kind,
spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
task.name = %name.unwrap_or_default()
);
#[cfg(not(tokio_track_caller))]
let span = tracing::trace_span!(
target: "tokio::task",
"task",
%kind,
task.name = %name.unwrap_or_default()
);
task.instrument(span)
}
Expand All @@ -29,7 +31,7 @@ cfg_trace! {
cfg_not_trace! {
cfg_rt! {
#[inline]
pub(crate) fn task<F>(task: F, _: &'static str) -> F {
pub(crate) fn task<F>(task: F, _: &'static str, _name: Option<&str>) -> F {
// nop
task
}
Expand Down
67 changes: 67 additions & 0 deletions tokio/tests/task_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
mod tests {
use std::rc::Rc;
use tokio::{
task::{Builder, LocalSet},
test,
};

#[test]
async fn spawn_with_name() {
let result = Builder::new()
.name("name")
.spawn(async { "task executed" })
.await;

assert_eq!(result.unwrap(), "task executed");
}

#[test]
async fn spawn_blocking_with_name() {
let result = Builder::new()
.name("name")
.spawn_blocking(|| "task executed")
.await;

assert_eq!(result.unwrap(), "task executed");
}

#[test]
async fn spawn_local_with_name() {
let unsend_data = Rc::new("task executed");
let result = LocalSet::new()
.run_until(async move {
Builder::new()
.name("name")
.spawn_local(async move { unsend_data })
.await
})
.await;

assert_eq!(*result.unwrap(), "task executed");
}

#[test]
async fn spawn_without_name() {
let result = Builder::new().spawn(async { "task executed" }).await;

assert_eq!(result.unwrap(), "task executed");
}

#[test]
async fn spawn_blocking_without_name() {
let result = Builder::new().spawn_blocking(|| "task executed").await;

assert_eq!(result.unwrap(), "task executed");
}

#[test]
async fn spawn_local_without_name() {
let unsend_data = Rc::new("task executed");
let result = LocalSet::new()
.run_until(async move { Builder::new().spawn_local(async move { unsend_data }).await })
.await;

assert_eq!(*result.unwrap(), "task executed");
}
}

1 comment on commit 8fa29cb

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 8fa29cb Previous: b521cc2 Ratio
send_large 57841 ns/iter (± 6266) 25250 ns/iter (± 273) 2.29

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.