Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: add Tracing instrumentation to spawned tasks #2655

Merged
merged 14 commits into from
Jul 13, 2020
Merged
4 changes: 3 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ edition = "2018"
# If you copy one of the examples into a new project, you should be using
# [dependencies] instead.
[dev-dependencies]
tokio = { version = "0.2.0", path = "../tokio", features = ["full"] }
tokio = { version = "0.2.0", path = "../tokio", features = ["full", "tracing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] }
bytes = "0.5"
futures = "0.3.0"
Expand Down
36 changes: 29 additions & 7 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,26 @@ use std::task::{Context, Poll};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
// Configure a `tracing` subscriber that logs traces emitted by the chat
// server.
tracing_subscriber::fmt()
// Filter what traces are displayed based on the RUST_LOG environment
// variable.
//
// Traces emitted by the example code will always be displayed. You
// can set `RUST_LOG=tokio=trace` to enable additional traces emitted by
// Tokio itself.
.with_env_filter(EnvFilter::from_default_env().add_directive("chat=info".parse()?))
// Log events when `tracing` spans are created, entered, exited, or
// closed. When Tokio's internal tracing support is enabled (as
// described above), this can be used to track the lifecycle of spawned
// tasks on the Tokio runtime.
.with_span_events(FmtSpan::FULL)
// Set this subscriber as the default, to collect all traces emitted by
// the program.
.init();

// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
Expand All @@ -59,7 +79,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Note that this is the Tokio TcpListener, which is fully async.
let mut listener = TcpListener::bind(&addr).await?;

println!("server running on {}", addr);
tracing::info!("server running on {}", addr);

loop {
// Asynchronously wait for an inbound TcpStream.
Expand All @@ -70,8 +90,9 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Spawn our handler to be run asynchronously.
tokio::spawn(async move {
tracing::debug!("accepted connection");
if let Err(e) = process(state, stream, addr).await {
println!("an error occurred; error = {:?}", e);
tracing::info!("an error occurred; error = {:?}", e);
}
});
}
Expand Down Expand Up @@ -200,7 +221,7 @@ async fn process(
Some(Ok(line)) => line,
// We didn't get a line so we return early here.
_ => {
println!("Failed to get username from {}. Client disconnected.", addr);
tracing::error!("Failed to get username from {}. Client disconnected.", addr);
return Ok(());
}
};
Expand All @@ -212,7 +233,7 @@ async fn process(
{
let mut state = state.lock().await;
let msg = format!("{} has joined the chat", username);
println!("{}", msg);
tracing::info!("{}", msg);
state.broadcast(addr, &msg).await;
}

Expand All @@ -233,9 +254,10 @@ async fn process(
peer.lines.send(&msg).await?;
}
Err(e) => {
println!(
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username, e
username,
e
);
}
}
Expand All @@ -248,7 +270,7 @@ async fn process(
state.peers.remove(&addr);

let msg = format!("{} has left the chat", username);
println!("{}", msg);
tracing::info!("{}", msg);
state.broadcast(addr, &msg).await;
}

Expand Down
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ iovec = { version = "0.1.4", optional = true }
num_cpus = { version = "1.8.0", optional = true }
parking_lot = { version = "0.10.0", optional = true } # Not in full
slab = { version = "0.4.1", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true } # Not in full

[target.'cfg(unix)'.dependencies]
mio-uds = { version = "0.6.5", optional = true }
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,25 @@ macro_rules! cfg_unstable {
}
}

macro_rules! cfg_trace {
($($item:item)*) => {
$(
#[cfg(feature = "tracing")]
#[cfg_attr(docsrs, doc(cfg(feature = "tracing")))]
$item
)*
}
}

macro_rules! cfg_not_trace {
($($item:item)*) => {
$(
#[cfg(not(feature = "tracing"))]
$item
)*
}
}

macro_rules! cfg_coop {
($($item:item)*) => {
$(
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ cfg_blocking! {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(feature = "tracing")]
let f = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
f()
}
};
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
crate::runtime::spawn_blocking(f)
}
}
2 changes: 2 additions & 0 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ cfg_rt_util! {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
Expand Down Expand Up @@ -277,6 +278,7 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
let future = crate::util::trace::task(future, "local");
let (task, handle) = unsafe { task::joinable_local(future) };
self.context.tasks.borrow_mut().queue.push_back(task);
handle
Expand Down
1 change: 1 addition & 0 deletions tokio/src/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ doc_rt_core! {
{
let spawn_handle = runtime::context::spawn_handle()
.expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
let task = crate::util::trace::task(task, "task");
spawn_handle.spawn(task)
}
}
2 changes: 2 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ cfg_rt_threaded! {
pub(crate) use try_lock::TryLock;
}

pub(crate) mod trace;

#[cfg(any(feature = "macros", feature = "stream"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use rand::thread_rng_n;
Expand Down
57 changes: 57 additions & 0 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
cfg_trace! {
cfg_rt_core! {
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;

use tracing::Span;

hawkw marked this conversation as resolved.
Show resolved Hide resolved
pin_project! {
/// A future that has been instrumented with a `tracing` span.
#[derive(Debug, Clone)]
pub(crate) struct Instrumented<T> {
#[pin]
inner: T,
span: Span,
}
}

impl<T: Future> Future for Instrumented<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _enter = this.span.enter();
this.inner.poll(cx)
}
}

impl<T> Instrumented<T> {
pub(crate) fn new(inner: T, span: Span) -> Self {
Self { inner, span }
}
}

#[inline]
pub(crate) fn task<F>(task: F, kind: &'static str) -> Instrumented<F> {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
%kind,
future = %std::any::type_name::<F>(),
);
Instrumented::new(task, span)
}
}
}

cfg_not_trace! {
cfg_rt_core! {
#[inline]
pub(crate) fn task<F>(task: F, _: &'static str) -> F {
// nop
task
}
}
}