Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: expose an unconstrained() opt-out #3547

Merged
merged 5 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 81 additions & 29 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Opt-in yield points for improved cooperative scheduling.
//! Yield points for improved cooperative scheduling.
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
Expand All @@ -21,37 +21,41 @@
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor. With
//! opt-in yield points, this problem is alleviated:
//! yield, and will starve other tasks and resources on the same executor.
//!
//! ```ignore
krallin marked this conversation as resolved.
Show resolved Hide resolved
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {
//! tokio::coop::proceed().await;
//! }
//! }
//! ```
//!
//! The `proceed` future will coordinate with the executor to make sure that
//! every so often control is yielded back to the executor so it can run other
//! tasks.
//!
//! # Placing yield points
//!
//! Voluntary yield points should be placed _after_ at least some work has been
//! done. If they are not, a future sufficiently deep in the task hierarchy may
//! end up _never_ getting to run because of the number of yield points that
//! inevitably appear before it is reached. In general, you will want yield
//! points to only appear in "leaf" futures -- those that do not themselves poll
//! other futures. By doing this, you avoid double-counting each iteration of
//! the outer future against the cooperating budget.
//! To account for this, Tokio has explicit yield points in a number of library
//! functions, which force tasks to return to the executor periodically.
//!
//! [`poll`]: method@std::future::Future::poll

// NOTE: The doctests in this module are ignored since the whole module is (currently) private.

// ```ignore
// # use tokio_stream::{Stream, StreamExt};
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
// while let Some(_) = input.next().await {
// tokio::coop::proceed().await;
// }
// }
// ```
//
// The `proceed` future will coordinate with the executor to make sure that
// every so often control is yielded back to the executor so it can run other
// tasks.
//
// # Placing yield points
//
// Voluntary yield points should be placed _after_ at least some work has been
// done. If they are not, a future sufficiently deep in the task hierarchy may
// end up _never_ getting to run because of the number of yield points that
// inevitably appear before it is reached. In general, you will want yield
// points to only appear in "leaf" futures -- those that do not themselves poll
// other futures. By doing this, you avoid double-counting each iteration of
// the outer future against the cooperating budget.

use pin_project_lite::pin_project;
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

thread_local! {
static CURRENT: Cell<Budget> = Cell::new(Budget::unconstrained());
Expand Down Expand Up @@ -147,9 +151,57 @@ cfg_rt! {
}
}

cfg_coop! {
use std::task::{Context, Poll};
pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
inner: F,
}
}

impl<F> Future for Unconstrained<F>
where
F: Future,
{
type Output = <F as Future>::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
with_budget(Budget::unconstrained(), || inner.poll(cx))
}
}

/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
/// otherwise.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() {
/// use tokio::{coop, sync::mpsc};
///
/// let fut = async {
/// let (tx, mut rx) = mpsc::unbounded_channel();
///
/// for i in 0..1000 {
/// let _ = tx.send(());
/// // This will always be ready. If coop was in effect, this code would be forced to yield
/// // periodically. However, if left unconstrained, then this code will never yield.
/// rx.recv().await;
/// }
/// };
///
/// coop::unconstrained(fut).await;
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
/// # }
/// ```
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only be available under cfg_coop! @carllerche ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that mean that if you need to use this in a library you'd force your users to enable coop? (granted, I'm not sure familiar with features in Cargo, as I mentioned in that other thread we don't really use Cargo at $work :) )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I remember now. cfg_coop! is actually a short-hand for "if any of these features are enabled". It's not a feature in and of itself. The idea is that if you don't bring in a runtime or any resource types, then there's no reason to build the coop stuff. So I think cfg_coop! should also apply to these types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a relatively complicated feature list. I might just prefer always exposing it, having it be a no-op without coop in the runtime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was at the request of @carllerche, though his opinions may have changed since?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut is that this should live in the task module for the public API. I like what @Darksonn suggests, expose the fn anytime the task module is enabled but the impl changes based on cfg_coop. Either it does something or it is a noop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut is that this should live in the task module for the public API. I like what @Darksonn suggests, expose the fn anytime the task module is enabled but the impl changes based on cfg_coop. Either it does something or it is a noop.

Should I be making this update in this PR then? I'm not certain exactly what y'all would like me to change to get this moving forward :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please go ahead and make this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good; thanks; I just updated the PR.

Unconstrained { inner }
}

cfg_coop! {
#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ cfg_rt! {
pub mod runtime;
}

pub(crate) mod coop;
pub mod coop;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved the function to tokio::task, this shouldn't be made public.

Copy link
Contributor Author

@krallin krallin Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it public so we could have access to some documentation for the coop module. Otherwise, there isn't much to explain what the wrapper does, and documenting the coop mechanism in the wrapper to disable it seems a bit counter-intuitive.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, good point. I think I would still prefer to put it on the module docs of an existing module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind — where should it live then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably go on tokio::task or tokio::runtime is my thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into tokio::task and made tokio::coop pub(crate) again.


cfg_signal! {
pub mod signal;
Expand Down
26 changes: 26 additions & 0 deletions tokio/tests/rt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,32 @@ rt_test! {
});
}

#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;

let rt = rt();

rt.block_on(async {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
// Create a bunch of tasks
let mut tasks = (0..1_000).map(|_| {
tokio::spawn(async { })
}).collect::<Vec<_>>();

// Hope that all the tasks complete...
time::sleep(Duration::from_millis(100)).await;

tokio::coop::unconstrained(poll_fn(|cx| {
// All the tasks should be ready
for task in &mut tasks {
assert!(Pin::new(task).poll(cx).is_ready());
}

Ready(())
})).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
Expand Down