Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt(threaded): basic self-tuning of injection queue #5715

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 112 additions & 4 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

use std::cell::RefCell;
use std::time::Duration;
use std::time::{Duration, Instant};

/// A scheduler worker
pub(super) struct Worker {
Expand Down Expand Up @@ -119,6 +119,24 @@ struct Core {

/// Fast random number generator.
rand: FastRand,

/// Instant at which the maintenance routine last ran.
last_tuning_at: Instant,

/// Number of tasks polled since last maintenance run.
///
/// Note, LIFO polls are batched w/ the task that scheduled the task in the
/// LIFO slot.
tasks_polled_since_last_tuning: u32,

/// Mean task poll time (exponentially weighted moving average)
task_mean_poll_time: Duration,

/// How many ticks until the global queue should be checked
ticks_until_check_global_queue: u32,

/// How often to check the global queue
global_queue_interval: u32,
}

/// State shared across all workers
Expand Down Expand Up @@ -201,6 +219,18 @@ scoped_thread_local!(static CURRENT: Context);
/// improvements.
const MAX_LIFO_POLLS_PER_TICK: usize = 3;

/// Target injection queue check interval
const TARGET_INJECTION_QUEUE_INTERVAL: Duration = Duration::from_micros(500);

/// Maximum number of tasks we poll before checking the injection queue.
const MAX_TASKS_POLLED_PER_INJECTION_QUEUE_INTERVAL: u32 = 61;

const INIT_MEAN_POLL_TIME: Duration = Duration::from_micros(
// Workaround no const division for duration.
TARGET_INJECTION_QUEUE_INTERVAL.as_micros() as u64
/ MAX_TASKS_POLLED_PER_INJECTION_QUEUE_INTERVAL as u64,
);

pub(super) fn create(
size: usize,
park: Parker,
Expand Down Expand Up @@ -231,6 +261,11 @@ pub(super) fn create(
park: Some(park),
metrics: MetricsBatch::new(&metrics),
rand: FastRand::new(config.seed_generator.next_seed()),
last_tuning_at: Instant::now(),
tasks_polled_since_last_tuning: 0,
task_mean_poll_time: INIT_MEAN_POLL_TIME,
global_queue_interval: config.global_queue_interval,
ticks_until_check_global_queue: config.global_queue_interval,
}));

remotes.push(Remote { steal, unpark });
Expand Down Expand Up @@ -433,6 +468,10 @@ fn run(worker: Arc<Worker>) {

impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
// Initialize `last_maintenance_at here. This resets any values that may
// exist before the core was stolen.
core.reset_tuning();

// Reset `lifo_enabled` here in case the core was previously stolen from
// a task that had the LIFO slot disabled.
self.reset_lifo_enabled(&mut core);
Expand All @@ -452,9 +491,15 @@ impl Context {
continue;
}

// No immediately available work. We may spend time trying to find
// more. This is a good time to tune our heuristics.
core.tune();

// There is no more **local** work to process, try to steal work
// from other workers.
if let Some(task) = core.steal_work(&self.worker) {
core.reset_tuning();

core = self.run_task(task, core)?;
} else {
// Wait for work
Expand All @@ -463,6 +508,8 @@ impl Context {
} else {
self.park(core)
};

core.reset_tuning();
}
}

Expand All @@ -482,8 +529,10 @@ impl Context {

self.assert_lifo_enabled_is_correct(&core);

// Make the core available to the runtime context
core.tasks_polled_since_last_tuning += 1;
core.metrics.start_poll();

// Make the core available to the runtime context
*self.core.borrow_mut() = Some(core);

// Run the task
Expand Down Expand Up @@ -576,12 +625,18 @@ impl Context {
if core.tick % self.worker.handle.shared.config.event_interval == 0 {
super::counters::inc_num_maintenance();

// Run tuning logic
core.tune();

// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));

// Run regularly scheduled maintenance
core.maintenance(&self.worker);

// Reset tuning counters
core.reset_tuning();
}

core
Expand Down Expand Up @@ -662,19 +717,23 @@ impl Core {
/// Increment the tick
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
self.ticks_until_check_global_queue = self.ticks_until_check_global_queue.saturating_sub(1);
}

/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
if self.ticks_until_check_global_queue == 0 {
self.next_global_task(worker)
.or_else(|| self.next_local_task())
} else {
let maybe_task = self.next_local_task();

if maybe_task.is_some() {
return maybe_task;
}

self.ticks_until_check_global_queue = self.global_queue_interval;

// Other threads can only **remove** tasks from the current worker's
// `run_queue`. So, we can be confident that by the time we call
// `run_queue.push_back` below, there will be *at least* `cap`
Expand Down Expand Up @@ -708,6 +767,11 @@ impl Core {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
}

fn next_global_task(&mut self, worker: &Worker) -> Option<Notified> {
self.ticks_until_check_global_queue = self.global_queue_interval;
worker.inject().pop()
}

/// Function responsible for stealing tasks from another worker
///
/// Note: Only if less than half the workers are searching for tasks to steal
Expand Down Expand Up @@ -842,6 +906,50 @@ impl Core {

park.shutdown(&handle.driver);
}

fn reset_tuning(&mut self) {
// Initialize `last_maintenance_at here. This resets any values that may
// exist before the core was stolen.
self.last_tuning_at = Instant::now();
self.tasks_polled_since_last_tuning = 0;
}

fn tune(&mut self) {
// weighs newer measurements fairly heavily.
const ALPHA: f64 = 0.5;

if self.tasks_polled_since_last_tuning > 0 {
let now = Instant::now();
let elapsed = now - self.last_tuning_at;
let mean = (elapsed / self.tasks_polled_since_last_tuning).as_nanos() as f64;

let weighted = self.task_mean_poll_time.as_nanos() as f64;
let weighted = ALPHA * mean + (1.0 - ALPHA) * weighted;

self.task_mean_poll_time = Duration::from_nanos(weighted as u64);

let global_queue_interval = std::cmp::min(
(TARGET_INJECTION_QUEUE_INTERVAL.as_nanos() / self.task_mean_poll_time.as_nanos())
as u32,
MAX_TASKS_POLLED_PER_INJECTION_QUEUE_INTERVAL,
);

if global_queue_interval > self.global_queue_interval {
self.ticks_until_check_global_queue +=
global_queue_interval - self.global_queue_interval;
self.global_queue_interval = global_queue_interval;
} else if global_queue_interval < self.global_queue_interval {
// Remove extra ticks, but make sure not to wrap
self.ticks_until_check_global_queue = self
.ticks_until_check_global_queue
.saturating_sub(self.global_queue_interval - global_queue_interval);
self.global_queue_interval = global_queue_interval;
}

self.last_tuning_at = now;
self.tasks_polled_since_last_tuning = 0;
}
}
}

impl Worker {
Expand Down