diff --git a/console-api/proto/common.proto b/console-api/proto/common.proto index 5e4a8ec86..f36785c38 100644 --- a/console-api/proto/common.proto +++ b/console-api/proto/common.proto @@ -177,10 +177,13 @@ message PollStats { // its poll method has completed. optional google.protobuf.Timestamp last_poll_ended = 5; // The total duration this object was being *actively polled*, summed across - // all polls. Note that this includes only polls that have completed and is - // not reflecting any inprogress polls. Subtracting `busy_time` from the + // all polls. + // + // Note that this includes only polls that have completed, and does not + // reflect any in-progress polls. Subtracting `busy_time` from the // total lifetime of the polled object results in the amount of time it - // has spent *waiting* to be polled. + // has spent *waiting* to be polled (including the `scheduled_time` value + // from `TaskStats`, if this is a task). google.protobuf.Duration busy_time = 6; } diff --git a/console-api/proto/tasks.proto b/console-api/proto/tasks.proto index 8c00c3d1c..6d7c58d07 100644 --- a/console-api/proto/tasks.proto +++ b/console-api/proto/tasks.proto @@ -130,6 +130,16 @@ message Stats { common.PollStats poll_stats = 7; // The total number of times this task has woken itself. uint64 self_wakes = 8; + // The total duration this task was scheduled prior to being polled, summed + // across all poll cycles. + // + // Note that this includes only polls that have started, and does not + // reflect any scheduled state where the task hasn't yet been polled. + // Subtracting both `busy_time` (from the task's `PollStats`) and + // `scheduled_time` from the total lifetime of the task results in the + // amount of time it spent unable to progress because it was waiting on + // some resource. + google.protobuf.Duration scheduled_time = 9; } diff --git a/console-api/src/generated/rs.tokio.console.common.rs b/console-api/src/generated/rs.tokio.console.common.rs index d651148e3..315d7825a 100644 --- a/console-api/src/generated/rs.tokio.console.common.rs +++ b/console-api/src/generated/rs.tokio.console.common.rs @@ -253,10 +253,13 @@ pub struct PollStats { #[prost(message, optional, tag="5")] pub last_poll_ended: ::core::option::Option<::prost_types::Timestamp>, /// The total duration this object was being *actively polled*, summed across - /// all polls. Note that this includes only polls that have completed and is - /// not reflecting any inprogress polls. Subtracting `busy_time` from the + /// all polls. + /// + /// Note that this includes only polls that have completed, and does not + /// reflect any in-progress polls. Subtracting `busy_time` from the /// total lifetime of the polled object results in the amount of time it - /// has spent *waiting* to be polled. + /// has spent *waiting* to be polled (including the `scheduled_time` value + /// from `TaskStats`, if this is a task). #[prost(message, optional, tag="6")] pub busy_time: ::core::option::Option<::prost_types::Duration>, } diff --git a/console-api/src/generated/rs.tokio.console.tasks.rs b/console-api/src/generated/rs.tokio.console.tasks.rs index 6ff543bc9..8bdf62c3f 100644 --- a/console-api/src/generated/rs.tokio.console.tasks.rs +++ b/console-api/src/generated/rs.tokio.console.tasks.rs @@ -167,6 +167,17 @@ pub struct Stats { /// The total number of times this task has woken itself. #[prost(uint64, tag="8")] pub self_wakes: u64, + /// The total duration this task was scheduled prior to being polled, summed + /// across all poll cycles. + /// + /// Note that this includes only polls that have started, and does not + /// reflect any scheduled state where the task hasn't yet been polled. + /// Subtracting both `busy_time` (from the task's `PollStats`) and + /// `scheduled_time` from the total lifetime of the task results in the + /// amount of time it spent unable to progress because it was waiting on + /// some resource. + #[prost(message, optional, tag="9")] + pub scheduled_time: ::core::option::Option<::prost_types::Duration>, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct DurationHistogram { diff --git a/console-subscriber/examples/long_scheduled.rs b/console-subscriber/examples/long_scheduled.rs new file mode 100644 index 000000000..d8cf79144 --- /dev/null +++ b/console-subscriber/examples/long_scheduled.rs @@ -0,0 +1,78 @@ +//! Long scheduled time +//! +//! This example shows an application with a task that has an excessive +//! time between being woken and being polled. +//! +//! It consists of a channel where a sender task sends a message +//! through the channel and then immediately does a lot of work +//! (simulated in this case by a call to `std::thread::sleep`). +//! +//! As soon as the sender task calls `send()` the receiver task gets +//! woken, but because there's only a single worker thread, it doesn't +//! get polled until after the sender task has finished "working" and +//! yields (via `tokio::time::sleep`). +//! +//! In the console, this is visible in the `rx` task, which has very +//! high scheduled times - in the detail screen you will see that around +//! it is scheduled around 98% of the time. The `tx` task, on the other +//! hand, is busy most of the time. +use std::time::Duration; + +use console_subscriber::ConsoleLayer; +use tokio::{sync::mpsc, task}; +use tracing::info; + +#[tokio::main(flavor = "multi_thread", worker_threads = 1)] +async fn main() -> Result<(), Box> { + ConsoleLayer::builder() + .with_default_env() + .publish_interval(Duration::from_millis(100)) + .init(); + + let (tx, rx) = mpsc::channel::(1); + let count = 10000; + + let jh_rx = task::Builder::new() + .name("rx") + .spawn(receiver(rx, count)) + .unwrap(); + let jh_tx = task::Builder::new() + .name("tx") + .spawn(sender(tx, count)) + .unwrap(); + + let res_tx = jh_tx.await; + let res_rx = jh_rx.await; + info!( + "main: Joined sender: {:?} and receiver: {:?}", + res_tx, res_rx, + ); + + tokio::time::sleep(Duration::from_millis(200)).await; + + Ok(()) +} + +async fn sender(tx: mpsc::Sender, count: u32) { + info!("tx: started"); + + for idx in 0..count { + let msg: u32 = idx; + let res = tx.send(msg).await; + info!("tx: sent msg '{}' result: {:?}", msg, res); + + std::thread::sleep(Duration::from_millis(5000)); + info!("tx: work done"); + + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +async fn receiver(mut rx: mpsc::Receiver, count: u32) { + info!("rx: started"); + + for _ in 0..count { + let msg = rx.recv().await; + info!("rx: Received message: '{:?}'", msg); + } +} diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index 2bce3c085..0e6995260 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -56,7 +56,7 @@ pub(crate) struct TaskStats { is_dropped: AtomicBool, // task stats pub(crate) created_at: Instant, - timestamps: Mutex, + dropped_at: Mutex>, // waker stats wakes: AtomicUsize, @@ -100,12 +100,6 @@ pub(crate) struct ResourceStats { pub(crate) parent_id: Option, } -#[derive(Debug, Default)] -struct TaskTimestamps { - dropped_at: Option, - last_wake: Option, -} - #[derive(Debug, Default)] struct PollStats { /// The number of polls in progress @@ -118,9 +112,11 @@ struct PollStats { #[derive(Debug, Default)] struct PollTimestamps { first_poll: Option, + last_wake: Option, last_poll_started: Option, last_poll_ended: Option, busy_time: Duration, + scheduled_time: Duration, histogram: H, } @@ -162,14 +158,16 @@ impl TaskStats { is_dirty: AtomicBool::new(true), is_dropped: AtomicBool::new(false), created_at, - timestamps: Mutex::new(TaskTimestamps::default()), + dropped_at: Mutex::new(None), poll_stats: PollStats { timestamps: Mutex::new(PollTimestamps { histogram: Histogram::new(poll_duration_max), first_poll: None, + last_wake: None, last_poll_started: None, last_poll_ended: None, busy_time: Duration::new(0, 0), + scheduled_time: Duration::new(0, 0), }), current_polls: AtomicUsize::new(0), polls: AtomicUsize::new(0), @@ -209,13 +207,14 @@ impl TaskStats { } fn wake(&self, at: Instant, self_wake: bool) { - let mut timestamps = self.timestamps.lock(); - timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); - self.wakes.fetch_add(1, Release); + self.poll_stats.wake(at); + self.wakes.fetch_add(1, Release); if self_wake { self.wakes.fetch_add(1, Release); } + + self.make_dirty(); } pub(crate) fn start_poll(&self, at: Instant) { @@ -235,8 +234,7 @@ impl TaskStats { return; } - let mut timestamps = self.timestamps.lock(); - let _prev = timestamps.dropped_at.replace(dropped_at); + let _prev = self.dropped_at.lock().replace(dropped_at); debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!"); self.make_dirty(); } @@ -257,16 +255,28 @@ impl ToProto for TaskStats { fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output { let poll_stats = Some(self.poll_stats.to_proto(base_time)); - let timestamps = self.timestamps.lock(); + let timestamps = self.poll_stats.timestamps.lock(); proto::tasks::Stats { poll_stats, created_at: Some(base_time.to_timestamp(self.created_at)), - dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)), + dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)), wakes: self.wakes.load(Acquire) as u64, waker_clones: self.waker_clones.load(Acquire) as u64, self_wakes: self.self_wakes.load(Acquire) as u64, waker_drops: self.waker_drops.load(Acquire) as u64, last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)), + scheduled_time: Some( + timestamps + .scheduled_time + .try_into() + .unwrap_or_else(|error| { + eprintln!( + "failed to convert `scheduled_time` to protobuf duration: {}", + error + ); + Default::default() + }), + ), } } } @@ -287,7 +297,7 @@ impl DroppedAt for TaskStats { // avoid acquiring the lock if we know we haven't tried to drop this // thing yet if self.is_dropped.load(Acquire) { - return self.timestamps.lock().dropped_at; + return *self.dropped_at.lock(); } None @@ -466,18 +476,46 @@ impl ToProto for ResourceStats { // === impl PollStats === impl PollStats { - fn start_poll(&self, at: Instant) { - if self.current_polls.fetch_add(1, AcqRel) == 0 { - // We are starting the first poll - let mut timestamps = self.timestamps.lock(); - if timestamps.first_poll.is_none() { - timestamps.first_poll = Some(at); - } + fn wake(&self, at: Instant) { + let mut timestamps = self.timestamps.lock(); + timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); + } - timestamps.last_poll_started = Some(at); + fn start_poll(&self, at: Instant) { + if self.current_polls.fetch_add(1, AcqRel) > 0 { + return; + } - self.polls.fetch_add(1, Release); + // We are starting the first poll + let mut timestamps = self.timestamps.lock(); + if timestamps.first_poll.is_none() { + timestamps.first_poll = Some(at); } + + timestamps.last_poll_started = Some(at); + + self.polls.fetch_add(1, Release); + + // If the last poll ended after the last wake then it was likely + // a self-wake, so we measure from the end of the last poll instead. + // This also ensures that `busy_time` and `scheduled_time` don't overlap. + let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) { + Some(scheduled) => scheduled, + None => return, // Async operations record polls, but not wakes + }; + + let elapsed = match at.checked_duration_since(scheduled) { + Some(elapsed) => elapsed, + None => { + eprintln!( + "possible Instant clock skew detected: a poll's start timestamp \ + was before the wake time/last poll end timestamp\nwake = {:?}\n start = {:?}", + scheduled, at + ); + return; + } + }; + timestamps.scheduled_time += elapsed; } fn end_poll(&self, at: Instant) { @@ -534,7 +572,7 @@ impl ToProto for PollStats { .map(|at| base_time.to_timestamp(at)), busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| { eprintln!( - "failed to convert busy time to protobuf duration: {}", + "failed to convert `busy_time` to protobuf duration: {}", error ); Default::default() diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index ad362f518..1dce737e5 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -43,10 +43,11 @@ pub(crate) enum SortBy { Name = 3, Total = 4, Busy = 5, - Idle = 6, - Polls = 7, - Target = 8, - Location = 9, + Scheduled = 6, + Idle = 7, + Polls = 8, + Target = 9, + Location = 10, } #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] @@ -54,6 +55,7 @@ pub(crate) enum TaskState { Completed, Idle, Running, + Scheduled, } pub(crate) type TaskRef = store::Ref; @@ -100,6 +102,7 @@ struct TaskStats { created_at: SystemTime, dropped_at: Option, busy: Duration, + scheduled: Duration, last_poll_started: Option, last_poll_ended: Option, idle: Option, @@ -297,6 +300,10 @@ impl Task { self.stats.last_poll_started > self.stats.last_poll_ended } + pub(crate) fn is_scheduled(&self) -> bool { + self.stats.last_wake > self.stats.last_poll_started + } + pub(crate) fn is_completed(&self) -> bool { self.stats.total.is_some() } @@ -310,6 +317,10 @@ impl Task { return TaskState::Running; } + if self.is_scheduled() { + return TaskState::Scheduled; + } + TaskState::Idle } @@ -331,10 +342,24 @@ impl Task { self.stats.busy } + pub(crate) fn scheduled(&self, since: SystemTime) -> Duration { + if let Some(wake) = self.stats.last_wake { + if self.stats.last_wake > self.stats.last_poll_started { + // In this case the task is scheduled, but has not yet been polled + let current_time_since_wake = since.duration_since(wake).unwrap_or_default(); + return self.stats.scheduled + current_time_since_wake; + } + } + self.stats.scheduled + } + pub(crate) fn idle(&self, since: SystemTime) -> Duration { self.stats .idle - .or_else(|| self.total(since).checked_sub(self.busy(since))) + .or_else(|| { + self.total(since) + .checked_sub(self.busy(since) + self.scheduled(since)) + }) .unwrap_or_default() } @@ -429,11 +454,14 @@ impl From for TaskStats { let poll_stats = pb.poll_stats.expect("task should have poll stats"); let busy = poll_stats.busy_time.map(pb_duration).unwrap_or_default(); - let idle = total.map(|total| total.checked_sub(busy).unwrap_or_default()); + let scheduled = pb.scheduled_time.map(pb_duration).unwrap_or_default(); + let idle = total.map(|total| total.checked_sub(busy + scheduled).unwrap_or_default()); Self { total, idle, + scheduled, busy, + last_wake: pb.last_wake.map(|v| v.try_into().unwrap()), last_poll_started: poll_stats.last_poll_started.map(|v| v.try_into().unwrap()), last_poll_ended: poll_stats.last_poll_ended.map(|v| v.try_into().unwrap()), polls: poll_stats.polls, @@ -442,7 +470,6 @@ impl From for TaskStats { wakes: pb.wakes, waker_clones: pb.waker_clones, waker_drops: pb.waker_drops, - last_wake: pb.last_wake.map(|v| v.try_into().unwrap()), self_wakes: pb.self_wakes, } } @@ -474,6 +501,9 @@ impl SortBy { Self::Idle => { tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().idle(now))) } + Self::Scheduled => { + tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().scheduled(now))) + } Self::Busy => { tasks.sort_unstable_by_key(|task| task.upgrade().map(|t| t.borrow().busy(now))) } @@ -505,6 +535,7 @@ impl TryFrom for SortBy { idx if idx == Self::Name as usize => Ok(Self::Name), idx if idx == Self::Total as usize => Ok(Self::Total), idx if idx == Self::Busy as usize => Ok(Self::Busy), + idx if idx == Self::Scheduled as usize => Ok(Self::Scheduled), idx if idx == Self::Idle as usize => Ok(Self::Idle), idx if idx == Self::Polls as usize => Ok(Self::Polls), idx if idx == Self::Target as usize => Ok(Self::Target), @@ -517,6 +548,7 @@ impl TryFrom for SortBy { impl TaskState { pub(crate) fn render(self, styles: &crate::view::Styles) -> Span<'static> { const RUNNING_UTF8: &str = "\u{25B6}"; + const SCHEDULED_UTF8: &str = "\u{23EB}"; const IDLE_UTF8: &str = "\u{23F8}"; const COMPLETED_UTF8: &str = "\u{23F9}"; match self { @@ -524,6 +556,7 @@ impl TaskState { styles.if_utf8(RUNNING_UTF8, "BUSY"), styles.fg(Color::Green), ), + Self::Scheduled => Span::raw(styles.if_utf8(SCHEDULED_UTF8, "SCHED")), Self::Idle => Span::raw(styles.if_utf8(IDLE_UTF8, "IDLE")), Self::Completed => Span::raw(styles.if_utf8(COMPLETED_UTF8, "DONE")), } diff --git a/tokio-console/src/view/mod.rs b/tokio-console/src/view/mod.rs index 3d35350b8..160419adb 100644 --- a/tokio-console/src/view/mod.rs +++ b/tokio-console/src/view/mod.rs @@ -39,7 +39,7 @@ pub struct View { /// details view), we want to leave the task list's state the way we left it /// --- e.g., if the user previously selected a particular sorting, we want /// it to remain sorted that way when we return to it. - tasks_list: TableListState, + tasks_list: TableListState, resources_list: TableListState, state: ViewState, pub(crate) styles: Styles, @@ -93,7 +93,7 @@ impl View { pub fn new(styles: Styles) -> Self { Self { state: ViewState::TasksList, - tasks_list: TableListState::::default(), + tasks_list: TableListState::::default(), resources_list: TableListState::::default(), styles, } diff --git a/tokio-console/src/view/task.rs b/tokio-console/src/view/task.rs index 93edd13b7..c9e021a10 100644 --- a/tokio-console/src/view/task.rs +++ b/tokio-console/src/view/task.rs @@ -69,7 +69,7 @@ impl TaskView { // controls layout::Constraint::Length(1), // task stats - layout::Constraint::Length(8), + layout::Constraint::Length(10), // poll duration layout::Constraint::Length(9), // fields @@ -89,7 +89,7 @@ impl TaskView { // warnings (add 2 for top and bottom borders) layout::Constraint::Length(warnings.len() as u16 + 2), // task stats - layout::Constraint::Length(8), + layout::Constraint::Length(10), // poll duration layout::Constraint::Length(9), // fields @@ -122,7 +122,7 @@ impl TaskView { ]); // Just preallocate capacity for ID, name, target, total, busy, and idle. - let mut overview = Vec::with_capacity(7); + let mut overview = Vec::with_capacity(8); overview.push(Spans::from(vec![ bold("ID: "), Span::raw(format!("{} ", task.id())), @@ -159,6 +159,7 @@ impl TaskView { styles.time_units(total, view::DUR_LIST_PRECISION, None), ])); overview.push(dur_percent("Busy: ", task.busy(now))); + overview.push(dur_percent("Scheduled: ", task.scheduled(now))); overview.push(dur_percent("Idle: ", task.idle(now))); let mut waker_stats = vec![Spans::from(vec![ diff --git a/tokio-console/src/view/tasks.rs b/tokio-console/src/view/tasks.rs index d103779ab..b537e08e1 100644 --- a/tokio-console/src/view/tasks.rs +++ b/tokio-console/src/view/tasks.rs @@ -19,17 +19,17 @@ use tui::{ #[derive(Debug, Default)] pub(crate) struct TasksTable {} -impl TableList<11> for TasksTable { +impl TableList<12> for TasksTable { type Row = Task; type Sort = SortBy; type Context = (); - const HEADER: &'static [&'static str; 11] = &[ - "Warn", "ID", "State", "Name", "Total", "Busy", "Idle", "Polls", "Target", "Location", - "Fields", + const HEADER: &'static [&'static str; 12] = &[ + "Warn", "ID", "State", "Name", "Total", "Busy", "Sched", "Idle", "Polls", "Target", + "Location", "Fields", ]; - const WIDTHS: &'static [usize; 11] = &[ + const WIDTHS: &'static [usize; 12] = &[ Self::HEADER[0].len() + 1, Self::HEADER[1].len() + 1, Self::HEADER[2].len() + 1, @@ -41,10 +41,11 @@ impl TableList<11> for TasksTable { Self::HEADER[8].len() + 1, Self::HEADER[9].len() + 1, Self::HEADER[10].len() + 1, + Self::HEADER[11].len() + 1, ]; fn render( - table_list_state: &mut TableListState, + table_list_state: &mut TableListState, styles: &view::Styles, frame: &mut tui::terminal::Frame, area: layout::Rect, @@ -129,6 +130,7 @@ impl TableList<11> for TasksTable { Cell::from(name_width.update_str(task.name().unwrap_or("")).to_string()), dur_cell(task.total(now)), dur_cell(task.busy(now)), + dur_cell(task.scheduled(now)), dur_cell(task.idle(now)), Cell::from(polls_width.update_str(task.total_polls().to_string())), Cell::from(target_width.update_str(task.target()).to_owned()), @@ -252,6 +254,7 @@ impl TableList<11> for TasksTable { layout::Constraint::Length(DUR_LEN as u16), layout::Constraint::Length(DUR_LEN as u16), layout::Constraint::Length(DUR_LEN as u16), + layout::Constraint::Length(DUR_LEN as u16), polls_width.constraint(), target_width.constraint(), location_width.constraint(),