Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: allow disabling cooperative scheduling #3516

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ impl Budget {
///
/// Note that as more yield points are added in the ecosystem, this value
/// will probably also have to be raised.
const fn initial() -> Budget {
pub(crate) const fn initial() -> Budget {
Budget(Some(128))
}

/// Returns an unconstrained budget. Operations will not be limited.
const fn unconstrained() -> Budget {
pub(crate) const fn unconstrained() -> Budget {
Budget(None)
}
}
Expand All @@ -89,13 +89,6 @@ cfg_rt_threaded! {
}
}

/// Run the given closure with a cooperative task budget. When the function
/// returns, the budget is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

cfg_rt_threaded! {
/// Set the current task's budget
#[cfg(feature = "blocking")]
Expand All @@ -105,7 +98,7 @@ cfg_rt_threaded! {
}

#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
pub(crate) fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
cell: &'a Cell<Budget>,
prev: Budget,
Expand Down Expand Up @@ -227,6 +220,10 @@ cfg_coop! {
mod test {
use super::*;

fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

fn get() -> Budget {
CURRENT.with(|cell| cell.get())
}
Expand Down
17 changes: 13 additions & 4 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::coop::Budget;
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
Expand Down Expand Up @@ -31,6 +32,9 @@ where
/// Current tick
tick: u8,

/// The coop_budget used by this scheduler when blocking on tasks.
coop_budget: Budget,

/// Thread park handle
park: P,
}
Expand Down Expand Up @@ -84,7 +88,7 @@ impl<P> BasicScheduler<P>
where
P: Park,
{
pub(crate) fn new(park: P) -> BasicScheduler<P> {
pub(crate) fn new(park: P, coop_budget: Budget) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

BasicScheduler {
Expand All @@ -99,6 +103,7 @@ where
}),
},
tick: 0,
coop_budget,
park,
}
}
Expand All @@ -120,15 +125,19 @@ where
where
F: Future,
{
let coop_budget = self.coop_budget;

enter(self, |scheduler, context| {
let _enter = runtime::enter(false);
let _enter = runtime::enter(false, coop_budget);
let waker = waker_ref(&scheduler.spawner.shared);
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);

'outer: loop {
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
if let Ready(v) =
crate::coop::with_budget(coop_budget, || future.as_mut().poll(&mut cx))
{
return v;
}

Expand All @@ -152,7 +161,7 @@ where
};

match next {
Some(task) => crate::coop::budget(|| task.run()),
Some(task) => crate::coop::with_budget(coop_budget, || task.run()),
None => {
// Park until the thread is signaled
scheduler.park.park().ok().expect("failed to park");
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl Receiver {
return true;
}

let mut e = match try_enter(false) {
let mut e = match try_enter(false, crate::coop::Budget::unconstrained()) {
Some(enter) => enter,
_ => {
if std::thread::panicking() {
Expand Down
22 changes: 19 additions & 3 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::coop::Budget;
use crate::runtime::handle::Handle;
use crate::runtime::shell::Shell;
use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner};
Expand Down Expand Up @@ -55,6 +56,9 @@ pub struct Builder {
/// Cap on thread usage.
max_threads: usize,

/// Coop budget to be used by the runtime this creates.
coop_budget: Budget,

/// Name used for threads spawned by the runtime.
pub(super) thread_name: String,

Expand Down Expand Up @@ -98,6 +102,8 @@ impl Builder {

max_threads: 512,

coop_budget: Budget::initial(),

// Default thread name
thread_name: "tokio-runtime-worker".into(),

Expand Down Expand Up @@ -294,6 +300,13 @@ impl Builder {
self
}

/// Disable tokio's automated cooperative scheduling mechanism See this link for details:
/// <https://tokio.rs/blog/2020-04-preemption>
pub fn no_coop(&mut self) -> &mut Self {
self.coop_budget = Budget::unconstrained();
self
}
Comment on lines +303 to +308
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only had a quick look. Is this the only change in the public API?

In any case, it needs better docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this and a similar change in LocalSet. I agree this needs more docs — I'm happy to add the polish here as soon as we agree on the general direction :)

Any guidance on the kind of documentation this warrants? The underlying feature didn't have that much I could link to, but let me know what you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, making sure we agree on the general direction first is probably good.

Regarding the kind of doc, it could look something like this:

Disable the automated cooperative scheduling feature.

The coop feature limits the number of operations that can be performed before a task yields to the executor. You can read more about the coop feature in the blog post Reducing tail latencies with automatic cooperative task yielding.

Example

use tokio::runtime;

let rt = runtime::Builder::new_multi_thread()
    .enable_all()
    .disable_coop()
    .build()
    .unwrap();


/// Creates the configured `Runtime`.
///
/// The returned `ThreadPool` instance is ready to spawn tasks.
Expand Down Expand Up @@ -334,13 +347,14 @@ impl Builder {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Shell(Shell::new(driver)),
kind: Kind::Shell(Shell::new(driver, self.coop_budget)),
handle: Handle {
spawner,
io_handle,
time_handle,
clock,
blocking_spawner,
coop_budget: self.coop_budget,
},
blocking_pool,
})
Expand Down Expand Up @@ -425,7 +439,7 @@ cfg_rt_core! {
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler = BasicScheduler::new(driver);
let scheduler = BasicScheduler::new(driver, self.coop_budget);
let spawner = Spawner::Basic(scheduler.spawner().clone());

// Blocking pool
Expand All @@ -440,6 +454,7 @@ cfg_rt_core! {
time_handle,
clock,
blocking_spawner,
coop_budget: self.coop_budget,
},
blocking_pool,
})
Expand Down Expand Up @@ -473,7 +488,7 @@ cfg_rt_threaded! {

let (io_driver, io_handle) = io::create_driver(self.enable_io)?;
let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone());
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver));
let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.coop_budget);
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
Expand All @@ -487,6 +502,7 @@ cfg_rt_threaded! {
time_handle,
clock,
blocking_spawner,
coop_budget: self.coop_budget,
};

// Spawn the thread pool workers
Expand Down
19 changes: 13 additions & 6 deletions tokio/src/runtime/enter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::coop::Budget;
use std::cell::{Cell, RefCell};
use std::fmt;
use std::marker::PhantomData;
Expand Down Expand Up @@ -25,13 +26,16 @@ thread_local!(static ENTERED: Cell<EnterContext> = Cell::new(EnterContext::NotEn

/// Represents an executor context.
pub(crate) struct Enter {
/// Coop budget to be used when blocking on tasks.
coop_budget: Budget,

_p: PhantomData<RefCell<()>>,
}

/// Marks the current thread as being within the dynamic extent of an
/// executor.
pub(crate) fn enter(allow_blocking: bool) -> Enter {
if let Some(enter) = try_enter(allow_blocking) {
pub(crate) fn enter(allow_blocking: bool, coop_budget: Budget) -> Enter {
if let Some(enter) = try_enter(allow_blocking, coop_budget) {
return enter;
}

Expand All @@ -45,13 +49,16 @@ pub(crate) fn enter(allow_blocking: bool) -> Enter {

/// Tries to enter a runtime context, returns `None` if already in a runtime
/// context.
pub(crate) fn try_enter(allow_blocking: bool) -> Option<Enter> {
pub(crate) fn try_enter(allow_blocking: bool, coop_budget: Budget) -> Option<Enter> {
ENTERED.with(|c| {
if c.get().is_entered() {
None
} else {
c.set(EnterContext::Entered { allow_blocking });
Some(Enter { _p: PhantomData })
Some(Enter {
coop_budget,
_p: PhantomData,
})
}
})
}
Expand Down Expand Up @@ -157,7 +164,7 @@ cfg_block_on! {
pin!(f);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down Expand Up @@ -193,7 +200,7 @@ cfg_blocking_impl! {
let when = Instant::now() + timeout;

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
6 changes: 5 additions & 1 deletion tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::coop::Budget;
use crate::runtime::{blocking, context, io, time, Spawner};
use std::{error, fmt};

Expand Down Expand Up @@ -33,6 +34,9 @@ pub struct Handle {

/// Blocking pool spawner
pub(super) blocking_spawner: blocking::Spawner,

/// The coop_budget used by this handle when entering the runtime and blocking on tasks.
pub(super) coop_budget: Budget,
}

impl Handle {
Expand Down Expand Up @@ -273,7 +277,7 @@ cfg_rt_core! {
///
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
self.enter(|| {
let mut enter = crate::runtime::enter(true);
let mut enter = crate::runtime::enter(true, self.coop_budget);
enter.block_on(future).expect("failed to park thread")
})
}
Expand Down
18 changes: 14 additions & 4 deletions tokio/src/runtime/shell.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![allow(clippy::redundant_clone)]

use crate::coop::Budget;
use crate::park::{Park, Unpark};
use crate::runtime::enter;
use crate::runtime::time;
Expand All @@ -16,31 +17,40 @@ pub(super) struct Shell {

/// TODO: don't store this
unpark: Arc<Handle>,

/// The coop_budget used by this shell when blocking on tasks.
coop_budget: Budget,
}

#[derive(Debug)]
struct Handle(<time::Driver as Park>::Unpark);

impl Shell {
pub(super) fn new(driver: time::Driver) -> Shell {
pub(super) fn new(driver: time::Driver, coop_budget: Budget) -> Shell {
let unpark = Arc::new(Handle(driver.unpark()));

Shell { driver, unpark }
Shell {
driver,
unpark,
coop_budget,
}
}

pub(super) fn block_on<F>(&mut self, f: F) -> F::Output
where
F: Future,
{
let _e = enter(true);
let _e = enter(true, self.coop_budget);

pin!(f);

let waker = waker_ref(&self.unpark);
let mut cx = Context::from_waker(&waker);

loop {
if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) =
crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx))
{
return v;
}

Expand Down
15 changes: 11 additions & 4 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ cfg_blocking! {
pub(crate) use worker::block_in_place;
}

use crate::coop::Budget;
use crate::loom::sync::Arc;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::Parker;
Expand All @@ -23,6 +24,9 @@ use std::future::Future;
/// Work-stealing based thread pool for executing futures.
pub(crate) struct ThreadPool {
spawner: Spawner,

/// The coop_budget used when blocking on tasks.
coop_budget: Budget,
}

/// Submit futures to the associated thread pool for execution.
Expand All @@ -45,10 +49,13 @@ pub(crate) struct Spawner {
// ===== impl ThreadPool =====

impl ThreadPool {
pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker);
pub(crate) fn new(size: usize, parker: Parker, coop_budget: Budget) -> (ThreadPool, Launch) {
let (shared, launch) = worker::create(size, parker, coop_budget);
let spawner = Spawner { shared };
let thread_pool = ThreadPool { spawner };
let thread_pool = ThreadPool {
spawner,
coop_budget,
};

(thread_pool, launch)
}
Expand Down Expand Up @@ -78,7 +85,7 @@ impl ThreadPool {
where
F: Future,
{
let mut enter = crate::runtime::enter(true);
let mut enter = crate::runtime::enter(true, self.coop_budget);
enter.block_on(future).expect("failed to park thread")
}
}
Expand Down
Loading