Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: expand on runtime metrics #4373

Merged
merged 24 commits into from
Jan 22, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/os-poll",
Expand Down Expand Up @@ -85,6 +84,11 @@ sync = []
test-util = ["rt", "sync", "time"]
time = []

# Technically, removing this is a breaking change even though it only ever did
# anything with the unstable flag on. It is probably safe to get rid of it after
# a few releases.
stats = []
Comment on lines +87 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why shouldn't this continue to be a feature flag? For machines without 64-bit atomics, the stats involve locking a bunch of mutexes quite a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If we rename to metrics we will need to change the feature flag. While unstable, I'm not worried about it since technically adding a feature flag is part of the public API.
  • We can always do runtime enabling / disabling of metrics collection.


[dependencies]
tokio-macros = { version = "1.7.0", path = "../tokio-macros", optional = true }

Expand Down
1 change: 0 additions & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@
//! `RUSTFLAGS="--cfg tokio_unstable"`.
//!
//! - `tracing`: Enables tracing events.
//! - `stats`: Enables runtime stats collection. ([RFC](https://github.com/tokio-rs/tokio/pull/3845))
//!
//! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section

Expand Down
12 changes: 7 additions & 5 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,22 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_stats {
macro_rules! cfg_metrics {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "stats"))]
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "stats"))))]
// For now, metrics is always enabled. When stabilized, it might
// have a dedicated feature flag.
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
$item
)*
}
}

macro_rules! cfg_not_stats {
macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "stats")))]
#[cfg(not(tokio_unstable))]
$item
)*
}
Expand Down
192 changes: 123 additions & 69 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
use crate::util::{waker_ref, Wake, WakerRef};
Expand Down Expand Up @@ -56,8 +56,8 @@ struct Core {
/// The driver is removed before starting to park the thread
driver: Option<Driver>,

/// Stats batcher
stats: WorkerStatsBatcher,
/// Metrics batch
metrics: MetricsBatch,
}

#[derive(Clone)]
Expand Down Expand Up @@ -98,8 +98,11 @@ struct Shared {
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,

/// Keeps track of various runtime stats.
stats: RuntimeStats,
/// Keeps track of various runtime metrics.
scheduler_metrics: SchedulerMetrics,

/// This scheduler only has one worker
carllerche marked this conversation as resolved.
Show resolved Hide resolved
worker_metrics: WorkerMetrics,
}

/// Thread-local context.
Expand Down Expand Up @@ -143,7 +146,8 @@ impl BasicScheduler {
woken: AtomicBool::new(false),
before_park,
after_unpark,
stats: RuntimeStats::new(1),
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: WorkerMetrics::new(),
}),
};

Expand All @@ -152,7 +156,7 @@ impl BasicScheduler {
spawner: spawner.clone(),
tick: 0,
driver: Some(driver),
stats: WorkerStatsBatcher::new(0),
metrics: MetricsBatch::new(),
})));

BasicScheduler {
Expand Down Expand Up @@ -219,11 +223,89 @@ impl BasicScheduler {
}
}

impl Drop for BasicScheduler {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};

core.enter(|mut core, context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
context.spawner.shared.owned.close_and_shutdown_all();

// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.pop_task() {
drop(task);
}

// Drain remote queue and set it to None
let remote_queue = core.spawner.shared.queue.lock().take();

// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
if let Some(remote_queue) = remote_queue {
for entry in remote_queue {
match entry {
RemoteMsg::Schedule(task) => {
drop(task);
}
}
}
}

assert!(context.spawner.shared.owned.is_empty());

// Submit metrics
core.metrics.submit(&core.spawner.shared.worker_metrics);

(core, ())
});
}
}

impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
}

// ===== impl Core =====

impl Core {
fn pop_task(&mut self) -> Option<task::Notified<Arc<Shared>>> {
let ret = self.tasks.pop_front();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
ret
}

fn push_task(&mut self, task: task::Notified<Arc<Shared>>) {
self.tasks.push_back(task);
self.metrics.inc_local_schedule_count();
self.spawner
.shared
.worker_metrics
.set_queue_depth(self.tasks.len());
}
}

// ===== impl Context =====

impl Context {
/// Execute the closure with the given scheduler core stored in the
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.stats.incr_poll_count();
core.metrics.incr_poll_count();
self.enter(core, || crate::coop::budget(f))
}

Expand All @@ -244,15 +326,15 @@ impl Context {
// instead of parking the thread
if core.tasks.is_empty() {
// Park until the thread is signaled
core.stats.about_to_park();
core.stats.submit(&core.spawner.shared.stats);
core.metrics.about_to_park();
core.metrics.submit(&core.spawner.shared.worker_metrics);

let (c, _) = self.enter(core, || {
driver.park().expect("failed to park");
});

core = c;
core.stats.returned_from_park();
core.metrics.returned_from_park();
}

if let Some(f) = &self.spawner.shared.after_unpark {
Expand All @@ -271,7 +353,7 @@ impl Context {
fn park_yield(&self, mut core: Box<Core>) -> Box<Core> {
let mut driver = core.driver.take().expect("driver missing");

core.stats.submit(&core.spawner.shared.stats);
core.metrics.submit(&core.spawner.shared.worker_metrics);
let (mut core, _) = self.enter(core, || {
driver
.park_timeout(Duration::from_millis(0))
Expand All @@ -297,57 +379,6 @@ impl Context {
}
}

impl Drop for BasicScheduler {
fn drop(&mut self) {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let core = match self.take_core() {
Some(core) => core,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};

core.enter(|mut core, context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
context.spawner.shared.owned.close_and_shutdown_all();

// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.tasks.pop_front() {
drop(task);
}

// Drain remote queue and set it to None
let remote_queue = core.spawner.shared.queue.lock().take();

// Using `Option::take` to replace the shared queue with `None`.
// We already shut down every task, so we just need to drop the task.
if let Some(remote_queue) = remote_queue {
for entry in remote_queue {
match entry {
RemoteMsg::Schedule(task) => {
drop(task);
}
}
}
}

assert!(context.spawner.shared.owned.is_empty());

(core, ())
});
}
}

impl fmt::Debug for BasicScheduler {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("BasicScheduler").finish()
}
}

// ===== impl Spawner =====

impl Spawner {
Expand All @@ -366,10 +397,6 @@ impl Spawner {
handle
}

pub(crate) fn stats(&self) -> &RuntimeStats {
&self.shared.stats
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand All @@ -390,6 +417,28 @@ impl Spawner {
}
}

cfg_metrics! {
impl Spawner {
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
&self.shared.scheduler_metrics
}

pub(crate) fn remote_queue_depth(&self) -> usize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible for a user to contend this by accident?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, yes w/ the current-thread runtime, but it isn't inherent. The injection queue should be improved in a later PR.

// TODO: avoid having to lock. The multi-threaded injection queue
// could probably be used here.
self.shared.queue.lock()
.as_ref()
.map(|queue| queue.len())
.unwrap_or(0)
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the assert here? Is there a better way to express this invariant maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index is a low level API. Do you have a suggestion for a better way to express it?

&self.shared.worker_metrics
}
}
}

impl fmt::Debug for Spawner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Spawner").finish()
Expand All @@ -411,10 +460,13 @@ impl Schedule for Arc<Shared> {
// If `None`, the runtime is shutting down, so there is no need
// to schedule the task.
if let Some(core) = core.as_mut() {
core.tasks.push_back(task);
core.push_task(task);
}
}
_ => {
// Track that a task was scheduled from **outside** of the runtime.
self.scheduler_metrics.inc_remote_schedule_count();

// If the queue is None, then the runtime has shut down. We
// don't need to do anything with the notification in that case.
let mut guard = self.queue.lock();
Expand Down Expand Up @@ -460,7 +512,9 @@ impl CoreGuard<'_> {

'outer: loop {
if core.spawner.reset_woken() {
let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx));
let (c, res) = context.enter(core, || {
crate::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;

Expand Down
20 changes: 12 additions & 8 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ impl Handle {
context::try_current()
}

cfg_stats! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
self.spawner.stats()
}
}

/// Spawns a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
Expand Down Expand Up @@ -327,6 +319,18 @@ impl Handle {
}
}

cfg_metrics! {
use crate::runtime::RuntimeMetrics;

impl Handle {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> RuntimeMetrics {
RuntimeMetrics::new(self.clone())
}
}
}

/// Error returned by `try_current` when no Runtime has been started
#[derive(Debug)]
pub struct TryCurrentError {
Expand Down
Loading