Skip to content

Commit

Permalink
runtime: expand on runtime metrics (#4373)
Browse files Browse the repository at this point in the history
This patch adds more runtime metrics. The API is still unstable.
  • Loading branch information
carllerche authored Jan 22, 2022
1 parent 4eed411 commit 24f4ee3
Show file tree
Hide file tree
Showing 24 changed files with 1,591 additions and 344 deletions.
6 changes: 5 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/os-poll",
Expand Down Expand Up @@ -85,6 +84,11 @@ sync = []
test-util = ["rt", "sync", "time"]
time = []

# Technically, removing this is a breaking change even though it only ever did
# anything with the unstable flag on. It is probably safe to get rid of it after
# a few releases.
stats = []

[dependencies]
tokio-macros = { version = "1.7.0", path = "../tokio-macros", optional = true }

Expand Down
1 change: 0 additions & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@
//! `RUSTFLAGS="--cfg tokio_unstable"`.
//!
//! - `tracing`: Enables tracing events.
//! - `stats`: Enables runtime stats collection. ([RFC](https://github.com/tokio-rs/tokio/pull/3845))
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section

Expand Down
12 changes: 7 additions & 5 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,22 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_stats {
macro_rules! cfg_metrics {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "stats"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "stats"))))]
// For now, metrics is always enabled. When stabilized, it might
// have a dedicated feature flag.
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
$item
)*
}
}

macro_rules! cfg_not_stats {
macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "stats")))]
#[cfg(not(tokio_unstable))]
$item
)*
}
Expand Down
192 changes: 123 additions & 69 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, Wake, WakerRef};
Expand Down Expand Up @@ -56,8 +56,8 @@ struct Core {
/// The driver is removed before starting to park the thread
driver: Option<Driver>,

/// Stats batcher
stats: WorkerStatsBatcher,
/// Metrics batch
metrics: MetricsBatch,
}

#[derive(Clone)]
Expand Down Expand Up @@ -98,8 +98,11 @@ struct Shared {
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,

/// Keeps track of various runtime stats.
stats: RuntimeStats,
/// Keeps track of various runtime metrics.
scheduler_metrics: SchedulerMetrics,

/// This scheduler only has one worker.
worker_metrics: WorkerMetrics,
}

/// Thread-local context.
Expand Down Expand Up @@ -143,7 +146,8 @@ impl BasicScheduler {
woken: AtomicBool::new(false),
before_park,
after_unpark,
stats: RuntimeStats::new(1),
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
}),
};

Expand All @@ -152,7 +156,7 @@ impl BasicScheduler {
spawner: spawner.clone(),
tick: 0,
driver: Some(driver),
stats: WorkerStatsBatcher::new(0),
metrics: MetricsBatch::new(),
})));

BasicScheduler {
Expand Down Expand Up @@ -219,11 +223,89 @@ impl BasicScheduler {
}
}

impl Drop for BasicScheduler {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};

core.enter(|mut core, context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
context.spawner.shared.owned.close_and_shutdown_all();

// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.pop_task() {
drop(task);
}

// Drain remote queue and set it to None
let remote_queue = core.spawner.shared.queue.lock().take();

// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
if let Some(remote_queue) = remote_queue {
for entry in remote_queue {
match entry {
RemoteMsg::Schedule(task) => {
drop(task);
}
}
}
}

assert!(context.spawner.shared.owned.is_empty());

// Submit metrics
core.metrics.submit(&core.spawner.shared.worker_metrics);

(core, ())
});
}
}

impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
}

// ===== impl Core =====

impl Core {
fn pop_task(&mut self) -> Option<task::Notified<Arc<Shared>>> {
let ret = self.tasks.pop_front();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
ret
}

fn push_task(&mut self, task: task::Notified<Arc<Shared>>) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
}
}

// ===== impl Context =====

impl Context {
/// Execute the closure with the given scheduler core stored in the
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.stats.incr_poll_count();
core.metrics.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
}

Expand All @@ -244,15 +326,15 @@ impl Context {
// instead of parking the thread
if core.tasks.is_empty() {
// Park until the thread is signaled
core.stats.about_to_park();
core.stats.submit(&core.spawner.shared.stats);
core.metrics.about_to_park();
core.metrics.submit(&core.spawner.shared.worker_metrics);

let (c, _) = self.enter(core, || {
driver.park().expect("failed to park");
});

core = c;
core.stats.returned_from_park();
core.metrics.returned_from_park();
}

if let Some(f) = &self.spawner.shared.after_unpark {
Expand All @@ -271,7 +353,7 @@ impl Context {
fn park_yield(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");

core.stats.submit(&core.spawner.shared.stats);
core.metrics.submit(&core.spawner.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver
.park_timeout(Duration::from_millis(0))
Expand All @@ -297,57 +379,6 @@ impl Context {
}
}

impl Drop for BasicScheduler {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};

core.enter(|mut core, context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
context.spawner.shared.owned.close_and_shutdown_all();

// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.tasks.pop_front() {
drop(task);
}

// Drain remote queue and set it to None
let remote_queue = core.spawner.shared.queue.lock().take();

// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
if let Some(remote_queue) = remote_queue {
for entry in remote_queue {
match entry {
RemoteMsg::Schedule(task) => {
drop(task);
}
}
}
}

assert!(context.spawner.shared.owned.is_empty());

(core, ())
});
}
}

impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
}

// ===== impl Spawner =====

impl Spawner {
Expand All @@ -366,10 +397,6 @@ impl Spawner {
handle
}

pub(crate) fn stats(&self) -> &RuntimeStats {
&self.shared.stats
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand All @@ -390,6 +417,28 @@ impl Spawner {
}
}

cfg_metrics! {
impl Spawner {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}

pub(crate) fn injection_queue_depth(&self) -> usize {
// TODO: avoid having to lock. The multi-threaded injection queue
// could probably be used here.
self.shared.queue.lock()
.as_ref()
.map(|queue| queue.len())
.unwrap_or(0)
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}
}

impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
Expand All @@ -411,10 +460,13 @@ impl Schedule for Arc<Shared> {
// If `None`, the runtime is shutting down, so there is no need
// to schedule the task.
if let Some(core) = core.as_mut() {
core.tasks.push_back(task);
core.push_task(task);
}
}
_ => {
// Track that a task was scheduled from **outside** of the runtime.
self.scheduler_metrics.inc_remote_schedule_count();

// If the queue is None, then the runtime has shut down. We
// don't need to do anything with the notification in that case.
let mut guard = self.queue.lock();
Expand Down Expand Up @@ -460,7 +512,9 @@ impl CoreGuard<'_> {

'outer: loop {
if core.spawner.reset_woken() {
let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx));
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;

Expand Down
20 changes: 12 additions & 8 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ impl Handle {
context::try_current()
}

cfg_stats! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
self.spawner.stats()
}
}

/// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
Expand Down Expand Up @@ -327,6 +319,18 @@ impl Handle {
}
}

cfg_metrics! {
use crate::runtime::RuntimeMetrics;

impl Handle {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> RuntimeMetrics {
RuntimeMetrics::new(self.clone())
}
}
}

/// Error returned by `try_current` when no Runtime has been started
#[derive(Debug)]
pub struct TryCurrentError {
Expand Down
Loading

0 comments on commit 24f4ee3

Please sign in to comment.