Skip to content

Commit

Permalink
coop: expose an unconstrained() opt-out
Browse files Browse the repository at this point in the history
This adds an opt-out for code that might not be compatible with Tokio
coop (e.g. heavily nested `FuturesUnordered`).

Notes to reviewer:

- I made the coop module public for this.
- I removed a few docs in said coop module that reference methods that
  don't exist yet.

For more context, see the discussion in:
tokio-rs#3516
  • Loading branch information
krallin committed Feb 23, 2021
1 parent 7de18af commit a921c88
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 30 deletions.
75 changes: 46 additions & 29 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Opt-in yield points for improved cooperative scheduling.
//! Yield points for improved cooperative scheduling.
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
Expand All @@ -21,37 +21,18 @@
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor. With
//! opt-in yield points, this problem is alleviated:
//! yield, and will starve other tasks and resources on the same executor.
//!
//! ```ignore
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {
//! tokio::coop::proceed().await;
//! }
//! }
//! ```
//!
//! The `proceed` future will coordinate with the executor to make sure that
//! every so often control is yielded back to the executor so it can run other
//! tasks.
//!
//! # Placing yield points
//!
//! Voluntary yield points should be placed _after_ at least some work has been
//! done. If they are not, a future sufficiently deep in the task hierarchy may
//! end up _never_ getting to run because of the number of yield points that
//! inevitably appear before it is reached. In general, you will want yield
//! points to only appear in "leaf" futures -- those that do not themselves poll
//! other futures. By doing this, you avoid double-counting each iteration of
//! the outer future against the cooperating budget.
//! To account for this, Tokio has explicit yield points in a number of library
//! functions, which force tasks to return to the executor periodically.
//!
//! [`poll`]: method@std::future::Future::poll

// NOTE: The doctests in this module are ignored since the whole module is (currently) private.

use pin_project_lite::pin_project;
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

thread_local! {
static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
Expand Down Expand Up @@ -147,9 +128,45 @@ cfg_rt! {
}
}

cfg_coop! {
use std::task::{Context, Poll};
pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
inner: F,
}
}

impl<F> Future for Unconstrained<F>
where
F: Future,
{
type Output = <F as Future>::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
with_budget(Budget::unconstrained(), || inner.poll(cx))
}
}

/// Turn off cooperative scheduling for a future. The future or stream will never yield.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::coop;
///
/// let fut = async { () };
/// coop::unconstrained(fut).await;
/// # }
/// ```
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Unconstrained { inner }
}

cfg_coop! {
#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ cfg_rt! {
pub mod runtime;
}

pub(crate) mod coop;
pub mod coop;

cfg_signal! {
pub mod signal;
Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,32 @@ rt_test! {
});
}

#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;

let rt = rt();

rt.block_on(async {
// Create a bunch of tasks
let mut tasks = (0..1_000).map(|_| {
tokio::spawn(async { })
}).collect::<Vec<_>>();

// Hope that all the tasks complete...
time::sleep(Duration::from_millis(100)).await;

tokio::coop::unconstrained(poll_fn(|cx| {
// All the tasks should be ready
for task in &mut tasks {
assert!(Pin::new(task).poll(cx).is_ready());
}

Ready(())
})).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
Expand Down

0 comments on commit a921c88

Please sign in to comment.