-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
runtime: expand on runtime metrics #4373
Changes from all commits
7c71ee1
82d4467
5895905
5bff844
19de22d
ea4a303
fc8f28a
b60a16a
b0098f0
852406b
7ed15d1
66cf82a
b62f724
2bae9e8
24ebd65
5f58cf6
f1d0dcf
0192ace
6ea5437
61f1621
43f58f3
47ef235
db9e139
3fe79b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,9 @@ use crate::loom::sync::Mutex; | |
use crate::park::{Park, Unpark}; | ||
use crate::runtime::context::EnterGuard; | ||
use crate::runtime::driver::Driver; | ||
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; | ||
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; | ||
use crate::runtime::Callback; | ||
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; | ||
use crate::sync::notify::Notify; | ||
use crate::util::atomic_cell::AtomicCell; | ||
use crate::util::{waker_ref, Wake, WakerRef}; | ||
|
@@ -56,8 +56,8 @@ struct Core { | |
/// The driver is removed before starting to park the thread | ||
driver: Option<Driver>, | ||
|
||
/// Stats batcher | ||
stats: WorkerStatsBatcher, | ||
/// Metrics batch | ||
metrics: MetricsBatch, | ||
} | ||
|
||
#[derive(Clone)] | ||
|
@@ -98,8 +98,11 @@ struct Shared { | |
/// Callback for a worker unparking itself | ||
after_unpark: Option<Callback>, | ||
|
||
/// Keeps track of various runtime stats. | ||
stats: RuntimeStats, | ||
/// Keeps track of various runtime metrics. | ||
scheduler_metrics: SchedulerMetrics, | ||
|
||
/// This scheduler only has one worker. | ||
worker_metrics: WorkerMetrics, | ||
} | ||
|
||
/// Thread-local context. | ||
|
@@ -143,7 +146,8 @@ impl BasicScheduler { | |
woken: AtomicBool::new(false), | ||
before_park, | ||
after_unpark, | ||
stats: RuntimeStats::new(1), | ||
scheduler_metrics: SchedulerMetrics::new(), | ||
worker_metrics: WorkerMetrics::new(), | ||
}), | ||
}; | ||
|
||
|
@@ -152,7 +156,7 @@ impl BasicScheduler { | |
spawner: spawner.clone(), | ||
tick: 0, | ||
driver: Some(driver), | ||
stats: WorkerStatsBatcher::new(0), | ||
metrics: MetricsBatch::new(), | ||
}))); | ||
|
||
BasicScheduler { | ||
|
@@ -219,11 +223,89 @@ impl BasicScheduler { | |
} | ||
} | ||
|
||
impl Drop for BasicScheduler { | ||
fn drop(&mut self) { | ||
// Avoid a double panic if we are currently panicking and | ||
// the lock may be poisoned. | ||
|
||
let core = match self.take_core() { | ||
Some(core) => core, | ||
None if std::thread::panicking() => return, | ||
None => panic!("Oh no! We never placed the Core back, this is a bug!"), | ||
}; | ||
|
||
core.enter(|mut core, context| { | ||
// Drain the OwnedTasks collection. This call also closes the | ||
// collection, ensuring that no tasks are ever pushed after this | ||
// call returns. | ||
context.spawner.shared.owned.close_and_shutdown_all(); | ||
|
||
// Drain local queue | ||
// We already shut down every task, so we just need to drop the task. | ||
while let Some(task) = core.pop_task() { | ||
drop(task); | ||
} | ||
|
||
// Drain remote queue and set it to None | ||
let remote_queue = core.spawner.shared.queue.lock().take(); | ||
|
||
// Using `Option::take` to replace the shared queue with `None`. | ||
// We already shut down every task, so we just need to drop the task. | ||
if let Some(remote_queue) = remote_queue { | ||
for entry in remote_queue { | ||
match entry { | ||
RemoteMsg::Schedule(task) => { | ||
drop(task); | ||
} | ||
} | ||
} | ||
} | ||
|
||
assert!(context.spawner.shared.owned.is_empty()); | ||
|
||
// Submit metrics | ||
core.metrics.submit(&core.spawner.shared.worker_metrics); | ||
|
||
(core, ()) | ||
}); | ||
} | ||
} | ||
|
||
impl fmt::Debug for BasicScheduler { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
fmt.debug_struct("BasicScheduler").finish() | ||
} | ||
} | ||
|
||
// ===== impl Core ===== | ||
|
||
impl Core { | ||
fn pop_task(&mut self) -> Option<task::Notified<Arc<Shared>>> { | ||
let ret = self.tasks.pop_front(); | ||
self.spawner | ||
.shared | ||
.worker_metrics | ||
.set_queue_depth(self.tasks.len()); | ||
ret | ||
} | ||
|
||
fn push_task(&mut self, task: task::Notified<Arc<Shared>>) { | ||
self.tasks.push_back(task); | ||
self.metrics.inc_local_schedule_count(); | ||
self.spawner | ||
.shared | ||
.worker_metrics | ||
.set_queue_depth(self.tasks.len()); | ||
} | ||
} | ||
|
||
// ===== impl Context ===== | ||
|
||
impl Context { | ||
/// Execute the closure with the given scheduler core stored in the | ||
/// thread-local context. | ||
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) { | ||
core.stats.incr_poll_count(); | ||
core.metrics.incr_poll_count(); | ||
self.enter(core, || crate::coop::budget(f)) | ||
} | ||
|
||
|
@@ -244,15 +326,15 @@ impl Context { | |
// instead of parking the thread | ||
if core.tasks.is_empty() { | ||
// Park until the thread is signaled | ||
core.stats.about_to_park(); | ||
core.stats.submit(&core.spawner.shared.stats); | ||
core.metrics.about_to_park(); | ||
core.metrics.submit(&core.spawner.shared.worker_metrics); | ||
|
||
let (c, _) = self.enter(core, || { | ||
driver.park().expect("failed to park"); | ||
}); | ||
|
||
core = c; | ||
core.stats.returned_from_park(); | ||
core.metrics.returned_from_park(); | ||
} | ||
|
||
if let Some(f) = &self.spawner.shared.after_unpark { | ||
|
@@ -271,7 +353,7 @@ impl Context { | |
fn park_yield(&self, mut core: Box<Core>) -> Box<Core> { | ||
let mut driver = core.driver.take().expect("driver missing"); | ||
|
||
core.stats.submit(&core.spawner.shared.stats); | ||
core.metrics.submit(&core.spawner.shared.worker_metrics); | ||
let (mut core, _) = self.enter(core, || { | ||
driver | ||
.park_timeout(Duration::from_millis(0)) | ||
|
@@ -297,57 +379,6 @@ impl Context { | |
} | ||
} | ||
|
||
impl Drop for BasicScheduler { | ||
fn drop(&mut self) { | ||
// Avoid a double panic if we are currently panicking and | ||
// the lock may be poisoned. | ||
|
||
let core = match self.take_core() { | ||
Some(core) => core, | ||
None if std::thread::panicking() => return, | ||
None => panic!("Oh no! We never placed the Core back, this is a bug!"), | ||
}; | ||
|
||
core.enter(|mut core, context| { | ||
// Drain the OwnedTasks collection. This call also closes the | ||
// collection, ensuring that no tasks are ever pushed after this | ||
// call returns. | ||
context.spawner.shared.owned.close_and_shutdown_all(); | ||
|
||
// Drain local queue | ||
// We already shut down every task, so we just need to drop the task. | ||
while let Some(task) = core.tasks.pop_front() { | ||
drop(task); | ||
} | ||
|
||
// Drain remote queue and set it to None | ||
let remote_queue = core.spawner.shared.queue.lock().take(); | ||
|
||
// Using `Option::take` to replace the shared queue with `None`. | ||
// We already shut down every task, so we just need to drop the task. | ||
if let Some(remote_queue) = remote_queue { | ||
for entry in remote_queue { | ||
match entry { | ||
RemoteMsg::Schedule(task) => { | ||
drop(task); | ||
} | ||
} | ||
} | ||
} | ||
|
||
assert!(context.spawner.shared.owned.is_empty()); | ||
|
||
(core, ()) | ||
}); | ||
} | ||
} | ||
|
||
impl fmt::Debug for BasicScheduler { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
fmt.debug_struct("BasicScheduler").finish() | ||
} | ||
} | ||
|
||
// ===== impl Spawner ===== | ||
|
||
impl Spawner { | ||
|
@@ -366,10 +397,6 @@ impl Spawner { | |
handle | ||
} | ||
|
||
pub(crate) fn stats(&self) -> &RuntimeStats { | ||
&self.shared.stats | ||
} | ||
|
||
fn pop(&self) -> Option<RemoteMsg> { | ||
match self.shared.queue.lock().as_mut() { | ||
Some(queue) => queue.pop_front(), | ||
|
@@ -390,6 +417,28 @@ impl Spawner { | |
} | ||
} | ||
|
||
cfg_metrics! { | ||
impl Spawner { | ||
pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { | ||
&self.shared.scheduler_metrics | ||
} | ||
|
||
pub(crate) fn injection_queue_depth(&self) -> usize { | ||
// TODO: avoid having to lock. The multi-threaded injection queue | ||
// could probably be used here. | ||
self.shared.queue.lock() | ||
.as_ref() | ||
.map(|queue| queue.len()) | ||
.unwrap_or(0) | ||
} | ||
|
||
pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { | ||
assert_eq!(0, worker); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the assert here? Is there a better way to express this invariant maybe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The index is a low level API. Do you have a suggestion for a better way to express it? |
||
&self.shared.worker_metrics | ||
} | ||
} | ||
} | ||
|
||
impl fmt::Debug for Spawner { | ||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
fmt.debug_struct("Spawner").finish() | ||
|
@@ -411,10 +460,13 @@ impl Schedule for Arc<Shared> { | |
// If `None`, the runtime is shutting down, so there is no need | ||
// to schedule the task. | ||
if let Some(core) = core.as_mut() { | ||
core.tasks.push_back(task); | ||
core.push_task(task); | ||
} | ||
} | ||
_ => { | ||
// Track that a task was scheduled from **outside** of the runtime. | ||
self.scheduler_metrics.inc_remote_schedule_count(); | ||
|
||
// If the queue is None, then the runtime has shut down. We | ||
// don't need to do anything with the notification in that case. | ||
let mut guard = self.queue.lock(); | ||
|
@@ -460,7 +512,9 @@ impl CoreGuard<'_> { | |
|
||
'outer: loop { | ||
if core.spawner.reset_woken() { | ||
let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx)); | ||
let (c, res) = context.enter(core, || { | ||
crate::coop::budget(|| future.as_mut().poll(&mut cx)) | ||
}); | ||
|
||
core = c; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why shouldn't this continue to be a feature flag? For machines without 64-bit atomics, the stats involve locking a bunch of mutexes quite a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metrics
we will need to change the feature flag. While unstable, I'm not worried about it since technically adding a feature flag is part of the public API.