From a921c88b06c5e7529358781b125f542ac1deb42c Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Tue, 23 Feb 2021 11:37:15 +0000 Subject: [PATCH 1/5] coop: expose an unconstrained() opt-out This adds an opt-out for code that might not be compatible with Tokio coop (e.g. heavily nested `FuturesUnordered`). Notes to reviewer: - I made the coop module public for this. - I removed a few docs in said coop module that reference methods that don't exist yet. For more context, see the discussion in: https://github.com/tokio-rs/tokio/pull/3516 --- tokio/src/coop.rs | 75 ++++++++++++++++++++++++---------------- tokio/src/lib.rs | 2 +- tokio/tests/rt_common.rs | 26 ++++++++++++++ 3 files changed, 73 insertions(+), 30 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 05b2ae83bdd..f823b12fdc5 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! Opt-in yield points for improved cooperative scheduling. +//! Yield points for improved cooperative scheduling. //! //! A single call to [`poll`] on a top-level task may potentially do a lot of //! work before it returns `Poll::Pending`. If a task runs for a long period of @@ -21,37 +21,18 @@ //! //! It may look harmless, but consider what happens under heavy load if the //! input stream is _always_ ready. If we spawn `drop_all`, the task will never -//! yield, and will starve other tasks and resources on the same executor. With -//! opt-in yield points, this problem is alleviated: +//! yield, and will starve other tasks and resources on the same executor. //! -//! ```ignore -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await { -//! tokio::coop::proceed().await; -//! } -//! } -//! ``` -//! -//! The `proceed` future will coordinate with the executor to make sure that -//! every so often control is yielded back to the executor so it can run other -//! tasks. -//! -//! # Placing yield points -//! -//! Voluntary yield points should be placed _after_ at least some work has been -//! done. If they are not, a future sufficiently deep in the task hierarchy may -//! end up _never_ getting to run because of the number of yield points that -//! inevitably appear before it is reached. In general, you will want yield -//! points to only appear in "leaf" futures -- those that do not themselves poll -//! other futures. By doing this, you avoid double-counting each iteration of -//! the outer future against the cooperating budget. +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to return to the executor periodically. //! //! [`poll`]: method@std::future::Future::poll -// NOTE: The doctests in this module are ignored since the whole module is (currently) private. - +use pin_project_lite::pin_project; use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; thread_local! { static CURRENT: Cell = Cell::new(Budget::unconstrained()); @@ -147,9 +128,45 @@ cfg_rt! { } } -cfg_coop! { - use std::task::{Context, Poll}; +pin_project! { + /// Future for the [`unconstrained`](unconstrained) method. + #[must_use = "Unconstrained does nothing unless polled"] + pub struct Unconstrained { + #[pin] + inner: F, + } +} + +impl Future for Unconstrained +where + F: Future, +{ + type Output = ::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + with_budget(Budget::unconstrained(), || inner.poll(cx)) + } +} + +/// Turn off cooperative scheduling for a future. The future or stream will never yield. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio::coop; +/// +/// let fut = async { () }; +/// coop::unconstrained(fut).await; +/// # } +/// ``` +pub fn unconstrained(inner: F) -> Unconstrained { + Unconstrained { inner } +} + +cfg_coop! { #[must_use] pub(crate) struct RestoreOnPending(Cell); diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index f6c502e56c0..eec13d86df8 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -372,7 +372,7 @@ cfg_rt! { pub mod runtime; } -pub(crate) mod coop; +pub mod coop; cfg_signal! { pub mod signal; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 9aef4b9b909..7e0e5b41404 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1017,6 +1017,32 @@ rt_test! { }); } + #[test] + fn coop_unconstrained() { + use std::task::Poll::Ready; + + let rt = rt(); + + rt.block_on(async { + // Create a bunch of tasks + let mut tasks = (0..1_000).map(|_| { + tokio::spawn(async { }) + }).collect::>(); + + // Hope that all the tasks complete... + time::sleep(Duration::from_millis(100)).await; + + tokio::coop::unconstrained(poll_fn(|cx| { + // All the tasks should be ready + for task in &mut tasks { + assert!(Pin::new(task).poll(cx).is_ready()); + } + + Ready(()) + })).await; + }); + } + // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test] From 788056b7d3b94acbc33bd795b9339b40190ee5fb Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Mon, 1 Mar 2021 06:46:55 -0800 Subject: [PATCH 2/5] improve example + restore comments --- tokio/src/coop.rs | 41 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index f823b12fdc5..21980d63803 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -28,6 +28,29 @@ //! //! [`poll`]: method@std::future::Future::poll +// ```ignore +// # use tokio_stream::{Stream, StreamExt}; +// async fn drop_all(mut input: I) { +// while let Some(_) = input.next().await { +// tokio::coop::proceed().await; +// } +// } +// ``` +// +// The `proceed` future will coordinate with the executor to make sure that +// every so often control is yielded back to the executor so it can run other +// tasks. +// +// # Placing yield points +// +// Voluntary yield points should be placed _after_ at least some work has been +// done. If they are not, a future sufficiently deep in the task hierarchy may +// end up _never_ getting to run because of the number of yield points that +// inevitably appear before it is reached. In general, you will want yield +// points to only appear in "leaf" futures -- those that do not themselves poll +// other futures. By doing this, you avoid double-counting each iteration of +// the outer future against the cooperating budget. + use pin_project_lite::pin_project; use std::cell::Cell; use std::future::Future; @@ -149,16 +172,28 @@ where } } -/// Turn off cooperative scheduling for a future. The future or stream will never yield. +/// Turn off cooperative scheduling for a future. The future will never be forced to yield by +/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields +/// otherwise. /// /// # Examples /// /// ``` /// # #[tokio::main] /// # async fn main() { -/// use tokio::coop; +/// use tokio::{coop, sync::mpsc}; +/// +/// let fut = async { +/// let (tx, mut rx) = mpsc::unbounded_channel(); +/// +/// for i in 0..1000 { +/// let _ = tx.send(()); +/// // This will always be ready. If coop was in effect, this code would be forced to yield +/// // periodically. However, if left unconstrained, then this code will never yield. +/// rx.recv().await; +/// } +/// }; /// -/// let fut = async { () }; /// coop::unconstrained(fut).await; /// # } /// ``` From aa2cb7bbafe6f16ad8d7baf9906909f3d938ab30 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Wed, 3 Mar 2021 04:20:51 -0800 Subject: [PATCH 3/5] move tokio::coop::unconstrained to tokio::task --- tokio/src/coop.rs | 66 ++++++--------------------------- tokio/src/macros/cfg.rs | 18 +++++++++ tokio/src/task/mod.rs | 29 +++++++++++++++ tokio/src/task/unconstrained.rs | 43 +++++++++++++++++++++ tokio/tests/rt_common.rs | 2 +- 5 files changed, 103 insertions(+), 55 deletions(-) create mode 100644 tokio/src/task/unconstrained.rs diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 21980d63803..6628272b4ab 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -26,6 +26,9 @@ //! To account for this, Tokio has explicit yield points in a number of library //! functions, which force tasks to return to the executor periodically. //! +//! If necessary, you may use [`task::unconstrained`][crate::task::unconstrained] to opt out +//! specific futures of Tokio's cooperative scheduling. +//! //! [`poll`]: method@std::future::Future::poll // ```ignore @@ -51,11 +54,7 @@ // other futures. By doing this, you avoid double-counting each iteration of // the outer future against the cooperating budget. -use pin_project_lite::pin_project; use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; thread_local! { static CURRENT: Cell = Cell::new(Budget::unconstrained()); @@ -102,6 +101,13 @@ pub(crate) fn budget(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } +/// Run the given closure with an unconstrained task budget. When the function returns, the budget +/// is reset to the value prior to calling the function. +#[inline(always)] +pub(crate) fn with_unconstrained(f: impl FnOnce() -> R) -> R { + with_budget(Budget::unconstrained(), f) +} + #[inline(always)] fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { @@ -151,57 +157,9 @@ cfg_rt! { } } -pin_project! { - /// Future for the [`unconstrained`](unconstrained) method. - #[must_use = "Unconstrained does nothing unless polled"] - pub struct Unconstrained { - #[pin] - inner: F, - } -} - -impl Future for Unconstrained -where - F: Future, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let inner = self.project().inner; - with_budget(Budget::unconstrained(), || inner.poll(cx)) - } -} - -/// Turn off cooperative scheduling for a future. The future will never be forced to yield by -/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields -/// otherwise. -/// -/// # Examples -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() { -/// use tokio::{coop, sync::mpsc}; -/// -/// let fut = async { -/// let (tx, mut rx) = mpsc::unbounded_channel(); -/// -/// for i in 0..1000 { -/// let _ = tx.send(()); -/// // This will always be ready. If coop was in effect, this code would be forced to yield -/// // periodically. However, if left unconstrained, then this code will never yield. -/// rx.recv().await; -/// } -/// }; -/// -/// coop::unconstrained(fut).await; -/// # } -/// ``` -pub fn unconstrained(inner: F) -> Unconstrained { - Unconstrained { inner } -} - cfg_coop! { + use std::task::{Context, Poll}; + #[must_use] pub(crate) struct RestoreOnPending(Cell); diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9ae098fb072..3442612938b 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -357,3 +357,21 @@ macro_rules! cfg_coop { )* } } + +macro_rules! cfg_not_coop { + ($($item:item)*) => { + $( + #[cfg(not(any( + feature = "fs", + feature = "io-std", + feature = "net", + feature = "process", + feature = "rt", + feature = "signal", + feature = "sync", + feature = "time", + )))] + $item + )* + } +} diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 5dc5e72c01e..5d4296f3e99 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -208,12 +208,38 @@ //! # .await; //! # } //! ``` +//! #### unconstrained +//! +//! Finally, this module provides [`task::unconstrained`], which lets you opt out a future of +//! tokio's [cooperative scheduling][crate::coop]. When a future is crapped with `unconstrained`, +//! it will never be forced to yield to Tokio. For example: +//! +//! ``` +//! # #[tokio::main] +//! # async fn main() { +//! use tokio::{task, sync::mpsc}; +//! +//! let fut = async { +//! let (tx, mut rx) = mpsc::unbounded_channel(); +//! +//! for i in 0..1000 { +//! let _ = tx.send(()); +//! // This will always be ready. If coop was in effect, this code would be forced to yield +//! // periodically. However, if left unconstrained, then this code will never yield. +//! rx.recv().await; +//! } +//! }; +//! +//! task::unconstrained(fut).await; +//! # } +//! ``` //! //! [`task::spawn_blocking`]: crate::task::spawn_blocking //! [`task::block_in_place`]: crate::task::block_in_place //! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler //! [`task::yield_now`]: crate::task::yield_now() //! [`thread::yield_now`]: std::thread::yield_now +//! [`task::unconstrained`]: crate::task::unconstrained cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle}; @@ -236,4 +262,7 @@ cfg_rt! { mod task_local; pub use task_local::LocalKey; + + mod unconstrained; + pub use unconstrained::{unconstrained, Unconstrained}; } diff --git a/tokio/src/task/unconstrained.rs b/tokio/src/task/unconstrained.rs new file mode 100644 index 00000000000..4a62f819ffa --- /dev/null +++ b/tokio/src/task/unconstrained.rs @@ -0,0 +1,43 @@ +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`unconstrained`](unconstrained) method. + #[must_use = "Unconstrained does nothing unless polled"] + pub struct Unconstrained { + #[pin] + inner: F, + } +} + +impl Future for Unconstrained +where + F: Future, +{ + type Output = ::Output; + + cfg_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + crate::coop::with_unconstrained(|| inner.poll(cx)) + } + } + + cfg_not_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + inner.poll(cx) + } + } +} + +/// Turn off cooperative scheduling for a future. The future will never be forced to yield by +/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields +/// otherwise. +/// +/// See also the usage example in the [task module](index.html#unconstrained). +pub fn unconstrained(inner: F) -> Unconstrained { + Unconstrained { inner } +} diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 7e0e5b41404..cb1d0f66152 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1032,7 +1032,7 @@ rt_test! { // Hope that all the tasks complete... time::sleep(Duration::from_millis(100)).await; - tokio::coop::unconstrained(poll_fn(|cx| { + tokio::task::unconstrained(poll_fn(|cx| { // All the tasks should be ready for task in &mut tasks { assert!(Pin::new(task).poll(cx).is_ready()); From f75a46749a055041cbc0dbdddd114614fdd66c44 Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Wed, 3 Mar 2021 04:59:40 -0800 Subject: [PATCH 4/5] fix up ambiguous docs link --- tokio/src/task/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 5d4296f3e99..a39af6bf3d7 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -239,7 +239,7 @@ //! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler //! [`task::yield_now`]: crate::task::yield_now() //! [`thread::yield_now`]: std::thread::yield_now -//! [`task::unconstrained`]: crate::task::unconstrained +//! [`task::unconstrained`]: crate::task::unconstrained() cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle}; From 36c62340c2e3d35365cf8028ebf9dff17d2fac0f Mon Sep 17 00:00:00 2001 From: Thomas Orozco Date: Tue, 9 Mar 2021 04:50:16 -0800 Subject: [PATCH 5/5] make coop module private again --- tokio/src/coop.rs | 29 ++--------------------------- tokio/src/lib.rs | 2 +- tokio/src/task/mod.rs | 35 ++++++++++++++++++++++++++++++++--- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 6628272b4ab..16d93fb7516 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -2,34 +2,9 @@ //! Yield points for improved cooperative scheduling. //! -//! A single call to [`poll`] on a top-level task may potentially do a lot of -//! work before it returns `Poll::Pending`. If a task runs for a long period of -//! time without yielding back to the executor, it can starve other tasks -//! waiting on that executor to execute them, or drive underlying resources. -//! Since Rust does not have a runtime, it is difficult to forcibly preempt a -//! long-running task. Instead, this module provides an opt-in mechanism for -//! futures to collaborate with the executor to avoid starvation. +//! Documentation for this can be found in the [`tokio::task`] module. //! -//! Consider a future like this one: -//! -//! ``` -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await {} -//! } -//! ``` -//! -//! It may look harmless, but consider what happens under heavy load if the -//! input stream is _always_ ready. If we spawn `drop_all`, the task will never -//! yield, and will starve other tasks and resources on the same executor. -//! -//! To account for this, Tokio has explicit yield points in a number of library -//! functions, which force tasks to return to the executor periodically. -//! -//! If necessary, you may use [`task::unconstrained`][crate::task::unconstrained] to opt out -//! specific futures of Tokio's cooperative scheduling. -//! -//! [`poll`]: method@std::future::Future::poll +//! [`tokio::task`]: crate::task. // ```ignore // # use tokio_stream::{Stream, StreamExt}; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index eec13d86df8..f6c502e56c0 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -372,7 +372,7 @@ cfg_rt! { pub mod runtime; } -pub mod coop; +pub(crate) mod coop; cfg_signal! { pub mod signal; diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index a39af6bf3d7..abae818b62d 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -208,11 +208,39 @@ //! # .await; //! # } //! ``` +//! +//! ### Cooperative scheduling +//! +//! A single call to [`poll`] on a top-level task may potentially do a lot of +//! work before it returns `Poll::Pending`. If a task runs for a long period of +//! time without yielding back to the executor, it can starve other tasks +//! waiting on that executor to execute them, or drive underlying resources. +//! Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for +//! futures to collaborate with the executor to avoid starvation. +//! +//! Consider a future like this one: +//! +//! ``` +//! # use tokio_stream::{Stream, StreamExt}; +//! async fn drop_all(mut input: I) { +//! while let Some(_) = input.next().await {} +//! } +//! ``` +//! +//! It may look harmless, but consider what happens under heavy load if the +//! input stream is _always_ ready. If we spawn `drop_all`, the task will never +//! yield, and will starve other tasks and resources on the same executor. +//! +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to return to the executor periodically. +//! +//! //! #### unconstrained //! -//! Finally, this module provides [`task::unconstrained`], which lets you opt out a future of -//! tokio's [cooperative scheduling][crate::coop]. When a future is crapped with `unconstrained`, -//! it will never be forced to yield to Tokio. For example: +//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative +//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to +//! Tokio. For example: //! //! ``` //! # #[tokio::main] @@ -240,6 +268,7 @@ //! [`task::yield_now`]: crate::task::yield_now() //! [`thread::yield_now`]: std::thread::yield_now //! [`task::unconstrained`]: crate::task::unconstrained() +//! [`poll`]: method@std::future::Future::poll cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle};