Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: add Tracing instrumentation to spawned tasks #2655

Merged
merged 14 commits into from
Jul 13, 2020
Merged
4 changes: 3 additions & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ edition = "2018"
# If you copy one of the examples into a new project, you should be using
# [dependencies] instead.
[dev-dependencies]
tokio = { version = "0.2.0", path = "../tokio", features = ["full"] }
tokio = { version = "0.2.0", path = "../tokio", features = ["full", "tracing"] }
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
tokio-util = { version = "0.3.0", path = "../tokio-util", features = ["full"] }
bytes = "0.5"
futures = "0.3.0"
Expand Down
23 changes: 16 additions & 7 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ use std::task::{Context, Poll};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.with_target(false)
.with_timer(())
hawkw marked this conversation as resolved.
Show resolved Hide resolved
.init();

// Create the shared state. This is how all the peers communicate.
//
// The server task will hold a handle to this. For every new client, the
Expand All @@ -59,7 +66,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Note that this is the Tokio TcpListener, which is fully async.
let mut listener = TcpListener::bind(&addr).await?;

println!("server running on {}", addr);
tracing::info!("server running on {}", addr);

loop {
// Asynchronously wait for an inbound TcpStream.
Expand All @@ -70,8 +77,9 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Spawn our handler to be run asynchronously.
tokio::spawn(async move {
tracing::debug!("accepted connection");
if let Err(e) = process(state, stream, addr).await {
println!("an error occurred; error = {:?}", e);
tracing::info!("an error occurred; error = {:?}", e);
}
});
}
Expand Down Expand Up @@ -200,7 +208,7 @@ async fn process(
Some(Ok(line)) => line,
// We didn't get a line so we return early here.
_ => {
println!("Failed to get username from {}. Client disconnected.", addr);
tracing::error!("Failed to get username from {}. Client disconnected.", addr);
return Ok(());
}
};
Expand All @@ -212,7 +220,7 @@ async fn process(
{
let mut state = state.lock().await;
let msg = format!("{} has joined the chat", username);
println!("{}", msg);
tracing::info!("{}", msg);
state.broadcast(addr, &msg).await;
}

Expand All @@ -233,9 +241,10 @@ async fn process(
peer.lines.send(&msg).await?;
}
Err(e) => {
println!(
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username, e
username,
e
);
}
}
Expand All @@ -248,7 +257,7 @@ async fn process(
state.peers.remove(&addr);

let msg = format!("{} has left the chat", username);
println!("{}", msg);
tracing::info!("{}", msg);
state.broadcast(addr, &msg).await;
}

Expand Down
3 changes: 3 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ version = "0.3.8"
default-features = false
optional = true

[target.'cfg(tokio_unstable)'.dependencies]
hawkw marked this conversation as resolved.
Show resolved Hide resolved
tracing = { version = "0.1.16", default-features = false, features = ["std"], optional = true }

[dev-dependencies]
tokio-test = { version = "0.2.0", path = "../tokio-test" }
futures = { version = "0.3.0", features = ["async-await"] }
Expand Down
10 changes: 10 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,16 @@ macro_rules! cfg_unstable {
}
}

macro_rules! cfg_trace {
($($item:item)*) => {
$(
#[cfg(all(feature = "tracing", tokio_unstable))]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this, but I don't think tracing merits being placed behind the tokio_unstable unstable flag. @carllerche will need to weigh in on this, but unlike the CancelationToken, changes to the tracing output aren't what the Tokio project has historically considered to be a "breaking change". An off-by-default (e.g., excluded from full) feature flag seems like a reasonable default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at one point, @carllerche suggested that any dependency on a pre-1.0 crate should require the tokio_unstable flag...but, in this case, we aren't exposing anything from tracing in Tokio's public APIs. AFAICT, it shouldn't be possible for the tracing dependency to cause any compile time breakage --- the worst failure mode, if there is eventually a breaking change to the tracing-core crate that we can't work around, is that the tracing diagnostics might silently fail to be collected.

IMO, you're right, and an off-by-default optional dep should be fine. That would be pretty similar to what we do for parking_lot, which is also pre-1.0 (and churns versions somewhat frequently!), since it's also not exposed publicly. Of course, if we were to expose tracing types (such as Span or Dispatch) in a public API later on, those APIs should require the unstable flag.

With all of that said: I made the more conservative choice of opening this PR with the tokio_unstable flag required, because I want to bias towards getting this merged sooner rather than later, and continuing to iterate once it merges. We can always remove the flag in a follow-up, but it's harder to add if these changes ship in a published release...

#[cfg_attr(docsrs, doc(cfg(all(feature = tokio_unstable))))]
$item
)*
}
}

macro_rules! cfg_coop {
($($item:item)*) => {
$(
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,20 @@ cfg_blocking! {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(all(feature = "tracing", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(feature = tokio_unstable))))]
let f = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
f()
}
};
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
crate::runtime::spawn_blocking(f)
}
}
24 changes: 24 additions & 0 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ cfg_rt_util! {
F: Future + 'static,
F::Output: 'static,
{
#[cfg(all(feature = "tracing", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(feature = tokio_unstable))))]
let future = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
future = %std::any::type_name::<F>(),
kind = %"local",
);
crate::util::Instrumented::new(future, span)
};

CURRENT.with(|maybe_cx| {
let cx = maybe_cx
.expect("`spawn_local` called from outside of a `task::LocalSet`");
Expand Down Expand Up @@ -277,6 +289,18 @@ impl LocalSet {
F: Future + 'static,
F::Output: 'static,
{
#[cfg(all(feature = "tracing", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(feature = tokio_unstable))))]
let future = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"local",
future = %std::any::type_name::<F>(),
);
crate::util::Instrumented::new(future, span)
};

let (task, handle) = unsafe { task::joinable_local(future) };
self.context.tasks.borrow_mut().queue.push_back(task);
handle
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/task/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ doc_rt_core! {
{
let spawn_handle = runtime::context::spawn_handle()
.expect("must be called from the context of Tokio runtime configured with either `basic_scheduler` or `threaded_scheduler`");
#[cfg(all(feature = "tracing", tokio_unstable))]
#[cfg_attr(docsrs, doc(cfg(all(feature = tokio_unstable))))]
let task = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"task",
future = %std::any::type_name::<T>(),
);
crate::util::Instrumented::new(task, span)
};
spawn_handle.spawn(task)
}
}
5 changes: 5 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ cfg_rt_threaded! {
pub(crate) use try_lock::TryLock;
}

cfg_trace! {
mod trace;
pub(crate) use trace::Instrumented;
}

#[cfg(any(feature = "macros", feature = "stream"))]
#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))]
pub use rand::thread_rng_n;
Expand Down
31 changes: 31 additions & 0 deletions tokio/src/util/trace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use pin_project_lite::pin_project;
hawkw marked this conversation as resolved.
Show resolved Hide resolved
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tracing::Span;

// A future, stream, sink, or executor that has been instrumented with a `tracing` span.
pin_project! {
#[derive(Debug, Clone)]
pub(crate) struct Instrumented<T> {
#[pin]
inner: T,
span: Span,
}
}

impl<T: Future> Future for Instrumented<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let _enter = this.span.enter();
this.inner.poll(cx)
}
}

impl<T> Instrumented<T> {
pub(crate) fn new(inner: T, span: Span) -> Self {
Self { inner, span }
}
}