diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 27e969c59d4..5f14858beb3 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -71,12 +71,12 @@ impl Budget { /// /// Note that as more yield points are added in the ecosystem, this value /// will probably also have to be raised. - const fn initial() -> Budget { + pub(crate) const fn initial() -> Budget { Budget(Some(128)) } /// Returns an unconstrained budget. Operations will not be limited. - const fn unconstrained() -> Budget { + pub(crate) const fn unconstrained() -> Budget { Budget(None) } } @@ -89,13 +89,6 @@ cfg_rt_threaded! { } } -/// Run the given closure with a cooperative task budget. When the function -/// returns, the budget is reset to the value prior to calling the function. -#[inline(always)] -pub(crate) fn budget(f: impl FnOnce() -> R) -> R { - with_budget(Budget::initial(), f) -} - cfg_rt_threaded! { /// Set the current task's budget #[cfg(feature = "blocking")] @@ -105,7 +98,7 @@ cfg_rt_threaded! { } #[inline(always)] -fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { +pub(crate) fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { cell: &'a Cell, prev: Budget, @@ -227,6 +220,10 @@ cfg_coop! { mod test { use super::*; + fn budget(f: impl FnOnce() -> R) -> R { + with_budget(Budget::initial(), f) + } + fn get() -> Budget { CURRENT.with(|cell| cell.get()) } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 7e1c257cc86..d95b2daa5eb 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,3 +1,4 @@ +use crate::coop::Budget; use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; @@ -31,6 +32,9 @@ where /// Current tick tick: u8, + /// The coop_budget used by this scheduler when blocking on tasks. + coop_budget: Budget, + /// Thread park handle park: P, } @@ -84,7 +88,7 @@ impl

BasicScheduler

where P: Park, { - pub(crate) fn new(park: P) -> BasicScheduler

{ + pub(crate) fn new(park: P, coop_budget: Budget) -> BasicScheduler

{ let unpark = Box::new(park.unpark()); BasicScheduler { @@ -99,6 +103,7 @@ where }), }, tick: 0, + coop_budget, park, } } @@ -120,15 +125,19 @@ where where F: Future, { + let coop_budget = self.coop_budget; + enter(self, |scheduler, context| { - let _enter = runtime::enter(false); + let _enter = runtime::enter(false, coop_budget); let waker = waker_ref(&scheduler.spawner.shared); let mut cx = std::task::Context::from_waker(&waker); pin!(future); 'outer: loop { - if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + if let Ready(v) = + crate::coop::with_budget(coop_budget, || future.as_mut().poll(&mut cx)) + { return v; } @@ -152,7 +161,7 @@ where }; match next { - Some(task) => crate::coop::budget(|| task.run()), + Some(task) => crate::coop::with_budget(coop_budget, || task.run()), None => { // Park until the thread is signaled scheduler.park.park().ok().expect("failed to park"); diff --git a/tokio/src/runtime/blocking/shutdown.rs b/tokio/src/runtime/blocking/shutdown.rs index 3b6cc5930dc..fbf7422d84b 100644 --- a/tokio/src/runtime/blocking/shutdown.rs +++ b/tokio/src/runtime/blocking/shutdown.rs @@ -41,7 +41,7 @@ impl Receiver { return true; } - let mut e = match try_enter(false) { + let mut e = match try_enter(false, crate::coop::Budget::unconstrained()) { Some(enter) => enter, _ => { if std::thread::panicking() { diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index fad72c7ad94..665f466b579 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,3 +1,4 @@ +use crate::coop::Budget; use crate::runtime::handle::Handle; use crate::runtime::shell::Shell; use crate::runtime::{blocking, io, time, Callback, Runtime, Spawner}; @@ -55,6 +56,9 @@ pub struct Builder { /// Cap on thread usage. max_threads: usize, + /// Coop budget to be used by the runtime this creates. + coop_budget: Budget, + /// Name used for threads spawned by the runtime. pub(super) thread_name: String, @@ -98,6 +102,8 @@ impl Builder { max_threads: 512, + coop_budget: Budget::initial(), + // Default thread name thread_name: "tokio-runtime-worker".into(), @@ -294,6 +300,13 @@ impl Builder { self } + /// Disable tokio's automated cooperative scheduling mechanism See this link for details: + /// + pub fn no_coop(&mut self) -> &mut Self { + self.coop_budget = Budget::unconstrained(); + self + } + /// Creates the configured `Runtime`. /// /// The returned `ThreadPool` instance is ready to spawn tasks. @@ -334,13 +347,14 @@ impl Builder { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Shell(Shell::new(driver)), + kind: Kind::Shell(Shell::new(driver, self.coop_budget)), handle: Handle { spawner, io_handle, time_handle, clock, blocking_spawner, + coop_budget: self.coop_budget, }, blocking_pool, }) @@ -425,7 +439,7 @@ cfg_rt_core! { // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = BasicScheduler::new(driver); + let scheduler = BasicScheduler::new(driver, self.coop_budget); let spawner = Spawner::Basic(scheduler.spawner().clone()); // Blocking pool @@ -440,6 +454,7 @@ cfg_rt_core! { time_handle, clock, blocking_spawner, + coop_budget: self.coop_budget, }, blocking_pool, }) @@ -473,7 +488,7 @@ cfg_rt_threaded! { let (io_driver, io_handle) = io::create_driver(self.enable_io)?; let (driver, time_handle) = time::create_driver(self.enable_time, io_driver, clock.clone()); - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); + let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.coop_budget); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool @@ -487,6 +502,7 @@ cfg_rt_threaded! { time_handle, clock, blocking_spawner, + coop_budget: self.coop_budget, }; // Spawn the thread pool workers diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 56a7c57b6c6..cb2beaf19c8 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -1,3 +1,4 @@ +use crate::coop::Budget; use std::cell::{Cell, RefCell}; use std::fmt; use std::marker::PhantomData; @@ -25,13 +26,16 @@ thread_local!(static ENTERED: Cell = Cell::new(EnterContext::NotEn /// Represents an executor context. pub(crate) struct Enter { + /// Coop budget to be used when blocking on tasks. + coop_budget: Budget, + _p: PhantomData>, } /// Marks the current thread as being within the dynamic extent of an /// executor. -pub(crate) fn enter(allow_blocking: bool) -> Enter { - if let Some(enter) = try_enter(allow_blocking) { +pub(crate) fn enter(allow_blocking: bool, coop_budget: Budget) -> Enter { + if let Some(enter) = try_enter(allow_blocking, coop_budget) { return enter; } @@ -45,13 +49,16 @@ pub(crate) fn enter(allow_blocking: bool) -> Enter { /// Tries to enter a runtime context, returns `None` if already in a runtime /// context. -pub(crate) fn try_enter(allow_blocking: bool) -> Option { +pub(crate) fn try_enter(allow_blocking: bool, coop_budget: Budget) -> Option { ENTERED.with(|c| { if c.get().is_entered() { None } else { c.set(EnterContext::Entered { allow_blocking }); - Some(Enter { _p: PhantomData }) + Some(Enter { + coop_budget, + _p: PhantomData, + }) } }) } @@ -157,7 +164,7 @@ cfg_block_on! { pin!(f); loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx)) { return Ok(v); } @@ -193,7 +200,7 @@ cfg_blocking_impl! { let when = Instant::now() + timeout; loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 92c08d617b2..2ae80425376 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -1,3 +1,4 @@ +use crate::coop::Budget; use crate::runtime::{blocking, context, io, time, Spawner}; use std::{error, fmt}; @@ -33,6 +34,9 @@ pub struct Handle { /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, + + /// The coop_budget used by this handle when entering the runtime and blocking on tasks. + pub(super) coop_budget: Budget, } impl Handle { @@ -273,7 +277,7 @@ cfg_rt_core! { /// pub fn block_on(&self, future: F) -> F::Output { self.enter(|| { - let mut enter = crate::runtime::enter(true); + let mut enter = crate::runtime::enter(true, self.coop_budget); enter.block_on(future).expect("failed to park thread") }) } diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index a65869d0de2..723b54918aa 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -1,5 +1,6 @@ #![allow(clippy::redundant_clone)] +use crate::coop::Budget; use crate::park::{Park, Unpark}; use crate::runtime::enter; use crate::runtime::time; @@ -16,23 +17,30 @@ pub(super) struct Shell { /// TODO: don't store this unpark: Arc, + + /// The coop_budget used by this shell when blocking on tasks. + coop_budget: Budget, } #[derive(Debug)] struct Handle(::Unpark); impl Shell { - pub(super) fn new(driver: time::Driver) -> Shell { + pub(super) fn new(driver: time::Driver, coop_budget: Budget) -> Shell { let unpark = Arc::new(Handle(driver.unpark())); - Shell { driver, unpark } + Shell { + driver, + unpark, + coop_budget, + } } pub(super) fn block_on(&mut self, f: F) -> F::Output where F: Future, { - let _e = enter(true); + let _e = enter(true, self.coop_budget); pin!(f); @@ -40,7 +48,9 @@ impl Shell { let mut cx = Context::from_waker(&waker); loop { - if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { + if let Ready(v) = + crate::coop::with_budget(self.coop_budget, || f.as_mut().poll(&mut cx)) + { return v; } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index d30e8d456ca..300f3200a43 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -13,6 +13,7 @@ cfg_blocking! { pub(crate) use worker::block_in_place; } +use crate::coop::Budget; use crate::loom::sync::Arc; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::Parker; @@ -23,6 +24,9 @@ use std::future::Future; /// Work-stealing based thread pool for executing futures. pub(crate) struct ThreadPool { spawner: Spawner, + + /// The coop_budget used when blocking on tasks. + coop_budget: Budget, } /// Submit futures to the associated thread pool for execution. @@ -45,10 +49,13 @@ pub(crate) struct Spawner { // ===== impl ThreadPool ===== impl ThreadPool { - pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker); + pub(crate) fn new(size: usize, parker: Parker, coop_budget: Budget) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, parker, coop_budget); let spawner = Spawner { shared }; - let thread_pool = ThreadPool { spawner }; + let thread_pool = ThreadPool { + spawner, + coop_budget, + }; (thread_pool, launch) } @@ -78,7 +85,7 @@ impl ThreadPool { where F: Future, { - let mut enter = crate::runtime::enter(true); + let mut enter = crate::runtime::enter(true, self.coop_budget); enter.block_on(future).expect("failed to park thread") } } diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index ac05285478d..37a5c3e5f86 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -4,7 +4,7 @@ //! "core" is handed off to a new thread allowing the scheduler to continue to //! make progress while the originating thread blocks. -use crate::coop; +use crate::coop::{self, Budget}; use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; @@ -63,6 +63,9 @@ struct Core { /// Fast random number generator. rand: FastRand, + + /// The coop_budget used by this handle when running tasks. + coop_budget: Budget, } /// State shared across all workers @@ -123,7 +126,7 @@ type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { +pub(super) fn create(size: usize, park: Parker, coop_budget: Budget) -> (Arc, Launch) { let mut cores = vec![]; let mut remotes = vec![]; @@ -143,6 +146,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { tasks: LinkedList::new(), park: Some(park), rand: FastRand::new(seed()), + coop_budget, })); remotes.push(Remote { @@ -297,7 +301,7 @@ fn run(worker: Arc) { core: RefCell::new(None), }; - let _enter = crate::runtime::enter(true); + let _enter = crate::runtime::enter(true, core.coop_budget); CURRENT.set(&cx, || { // This should always be an error. It only returns a `Result` to support @@ -341,11 +345,13 @@ impl Context { // another idle worker to try to steal work. core.transition_from_searching(&self.worker); + let coop_budget = core.coop_budget; + // Make the core available to the runtime context *self.core.borrow_mut() = Some(core); // Run the task - coop::budget(|| { + coop::with_budget(coop_budget, || { task.run(); // As long as there is budget remaining and a task exists in the diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 3c409edfb90..549a32186d8 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -2,6 +2,7 @@ use crate::runtime::task::{self, JoinHandle, Task}; use crate::sync::AtomicWaker; use crate::util::linked_list::LinkedList; +use crate::coop::Budget; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; @@ -116,6 +117,9 @@ cfg_rt_util! { /// State available from thread-local context: Context, + /// The coop_budget used by this LocalSet when pollin tasks tasks. + coop_budget: Budget, + /// This type should not be Send. _not_send: PhantomData<*const ()>, } @@ -233,10 +237,17 @@ impl LocalSet { waker: AtomicWaker::new(), }), }, + coop_budget: Budget::initial(), _not_send: PhantomData, } } + /// Disable tokio's automated cooperative scheduling mechanism See this link for details: + /// + pub fn no_coop(&mut self) { + self.coop_budget = Budget::unconstrained(); + } + /// Spawns a `!Send` task onto the local task set. /// /// This task is guaranteed to be run on the current thread. @@ -405,7 +416,7 @@ impl LocalSet { // task initially. Because `LocalSet` itself is `!Send`, and // `spawn_local` spawns into the `LocalSet` on the current // thread, the invariant is maintained. - Some(task) => crate::coop::budget(|| task.run()), + Some(task) => crate::coop::with_budget(Budget::initial(), || task.run()), // We have fully drained the queue of notified tasks, so the // local future doesn't need to be notified again — it can wait // until something else wakes a task in the local set. @@ -525,7 +536,7 @@ impl Future for RunUntil<'_, T> { let _no_blocking = crate::runtime::enter::disallow_blocking(); let f = me.future; - if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + if let Poll::Ready(output) = crate::coop::with_budget(Budget::initial(), || f.poll(cx)) { return Poll::Ready(output); }