From e7d74b3119178b9b86f7b547774b6b121de2239a Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Tue, 8 Jun 2021 14:50:40 -0700 Subject: [PATCH] tracing: instrument task wakers (#3836) ## Motivation In support of tokio-rs/console#37, we want to understand when a specific task's waker has been interacted with, such as when it is awoken, or if it's forgotten (not cloned), etc. ## Solution When the tracing feature is enabled, a super trait of Future (InstrumentedFuture) is implemented for Instrumented that allows grabbing the task's ID (well, its span ID), and stores that in the raw task trailer. The waker vtable then emits events and includes that ID. --- tokio/src/future/mod.rs | 11 ++++++++++ tokio/src/future/trace.rs | 11 ++++++++++ tokio/src/runtime/basic_scheduler.rs | 2 +- tokio/src/runtime/blocking/pool.rs | 13 ------------ tokio/src/runtime/handle.rs | 12 +++++------ tokio/src/runtime/spawner.rs | 3 +-- tokio/src/runtime/task/core.rs | 10 ++++++++- tokio/src/runtime/task/harness.rs | 7 ++++++- tokio/src/runtime/task/mod.rs | 2 +- tokio/src/runtime/task/raw.rs | 2 +- tokio/src/runtime/task/waker.rs | 31 +++++++++++++++++++++++++++- tokio/src/runtime/thread_pool/mod.rs | 2 +- 12 files changed, 78 insertions(+), 28 deletions(-) create mode 100644 tokio/src/future/trace.rs diff --git a/tokio/src/future/mod.rs b/tokio/src/future/mod.rs index f7d93c9868c..96483acd7c4 100644 --- a/tokio/src/future/mod.rs +++ b/tokio/src/future/mod.rs @@ -22,3 +22,14 @@ cfg_sync! { mod block_on; pub(crate) use block_on::block_on; } + +cfg_trace! { + mod trace; + pub(crate) use trace::InstrumentedFuture as Future; +} + +cfg_not_trace! { + cfg_rt! { + pub(crate) use std::future::Future; + } +} diff --git a/tokio/src/future/trace.rs b/tokio/src/future/trace.rs new file mode 100644 index 00000000000..28789a604d2 --- /dev/null +++ b/tokio/src/future/trace.rs @@ -0,0 +1,11 @@ +use std::future::Future; + +pub(crate) trait InstrumentedFuture: Future { + fn id(&self) -> Option; +} + +impl InstrumentedFuture for tracing::instrument::Instrumented { + fn id(&self) -> Option { + self.span().id() + } +} diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index cbd8e58bb33..13dfb69739f 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -392,7 +392,7 @@ impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + Send + 'static, + F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { let (task, handle) = task::joinable(future); diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 5c9b8ede1a3..b7d725128d7 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -4,7 +4,6 @@ use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; use crate::runtime::blocking::schedule::NoopSchedule; use crate::runtime::blocking::shutdown; -use crate::runtime::blocking::task::BlockingTask; use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; @@ -86,18 +85,6 @@ where rt.spawn_blocking(func) } -#[allow(dead_code)] -pub(crate) fn try_spawn_blocking(func: F) -> Result<(), ()> -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let rt = context::current().expect(CONTEXT_MISSING_ERROR); - - let (task, _handle) = task::joinable(BlockingTask::new(func)); - rt.blocking_spawner.spawn(task, &rt) -} - // ===== impl BlockingPool ===== impl BlockingPool { diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 4f1b4c5795a..173f0ca61f1 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -174,8 +174,11 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { + let fut = BlockingTask::new(func); + #[cfg(all(tokio_unstable, feature = "tracing"))] - let func = { + let fut = { + use tracing::Instrument; #[cfg(tokio_track_caller)] let location = std::panic::Location::caller(); #[cfg(tokio_track_caller)] @@ -193,12 +196,9 @@ impl Handle { kind = %"blocking", function = %std::any::type_name::(), ); - move || { - let _g = span.enter(); - func() - } + fut.instrument(span) }; - let (task, handle) = task::joinable(BlockingTask::new(func)); + let (task, handle) = task::joinable(fut); let _ = self.blocking_spawner.spawn(task, &self); handle } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index a37c66796b4..fbcde2cfaf5 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,8 +1,7 @@ cfg_rt! { + use crate::future::Future; use crate::runtime::basic_scheduler; use crate::task::JoinHandle; - - use std::future::Future; } cfg_rt_multi_thread! { diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index fb6dafda391..026a6dcb2f1 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -9,13 +9,13 @@ //! Make sure to consult the relevant safety section of each function before //! use. +use crate::future::Future; use crate::loom::cell::UnsafeCell; use crate::runtime::task::raw::{self, Vtable}; use crate::runtime::task::state::State; use crate::runtime::task::{Notified, Schedule, Task}; use crate::util::linked_list; -use std::future::Future; use std::pin::Pin; use std::ptr::NonNull; use std::task::{Context, Poll, Waker}; @@ -71,6 +71,10 @@ pub(crate) struct Header { /// Table of function pointers for executing actions on the task. pub(super) vtable: &'static Vtable, + + /// The tracing ID for this instrumented task. + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) id: Option, } unsafe impl Send for Header {} @@ -93,6 +97,8 @@ impl Cell { /// Allocates a new task cell, containing the header, trailer, and core /// structures. pub(super) fn new(future: T, state: State) -> Box> { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let id = future.id(); Box::new(Cell { header: Header { state, @@ -100,6 +106,8 @@ impl Cell { queue_next: UnsafeCell::new(None), stack_next: UnsafeCell::new(None), vtable: raw::vtable::(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + id, }, core: Core { scheduler: Scheduler { diff --git a/tokio/src/runtime/task/harness.rs b/tokio/src/runtime/task/harness.rs index 7d596e36e1a..47bbcc15ffc 100644 --- a/tokio/src/runtime/task/harness.rs +++ b/tokio/src/runtime/task/harness.rs @@ -1,9 +1,9 @@ +use crate::future::Future; use crate::runtime::task::core::{Cell, Core, CoreStage, Header, Scheduler, Trailer}; use crate::runtime::task::state::Snapshot; use crate::runtime::task::waker::waker_ref; use crate::runtime::task::{JoinError, Notified, Schedule, Task}; -use std::future::Future; use std::mem; use std::panic; use std::ptr::NonNull; @@ -146,6 +146,11 @@ where } } + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(super) fn id(&self) -> Option<&tracing::Id> { + self.header().id.as_ref() + } + /// Forcibly shutdown the task /// /// Attempt to transition to `Running` in order to forcibly shutdown the diff --git a/tokio/src/runtime/task/mod.rs b/tokio/src/runtime/task/mod.rs index 7b49e95abed..58b8c2a15e8 100644 --- a/tokio/src/runtime/task/mod.rs +++ b/tokio/src/runtime/task/mod.rs @@ -26,9 +26,9 @@ cfg_rt_multi_thread! { pub(crate) use self::stack::TransferStack; } +use crate::future::Future; use crate::util::linked_list; -use std::future::Future; use std::marker::PhantomData; use std::ptr::NonNull; use std::{fmt, mem}; diff --git a/tokio/src/runtime/task/raw.rs b/tokio/src/runtime/task/raw.rs index cae56d037da..a9cd4e6f4c7 100644 --- a/tokio/src/runtime/task/raw.rs +++ b/tokio/src/runtime/task/raw.rs @@ -1,6 +1,6 @@ +use crate::future::Future; use crate::runtime::task::{Cell, Harness, Header, Schedule, State}; -use std::future::Future; use std::ptr::NonNull; use std::task::{Poll, Waker}; diff --git a/tokio/src/runtime/task/waker.rs b/tokio/src/runtime/task/waker.rs index 5c2d478fbbc..ef5fd7438bb 100644 --- a/tokio/src/runtime/task/waker.rs +++ b/tokio/src/runtime/task/waker.rs @@ -1,7 +1,7 @@ +use crate::future::Future; use crate::runtime::task::harness::Harness; use crate::runtime::task::{Header, Schedule}; -use std::future::Future; use std::marker::PhantomData; use std::mem::ManuallyDrop; use std::ops; @@ -44,12 +44,38 @@ impl ops::Deref for WakerRef<'_, S> { } } +cfg_trace! { + macro_rules! trace { + ($harness:expr, $op:expr) => { + if let Some(id) = $harness.id() { + tracing::trace!( + target: "tokio::task::waker", + op = %$op, + task.id = id.into_u64(), + ); + } + } + } +} + +cfg_not_trace! { + macro_rules! trace { + ($harness:expr, $op:expr) => { + // noop + let _ = &$harness; + } + } +} + unsafe fn clone_waker(ptr: *const ()) -> RawWaker where T: Future, S: Schedule, { let header = ptr as *const Header; + let ptr = NonNull::new_unchecked(ptr as *mut Header); + let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.clone"); (*header).state.ref_inc(); raw_waker::(header) } @@ -61,6 +87,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.drop"); harness.drop_reference(); } @@ -71,6 +98,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.wake"); harness.wake_by_val(); } @@ -82,6 +110,7 @@ where { let ptr = NonNull::new_unchecked(ptr as *mut Header); let harness = Harness::::from_raw(ptr); + trace!(harness, "waker.wake_by_ref"); harness.wake_by_ref(); } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 47f8ee3454f..421b0786796 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -90,7 +90,7 @@ impl Spawner { /// Spawns a future onto the thread pool pub(crate) fn spawn(&self, future: F) -> JoinHandle where - F: Future + Send + 'static, + F: crate::future::Future + Send + 'static, F::Output: Send + 'static, { let (task, handle) = task::joinable(future);