Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: expose an unconstrained() opt-out #3547

Merged
merged 5 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 33 additions & 48 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,33 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Opt-in yield points for improved cooperative scheduling.
//! Yield points for improved cooperative scheduling.
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
//! time without yielding back to the executor, it can starve other tasks
//! waiting on that executor to execute them, or drive underlying resources.
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
//! long-running task. Instead, this module provides an opt-in mechanism for
//! futures to collaborate with the executor to avoid starvation.
//! Documentation for this can be found in the [`tokio::task`] module.
//!
//! Consider a future like this one:
//!
//! ```
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {}
//! }
//! ```
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor. With
//! opt-in yield points, this problem is alleviated:
//!
//! ```ignore
krallin marked this conversation as resolved.
Show resolved Hide resolved
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {
//! tokio::coop::proceed().await;
//! }
//! }
//! ```
//!
//! The `proceed` future will coordinate with the executor to make sure that
//! every so often control is yielded back to the executor so it can run other
//! tasks.
//!
//! # Placing yield points
//!
//! Voluntary yield points should be placed _after_ at least some work has been
//! done. If they are not, a future sufficiently deep in the task hierarchy may
//! end up _never_ getting to run because of the number of yield points that
//! inevitably appear before it is reached. In general, you will want yield
//! points to only appear in "leaf" futures -- those that do not themselves poll
//! other futures. By doing this, you avoid double-counting each iteration of
//! the outer future against the cooperating budget.
//!
//! [`poll`]: method@std::future::Future::poll

// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
//! [`tokio::task`]: crate::task.

// ```ignore
// # use tokio_stream::{Stream, StreamExt};
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
// while let Some(_) = input.next().await {
// tokio::coop::proceed().await;
// }
// }
// ```
//
// The `proceed` future will coordinate with the executor to make sure that
// every so often control is yielded back to the executor so it can run other
// tasks.
//
// # Placing yield points
//
// Voluntary yield points should be placed _after_ at least some work has been
// done. If they are not, a future sufficiently deep in the task hierarchy may
// end up _never_ getting to run because of the number of yield points that
// inevitably appear before it is reached. In general, you will want yield
// points to only appear in "leaf" futures -- those that do not themselves poll
// other futures. By doing this, you avoid double-counting each iteration of
// the outer future against the cooperating budget.

use std::cell::Cell;

Expand Down Expand Up @@ -98,6 +76,13 @@ pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

/// Run the given closure with an unconstrained task budget. When the function returns, the budget
/// is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::unconstrained(), f)
}

#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,3 +357,21 @@ macro_rules! cfg_coop {
)*
}
}

macro_rules! cfg_not_coop {
($($item:item)*) => {
$(
#[cfg(not(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
)))]
$item
)*
}
}
58 changes: 58 additions & 0 deletions tokio/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,66 @@
//! # }
//! ```
//!
//! ### Cooperative scheduling
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
//! time without yielding back to the executor, it can starve other tasks
//! waiting on that executor to execute them, or drive underlying resources.
//! Since Rust does not have a runtime, it is difficult to forcibly preempt a
//! long-running task. Instead, this module provides an opt-in mechanism for
//! futures to collaborate with the executor to avoid starvation.
//!
//! Consider a future like this one:
//!
//! ```
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {}
//! }
Comment on lines +225 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure which version I prefer doc-wise, but this could also be written without the Unpin bound:

Suggested change
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {}
//! }
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream>(input: I) {
//! tokio::pin!(input);
//! while let Some(_) = input.next().await {}
//! }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really just ported this one from the existing docs, but I'm happy to change it, sure

//! ```
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor.
//!
//! To account for this, Tokio has explicit yield points in a number of library
//! functions, which force tasks to return to the executor periodically.
//!
//!
//! #### unconstrained
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this being lowercase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pattern-matched from the other functions in there, e.g. yield_now:

tokio/tokio/src/task/mod.rs

Lines 185 to 192 in 704de8c

//! #### yield_now
//!
//! In addition, this module provides a [`task::yield_now`] async function
//! that is analogous to the standard library's [`thread::yield_now`]. Calling
//! and `await`ing this function will cause the current task to yield to the
//! Tokio runtime's scheduler, allowing other tasks to be
//! scheduled. Eventually, the yielding task will be polled again, allowing it
//! to execute. For example:

Should we change them all?

//!
//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative
//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to
//! Tokio. For example:
//!
//! ```
//! # #[tokio::main]
//! # async fn main() {
//! use tokio::{task, sync::mpsc};
//!
//! let fut = async {
//! let (tx, mut rx) = mpsc::unbounded_channel();
//!
//! for i in 0..1000 {
//! let _ = tx.send(());
//! // This will always be ready. If coop was in effect, this code would be forced to yield
//! // periodically. However, if left unconstrained, then this code will never yield.
//! rx.recv().await;
//! }
//! };
//!
//! task::unconstrained(fut).await;
//! # }
//! ```
//!
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
//! [`task::block_in_place`]: crate::task::block_in_place
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
//! [`task::yield_now`]: crate::task::yield_now()
//! [`thread::yield_now`]: std::thread::yield_now
//! [`task::unconstrained`]: crate::task::unconstrained()
//! [`poll`]: method@std::future::Future::poll

cfg_rt! {
pub use crate::runtime::task::{JoinError, JoinHandle};
Expand All @@ -236,4 +291,7 @@ cfg_rt! {

mod task_local;
pub use task_local::LocalKey;

mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};
}
43 changes: 43 additions & 0 deletions tokio/src/task/unconstrained.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
inner: F,
}
}

impl<F> Future for Unconstrained<F>
where
F: Future,
{
type Output = <F as Future>::Output;

cfg_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
crate::coop::with_unconstrained(|| inner.poll(cx))
}
}

cfg_not_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
inner.poll(cx)
}
}
}

/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
/// otherwise.
///
/// See also the usage example in the [task module](index.html#unconstrained).
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Unconstrained { inner }
}
26 changes: 26 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,32 @@ rt_test! {
});
}

#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;

let rt = rt();

rt.block_on(async {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
// Create a bunch of tasks
let mut tasks = (0..1_000).map(|_| {
tokio::spawn(async { })
}).collect::<Vec<_>>();

// Hope that all the tasks complete...
time::sleep(Duration::from_millis(100)).await;

tokio::task::unconstrained(poll_fn(|cx| {
// All the tasks should be ready
for task in &mut tasks {
assert!(Pin::new(task).poll(cx).is_ready());
}

Ready(())
})).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
Expand Down