Skip to content

Commit

Permalink
rt: extract basic_scheduler::Config (#4935)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche authored Aug 24, 2022
1 parent d720770 commit df28ac0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 63 deletions.
20 changes: 1 addition & 19 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::{Callback, HandleInner};
use crate::runtime::{Config, HandleInner};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -68,24 +68,6 @@ pub(crate) struct Spawner {
shared: Arc<Shared>,
}

pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}

/// Scheduler state shared between threads.
struct Shared {
/// Remote run queue. None if the `Runtime` has been dropped.
Expand Down
17 changes: 10 additions & 7 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,8 +784,7 @@ impl Builder {
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::basic_scheduler::Config;
use crate::runtime::{BasicScheduler, HandleInner, Kind};
use crate::runtime::{BasicScheduler, Config, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

Expand Down Expand Up @@ -903,7 +902,7 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{HandleInner, Kind, ThreadPool};
use crate::runtime::{Config, HandleInner, Kind, ThreadPool};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

Expand All @@ -926,10 +925,14 @@ cfg_rt_multi_thread! {
core_threads,
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
self.global_queue_interval,
self.event_interval,
Config {
before_park: self.before_park.clone(),
after_unpark: self.after_unpark.clone(),
global_queue_interval: self.global_queue_interval,
event_interval: self.event_interval,
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
},
);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::runtime::Callback;

pub(crate) struct Config {
/// How many ticks before pulling a task from the global/remote queue?
pub(crate) global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
pub(crate) event_interval: u32,

/// Callback for a worker parking itself
pub(crate) before_park: Option<Callback>,

/// Callback for a worker unparking itself
pub(crate) after_unpark: Option<Callback>,

#[cfg(tokio_unstable)]
/// How to respond to unhandled task panics.
pub(crate) unhandled_panic: crate::runtime::UnhandledPanic,
}
3 changes: 3 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;

mod config;
use config::Config;

mod blocking;
use blocking::BlockingPool;
#[cfg_attr(tokio_wasi, allow(unused_imports))]
Expand Down
17 changes: 3 additions & 14 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place;

use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Callback, Driver, HandleInner};
use crate::runtime::{Config, Driver, HandleInner};

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -49,21 +49,10 @@ impl ThreadPool {
size: usize,
driver: Driver,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
config: Config,
) -> (ThreadPool, Launch) {
let parker = Parker::new(driver);
let (shared, launch) = worker::create(
size,
parker,
handle_inner,
before_park,
after_unpark,
global_queue_interval,
event_interval,
);
let (shared, launch) = worker::create(size, parker, handle_inner, config);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };

Expand Down
32 changes: 9 additions & 23 deletions tokio/src/runtime/thread_pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use crate::runtime;
use crate::runtime::enter::EnterContext;
use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker};
use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::util::atomic_cell::AtomicCell;
use crate::util::FastRand;

Expand Down Expand Up @@ -117,12 +117,6 @@ struct Core {

/// Fast random number generator.
rand: FastRand,

/// How many ticks before pulling a task from the global/remote queue?
global_queue_interval: u32,

/// How many ticks before yielding to the driver for timer and I/O events?
event_interval: u32,
}

/// State shared across all workers
Expand Down Expand Up @@ -152,10 +146,8 @@ pub(super) struct Shared {
#[allow(clippy::vec_box)] // we're moving an already-boxed value
shutdown_cores: Mutex<Vec<Box<Core>>>,

/// Callback for a worker parking itself
before_park: Option<Callback>,
/// Callback for a worker unparking itself
after_unpark: Option<Callback>,
/// Scheduler configuration options
config: Config,

/// Collects metrics from the runtime.
pub(super) scheduler_metrics: SchedulerMetrics,
Expand Down Expand Up @@ -202,10 +194,7 @@ pub(super) fn create(
size: usize,
park: Parker,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
global_queue_interval: u32,
event_interval: u32,
config: Config,
) -> (Arc<Shared>, Launch) {
let mut cores = Vec::with_capacity(size);
let mut remotes = Vec::with_capacity(size);
Expand All @@ -227,8 +216,6 @@ pub(super) fn create(
park: Some(park),
metrics: MetricsBatch::new(),
rand: FastRand::new(seed()),
global_queue_interval,
event_interval,
}));

remotes.push(Remote { steal, unpark });
Expand All @@ -242,8 +229,7 @@ pub(super) fn create(
idle: Idle::new(size),
owned: OwnedTasks::new(),
shutdown_cores: Mutex::new(vec![]),
before_park,
after_unpark,
config,
scheduler_metrics: SchedulerMetrics::new(),
worker_metrics: worker_metrics.into_boxed_slice(),
});
Expand Down Expand Up @@ -468,7 +454,7 @@ impl Context {
}

fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % core.event_interval == 0 {
if core.tick % self.worker.shared.config.event_interval == 0 {
// Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
// to run without actually putting the thread to sleep.
core = self.park_timeout(core, Some(Duration::from_millis(0)));
Expand All @@ -492,7 +478,7 @@ impl Context {
/// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
/// after all the IOs get dispatched
fn park(&self, mut core: Box<Core>) -> Box<Core> {
if let Some(f) = &self.worker.shared.before_park {
if let Some(f) = &self.worker.shared.config.before_park {
f();
}

Expand All @@ -511,7 +497,7 @@ impl Context {
}
}

if let Some(f) = &self.worker.shared.after_unpark {
if let Some(f) = &self.worker.shared.config.after_unpark {
f();
}
core
Expand Down Expand Up @@ -555,7 +541,7 @@ impl Core {

/// Return the next notified task available to this worker.
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % self.global_queue_interval == 0 {
if self.tick % worker.shared.config.global_queue_interval == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
Expand Down

0 comments on commit df28ac0

Please sign in to comment.