diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 2a88b766c0a..5aa5c399d27 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -46,7 +46,6 @@ io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] macros = ["tokio-macros"] -stats = [] net = [ "libc", "mio/os-poll", @@ -85,6 +84,11 @@ sync = [] test-util = ["rt", "sync", "time"] time = [] +# Technically, removing this is a breaking change even though it only ever did +# anything with the unstable flag on. It is probably safe to get rid of it after +# a few releases. +stats = [] + [dependencies] tokio-macros = { version = "1.7.0", path = "../tokio-macros", optional = true } diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 01878524e13..35295d837a6 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -346,7 +346,6 @@ //! `RUSTFLAGS="--cfg tokio_unstable"`. //! //! - `tracing`: Enables tracing events. -//! - `stats`: Enables runtime stats collection. ([RFC](https://github.com/tokio-rs/tokio/pull/3845)) //! //! [feature flags]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9fa30ca27d6..3b8fd3a7890 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -174,20 +174,22 @@ macro_rules! cfg_macros { } } -macro_rules! cfg_stats { +macro_rules! cfg_metrics { ($($item:item)*) => { $( - #[cfg(all(tokio_unstable, feature = "stats"))] - #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "stats"))))] + // For now, metrics is always enabled. When stabilized, it might + // have a dedicated feature flag. + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] $item )* } } -macro_rules! cfg_not_stats { +macro_rules! cfg_not_metrics { ($($item:item)*) => { $( - #[cfg(not(all(tokio_unstable, feature = "stats")))] + #[cfg(not(tokio_unstable))] $item )* } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index f70fa656925..00193329137 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -4,9 +4,9 @@ use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; use crate::runtime::driver::Driver; -use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::Callback; +use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; use crate::util::{waker_ref, Wake, WakerRef}; @@ -56,8 +56,8 @@ struct Core { /// The driver is removed before starting to park the thread driver: Option, - /// Stats batcher - stats: WorkerStatsBatcher, + /// Metrics batch + metrics: MetricsBatch, } #[derive(Clone)] @@ -98,8 +98,11 @@ struct Shared { /// Callback for a worker unparking itself after_unpark: Option, - /// Keeps track of various runtime stats. - stats: RuntimeStats, + /// Keeps track of various runtime metrics. + scheduler_metrics: SchedulerMetrics, + + /// This scheduler only has one worker. + worker_metrics: WorkerMetrics, } /// Thread-local context. @@ -143,7 +146,8 @@ impl BasicScheduler { woken: AtomicBool::new(false), before_park, after_unpark, - stats: RuntimeStats::new(1), + scheduler_metrics: SchedulerMetrics::new(), + worker_metrics: WorkerMetrics::new(), }), }; @@ -152,7 +156,7 @@ impl BasicScheduler { spawner: spawner.clone(), tick: 0, driver: Some(driver), - stats: WorkerStatsBatcher::new(0), + metrics: MetricsBatch::new(), }))); BasicScheduler { @@ -219,11 +223,89 @@ impl BasicScheduler { } } +impl Drop for BasicScheduler { + fn drop(&mut self) { + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + + let core = match self.take_core() { + Some(core) => core, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Core back, this is a bug!"), + }; + + core.enter(|mut core, context| { + // Drain the OwnedTasks collection. This call also closes the + // collection, ensuring that no tasks are ever pushed after this + // call returns. + context.spawner.shared.owned.close_and_shutdown_all(); + + // Drain local queue + // We already shut down every task, so we just need to drop the task. + while let Some(task) = core.pop_task() { + drop(task); + } + + // Drain remote queue and set it to None + let remote_queue = core.spawner.shared.queue.lock().take(); + + // Using `Option::take` to replace the shared queue with `None`. + // We already shut down every task, so we just need to drop the task. + if let Some(remote_queue) = remote_queue { + for entry in remote_queue { + match entry { + RemoteMsg::Schedule(task) => { + drop(task); + } + } + } + } + + assert!(context.spawner.shared.owned.is_empty()); + + // Submit metrics + core.metrics.submit(&core.spawner.shared.worker_metrics); + + (core, ()) + }); + } +} + +impl fmt::Debug for BasicScheduler { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("BasicScheduler").finish() + } +} + +// ===== impl Core ===== + +impl Core { + fn pop_task(&mut self) -> Option>> { + let ret = self.tasks.pop_front(); + self.spawner + .shared + .worker_metrics + .set_queue_depth(self.tasks.len()); + ret + } + + fn push_task(&mut self, task: task::Notified>) { + self.tasks.push_back(task); + self.metrics.inc_local_schedule_count(); + self.spawner + .shared + .worker_metrics + .set_queue_depth(self.tasks.len()); + } +} + +// ===== impl Context ===== + impl Context { /// Execute the closure with the given scheduler core stored in the /// thread-local context. fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { - core.stats.incr_poll_count(); + core.metrics.incr_poll_count(); self.enter(core, || crate::coop::budget(f)) } @@ -244,15 +326,15 @@ impl Context { // instead of parking the thread if core.tasks.is_empty() { // Park until the thread is signaled - core.stats.about_to_park(); - core.stats.submit(&core.spawner.shared.stats); + core.metrics.about_to_park(); + core.metrics.submit(&core.spawner.shared.worker_metrics); let (c, _) = self.enter(core, || { driver.park().expect("failed to park"); }); core = c; - core.stats.returned_from_park(); + core.metrics.returned_from_park(); } if let Some(f) = &self.spawner.shared.after_unpark { @@ -271,7 +353,7 @@ impl Context { fn park_yield(&self, mut core: Box) -> Box { let mut driver = core.driver.take().expect("driver missing"); - core.stats.submit(&core.spawner.shared.stats); + core.metrics.submit(&core.spawner.shared.worker_metrics); let (mut core, _) = self.enter(core, || { driver .park_timeout(Duration::from_millis(0)) @@ -297,57 +379,6 @@ impl Context { } } -impl Drop for BasicScheduler { - fn drop(&mut self) { - // Avoid a double panic if we are currently panicking and - // the lock may be poisoned. - - let core = match self.take_core() { - Some(core) => core, - None if std::thread::panicking() => return, - None => panic!("Oh no! We never placed the Core back, this is a bug!"), - }; - - core.enter(|mut core, context| { - // Drain the OwnedTasks collection. This call also closes the - // collection, ensuring that no tasks are ever pushed after this - // call returns. - context.spawner.shared.owned.close_and_shutdown_all(); - - // Drain local queue - // We already shut down every task, so we just need to drop the task. - while let Some(task) = core.tasks.pop_front() { - drop(task); - } - - // Drain remote queue and set it to None - let remote_queue = core.spawner.shared.queue.lock().take(); - - // Using `Option::take` to replace the shared queue with `None`. - // We already shut down every task, so we just need to drop the task. - if let Some(remote_queue) = remote_queue { - for entry in remote_queue { - match entry { - RemoteMsg::Schedule(task) => { - drop(task); - } - } - } - } - - assert!(context.spawner.shared.owned.is_empty()); - - (core, ()) - }); - } -} - -impl fmt::Debug for BasicScheduler { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("BasicScheduler").finish() - } -} - // ===== impl Spawner ===== impl Spawner { @@ -366,10 +397,6 @@ impl Spawner { handle } - pub(crate) fn stats(&self) -> &RuntimeStats { - &self.shared.stats - } - fn pop(&self) -> Option { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), @@ -390,6 +417,28 @@ impl Spawner { } } +cfg_metrics! { + impl Spawner { + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + // TODO: avoid having to lock. The multi-threaded injection queue + // could probably be used here. + self.shared.queue.lock() + .as_ref() + .map(|queue| queue.len()) + .unwrap_or(0) + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + assert_eq!(0, worker); + &self.shared.worker_metrics + } + } +} + impl fmt::Debug for Spawner { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Spawner").finish() @@ -411,10 +460,13 @@ impl Schedule for Arc { // If `None`, the runtime is shutting down, so there is no need // to schedule the task. if let Some(core) = core.as_mut() { - core.tasks.push_back(task); + core.push_task(task); } } _ => { + // Track that a task was scheduled from **outside** of the runtime. + self.scheduler_metrics.inc_remote_schedule_count(); + // If the queue is None, then the runtime has shut down. We // don't need to do anything with the notification in that case. let mut guard = self.queue.lock(); @@ -460,7 +512,9 @@ impl CoreGuard<'_> { 'outer: loop { if core.spawner.reset_woken() { - let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx)); + let (c, res) = context.enter(core, || { + crate::coop::budget(|| future.as_mut().poll(&mut cx)) + }); core = c; diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 612205cccfa..3481a2552f3 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -126,14 +126,6 @@ impl Handle { context::try_current() } - cfg_stats! { - /// Returns a view that lets you get information about how the runtime - /// is performing. - pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats { - self.spawner.stats() - } - } - /// Spawns a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a @@ -327,6 +319,18 @@ impl Handle { } } +cfg_metrics! { + use crate::runtime::RuntimeMetrics; + + impl Handle { + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn metrics(&self) -> RuntimeMetrics { + RuntimeMetrics::new(self.clone()) + } + } +} + /// Error returned by `try_current` when no Runtime has been started #[derive(Debug)] pub struct TryCurrentError { diff --git a/tokio/src/runtime/metrics/batch.rs b/tokio/src/runtime/metrics/batch.rs new file mode 100644 index 00000000000..f1c3fa6b747 --- /dev/null +++ b/tokio/src/runtime/metrics/batch.rs @@ -0,0 +1,105 @@ +use crate::runtime::WorkerMetrics; + +use std::convert::TryFrom; +use std::sync::atomic::Ordering::Relaxed; +use std::time::Instant; + +pub(crate) struct MetricsBatch { + /// Number of times the worker parked. + park_count: u64, + + /// Number of times the worker woke w/o doing work. + noop_count: u64, + + /// Number of times stolen. + steal_count: u64, + + /// Number of tasks that were polled by the worker. + poll_count: u64, + + /// Number of tasks polled when the worker entered park. This is used to + /// track the noop count. + poll_count_on_last_park: u64, + + /// Number of tasks that were scheduled locally on this worker. + local_schedule_count: u64, + + /// Number of tasks moved to the global queue to make space in the local + /// queue + overflow_count: u64, + + /// The total busy duration in nanoseconds. + busy_duration_total: u64, + last_resume_time: Instant, +} + +impl MetricsBatch { + pub(crate) fn new() -> MetricsBatch { + MetricsBatch { + park_count: 0, + noop_count: 0, + steal_count: 0, + poll_count: 0, + poll_count_on_last_park: 0, + local_schedule_count: 0, + overflow_count: 0, + busy_duration_total: 0, + last_resume_time: Instant::now(), + } + } + + pub(crate) fn submit(&mut self, worker: &WorkerMetrics) { + worker.park_count.store(self.park_count, Relaxed); + worker.noop_count.store(self.noop_count, Relaxed); + worker.steal_count.store(self.steal_count, Relaxed); + worker.poll_count.store(self.poll_count, Relaxed); + + worker + .busy_duration_total + .store(self.busy_duration_total, Relaxed); + + worker + .local_schedule_count + .store(self.local_schedule_count, Relaxed); + worker.overflow_count.store(self.overflow_count, Relaxed); + } + + /// The worker is about to park. + pub(crate) fn about_to_park(&mut self) { + self.park_count += 1; + + if self.poll_count_on_last_park == self.poll_count { + self.noop_count += 1; + } else { + self.poll_count_on_last_park = self.poll_count; + } + + let busy_duration = self.last_resume_time.elapsed(); + let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX); + self.busy_duration_total += busy_duration; + } + + pub(crate) fn returned_from_park(&mut self) { + self.last_resume_time = Instant::now(); + } + + pub(crate) fn inc_local_schedule_count(&mut self) { + self.local_schedule_count += 1; + } + + pub(crate) fn incr_poll_count(&mut self) { + self.poll_count += 1; + } +} + +cfg_rt_multi_thread! { + impl MetricsBatch { + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.steal_count += by as u64; + } + + pub(crate) fn incr_overflow_count(&mut self) { + self.overflow_count += 1; + } + } +} diff --git a/tokio/src/runtime/metrics/mock.rs b/tokio/src/runtime/metrics/mock.rs new file mode 100644 index 00000000000..6b9cf704f42 --- /dev/null +++ b/tokio/src/runtime/metrics/mock.rs @@ -0,0 +1,43 @@ +//! This file contains mocks of the types in src/runtime/metrics + +pub(crate) struct SchedulerMetrics {} + +pub(crate) struct WorkerMetrics {} + +pub(crate) struct MetricsBatch {} + +impl SchedulerMetrics { + pub(crate) fn new() -> Self { + Self {} + } + + /// Increment the number of tasks scheduled externally + pub(crate) fn inc_remote_schedule_count(&self) {} +} + +impl WorkerMetrics { + pub(crate) fn new() -> Self { + Self {} + } + + pub(crate) fn set_queue_depth(&self, _len: usize) {} +} + +impl MetricsBatch { + pub(crate) fn new() -> Self { + Self {} + } + + pub(crate) fn submit(&mut self, _to: &WorkerMetrics) {} + pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn returned_from_park(&mut self) {} + pub(crate) fn incr_poll_count(&mut self) {} + pub(crate) fn inc_local_schedule_count(&mut self) {} +} + +cfg_rt_multi_thread! { + impl MetricsBatch { + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + pub(crate) fn incr_overflow_count(&mut self) {} + } +} diff --git a/tokio/src/runtime/metrics/mod.rs b/tokio/src/runtime/metrics/mod.rs new file mode 100644 index 00000000000..ca643a59047 --- /dev/null +++ b/tokio/src/runtime/metrics/mod.rs @@ -0,0 +1,30 @@ +//! This module contains information need to view information about how the +//! runtime is performing. +//! +//! **Note**: This is an [unstable API][unstable]. The public API of types in +//! this module may break in 1.x releases. See [the documentation on unstable +//! features][unstable] for details. +//! +//! [unstable]: crate#unstable-features +#![allow(clippy::module_inception)] + +cfg_metrics! { + mod batch; + pub(crate) use batch::MetricsBatch; + + mod runtime; + #[allow(unreachable_pub)] // rust-lang/rust#57411 + pub use runtime::RuntimeMetrics; + + mod scheduler; + pub(crate) use scheduler::SchedulerMetrics; + + mod worker; + pub(crate) use worker::WorkerMetrics; +} + +cfg_not_metrics! { + mod mock; + + pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch}; +} diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs new file mode 100644 index 00000000000..0f8055907f5 --- /dev/null +++ b/tokio/src/runtime/metrics/runtime.rs @@ -0,0 +1,449 @@ +use crate::runtime::Handle; + +use std::sync::atomic::Ordering::Relaxed; +use std::time::Duration; + +/// Handle to the runtime's metrics. +/// +/// This handle is internally reference-counted and can be freely cloned. A +/// `RuntimeMetrics` handle is obtained using the [`Runtime::metrics`] method. +/// +/// [`Runtime::metrics`]: crate::runtime::Runtime::metrics() +#[derive(Clone, Debug)] +pub struct RuntimeMetrics { + handle: Handle, +} + +impl RuntimeMetrics { + pub(crate) fn new(handle: Handle) -> RuntimeMetrics { + RuntimeMetrics { handle } + } + + /// Returns the number of worker threads used by the runtime. + /// + /// The number of workers is set by configuring `worker_threads` on + /// `runtime::Builder`. When using the `current_thread` runtime, the return + /// value is always `1`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.num_workers(); + /// println!("Runtime is using {} workers", n); + /// } + /// ``` + pub fn num_workers(&self) -> usize { + self.handle.spawner.num_workers() + } + + /// Returns the number of tasks scheduled from **outside** of the runtime. + /// + /// The remote schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **outside** of the + /// runtime. This usually means that a task is spawned or notified from a + /// non-runtime thread and must be queued using the Runtime's injection + /// queue, which tends to be slower. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.remote_schedule_count(); + /// println!("{} tasks were scheduled from outside the runtime", n); + /// } + /// ``` + pub fn remote_schedule_count(&self) -> u64 { + self.handle + .spawner + .scheduler_metrics() + .remote_schedule_count + .load(Relaxed) + } + + /// Returns the total number of times the given worker thread has parked. + /// + /// The worker park count starts at zero when the runtime is created and + /// increases by one each time the worker parks the thread waiting for new + /// inbound events to process. This usually means the worker has processed + /// all pending work and is currently idle. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_park_count(0); + /// println!("worker 0 parked {} times", n); + /// } + /// ``` + pub fn worker_park_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .park_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread unparked but + /// performed no work before parking again. + /// + /// The worker no-op count starts at zero when the runtime is created and + /// increases by one each time the worker unparks the thread but finds no + /// new work and goes back to sleep. This indicates a false-positive wake up. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_noop_count(0); + /// println!("worker 0 had {} no-op unparks", n); + /// } + /// ``` + pub fn worker_noop_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .noop_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread stole tasks from + /// another worker thread. + /// + /// This metric only applies to the **multi-threaded** runtime and will always return `0` when using the current thread runtime. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by one each time the worker has processed its scheduled queue + /// and successfully steals more pending tasks from another worker. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_noop_count(0); + /// println!("worker 0 has stolen tasks {} times", n); + /// } + /// ``` + pub fn worker_steal_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .steal_count + .load(Relaxed) + } + + /// Returns the number of tasks the given worker thread has polled. + /// + /// The worker poll count starts at zero when the runtime is created and + /// increases by one each time the worker polls a scheduled task. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_poll_count(0); + /// println!("worker 0 has polled {} tasks", n); + /// } + /// ``` + pub fn worker_poll_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .poll_count + .load(Relaxed) + } + + /// Returns the amount of time the given worker thread has been busy. + /// + /// The worker busy duration starts at zero when the runtime is created and + /// increases whenever the worker is spending time processing work. Using + /// this value can indicate the load of the given worker. If a lot of time + /// is spent busy, then the worker is under load and will check for inbound + /// events less often. + /// + /// The timer is monotonically increasing. It is never decremented or reset + /// to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_poll_count(0); + /// println!("worker 0 has polled {} tasks", n); + /// } + /// ``` + pub fn worker_total_busy_duration(&self, worker: usize) -> Duration { + let nanos = self + .handle + .spawner + .worker_metrics(worker) + .busy_duration_total + .load(Relaxed); + Duration::from_nanos(nanos) + } + + /// Returns the number of tasks scheduled from **within** the runtime on the + /// given worker's local queue. + /// + /// The local schedule count starts at zero when the runtime is created and + /// increases by one each time a task is woken from **inside** of the + /// runtime on the given worker. This usually means that a task is spawned + /// or notified from within a runtime thread and will be queued on the + /// worker-local queue. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_local_schedule_count(0); + /// println!("{} tasks were scheduled on the worker's local queue", n); + /// } + /// ``` + pub fn worker_local_schedule_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .local_schedule_count + .load(Relaxed) + } + + /// Returns the number of times the given worker thread saturated its local + /// queue. + /// + /// This metric only applies to the **multi-threaded** scheduler. + /// + /// The worker steal count starts at zero when the runtime is created and + /// increases by one each time the worker attempts to schedule a task + /// locally, but its local queue is full. When this happens, half of the + /// local queue is moved to the injection queue. + /// + /// The counter is monotonically increasing. It is never decremented or + /// reset to zero. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_overflow_count(0); + /// println!("worker 0 has overflowed its queue {} times", n); + /// } + /// ``` + pub fn worker_overflow_count(&self, worker: usize) -> u64 { + self.handle + .spawner + .worker_metrics(worker) + .overflow_count + .load(Relaxed) + } + + /// Returns the number of tasks currently scheduled in the runtime's + /// injection queue. + /// + /// Tasks that are spanwed or notified from a non-runtime thread are + /// scheduled using the runtime's injection queue. This metric returns the + /// **current** number of tasks pending in the injection queue. As such, the + /// returned value may increase or decrease as new tasks are scheduled and + /// processed. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.injection_queue_depth(); + /// println!("{} tasks currently pending in the runtime's injection queue", n); + /// } + /// ``` + pub fn injection_queue_depth(&self) -> usize { + self.handle.spawner.injection_queue_depth() + } + + /// Returns the number of tasks currently scheduled in the given worker's + /// local queue. + /// + /// Tasks that are spawned or notified from within a runtime thread are + /// scheduled using that worker's local queue. This metric returns the + /// **current** number of tasks pending in the worker's local queue. As + /// such, the returned value may increase or decrease as new tasks are + /// scheduled and processed. + /// + /// # Arguments + /// + /// `worker` is the index of the worker being queried. The given value must + /// be between 0 and `num_workers()`. The index uniquely identifies a single + /// worker and will continue to indentify the worker throughout the lifetime + /// of the runtime instance. + /// + /// # Panics + /// + /// The method panics when `worker` represents an invalid worker, i.e. is + /// greater than or equal to `num_workers()`. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let metrics = Handle::current().metrics(); + /// + /// let n = metrics.worker_local_queue_depth(0); + /// println!("{} tasks currently pending in worker 0's local queue", n); + /// } + /// ``` + pub fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.handle.spawner.worker_local_queue_depth(worker) + } +} diff --git a/tokio/src/runtime/metrics/scheduler.rs b/tokio/src/runtime/metrics/scheduler.rs new file mode 100644 index 00000000000..d1ba3b64420 --- /dev/null +++ b/tokio/src/runtime/metrics/scheduler.rs @@ -0,0 +1,27 @@ +use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; + +/// Retrieves metrics from the Tokio runtime. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[derive(Debug)] +pub(crate) struct SchedulerMetrics { + /// Number of tasks that are scheduled from outside the runtime. + pub(super) remote_schedule_count: AtomicU64, +} + +impl SchedulerMetrics { + pub(crate) fn new() -> SchedulerMetrics { + SchedulerMetrics { + remote_schedule_count: AtomicU64::new(0), + } + } + + /// Increment the number of tasks scheduled externally + pub(crate) fn inc_remote_schedule_count(&self) { + self.remote_schedule_count.fetch_add(1, Relaxed); + } +} diff --git a/tokio/src/runtime/metrics/worker.rs b/tokio/src/runtime/metrics/worker.rs new file mode 100644 index 00000000000..ec59ce04730 --- /dev/null +++ b/tokio/src/runtime/metrics/worker.rs @@ -0,0 +1,61 @@ +use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::{AtomicU64, AtomicUsize}; + +/// Retreive runtime worker metrics. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// [unstable]: crate#unstable-features +#[derive(Debug)] +#[repr(align(128))] +pub(crate) struct WorkerMetrics { + /// Number of times the worker parked. + pub(crate) park_count: AtomicU64, + + /// Number of times the worker woke then parked again without doing work. + pub(crate) noop_count: AtomicU64, + + /// Number of times the worker attempted to steal. + pub(crate) steal_count: AtomicU64, + + /// Number of tasks the worker polled. + pub(crate) poll_count: AtomicU64, + + /// Amount of time the worker spent doing work vs. parking. + pub(crate) busy_duration_total: AtomicU64, + + /// Number of tasks scheduled for execution on the worker's local queue. + pub(crate) local_schedule_count: AtomicU64, + + /// Number of tasks moved from the local queue to the global queue to free space. + pub(crate) overflow_count: AtomicU64, + + /// Number of tasks currently in the local queue. Used only by the + /// current-thread scheduler. + pub(crate) queue_depth: AtomicUsize, +} + +impl WorkerMetrics { + pub(crate) fn new() -> WorkerMetrics { + WorkerMetrics { + park_count: AtomicU64::new(0), + noop_count: AtomicU64::new(0), + steal_count: AtomicU64::new(0), + poll_count: AtomicU64::new(0), + overflow_count: AtomicU64::new(0), + busy_duration_total: AtomicU64::new(0), + local_schedule_count: AtomicU64::new(0), + queue_depth: AtomicUsize::new(0), + } + } + + pub(crate) fn queue_depth(&self) -> usize { + self.queue_depth.load(Relaxed) + } + + pub(crate) fn set_queue_depth(&self, len: usize) { + self.queue_depth.store(len, Relaxed); + } +} diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index e77c5e3a0f8..c8d97e1b19a 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -181,11 +181,16 @@ pub(crate) mod enter; pub(crate) mod task; -cfg_stats! { - pub mod stats; +cfg_metrics! { + mod metrics; + pub use metrics::RuntimeMetrics; + + pub(crate) use metrics::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; } -cfg_not_stats! { - pub(crate) mod stats; + +cfg_not_metrics! { + pub(crate) mod metrics; + pub(crate) use metrics::{SchedulerMetrics, WorkerMetrics, MetricsBatch}; } cfg_rt! { @@ -597,4 +602,13 @@ cfg_rt! { } } } + + cfg_metrics! { + impl Runtime { + /// TODO + pub fn metrics(&self) -> RuntimeMetrics { + self.handle.metrics() + } + } + } } diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index a88dffcaf5d..ad9085a6545 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -3,8 +3,8 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; use crate::loom::sync::Arc; -use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject}; +use crate::runtime::MetricsBatch; use std::mem::MaybeUninit; use std::ptr; @@ -102,7 +102,12 @@ impl Local { } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back(&mut self, mut task: task::Notified, inject: &Inject) { + pub(super) fn push_back( + &mut self, + mut task: task::Notified, + inject: &Inject, + metrics: &mut MetricsBatch, + ) { let tail = loop { let head = self.inner.head.load(Acquire); let (steal, real) = unpack(head); @@ -121,7 +126,7 @@ impl Local { } else { // Push the current task and half of the queue into the // inject queue. - match self.push_overflow(task, real, tail, inject) { + match self.push_overflow(task, real, tail, inject, metrics) { Ok(_) => return, // Lost the race, try again Err(v) => { @@ -163,6 +168,7 @@ impl Local { head: u16, tail: u16, inject: &Inject, + metrics: &mut MetricsBatch, ) -> Result<(), task::Notified> { /// How many elements are we taking from the local queue. /// @@ -246,6 +252,9 @@ impl Local { }; inject.push_batch(batch_iter.chain(std::iter::once(task))); + // Add 1 to factor in the task currently being scheduled. + metrics.incr_overflow_count(); + Ok(()) } @@ -300,7 +309,7 @@ impl Steal { pub(super) fn steal_into( &self, dst: &mut Local, - stats: &mut WorkerStatsBatcher, + dst_metrics: &mut MetricsBatch, ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. @@ -320,13 +329,14 @@ impl Steal { // Steal the tasks into `dst`'s buffer. This does not yet expose the // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); - stats.incr_steal_count(n); if n == 0 { // No tasks were stolen return None; } + dst_metrics.incr_steal_count(n); + // We are returning a task here n -= 1; @@ -446,6 +456,14 @@ impl Steal { } } +cfg_metrics! { + impl Steal { + pub(crate) fn len(&self) -> usize { + self.0.len() as _ + } + } +} + impl Clone for Steal { fn clone(&self) -> Steal { Steal(self.0.clone()) @@ -461,11 +479,15 @@ impl Drop for Local { } impl Inner { - fn is_empty(&self) -> bool { + fn len(&self) -> u16 { let (_, head) = unpack(self.head.load(Acquire)); let tail = self.tail.load(Acquire); - head == tail + tail.wrapping_sub(head) + } + + fn is_empty(&self) -> bool { + self.len() == 0 } } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index 9a3d465aef9..d81a806cb59 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,6 +1,5 @@ use crate::future::Future; use crate::runtime::basic_scheduler; -use crate::runtime::stats::RuntimeStats; use crate::task::JoinHandle; cfg_rt_multi_thread! { @@ -35,13 +34,50 @@ impl Spawner { Spawner::ThreadPool(spawner) => spawner.spawn(future), } } +} - #[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))] - pub(crate) fn stats(&self) -> &RuntimeStats { - match self { - Spawner::Basic(spawner) => spawner.stats(), - #[cfg(feature = "rt-multi-thread")] - Spawner::ThreadPool(spawner) => spawner.stats(), +cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Spawner { + pub(crate) fn num_workers(&self) -> usize { + match self { + Spawner::Basic(_) => 1, + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.num_workers(), + } + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + match self { + Spawner::Basic(spawner) => spawner.scheduler_metrics(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.scheduler_metrics(), + } + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + match self { + Spawner::Basic(spawner) => spawner.worker_metrics(worker), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.worker_metrics(worker), + } + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + match self { + Spawner::Basic(spawner) => spawner.injection_queue_depth(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.injection_queue_depth(), + } + } + + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + match self { + Spawner::Basic(spawner) => spawner.worker_metrics(worker).queue_depth(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.worker_local_queue_depth(worker), + } } } } diff --git a/tokio/src/runtime/stats/mock.rs b/tokio/src/runtime/stats/mock.rs deleted file mode 100644 index 3bda8bffde6..00000000000 --- a/tokio/src/runtime/stats/mock.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! This file contains mocks of the types in src/runtime/stats/stats.rs - -pub(crate) struct RuntimeStats {} - -impl RuntimeStats { - pub(crate) fn new(_worker_threads: usize) -> Self { - Self {} - } -} - -pub(crate) struct WorkerStatsBatcher {} - -impl WorkerStatsBatcher { - pub(crate) fn new(_my_index: usize) -> Self { - Self {} - } - - pub(crate) fn submit(&mut self, _to: &RuntimeStats) {} - - pub(crate) fn about_to_park(&mut self) {} - pub(crate) fn returned_from_park(&mut self) {} - - #[cfg(feature = "rt-multi-thread")] - pub(crate) fn incr_steal_count(&mut self, _by: u16) {} - - pub(crate) fn incr_poll_count(&mut self) {} -} diff --git a/tokio/src/runtime/stats/mod.rs b/tokio/src/runtime/stats/mod.rs deleted file mode 100644 index 355e400602d..00000000000 --- a/tokio/src/runtime/stats/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! This module contains information need to view information about how the -//! runtime is performing. -//! -//! **Note**: This is an [unstable API][unstable]. The public API of types in -//! this module may break in 1.x releases. See [the documentation on unstable -//! features][unstable] for details. -//! -//! [unstable]: crate#unstable-features -#![allow(clippy::module_inception)] - -cfg_stats! { - mod stats; - - pub use self::stats::{RuntimeStats, WorkerStats}; - pub(crate) use self::stats::WorkerStatsBatcher; -} - -cfg_not_stats! { - #[path = "mock.rs"] - mod stats; - - pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher}; -} diff --git a/tokio/src/runtime/stats/stats.rs b/tokio/src/runtime/stats/stats.rs deleted file mode 100644 index 375786300e7..00000000000 --- a/tokio/src/runtime/stats/stats.rs +++ /dev/null @@ -1,134 +0,0 @@ -//! This file contains the types necessary to collect various types of stats. -use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; - -use std::convert::TryFrom; -use std::time::{Duration, Instant}; - -/// This type contains methods to retrieve stats from a Tokio runtime. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features -#[derive(Debug)] -pub struct RuntimeStats { - workers: Box<[WorkerStats]>, -} - -/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime. -/// -/// **Note**: This is an [unstable API][unstable]. The public API of this type -/// may break in 1.x releases. See [the documentation on unstable -/// features][unstable] for details. -/// -/// [unstable]: crate#unstable-features -#[derive(Debug)] -#[repr(align(128))] -pub struct WorkerStats { - park_count: AtomicU64, - steal_count: AtomicU64, - poll_count: AtomicU64, - busy_duration_total: AtomicU64, -} - -impl RuntimeStats { - pub(crate) fn new(worker_threads: usize) -> Self { - let mut workers = Vec::with_capacity(worker_threads); - for _ in 0..worker_threads { - workers.push(WorkerStats { - park_count: AtomicU64::new(0), - steal_count: AtomicU64::new(0), - poll_count: AtomicU64::new(0), - busy_duration_total: AtomicU64::new(0), - }); - } - - Self { - workers: workers.into_boxed_slice(), - } - } - - /// Returns a slice containing the worker stats for each worker thread. - pub fn workers(&self) -> impl Iterator { - self.workers.iter() - } -} - -impl WorkerStats { - /// Returns the total number of times this worker thread has parked. - pub fn park_count(&self) -> u64 { - self.park_count.load(Relaxed) - } - - /// Returns the number of tasks this worker has stolen from other worker - /// threads. - pub fn steal_count(&self) -> u64 { - self.steal_count.load(Relaxed) - } - - /// Returns the number of times this worker has polled a task. - pub fn poll_count(&self) -> u64 { - self.poll_count.load(Relaxed) - } - - /// Returns the total amount of time this worker has been busy for. - pub fn total_busy_duration(&self) -> Duration { - Duration::from_nanos(self.busy_duration_total.load(Relaxed)) - } -} - -pub(crate) struct WorkerStatsBatcher { - my_index: usize, - park_count: u64, - steal_count: u64, - poll_count: u64, - /// The total busy duration in nanoseconds. - busy_duration_total: u64, - last_resume_time: Instant, -} - -impl WorkerStatsBatcher { - pub(crate) fn new(my_index: usize) -> Self { - Self { - my_index, - park_count: 0, - steal_count: 0, - poll_count: 0, - busy_duration_total: 0, - last_resume_time: Instant::now(), - } - } - pub(crate) fn submit(&mut self, to: &RuntimeStats) { - let worker = &to.workers[self.my_index]; - - worker.park_count.store(self.park_count, Relaxed); - worker.steal_count.store(self.steal_count, Relaxed); - worker.poll_count.store(self.poll_count, Relaxed); - - worker - .busy_duration_total - .store(self.busy_duration_total, Relaxed); - } - - pub(crate) fn about_to_park(&mut self) { - self.park_count += 1; - - let busy_duration = self.last_resume_time.elapsed(); - let busy_duration = u64::try_from(busy_duration.as_nanos()).unwrap_or(u64::MAX); - self.busy_duration_total += busy_duration; - } - - pub(crate) fn returned_from_park(&mut self) { - self.last_resume_time = Instant::now(); - } - - #[cfg(feature = "rt-multi-thread")] - pub(crate) fn incr_steal_count(&mut self, by: u16) { - self.steal_count += u64::from(by); - } - - pub(crate) fn incr_poll_count(&mut self) { - self.poll_count += 1; - } -} diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index 2cbb0a18b53..b5f78d7ebe7 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,7 +1,6 @@ use crate::runtime::blocking::NoopSchedule; -use crate::runtime::queue; -use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::Inject; +use crate::runtime::{queue, MetricsBatch}; use loom::thread; @@ -10,14 +9,15 @@ fn basic() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -34,7 +34,7 @@ fn basic() { for _ in 0..2 { for _ in 0..2 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } if local.pop().is_some() { @@ -43,7 +43,7 @@ fn basic() { // Push another task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); while local.pop().is_some() { n += 1; @@ -65,13 +65,14 @@ fn steal_overflow() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -86,7 +87,7 @@ fn steal_overflow() { // push a task, pop a task let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); if local.pop().is_some() { n += 1; @@ -94,7 +95,7 @@ fn steal_overflow() { for _ in 0..6 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } n += th.join().unwrap(); @@ -116,10 +117,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal) -> usize { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local, &mut stats).is_none() { + if steal.steal_into(&mut local, &mut metrics).is_none() { return 0; } @@ -135,11 +136,12 @@ fn multi_stealer() { loom::model(|| { let (steal, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); // Push work for _ in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } let th1 = { @@ -169,7 +171,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -177,17 +179,17 @@ fn chained_steal() { // Load up some tasks for _ in 0..4 { let (task, _) = super::unowned(async {}); - l1.push_back(task, &inject); + l1.push_back(task, &inject, &mut metrics); let (task, _) = super::unowned(async {}); - l2.push_back(task, &inject); + l2.push_back(task, &inject, &mut metrics); } // Spawn a task to steal from **our** queue let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); - s1.steal_into(&mut local, &mut stats); + s1.steal_into(&mut local, &mut metrics); while local.pop().is_some() {} }); @@ -195,7 +197,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1, &mut stats); + s2.steal_into(&mut l1, &mut metrics); th.join().unwrap(); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 47f1b01d6a6..2de17b9f4c4 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,18 +1,39 @@ use crate::runtime::queue; -use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::MetricsBatch; use std::thread; use std::time::Duration; +#[allow(unused)] +macro_rules! assert_metrics { + ($metrics:ident, $field:ident == $v:expr) => {{ + use crate::runtime::WorkerMetrics; + use std::sync::atomic::Ordering::Relaxed; + + let worker = WorkerMetrics::new(); + $metrics.submit(&worker); + + let expect = $v; + let actual = worker.$field.load(Relaxed); + + assert!(actual == expect, "expect = {}; actual = {}", expect, actual) + }}; +} + #[test] fn fits_256() { let (_, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); for _ in 0..256 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 0); } assert!(inject.pop().is_none()); @@ -24,10 +45,15 @@ fn fits_256() { fn overflow() { let (_, mut local) = queue::local(); let inject = Inject::new(); + let mut metrics = MetricsBatch::new(); for _ in 0..257 { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); + } + + cfg_metrics! { + assert_metrics!(metrics, overflow_count == 1); } let mut n = 0; @@ -45,7 +71,7 @@ fn overflow() { #[test] fn steal_batch() { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); @@ -53,10 +79,14 @@ fn steal_batch() { for _ in 0..4 { let (task, _) = super::unowned(async {}); - local1.push_back(task, &inject); + local1.push_back(task, &inject, &mut metrics); } - assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); + assert!(steal1.steal_into(&mut local2, &mut metrics).is_some()); + + cfg_metrics! { + assert_metrics!(metrics, steal_count == 2); + } for _ in 0..1 { assert!(local2.pop().is_some()); @@ -79,17 +109,19 @@ fn stress1() { const NUM_PUSH: usize = 500; const NUM_POP: usize = 250; + let mut metrics = MetricsBatch::new(); + for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut metrics = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local, &mut stats).is_some() { + if steal.steal_into(&mut local, &mut metrics).is_some() { n += 1; } @@ -100,6 +132,10 @@ fn stress1() { thread::yield_now(); } + cfg_metrics! { + assert_metrics!(metrics, steal_count == n as _); + } + n }); @@ -108,7 +144,7 @@ fn stress1() { for _ in 0..NUM_LOCAL { for _ in 0..NUM_PUSH { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); } for _ in 0..NUM_POP { @@ -136,12 +172,14 @@ fn stress2() { const NUM_TASKS: usize = 1_000_000; const NUM_STEAL: usize = 1_000; + let mut metrics = MetricsBatch::new(); + for _ in 0..NUM_ITER { let (steal, mut local) = queue::local(); let inject = Inject::new(); let th = thread::spawn(move || { - let mut stats = WorkerStatsBatcher::new(0); + let mut stats = MetricsBatch::new(); let (_, mut local) = queue::local(); let mut n = 0; @@ -164,7 +202,7 @@ fn stress2() { for i in 0..NUM_TASKS { let (task, _) = super::unowned(async {}); - local.push_back(task, &inject); + local.push_back(task, &inject, &mut metrics); if i % 128 == 0 && local.pop().is_some() { num_pop += 1; diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 3e1ce448215..d3f46517cb0 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -9,7 +9,6 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; -use crate::runtime::stats::RuntimeStats; use crate::runtime::task::JoinHandle; use crate::runtime::{Callback, Parker}; @@ -102,9 +101,31 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } +} + +cfg_metrics! { + use crate::runtime::{SchedulerMetrics, WorkerMetrics}; + + impl Spawner { + pub(crate) fn num_workers(&self) -> usize { + self.shared.worker_metrics.len() + } + + pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics { + &self.shared.scheduler_metrics + } + + pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics { + &self.shared.worker_metrics[worker] + } + + pub(crate) fn injection_queue_depth(&self) -> usize { + self.shared.injection_queue_depth() + } - pub(crate) fn stats(&self) -> &RuntimeStats { - self.shared.stats() + pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.shared.worker_local_queue_depth(worker) + } } } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 27d0d5e7d32..60706cad356 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -64,10 +64,9 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; -use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::Idle; -use crate::runtime::{queue, task, Callback}; +use crate::runtime::{queue, task, Callback, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; @@ -114,8 +113,8 @@ struct Core { /// borrow checker happy. park: Option, - /// Batching stats so they can be submitted to RuntimeStats. - stats: WorkerStatsBatcher, + /// Batching metrics so they can be submitted to RuntimeMetrics. + metrics: MetricsBatch, /// Fast random number generator. rand: FastRand, @@ -148,8 +147,10 @@ pub(super) struct Shared { /// Callback for a worker unparking itself after_unpark: Option, - /// Collects stats from the runtime. - stats: RuntimeStats, + /// Collects metrics from the runtime. + pub(super) scheduler_metrics: SchedulerMetrics, + + pub(super) worker_metrics: Box<[WorkerMetrics]>, } /// Used to communicate with a worker from other threads. @@ -195,9 +196,10 @@ pub(super) fn create( ) -> (Arc, Launch) { let mut cores = vec![]; let mut remotes = vec![]; + let mut worker_metrics = vec![]; // Create the local queues - for i in 0..size { + for _ in 0..size { let (steal, run_queue) = queue::local(); let park = park.clone(); @@ -210,11 +212,12 @@ pub(super) fn create( is_searching: false, is_shutdown: false, park: Some(park), - stats: WorkerStatsBatcher::new(i), + metrics: MetricsBatch::new(), rand: FastRand::new(seed()), })); remotes.push(Remote { steal, unpark }); + worker_metrics.push(WorkerMetrics::new()); } let shared = Arc::new(Shared { @@ -225,7 +228,8 @@ pub(super) fn create( shutdown_cores: Mutex::new(vec![]), before_park, after_unpark, - stats: RuntimeStats::new(size), + scheduler_metrics: SchedulerMetrics::new(), + worker_metrics: worker_metrics.into_boxed_slice(), }); let mut launch = Launch(vec![]); @@ -413,7 +417,7 @@ impl Context { core.transition_from_searching(&self.worker); // Make the core available to the runtime context - core.stats.incr_poll_count(); + core.metrics.incr_poll_count(); *self.core.borrow_mut() = Some(core); // Run the task @@ -438,14 +442,15 @@ impl Context { if coop::has_budget_remaining() { // Run the LIFO task, then loop - core.stats.incr_poll_count(); + core.metrics.incr_poll_count(); *self.core.borrow_mut() = Some(core); let task = self.worker.shared.owned.assert_owner(task); task.run(); } else { // Not enough budget left to run the LIFO task, push it to // the back of the queue and return. - core.run_queue.push_back(task, self.worker.inject()); + core.run_queue + .push_back(task, self.worker.inject(), &mut core.metrics); return Ok(core); } } @@ -472,7 +477,9 @@ impl Context { if core.transition_to_parked(&self.worker) { while !core.is_shutdown { + core.metrics.about_to_park(); core = self.park_timeout(core, None); + core.metrics.returned_from_park(); // Run regularly scheduled maintenance core.maintenance(&self.worker); @@ -493,8 +500,6 @@ impl Context { // Take the parker out of core let mut park = core.park.take().expect("park missing"); - core.stats.about_to_park(); - // Store `core` in context *self.core.borrow_mut() = Some(core); @@ -516,8 +521,6 @@ impl Context { self.worker.shared.notify_parked(); } - core.stats.returned_from_park(); - core } } @@ -561,7 +564,7 @@ impl Core { let target = &worker.shared.remotes[i]; if let Some(task) = target .steal - .steal_into(&mut self.run_queue, &mut self.stats) + .steal_into(&mut self.run_queue, &mut self.metrics) { return Some(task); } @@ -637,7 +640,8 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { - self.stats.submit(&worker.shared.stats); + self.metrics + .submit(&worker.shared.worker_metrics[worker.index]); if !self.is_shutdown { // Check if the scheduler has been shutdown @@ -651,7 +655,8 @@ impl Core { // Signal to all tasks to shut down. worker.shared.owned.close_and_shutdown_all(); - self.stats.submit(&worker.shared.stats); + self.metrics + .submit(&worker.shared.worker_metrics[worker.index]); } /// Shuts down the core. @@ -702,10 +707,6 @@ impl Shared { handle } - pub(crate) fn stats(&self) -> &RuntimeStats { - &self.stats - } - pub(super) fn schedule(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { @@ -721,17 +722,21 @@ impl Shared { // Otherwise, use the inject queue. self.inject.push(task); + self.scheduler_metrics.inc_remote_schedule_count(); self.notify_parked(); }) } fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) { + core.metrics.inc_local_schedule_count(); + // Spawning from the worker thread. If scheduling a "yield" then the // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. let should_notify = if is_yield { - core.run_queue.push_back(task, &self.inject); + core.run_queue + .push_back(task, &self.inject, &mut core.metrics); true } else { // Push to the LIFO slot @@ -739,7 +744,8 @@ impl Shared { let ret = prev.is_some(); if let Some(prev) = prev { - core.run_queue.push_back(prev, &self.inject); + core.run_queue + .push_back(prev, &self.inject, &mut core.metrics); } core.lifo_slot = Some(task); @@ -824,3 +830,15 @@ impl Shared { std::ptr::eq(self, other) } } + +cfg_metrics! { + impl Shared { + pub(super) fn injection_queue_depth(&self) -> usize { + self.inject.len() + } + + pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize { + self.remotes[worker].steal.len() + } + } +} diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 149b3bfaad7..cc6ac677280 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -16,6 +16,15 @@ mod support { pub(crate) mod mpsc_stream; } +macro_rules! cfg_metrics { + ($($t:tt)*) => { + #[cfg(tokio_unstable)] + { + $( $t )* + } + } +} + #[test] fn spawned_task_does_not_progress_without_block_on() { let (tx, mut rx) = oneshot::channel(); @@ -197,6 +206,73 @@ fn wake_in_drop_after_panic() { }); } +#[test] +fn spawn_two() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(0, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(2, local); + } +} + +#[test] +fn spawn_remote() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + let handle = tokio::spawn(async move { + std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(10)); + tx.send("ZOMG").unwrap(); + }); + + rx.await.unwrap() + }); + + handle.await.unwrap() + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(1, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(1, local); + } +} + #[test] #[should_panic( expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers." diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs new file mode 100644 index 00000000000..0a26b80285d --- /dev/null +++ b/tokio/tests/rt_metrics.rs @@ -0,0 +1,385 @@ +#![warn(rust_2018_idioms)] +#![cfg(all(feature = "full", tokio_unstable))] + +use tokio::runtime::Runtime; +use tokio::time::{self, Duration}; + +#[test] +fn num_workers() { + let rt = basic(); + assert_eq!(1, rt.metrics().num_workers()); + + let rt = threaded(); + assert_eq!(2, rt.metrics().num_workers()); +} + +#[test] +fn remote_schedule_count() { + use std::thread; + + let rt = basic(); + let handle = rt.handle().clone(); + let task = thread::spawn(move || { + handle.spawn(async { + // DO nothing + }) + }) + .join() + .unwrap(); + + rt.block_on(task).unwrap(); + + assert_eq!(1, rt.metrics().remote_schedule_count()); + + let rt = threaded(); + let handle = rt.handle().clone(); + let task = thread::spawn(move || { + handle.spawn(async { + // DO nothing + }) + }) + .join() + .unwrap(); + + rt.block_on(task).unwrap(); + + assert_eq!(1, rt.metrics().remote_schedule_count()); +} + +#[test] +fn worker_park_count() { + let rt = basic(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(2 <= metrics.worker_park_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_park_count(0)); + assert!(1 <= metrics.worker_park_count(1)); +} + +#[test] +fn worker_noop_count() { + // There isn't really a great way to generate no-op parks as they happen as + // false-positive events under concurrency. + + let rt = basic(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(2 <= metrics.worker_noop_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + time::sleep(Duration::from_millis(1)).await; + }); + drop(rt); + assert!(1 <= metrics.worker_noop_count(0)); + assert!(1 <= metrics.worker_noop_count(1)); +} + +#[test] +fn worker_steal_count() { + // This metric only applies to the multi-threaded runtime. + // + // We use a blocking channel to backup one worker thread. + use std::sync::mpsc::channel; + + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(async { + let (tx, rx) = channel(); + + // Move to the runtime. + tokio::spawn(async move { + // Spawn the task that sends to the channel + tokio::spawn(async move { + tx.send(()).unwrap(); + }); + + // Spawn a task that bumps the previous task out of the "next + // scheduled" slot. + tokio::spawn(async {}); + + // Blocking receive on the channe. + rx.recv().unwrap(); + }) + .await + .unwrap(); + }); + + drop(rt); + + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_steal_count(i)) + .sum(); + + assert_eq!(1, n); +} + +#[test] +fn worker_poll_count() { + const N: u64 = 5; + + let rt = basic(); + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + assert_eq!(N, metrics.worker_poll_count(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}).await.unwrap(); + } + }); + drop(rt); + // Account for the `block_on` task + let n = (0..metrics.num_workers()) + .map(|i| metrics.worker_poll_count(i)) + .sum(); + + assert_eq!(N, n); +} + +#[test] +fn worker_total_busy_duration() { + const N: usize = 5; + + let zero = Duration::from_millis(0); + + let rt = basic(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + assert!(zero < metrics.worker_total_busy_duration(0)); + + let rt = threaded(); + let metrics = rt.metrics(); + + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async { + tokio::task::yield_now().await; + }) + .await + .unwrap(); + } + }); + + drop(rt); + + for i in 0..metrics.num_workers() { + assert!(zero < metrics.worker_total_busy_duration(i)); + } +} + +#[test] +fn worker_local_schedule_count() { + let rt = basic(); + let metrics = rt.metrics(); + rt.block_on(async { + tokio::spawn(async {}).await.unwrap(); + }); + drop(rt); + + assert_eq!(1, metrics.worker_local_schedule_count(0)); + assert_eq!(0, metrics.remote_schedule_count()); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + // Move to the runtime + tokio::spawn(async { + tokio::spawn(async {}).await.unwrap(); + }) + .await + .unwrap(); + }); + drop(rt); + + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_local_schedule_count(i)) + .sum(); + + assert_eq!(2, n); + assert_eq!(1, metrics.remote_schedule_count()); +} + +#[test] +fn worker_overflow_count() { + // Only applies to the threaded worker + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async { + // Move to the runtime + tokio::spawn(async { + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); + + // First, we need to block the other worker until all tasks have + // been spawned. + tokio::spawn(async move { + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + + // Bump the next-run spawn + tokio::spawn(async {}); + + rx1.recv().unwrap(); + + // Spawn many tasks + for _ in 0..300 { + tokio::spawn(async {}); + } + + tx2.send(()).unwrap(); + }) + .await + .unwrap(); + }); + drop(rt); + + let n: u64 = (0..metrics.num_workers()) + .map(|i| metrics.worker_overflow_count(i)) + .sum(); + + assert_eq!(1, n); +} + +#[test] +fn injection_queue_depth() { + use std::thread; + + let rt = basic(); + let handle = rt.handle().clone(); + let metrics = rt.metrics(); + + thread::spawn(move || { + handle.spawn(async {}); + }) + .join() + .unwrap(); + + assert_eq!(1, metrics.injection_queue_depth()); + + let rt = threaded(); + let handle = rt.handle().clone(); + let metrics = rt.metrics(); + + // First we need to block the runtime workers + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); + + rt.spawn(async move { rx1.recv().unwrap() }); + rt.spawn(async move { rx2.recv().unwrap() }); + + thread::spawn(move || { + handle.spawn(async {}); + }) + .join() + .unwrap(); + + let n = metrics.injection_queue_depth(); + assert!(1 <= n, "{}", n); + assert!(3 >= n, "{}", n); + + tx1.send(()).unwrap(); + tx2.send(()).unwrap(); +} + +#[test] +fn worker_local_queue_depth() { + const N: usize = 100; + + let rt = basic(); + let metrics = rt.metrics(); + rt.block_on(async { + for _ in 0..N { + tokio::spawn(async {}); + } + + assert_eq!(N, metrics.worker_local_queue_depth(0)); + }); + + let rt = threaded(); + let metrics = rt.metrics(); + rt.block_on(async move { + // Move to the runtime + tokio::spawn(async move { + let (tx1, rx1) = std::sync::mpsc::channel(); + let (tx2, rx2) = std::sync::mpsc::channel(); + + // First, we need to block the other worker until all tasks have + // been spawned. + tokio::spawn(async move { + tx1.send(()).unwrap(); + rx2.recv().unwrap(); + }); + + // Bump the next-run spawn + tokio::spawn(async {}); + + rx1.recv().unwrap(); + + // Spawn some tasks + for _ in 0..100 { + tokio::spawn(async {}); + } + + let n: usize = (0..metrics.num_workers()) + .map(|i| metrics.worker_local_queue_depth(i)) + .sum(); + + assert_eq!(n, N); + + tx2.send(()).unwrap(); + }) + .await + .unwrap(); + }); +} + +fn basic() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() +} + +fn threaded() -> Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap() +} diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 5f047a7962d..b2f84fd33ff 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -15,6 +15,15 @@ use std::sync::atomic::Ordering::Relaxed; use std::sync::{mpsc, Arc, Mutex}; use std::task::{Context, Poll, Waker}; +macro_rules! cfg_metrics { + ($($t:tt)*) => { + #[cfg(tokio_unstable)] + { + $( $t )* + } + } +} + #[test] fn single_thread() { // No panic when starting a runtime w/ a single thread @@ -55,6 +64,38 @@ fn many_oneshot_futures() { } } +#[test] +fn spawn_two() { + let rt = rt(); + + let out = rt.block_on(async { + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + tokio::spawn(async move { + tx.send("ZOMG").unwrap(); + }); + }); + + assert_ok!(rx.await) + }); + + assert_eq!(out, "ZOMG"); + + cfg_metrics! { + let metrics = rt.metrics(); + drop(rt); + assert_eq!(1, metrics.remote_schedule_count()); + + let mut local = 0; + for i in 0..metrics.num_workers() { + local += metrics.worker_local_schedule_count(i); + } + + assert_eq!(1, local); + } +} + #[test] fn many_multishot_futures() { const CHAIN: usize = 200;