From 3ceb716068f6c3fc79ff1cfee2147c19f9ebd2cc Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 1 Oct 2024 12:18:44 +0200 Subject: [PATCH 1/6] task: add task size to tracing instrumentation In Tokio, the futures for tasks are stored on the stack unless they are explicitly boxed (caveat in debug mode, see below). Having very large futures can be problematic as it can cause a stack overflow. This change adds the size of the future driving an async task or the function driving a blocking task to the tracing instrumentation. This will make this information immediately available in Tokio Console, and will enable a new lint which will warn users if they have large futures (just for async tasks). --- tokio/src/runtime/blocking/pool.rs | 1 + tokio/src/util/trace.rs | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f8466a19bd9..7cf3919ac41 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -375,6 +375,7 @@ impl Spawner { task.name = %name.unwrap_or_default(), task.id = id.as_u64(), "fn" = %std::any::type_name::(), + size.bytes = std::mem::size_of::(), loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index e1827686ca9..b75fd4cd5ac 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -5,6 +5,7 @@ cfg_trace! { task::{Context, Poll}, }; use pin_project_lite::pin_project; + use std::mem; use std::future::Future; pub(crate) use tracing::instrument::Instrumented; @@ -12,7 +13,7 @@ cfg_trace! { #[track_caller] pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented { #[track_caller] - fn get_span(kind: &'static str, name: Option<&str>, id: u64) -> tracing::Span { + fn get_span(kind: &'static str, name: Option<&str>, id: u64, task_size: usize) -> tracing::Span { let location = std::panic::Location::caller(); tracing::trace_span!( target: "tokio::task", @@ -21,13 +22,14 @@ cfg_trace! { %kind, task.name = %name.unwrap_or_default(), task.id = id, + size.bytes = task_size, loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), ) } use tracing::instrument::Instrument; - let span = get_span(kind, name, id); + let span = get_span(kind, name, id, mem::size_of::()); task.instrument(span) } From 648d906008c2a14f8b359178dafbf1adc8d7c9c0 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 1 Oct 2024 17:34:06 +0200 Subject: [PATCH 2/6] record original future/function size when auto-boxing --- tokio/src/runtime/blocking/pool.rs | 22 ++++++++------ tokio/src/runtime/handle.rs | 25 ++++++++------- tokio/src/runtime/runtime.rs | 20 +++++++----- tokio/src/task/builder.rs | 41 ++++++++++++++++--------- tokio/src/task/local.rs | 37 +++++++++++----------- tokio/src/task/spawn.rs | 14 ++++----- tokio/src/util/trace.rs | 49 +++++++++++++++++++++++++++--- 7 files changed, 136 insertions(+), 72 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 7cf3919ac41..7096de7c8ad 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; +use crate::util::trace::SpawnMeta; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -299,10 +300,11 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt) + let fn_size = std::mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { + self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt) } else { - self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt) + self.spawn_blocking_inner(func, Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt) }; match spawn_result { @@ -326,18 +328,19 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { + let fn_size = std::mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { self.spawn_blocking_inner( Box::new(func), Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) } else { self.spawn_blocking_inner( func, Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) }; @@ -355,7 +358,7 @@ impl Spawner { &self, func: F, is_mandatory: Mandatory, - name: Option<&str>, + spawn_meta: SpawnMeta<'_>, rt: &Handle, ) -> (JoinHandle, Result<(), SpawnError>) where @@ -372,9 +375,10 @@ impl Spawner { target: "tokio::task::blocking", "runtime.spawn", kind = %"blocking", - task.name = %name.unwrap_or_default(), + task.name = %spawn_meta.name.unwrap_or_default(), task.id = id.as_u64(), "fn" = %std::any::type_name::(), + original_size.bytes = spawn_meta.original_size, size.bytes = std::mem::size_of::(), loc.file = location.file(), loc.line = location.line(), @@ -384,7 +388,7 @@ impl Spawner { }; #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = name; + let _ = spawn_meta; let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 7e3cd1504e5..9026e8773a0 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -18,10 +18,11 @@ pub struct Handle { use crate::runtime::task::JoinHandle; use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; +use crate::util::trace::SpawnMeta; use std::future::Future; use std::marker::PhantomData; -use std::{error, fmt}; +use std::{error, fmt, mem}; /// Runtime context guard. /// @@ -189,10 +190,11 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.spawn_named(future, None) + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -296,15 +298,16 @@ impl Handle { /// [`tokio::time`]: crate::time #[track_caller] pub fn block_on(&self, future: F) -> F::Output { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner(&self, future: F) -> F::Output { + fn block_on_inner(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -316,7 +319,7 @@ impl Handle { #[cfg(all(tokio_unstable, feature = "tracing"))] let future = - crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); + crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64()); // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. @@ -326,7 +329,7 @@ impl Handle { } #[track_caller] - pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle + pub(crate) fn spawn_named(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, @@ -341,7 +344,7 @@ impl Handle { ))] let future = super::task::trace::Trace::root(future); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", _name, id.as_u64()); + let future = crate::util::trace::task(future, "task", _meta, id.as_u64()); self.inner.spawn(future, id) } diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 74061d24ce8..24c772ff9fe 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -3,8 +3,10 @@ use crate::runtime::blocking::BlockingPool; use crate::runtime::scheduler::CurrentThread; use crate::runtime::{context, EnterGuard, Handle}; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; +use std::mem; use std::time::Duration; cfg_rt_multi_thread! { @@ -241,10 +243,11 @@ impl Runtime { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.handle.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.handle.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.handle.spawn_named(future, None) + self.handle.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -329,15 +332,16 @@ impl Runtime { /// [handle]: fn@Handle::block_on #[track_caller] pub fn block_on(&self, future: F) -> F::Output { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner(&self, future: F) -> F::Output { + fn block_on_inner(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -351,7 +355,7 @@ impl Runtime { let future = crate::util::trace::task( future, "block_on", - None, + _meta, crate::runtime::task::Id::next().as_u64(), ); diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index c98849b2746..2cff1c6bcff 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,9 +1,9 @@ #![allow(unreachable_pub)] use crate::{ runtime::{Handle, BOX_FUTURE_THRESHOLD}, - task::{JoinHandle, LocalSet}, + task::{JoinHandle, LocalSet}, util::trace::SpawnMeta, }; -use std::{future::Future, io}; +use std::{future::Future, io, mem}; /// Factory which is used to configure the properties of a new task. /// @@ -88,10 +88,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - super::spawn::spawn_inner(Box::pin(future), self.name) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::spawn::spawn_inner(future, self.name) + super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -108,10 +109,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - handle.spawn_named(Box::pin(future), self.name) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - handle.spawn_named(future, self.name) + handle.spawn_named(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -135,10 +137,12 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - super::local::spawn_local_inner(Box::pin(future), self.name) + + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::local::spawn_local_inner(future, self.name) + super::local::spawn_local_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -159,7 +163,13 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(local_set.spawn_named(future, self.name)) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + local_set.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) + } else { + local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size)) + }) + } /// Spawns blocking code on the blocking threadpool. @@ -200,19 +210,20 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD + let fn_size = mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { handle.inner.blocking_spawner().spawn_blocking_inner( Box::new(function), Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) } else { handle.inner.blocking_spawner().spawn_blocking_inner( function, Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) }; diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 90d4d3612e8..c264be095af 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -6,11 +6,12 @@ use crate::runtime; use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks}; use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD}; use crate::sync::AtomicWaker; +use crate::util::trace::SpawnMeta; use crate::util::RcCell; use std::cell::Cell; use std::collections::VecDeque; -use std::fmt; +use std::{fmt, mem}; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -367,22 +368,23 @@ cfg_rt! { F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - spawn_local_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_local_inner(future, None) + spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_local_inner(future: F, name: Option<&str>) -> JoinHandle + pub(super) fn spawn_local_inner(future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static { match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), - Some(cx) => cx.spawn(future, name) + Some(cx) => cx.spawn(future, meta) } } } @@ -521,7 +523,12 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - self.spawn_named(future, None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + } else { + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + } } /// Runs a future to completion on the provided runtime, driving any local @@ -643,26 +650,22 @@ impl LocalSet { pub(in crate::task) fn spawn_named( &self, future: F, - name: Option<&str>, + meta: SpawnMeta<'_>, ) -> JoinHandle where F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_named_inner(Box::pin(future), name) - } else { - self.spawn_named_inner(future, name) - } + self.spawn_named_inner(future, meta) } #[track_caller] - fn spawn_named_inner(&self, future: F, name: Option<&str>) -> JoinHandle + fn spawn_named_inner(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static, { - let handle = self.context.spawn(future, name); + let handle = self.context.spawn(future, meta); // Because a task was spawned from *outside* the `LocalSet`, wake the // `LocalSet` future to execute the new task, if it hasn't been woken. @@ -949,13 +952,13 @@ impl Drop for LocalSet { impl Context { #[track_caller] - fn spawn(&self, future: F, name: Option<&str>) -> JoinHandle + fn spawn(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static, { let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", name, id.as_u64()); + let future = crate::util::trace::task(future, "local", meta, id.as_u64()); // Safety: called from the thread that owns the `LocalSet` let (handle, notified) = { diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 4208ac6e0c6..7c748226121 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,5 +1,6 @@ use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; @@ -167,17 +168,16 @@ cfg_rt! { F: Future + Send + 'static, F::Output: Send + 'static, { - // preventing stack overflows on debug mode, by quickly sending the - // task to the heap. - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - spawn_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_inner(future, None) + spawn_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_inner(future: T, name: Option<&str>) -> JoinHandle + pub(super) fn spawn_inner(future: T, meta: SpawnMeta<'_>) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, @@ -197,7 +197,7 @@ cfg_rt! { ))] let future = task::trace::Trace::root(future); let id = task::Id::next(); - let task = crate::util::trace::task(future, "task", name, id.as_u64()); + let task = crate::util::trace::task(future, "task", meta, id.as_u64()); match context::with_current(|handle| handle.spawn(task, id)) { Ok(join_handle) => join_handle, diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index b75fd4cd5ac..b94233385a1 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,3 +1,41 @@ +use std::marker::PhantomData; + +pub(crate) struct SpawnMeta<'a> { + /// The name of the task + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) name: Option<&'a str>, + /// The original size of the future or function being spawned + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) original_size: usize, + _pd: PhantomData<&'a ()>, +} + +impl<'a> SpawnMeta<'a> { + /// Create new spawn meta with a name and original size (before possible auto-boxing) + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { + Self { + name, + original_size, + _pd: PhantomData, + } + } + + /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) + pub(crate) fn new_unnamed(original_size: usize) -> Self { + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _original_size = original_size; + + Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + name: None, + #[cfg(all(tokio_unstable, feature = "tracing"))] + original_size, + _pd: PhantomData, + } + } +} + cfg_trace! { cfg_rt! { use core::{ @@ -11,17 +49,18 @@ cfg_trace! { #[inline] #[track_caller] - pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented { + pub(crate) fn task(task: F, kind: &'static str, meta: SpawnMeta<'_>, id: u64) -> Instrumented { #[track_caller] - fn get_span(kind: &'static str, name: Option<&str>, id: u64, task_size: usize) -> tracing::Span { + fn get_span(kind: &'static str, spawn_meta: SpawnMeta<'_>, id: u64, task_size: usize) -> tracing::Span { let location = std::panic::Location::caller(); tracing::trace_span!( target: "tokio::task", parent: None, "runtime.spawn", %kind, - task.name = %name.unwrap_or_default(), + task.name = %spawn_meta.name.unwrap_or_default(), task.id = id, + original_size.bytes = spawn_meta.original_size, size.bytes = task_size, loc.file = location.file(), loc.line = location.line(), @@ -29,7 +68,7 @@ cfg_trace! { ) } use tracing::instrument::Instrument; - let span = get_span(kind, name, id, mem::size_of::()); + let span = get_span(kind, meta, id, mem::size_of::()); task.instrument(span) } @@ -99,7 +138,7 @@ cfg_time! { cfg_not_trace! { cfg_rt! { #[inline] - pub(crate) fn task(task: F, _: &'static str, _name: Option<&str>, _: u64) -> F { + pub(crate) fn task(task: F, _kind: &'static str, _meta: SpawnMeta<'_>, _id: u64) -> F { // nop task } From 469a9271c0b91d5a2194218b61cc9b84ec24a62c Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 2 Oct 2024 14:13:39 +0200 Subject: [PATCH 3/6] add tests and format --- tokio/src/runtime/blocking/pool.rs | 14 +++- tokio/src/runtime/runtime.rs | 6 +- tokio/src/task/builder.rs | 8 +- tokio/src/task/local.rs | 3 +- .../tracing-instrumentation/tests/task.rs | 79 +++++++++++++++++++ 5 files changed, 100 insertions(+), 10 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 7096de7c8ad..b27f0b58ab7 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -302,9 +302,19 @@ impl Spawner { { let fn_size = std::mem::size_of::(); let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { - self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt) + self.spawn_blocking_inner( + Box::new(func), + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) } else { - self.spawn_blocking_inner(func, Mandatory::NonMandatory, SpawnMeta::new_unnamed(fn_size), rt) + self.spawn_blocking_inner( + func, + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) }; match spawn_result { diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 24c772ff9fe..4b22ae9747c 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -245,9 +245,11 @@ impl Runtime { { let fut_size = mem::size_of::(); if fut_size > BOX_FUTURE_THRESHOLD { - self.handle.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + self.handle + .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.handle.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + self.handle + .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 2cff1c6bcff..6053352a01c 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -1,7 +1,8 @@ #![allow(unreachable_pub)] use crate::{ runtime::{Handle, BOX_FUTURE_THRESHOLD}, - task::{JoinHandle, LocalSet}, util::trace::SpawnMeta, + task::{JoinHandle, LocalSet}, + util::trace::SpawnMeta, }; use std::{future::Future, io, mem}; @@ -137,7 +138,6 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - let fut_size = mem::size_of::(); Ok(if fut_size > BOX_FUTURE_THRESHOLD { super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) @@ -169,7 +169,6 @@ impl<'a> Builder<'a> { } else { local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size)) }) - } /// Spawns blocking code on the blocking threadpool. @@ -211,8 +210,7 @@ impl<'a> Builder<'a> { { use crate::runtime::Mandatory; let fn_size = mem::size_of::(); - let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD - { + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { handle.inner.blocking_spawner().spawn_blocking_inner( Box::new(function), Mandatory::NonMandatory, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index c264be095af..d5341937893 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -11,9 +11,10 @@ use crate::util::RcCell; use std::cell::Cell; use std::collections::VecDeque; -use std::{fmt, mem}; +use std::fmt; use std::future::Future; use std::marker::PhantomData; +use std::mem; use std::pin::Pin; use std::rc::Rc; use std::task::Poll; diff --git a/tokio/tests/tracing-instrumentation/tests/task.rs b/tokio/tests/tracing-instrumentation/tests/task.rs index 7bdb078e32c..e67349cb261 100644 --- a/tokio/tests/tracing-instrumentation/tests/task.rs +++ b/tokio/tests/tracing-instrumentation/tests/task.rs @@ -3,6 +3,8 @@ //! These tests ensure that the instrumentation for task spawning and task //! lifecycles is correct. +use std::{mem, time::Duration}; + use tokio::task; use tracing_mock::{expect, span::NewSpan, subscriber}; @@ -93,6 +95,83 @@ async fn task_builder_loc_file_recorded() { handle.assert_finished(); } +#[tokio::test] +async fn task_spawn_sizes_recorded() { + let future = futures::future::ready(()); + let size = mem::size_of_val(&future) as u64; + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + .with_field( + expect::field("size.bytes") + .with_value(&size) + .and(expect::field("original_size.bytes").with_value(&size)), + ); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + +#[tokio::test] +async fn task_big_spawn_sizes_recorded() { + let future = { + async fn big() { + let mut a = [0_u8; N]; + for idx in 0..N { + a[idx] = (idx % 256) as u8; + } + tokio::time::sleep(Duration::from_millis(10)).await; + for idx in 0..N { + assert_eq!(a[idx], (idx % 256) as u8); + } + } + + // This is larger than the release auto-boxing threshold + big::<20_000>() + }; + + fn boxed_size(_: &T) -> usize { + mem::size_of::>() + } + let size = mem::size_of_val(&future) as u64; + let boxed_size = boxed_size(&future); + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + .with_field( + expect::field("size.bytes") + .with_value(&boxed_size) + .and(expect::field("original_size.bytes").with_value(&size)), + ); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + /// Expect a task with name /// /// This is a convenience function to create the expectation for a new task From d926d823c4e6d3c4785aa94815db617f5ad551d4 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 2 Oct 2024 16:32:48 +0200 Subject: [PATCH 4/6] fix SpawnMeta feature visibility --- tokio/src/util/trace.rs | 83 ++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 42 deletions(-) diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index b94233385a1..a272545b353 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,43 +1,43 @@ -use std::marker::PhantomData; +cfg_rt! { + use std::marker::PhantomData; -pub(crate) struct SpawnMeta<'a> { - /// The name of the task - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) name: Option<&'a str>, - /// The original size of the future or function being spawned - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) original_size: usize, - _pd: PhantomData<&'a ()>, -} + pub(crate) struct SpawnMeta<'a> { + /// The name of the task + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) name: Option<&'a str>, + /// The original size of the future or function being spawned + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) original_size: usize, + _pd: PhantomData<&'a ()>, + } -impl<'a> SpawnMeta<'a> { - /// Create new spawn meta with a name and original size (before possible auto-boxing) - #[cfg(all(tokio_unstable, feature = "tracing"))] - pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { - Self { - name, - original_size, - _pd: PhantomData, + impl<'a> SpawnMeta<'a> { + /// Create new spawn meta with a name and original size (before possible auto-boxing) + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { + Self { + name, + original_size, + _pd: PhantomData, + } } - } - /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) - pub(crate) fn new_unnamed(original_size: usize) -> Self { - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _original_size = original_size; + /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) + pub(crate) fn new_unnamed(original_size: usize) -> Self { + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _original_size = original_size; - Self { - #[cfg(all(tokio_unstable, feature = "tracing"))] - name: None, - #[cfg(all(tokio_unstable, feature = "tracing"))] - original_size, - _pd: PhantomData, + Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + name: None, + #[cfg(all(tokio_unstable, feature = "tracing"))] + original_size, + _pd: PhantomData, + } } } -} -cfg_trace! { - cfg_rt! { + cfg_trace! { use core::{ pin::Pin, task::{Context, Poll}, @@ -124,7 +124,16 @@ cfg_trace! { } } } + + cfg_not_trace! { + #[inline] + pub(crate) fn task(task: F, _kind: &'static str, _meta: SpawnMeta<'_>, _id: u64) -> F { + // nop + task + } + } } + cfg_time! { #[track_caller] pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { @@ -134,13 +143,3 @@ cfg_time! { None } } - -cfg_not_trace! { - cfg_rt! { - #[inline] - pub(crate) fn task(task: F, _kind: &'static str, _meta: SpawnMeta<'_>, _id: u64) -> F { - // nop - task - } - } -} From d358a3b8144e56a878aac7491a9471bd60a533c7 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 2 Oct 2024 17:42:47 +0200 Subject: [PATCH 5/6] made original_size.bytes field optional No point setting it if it's the same as the size.bytes field. --- tokio/src/runtime/blocking/pool.rs | 27 ++---------------- tokio/src/util/trace.rs | 44 +++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index b27f0b58ab7..7eec91d23d9 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,7 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; -use crate::util::trace::SpawnMeta; +use crate::util::trace::{blocking_task, SpawnMeta}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -375,30 +375,9 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let fut = BlockingTask::new(func); let id = task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let fut = { - use tracing::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task::blocking", - "runtime.spawn", - kind = %"blocking", - task.name = %spawn_meta.name.unwrap_or_default(), - task.id = id.as_u64(), - "fn" = %std::any::type_name::(), - original_size.bytes = spawn_meta.original_size, - size.bytes = std::mem::size_of::(), - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), - ); - fut.instrument(span) - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = spawn_meta; + let fut = + blocking_task::>(BlockingTask::new(func), spawn_meta, id.as_u64()); let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index a272545b353..97006df474e 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -45,6 +45,7 @@ cfg_rt! { use pin_project_lite::pin_project; use std::mem; use std::future::Future; + use tracing::instrument::Instrument; pub(crate) use tracing::instrument::Instrumented; #[inline] @@ -53,6 +54,11 @@ cfg_rt! { #[track_caller] fn get_span(kind: &'static str, spawn_meta: SpawnMeta<'_>, id: u64, task_size: usize) -> tracing::Span { let location = std::panic::Location::caller(); + let original_size = if spawn_meta.original_size != task_size { + Some(spawn_meta.original_size) + } else { + None + }; tracing::trace_span!( target: "tokio::task", parent: None, @@ -60,7 +66,7 @@ cfg_rt! { %kind, task.name = %spawn_meta.name.unwrap_or_default(), task.id = id, - original_size.bytes = spawn_meta.original_size, + original_size.bytes = original_size, size.bytes = task_size, loc.file = location.file(), loc.line = location.line(), @@ -72,6 +78,35 @@ cfg_rt! { task.instrument(span) } + #[inline] + #[track_caller] + pub(crate) fn blocking_task(task: Fut, spawn_meta: SpawnMeta<'_>, id: u64) -> Instrumented { + let location = std::panic::Location::caller(); + + let fn_size = mem::size_of::(); + let original_size = if spawn_meta.original_size != fn_size { + Some(spawn_meta.original_size) + } else { + None + }; + + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %spawn_meta.name.unwrap_or_default(), + task.id = id, + "fn" = %std::any::type_name::(), + original_size.bytes = original_size, + size.bytes = fn_size, + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + task.instrument(span) + + } + pub(crate) fn async_op(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp where P: FnOnce() -> F { resource_span.in_scope(|| { @@ -131,6 +166,13 @@ cfg_rt! { // nop task } + + #[inline] + pub(crate) fn blocking_task(task: Fut, _spawn_meta: SpawnMeta<'_>, _id: u64) -> Fut { + let _ = PhantomData::<&Fn>; + // nop + task + } } } From 1719b4098a253f02fa0a29597319f5f15c978d35 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 2 Oct 2024 17:54:19 +0200 Subject: [PATCH 6/6] bump minimum tracing from 0.1.25 to 0.1.29 This is needed to get have Value implemented for Option. Also fix tracing instrumentation tests now that original_size.bytes won't be recorded if it's the same as size.bytes. --- tokio-util/Cargo.toml | 2 +- tokio/Cargo.toml | 2 +- .../tests/tracing-instrumentation/tests/task.rs | 16 +++++++--------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index a73eec8799a..12f70be862e 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -42,7 +42,7 @@ futures-io = { version = "0.3.0", optional = true } futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", default-features = false, optional = true } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4c5f7d46acb..b07f50150b7 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -101,7 +101,7 @@ socket2 = { version = "0.5.5", optional = true, features = [ "all" ] } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. diff --git a/tokio/tests/tracing-instrumentation/tests/task.rs b/tokio/tests/tracing-instrumentation/tests/task.rs index e67349cb261..fb215ca7ce0 100644 --- a/tokio/tests/tracing-instrumentation/tests/task.rs +++ b/tokio/tests/tracing-instrumentation/tests/task.rs @@ -103,11 +103,9 @@ async fn task_spawn_sizes_recorded() { let task_span = expect::span() .named("runtime.spawn") .with_target("tokio::task") - .with_field( - expect::field("size.bytes") - .with_value(&size) - .and(expect::field("original_size.bytes").with_value(&size)), - ); + // TODO(hds): check that original_size.bytes is NOT recorded when this can be done in + // tracing-mock without listing every other field. + .with_field(expect::field("size.bytes").with_value(&size)); let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); @@ -129,12 +127,12 @@ async fn task_big_spawn_sizes_recorded() { let future = { async fn big() { let mut a = [0_u8; N]; - for idx in 0..N { - a[idx] = (idx % 256) as u8; + for (idx, item) in a.iter_mut().enumerate() { + *item = (idx % 256) as u8; } tokio::time::sleep(Duration::from_millis(10)).await; - for idx in 0..N { - assert_eq!(a[idx], (idx % 256) as u8); + for (idx, item) in a.iter_mut().enumerate() { + assert_eq!(*item, (idx % 256) as u8); } }