Skip to content

Commit

Permalink
coop: expose an unconstrained() opt-out (#3547)
Browse files Browse the repository at this point in the history
  • Loading branch information
krallin authored Mar 9, 2021
1 parent f70b9b8 commit 05eeea5
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 48 deletions.
81 changes: 33 additions & 48 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,33 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Opt-in yield points for improved cooperative scheduling.
//! Yield points for improved cooperative scheduling.
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
//! time without yielding back to the executor, it can starve other tasks
//! waiting on that executor to execute them, or drive underlying resources.
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
//! long-running task. Instead, this module provides an opt-in mechanism for
//! futures to collaborate with the executor to avoid starvation.
//! Documentation for this can be found in the [`tokio::task`] module.
//!
//! Consider a future like this one:
//!
//! ```
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {}
//! }
//! ```
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor. With
//! opt-in yield points, this problem is alleviated:
//!
//! ```ignore
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {
//! tokio::coop::proceed().await;
//! }
//! }
//! ```
//!
//! The `proceed` future will coordinate with the executor to make sure that
//! every so often control is yielded back to the executor so it can run other
//! tasks.
//!
//! # Placing yield points
//!
//! Voluntary yield points should be placed _after_ at least some work has been
//! done. If they are not, a future sufficiently deep in the task hierarchy may
//! end up _never_ getting to run because of the number of yield points that
//! inevitably appear before it is reached. In general, you will want yield
//! points to only appear in "leaf" futures -- those that do not themselves poll
//! other futures. By doing this, you avoid double-counting each iteration of
//! the outer future against the cooperating budget.
//!
//! [`poll`]: method@std::future::Future::poll

// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
//! [`tokio::task`]: crate::task.

// ```ignore
// # use tokio_stream::{Stream, StreamExt};
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
// while let Some(_) = input.next().await {
// tokio::coop::proceed().await;
// }
// }
// ```
//
// The `proceed` future will coordinate with the executor to make sure that
// every so often control is yielded back to the executor so it can run other
// tasks.
//
// # Placing yield points
//
// Voluntary yield points should be placed _after_ at least some work has been
// done. If they are not, a future sufficiently deep in the task hierarchy may
// end up _never_ getting to run because of the number of yield points that
// inevitably appear before it is reached. In general, you will want yield
// points to only appear in "leaf" futures -- those that do not themselves poll
// other futures. By doing this, you avoid double-counting each iteration of
// the outer future against the cooperating budget.

use std::cell::Cell;

Expand Down Expand Up @@ -98,6 +76,13 @@ pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

/// Run the given closure with an unconstrained task budget. When the function returns, the budget
/// is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::unconstrained(), f)
}

#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,21 @@ macro_rules! cfg_coop {
)*
}
}

macro_rules! cfg_not_coop {
($($item:item)*) => {
$(
#[cfg(not(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
)))]
$item
)*
}
}
58 changes: 58 additions & 0 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,66 @@
//! # }
//! ```
//!
//! ### Cooperative scheduling
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
//! time without yielding back to the executor, it can starve other tasks
//! waiting on that executor to execute them, or drive underlying resources.
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
//! long-running task. Instead, this module provides an opt-in mechanism for
//! futures to collaborate with the executor to avoid starvation.
//!
//! Consider a future like this one:
//!
//! ```
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {}
//! }
//! ```
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor.
//!
//! To account for this, Tokio has explicit yield points in a number of library
//! functions, which force tasks to return to the executor periodically.
//!
//!
//! #### unconstrained
//!
//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative
//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
//! Tokio. For example:
//!
//! ```
//! # #[tokio::main]
//! # async fn main() {
//! use tokio::{task, sync::mpsc};
//!
//! let fut = async {
//! let (tx, mut rx) = mpsc::unbounded_channel();
//!
//! for i in 0..1000 {
//! let _ = tx.send(());
//! // This will always be ready. If coop was in effect, this code would be forced to yield
//! // periodically. However, if left unconstrained, then this code will never yield.
//! rx.recv().await;
//! }
//! };
//!
//! task::unconstrained(fut).await;
//! # }
//! ```
//!
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
//! [`task::block_in_place`]: crate::task::block_in_place
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
//! [`task::yield_now`]: crate::task::yield_now()
//! [`thread::yield_now`]: std::thread::yield_now
//! [`task::unconstrained`]: crate::task::unconstrained()
//! [`poll`]: method@std::future::Future::poll

cfg_rt! {
pub use crate::runtime::task::{JoinError, JoinHandle};
Expand All @@ -236,4 +291,7 @@ cfg_rt! {

mod task_local;
pub use task_local::LocalKey;

mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};
}
43 changes: 43 additions & 0 deletions tokio/src/task/unconstrained.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
inner: F,
}
}

impl<F> Future for Unconstrained<F>
where
F: Future,
{
type Output = <F as Future>::Output;

cfg_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
crate::coop::with_unconstrained(|| inner.poll(cx))
}
}

cfg_not_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
inner.poll(cx)
}
}
}

/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
/// otherwise.
///
/// See also the usage example in the [task module](index.html#unconstrained).
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Unconstrained { inner }
}
26 changes: 26 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,32 @@ rt_test! {
});
}

#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;

let rt = rt();

rt.block_on(async {
// Create a bunch of tasks
let mut tasks = (0..1_000).map(|_| {
tokio::spawn(async { })
}).collect::<Vec<_>>();

// Hope that all the tasks complete...
time::sleep(Duration::from_millis(100)).await;

tokio::task::unconstrained(poll_fn(|cx| {
// All the tasks should be ready
for task in &mut tasks {
assert!(Pin::new(task).poll(cx).is_ready());
}

Ready(())
})).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
Expand Down

0 comments on commit 05eeea5

Please sign in to comment.