Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add worker thread id #6695

Merged
merged 10 commits into from
Jul 23, 2024
3 changes: 3 additions & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! This file contains mocks of the types in src/runtime/metrics

use std::thread::ThreadId;

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}
Expand Down Expand Up @@ -30,6 +32,7 @@ impl WorkerMetrics {
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
Expand Down
44 changes: 44 additions & 0 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::runtime::Handle;

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
Expand Down Expand Up @@ -127,6 +128,49 @@ impl RuntimeMetrics {
self.handle.inner.num_idle_blocking_threads()
}

/// Returns the thread id of the given worker thread.
///
/// The returned value is `None` if the worker thread has not yet finished
/// starting up.
///
/// If additional information about the thread, such as its native id, are
/// required, those can be collected in [`on_thread_start`] and correlated
/// using the thread id.
Comment on lines +136 to +138
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is enough for your purposes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for now we just want to send SIGABRT to the hanging thread and for that we collect the pthread_t id of each worker thread in on_thread_start.

///
/// [`on_thread_start`]: crate::runtime::Builder::on_thread_start
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let id = metrics.worker_thread_id(0);
/// println!("worker 0 has id {:?}", id);
/// }
/// ```
pub fn worker_thread_id(&self, worker: usize) -> Option<ThreadId> {
self.handle
.inner
.worker_metrics(worker)
.thread_id()
}

cfg_64bit_metrics! {
/// Returns the number of tasks spawned in this runtime since it was created.
///
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use std::thread::ThreadId;

/// Retrieve runtime worker metrics.
///
Expand Down Expand Up @@ -49,6 +51,9 @@ pub(crate) struct WorkerMetrics {

/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,

/// Thread id of worker thread.
thread_id: Mutex<Option<ThreadId>>,
}

impl WorkerMetrics {
Expand All @@ -72,4 +77,12 @@ impl WorkerMetrics {
pub(crate) fn set_queue_depth(&self, len: usize) {
self.queue_depth.store(len, Relaxed);
}

pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}

pub(crate) fn set_thread_id(&self, thread_id: ThreadId) {
*self.thread_id.lock().unwrap() = Some(thread_id);
}
}
7 changes: 6 additions & 1 deletion tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::Ordering::{AcqRel, Release};
use std::task::Poll::{Pending, Ready};
use std::task::Waker;
use std::time::Duration;
use std::{fmt, thread};

/// Executes tasks on the current thread
pub(crate) struct CurrentThread {
Expand Down Expand Up @@ -123,6 +123,7 @@ impl CurrentThread {
config: Config,
) -> (CurrentThread, Arc<Handle>) {
let worker_metrics = WorkerMetrics::from_config(&config);
worker_metrics.set_thread_id(thread::current().id());

// Get the configured global queue interval, or use the default.
let global_queue_interval = config
Expand Down Expand Up @@ -172,6 +173,10 @@ impl CurrentThread {
// available or the future is complete.
loop {
if let Some(core) = self.take_core(handle) {
handle
.shared
.worker_metrics
.set_thread_id(thread::current().id());
return core.block_on(future);
} else {
let notified = self.notify.notified();
Expand Down
9 changes: 9 additions & 0 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::task::Waker;
use std::thread;
use std::time::Duration;

cfg_unstable_metrics! {
Expand Down Expand Up @@ -334,6 +335,12 @@ where
if let Some(cx) = maybe_cx {
if self.take_core {
let core = cx.worker.core.take();

if core.is_some() {
cx.worker.handle.shared.worker_metrics[cx.worker.index]
.set_thread_id(thread::current().id());
}

let mut cx_core = cx.core.borrow_mut();
assert!(cx_core.is_none());
*cx_core = core;
Expand Down Expand Up @@ -482,6 +489,8 @@ fn run(worker: Arc<Worker>) {
None => return,
};

worker.handle.shared.worker_metrics[worker.index].set_thread_id(thread::current().id());

let handle = scheduler::Handle::MultiThread(worker.handle.clone());

crate::runtime::context::enter_runtime(&handle, true, |_| {
Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::{Cell, RefCell};
use std::cmp;
use std::task::Waker;
use std::time::Duration;
use std::{cmp, thread};

cfg_unstable_metrics! {
mod metrics;
Expand Down Expand Up @@ -569,6 +569,7 @@ impl Worker {
}
};

cx.shared().worker_metrics[core.index].set_thread_id(thread::current().id());
core.stats.start_processing_scheduled_tasks(&mut self.stats);

if let Some(task) = maybe_task {
Expand Down
63 changes: 63 additions & 0 deletions tokio/tests/rt_unstable_metrics.rs
surban marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use std::future::Future;
use std::sync::{Arc, Barrier, Mutex};
use std::task::Poll;
use std::thread;
use tokio::macros::support::poll_fn;

use tokio::runtime::Runtime;
Expand Down Expand Up @@ -150,6 +151,68 @@ fn remote_schedule_count() {
assert_eq!(1, rt.metrics().remote_schedule_count());
}

#[test]
fn worker_thread_id_current_thread() {
let rt = current_thread();
let metrics = rt.metrics();

// Check that runtime is on this thread.
rt.block_on(async {});
assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));

// Move runtime to another thread.
let thread_id = std::thread::scope(|scope| {
let join_handle = scope.spawn(|| {
rt.block_on(async {});
});
join_handle.thread().id()
});
assert_eq!(Some(thread_id), metrics.worker_thread_id(0));

// Move runtime back to this thread.
rt.block_on(async {});
assert_eq!(Some(thread::current().id()), metrics.worker_thread_id(0));
}

#[test]
fn worker_thread_id_threaded() {
let rt = threaded();
let metrics = rt.metrics();

rt.block_on(rt.spawn(async move {
// Check that we are running on a worker thread and determine
// the index of our worker.
let thread_id = std::thread::current().id();
let this_worker = (0..2)
.position(|w| metrics.worker_thread_id(w) == Some(thread_id))
.expect("task not running on any worker thread");

// Force worker to another thread.
let moved_thread_id = tokio::task::block_in_place(|| {
assert_eq!(thread_id, std::thread::current().id());

// Wait for worker to move to another thread.
for _ in 0..100 {
let new_id = metrics.worker_thread_id(this_worker).unwrap();
if thread_id != new_id {
return new_id;
}
std::thread::sleep(Duration::from_millis(100));
}

panic!("worker did not move to new thread");
});

// After blocking task worker either stays on new thread or
// is moved back to current thread.
assert!(
metrics.worker_thread_id(this_worker) == Some(moved_thread_id)
|| metrics.worker_thread_id(this_worker) == Some(thread_id)
);
}))
.unwrap()
}

#[test]
fn worker_park_count() {
let rt = current_thread();
Expand Down
Loading