Skip to content

Commit

Permalink
task: also instrument streams (tokio-rs#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
duarten authored Dec 14, 2022
1 parent a2609d1 commit 84cbb53
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 92 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ default = ["rt"]
rt = ["tokio"]

[dependencies]
tokio-stream = "0.1.11"
futures-util = "0.3.19"
pin-project-lite = "0.2.7"
tokio = { version = "1.15.0", features = ["rt", "stats", "time"], optional = true }
Expand Down
37 changes: 37 additions & 0 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::time::Duration;

use futures::{stream::FuturesUnordered, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let metrics_monitor = tokio_metrics::TaskMonitor::new();

// print task metrics every 500ms
{
let metrics_monitor = metrics_monitor.clone();
tokio::spawn(async move {
for deltas in metrics_monitor.intervals() {
// pretty-print the metric deltas
println!("{:?}", deltas);
// wait 500ms
tokio::time::sleep(Duration::from_millis(500)).await;
}
})
};

// instrument a stream and await it
let mut stream =
metrics_monitor.instrument((0..3).map(|_| do_work()).collect::<FuturesUnordered<_>>());
while stream.next().await.is_some() {}

println!("{:?}", metrics_monitor.cumulative());

Ok(())
}

async fn do_work() {
for _ in 0..25 {
tokio::task::yield_now().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
189 changes: 97 additions & 92 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering::SeqCst};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio_stream::Stream;

#[cfg(any(feature = "rt"))]
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -1545,7 +1546,7 @@ impl TaskMonitor {
/// assert_eq!(monitor.cumulative().first_poll_count, 2);
/// }
/// ```
pub fn instrument<F: Future>(&self, task: F) -> Instrumented<F> {
pub fn instrument<F>(&self, task: F) -> Instrumented<F> {
self.metrics.instrumented_count.fetch_add(1, SeqCst);
Instrumented {
task,
Expand Down Expand Up @@ -2287,113 +2288,117 @@ impl<T: Future> Future for Instrumented<T> {
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let poll_start = Instant::now();
let this = self.project();
let idled_at = this.idled_at;
let state = this.state;
let instrumented_at = state.instrumented_at;
let metrics = &state.metrics;

/* accounting for time-to-first-poll and tasks-count */
// is this the first time this task has been polled?
if !*this.did_poll_once {
// if so, we need to do three things:
/* 1. note that this task *has* been polled */
*this.did_poll_once = true;

/* 2. account for the time-to-first-poll of this task */
// if the time-to-first-poll of this task exceeds `u64::MAX` ns,
// round down to `u64::MAX` nanoseconds
let elapsed = (poll_start - instrumented_at)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);
// add this duration to `time_to_first_poll_ns_total`
metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);

/* 3. increment the count of tasks that have been polled at least once */
state.metrics.first_poll_count.fetch_add(1, SeqCst);
}

/* accounting for time-idled and time-scheduled */
// 1. note (and reset) the instant this task was last awoke
let woke_at = state.woke_at.swap(0, SeqCst);

// The state of a future is *idling* in the interim between the instant
// it completes a `poll`, and the instant it is next awoken.
if *idled_at < woke_at {
// increment the counter of how many idles occured
metrics.total_idled_count.fetch_add(1, SeqCst);

// compute the duration of the idle
let idle_ns = woke_at - *idled_at;

// adjust the total elasped time monitored tasks spent idling
metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
}

// if this task spent any time in the scheduled state after instrumentation,
// and after first poll, `woke_at` will be greater than 0.
if woke_at > 0 {
// increment the counter of how many schedules occured
metrics.total_scheduled_count.fetch_add(1, SeqCst);

// recall that the `woke_at` field is internally represented as
// nanoseconds-since-instrumentation. here, for accounting purposes,
// we need to instead represent it as a proper `Instant`.
let woke_instant = instrumented_at + Duration::from_nanos(woke_at);

// the duration this task spent scheduled is time time elapsed between
// when this task was awoke, and when it was polled.
let scheduled_ns = (poll_start - woke_instant)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);

// add `scheduled_ns` to the Monitor's total
metrics
.total_scheduled_duration_ns
.fetch_add(scheduled_ns, SeqCst);
}

// Register the waker
state.waker.register(cx.waker());
instrument_poll(cx, self, Future::poll)
}
}

// Get the instrumented waker
let waker_ref = futures_util::task::waker_ref(state);
let mut cx = Context::from_waker(&waker_ref);
impl<T: Stream> Stream for Instrumented<T> {
type Item = T::Item;

// Poll the task
let inner_poll_start = Instant::now();
let ret = Future::poll(this.task, &mut cx);
let inner_poll_end = Instant::now();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
instrument_poll(cx, self, Stream::poll_next)
}
}

/* idle time starts now */
*idled_at = (inner_poll_end - instrumented_at)
fn instrument_poll<T, Out>(
cx: &mut Context,
instrumented: Pin<&mut Instrumented<T>>,
poll_fn: impl FnOnce(Pin<&mut T>, &mut Context) -> Poll<Out>,
) -> Poll<Out> {
let poll_start = Instant::now();
let this = instrumented.project();
let idled_at = this.idled_at;
let state = this.state;
let instrumented_at = state.instrumented_at;
let metrics = &state.metrics;
/* accounting for time-to-first-poll and tasks-count */
// is this the first time this task has been polled?
if !*this.did_poll_once {
// if so, we need to do three things:
/* 1. note that this task *has* been polled */
*this.did_poll_once = true;

/* 2. account for the time-to-first-poll of this task */
// if the time-to-first-poll of this task exceeds `u64::MAX` ns,
// round down to `u64::MAX` nanoseconds
let elapsed = (poll_start - instrumented_at)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);
// add this duration to `time_to_first_poll_ns_total`
metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst);

/* accounting for poll time */
let inner_poll_duration = inner_poll_end - inner_poll_start;
let inner_poll_ns: u64 = inner_poll_duration
/* 3. increment the count of tasks that have been polled at least once */
state.metrics.first_poll_count.fetch_add(1, SeqCst);
}
/* accounting for time-idled and time-scheduled */
// 1. note (and reset) the instant this task was last awoke
let woke_at = state.woke_at.swap(0, SeqCst);
// The state of a future is *idling* in the interim between the instant
// it completes a `poll`, and the instant it is next awoken.
if *idled_at < woke_at {
// increment the counter of how many idles occurred
metrics.total_idled_count.fetch_add(1, SeqCst);

// compute the duration of the idle
let idle_ns = woke_at - *idled_at;

// adjust the total elapsed time monitored tasks spent idling
metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst);
}
// if this task spent any time in the scheduled state after instrumentation,
// and after first poll, `woke_at` will be greater than 0.
if woke_at > 0 {
// increment the counter of how many schedules occurred
metrics.total_scheduled_count.fetch_add(1, SeqCst);

// recall that the `woke_at` field is internally represented as
// nanoseconds-since-instrumentation. here, for accounting purposes,
// we need to instead represent it as a proper `Instant`.
let woke_instant = instrumented_at + Duration::from_nanos(woke_at);

// the duration this task spent scheduled is time time elapsed between
// when this task was awoke, and when it was polled.
let scheduled_ns = (poll_start - woke_instant)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);

let (count_bucket, duration_bucket) = // was this a slow or fast poll?
// add `scheduled_ns` to the Monitor's total
metrics
.total_scheduled_duration_ns
.fetch_add(scheduled_ns, SeqCst);
}
// Register the waker
state.waker.register(cx.waker());
// Get the instrumented waker
let waker_ref = futures_util::task::waker_ref(state);
let mut cx = Context::from_waker(&waker_ref);
// Poll the task
let inner_poll_start = Instant::now();
let ret = poll_fn(this.task, &mut cx);
let inner_poll_end = Instant::now();
/* idle time starts now */
*idled_at = (inner_poll_end - instrumented_at)
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);
/* accounting for poll time */
let inner_poll_duration = inner_poll_end - inner_poll_start;
let inner_poll_ns: u64 = inner_poll_duration
.as_nanos()
.try_into()
.unwrap_or(u64::MAX);
let (count_bucket, duration_bucket) = // was this a slow or fast poll?
if inner_poll_duration >= metrics.slow_poll_threshold {
(&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration)
} else {
(&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns)
};

// update the appropriate bucket
count_bucket.fetch_add(1, SeqCst);
duration_bucket.fetch_add(inner_poll_ns, SeqCst);

ret
}
// update the appropriate bucket
count_bucket.fetch_add(1, SeqCst);
duration_bucket.fetch_add(inner_poll_ns, SeqCst);
ret
}

impl State {
Expand Down

0 comments on commit 84cbb53

Please sign in to comment.