Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add warning for tasks that never yield #439

Merged
merged 17 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions console-subscriber/examples/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ OPTIONS:
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;

#[tokio::main]
Expand Down Expand Up @@ -38,6 +39,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
Expand Down Expand Up @@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) {
}
}
}

#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");

_ = handle.await;
}
}
1 change: 1 addition & 0 deletions tokio-console/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> {
.with_task_linters(vec![
warnings::Linter::new(warnings::SelfWakePercent::default()),
warnings::Linter::new(warnings::LostWaker),
warnings::Linter::new(warnings::NeverYielded::default()),
])
.with_retain_for(retain_for);
let mut input = input::EventStream::new();
Expand Down
45 changes: 37 additions & 8 deletions tokio-console/src/state/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use crate::{
},
util::Percentage,
view,
warnings::Linter,
warnings::{Lint, Linter},
};
use console_api as proto;
use ratatui::{style::Color, text::Span};
use std::{
cell::RefCell,
collections::HashMap,
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
rc::{Rc, Weak},
time::{Duration, SystemTime},
Expand All @@ -24,6 +24,7 @@ use std::{
#[derive(Default, Debug)]
pub(crate) struct TasksState {
tasks: Store<Task>,
pending_lint: HashSet<Id<Task>>,
pub(crate) linters: Vec<Linter<Task>>,
dropped_events: u64,
}
Expand Down Expand Up @@ -217,16 +218,38 @@ impl TasksState {
warnings: Vec::new(),
location,
};
task.lint(linters);
let recheck = task.lint(linters);
if recheck {
self.pending_lint.insert(task.id);
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}
Some((id, task))
});

let mut checked = HashSet::new();
for (stats, mut task) in self.tasks.updated(stats_update) {
tracing::trace!(?task, ?stats, "processing stats update for");
task.stats = stats.into();
task.lint(linters);
let recheck = task.lint(linters);
if !recheck {
self.pending_lint.remove(&task.id);
}
checked.insert(task.id);
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}

self.pending_lint.retain(|id| {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
if checked.contains(id) {
true
} else {
let recheck = self
.tasks
.get(*id)
.expect("task pending lint is not in task store")
jefftt marked this conversation as resolved.
Show resolved Hide resolved
.borrow_mut()
.lint(linters);
recheck
}
});

self.dropped_events += update.dropped_events;
}

Expand Down Expand Up @@ -430,15 +453,21 @@ impl Task {
&self.warnings[..]
}

fn lint(&mut self, linters: &[Linter<Task>]) {
fn lint(&mut self, linters: &[Linter<Task>]) -> bool {
jefftt marked this conversation as resolved.
Show resolved Hide resolved
self.warnings.clear();
let mut recheck = false;
for lint in linters {
tracing::debug!(?lint, task = ?self, "checking...");
if let Some(warning) = lint.check(self) {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning)
match lint.check(self) {
Lint::Warning(warning) => {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning);
}
Lint::Ok => {}
Lint::Recheck => recheck = true,
}
}
recheck
}

pub(crate) fn location(&self) -> &str {
Expand Down
126 changes: 106 additions & 20 deletions tokio-console/src/warnings.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::state::tasks::Task;
use std::{fmt::Debug, rc::Rc};
use crate::state::tasks::{Task, TaskState};
use std::{
fmt::Debug,
rc::Rc,
time::{Duration, SystemTime},
};

/// A warning for a particular type of monitored entity (e.g. task or resource).
///
/// This trait implements the logic for detecting a particular warning, and
/// generating a warning message describing it. The [`Linter`] type wraps an
/// instance of this trait to track active instances of the warning.
pub trait Warn<T>: Debug {
/// Returns `true` if the warning applies to `val`.
fn check(&self, val: &T) -> bool;
/// Returns if the warning applies to `val`.
fn check(&self, val: &T) -> Warning;

/// Formats a description of the warning detected for a *specific* `val`.
///
Expand Down Expand Up @@ -46,6 +50,31 @@ pub trait Warn<T>: Debug {
fn summary(&self) -> &str;
}

/// A result for a linter check
jefftt marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) enum Lint<T> {
/// No warning applies to the entity
Ok,

/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
Warning(Linter<T>),

/// The lint should be rechecked as the conditions to allow for checking are
/// not satisfied yet
Recheck,
}

/// A result for a warning check
pub enum Warning {
/// Set `true` if the warning applies or `false` otherwise
Check(bool),
jefftt marked this conversation as resolved.
Show resolved Hide resolved

/// The warning should be rechecked as the conditions to allow for checking
/// are not satisfied yet
Recheck,
}

#[derive(Debug)]
pub(crate) struct Linter<T>(Rc<dyn Warn<T>>);

Expand All @@ -57,17 +86,12 @@ impl<T> Linter<T> {
Self(Rc::new(warning))
}

/// Checks if the warning applies to a particular entity, returning a clone
/// of `Self` if it does.
///
/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
pub(crate) fn check(&self, val: &T) -> Option<Self> {
if self.0.check(val) {
Some(Self(self.0.clone()))
} else {
None
/// Checks if the warning applies to a particular entity
pub(crate) fn check(&self, val: &T) -> Lint<T> {
match self.0.check(val) {
Warning::Check(false) => Lint::Ok,
Warning::Check(true) => Lint::Warning(Self(self.0.clone())),
Warning::Recheck => Lint::Recheck,
}
}

Expand All @@ -78,7 +102,7 @@ impl<T> Linter<T> {

pub(crate) fn format(&self, val: &T) -> String {
debug_assert!(
self.0.check(val),
matches!(self.0.check(val), Warning::Check(true)),
"tried to format a warning for a {} that did not have that warning!",
std::any::type_name::<T>()
);
Expand Down Expand Up @@ -120,9 +144,9 @@ impl Warn<Task> for SelfWakePercent {
self.description.as_str()
}

fn check(&self, task: &Task) -> bool {
fn check(&self, task: &Task) -> Warning {
let self_wakes = task.self_wake_percent();
self_wakes > self.min_percent
Warning::Check(self_wakes > self.min_percent)
}

fn format(&self, task: &Task) -> String {
Expand All @@ -142,11 +166,73 @@ impl Warn<Task> for LostWaker {
"tasks have lost their waker"
}

fn check(&self, task: &Task) -> bool {
!task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened()
fn check(&self, task: &Task) -> Warning {
Warning::Check(
!task.is_completed()
&& task.waker_count() == 0
&& !task.is_running()
&& !task.is_awakened(),
)
}

fn format(&self, _: &Task) -> String {
"This task has lost its waker, and will never be woken again.".into()
}
}

/// Warning for if a task has never yielded
#[derive(Clone, Debug)]
pub(crate) struct NeverYielded {
min_duration: Duration,
description: String,
}

impl NeverYielded {
pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1);
pub(crate) fn new(min_duration: Duration) -> Self {
Self {
min_duration,
description: format!(
"tasks have never yielded (threshold {}ms)",
min_duration.as_millis()
),
}
}
}

impl Default for NeverYielded {
fn default() -> Self {
Self::new(Self::DEFAULT_DURATION)
}
}

impl Warn<Task> for NeverYielded {
fn summary(&self) -> &str {
self.description.as_str()
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are waiting to run
if task.state() != TaskState::Running {
return Warning::Check(false);
}

if task.total_polls() > 1 {
return Warning::Check(false);
}

// Avoid short-lived task false positives
if task.busy(SystemTime::now()) >= self.min_duration {
Warning::Check(true)
} else {
Warning::Recheck
}
}

fn format(&self, task: &Task) -> String {
format!(
"This task has never yielded ({:?})",
task.busy(SystemTime::now()),
)
}
}