Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: fix LocalSet having a single shared task budget #2462

Merged
merged 4 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,11 @@ where
return f();
}

struct Guard<'a>(&'a Cell<usize>);
impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.0.set(UNCONSTRAINED);
}
}

hits.set(BUDGET);
let _guard = Guard(hits);
let _guard = ResetGuard {
hits,
prev: UNCONSTRAINED,
};
f()
})
}
Expand All @@ -114,6 +110,32 @@ cfg_blocking_impl! {
}
}

cfg_rt_core! {
cfg_rt_util! {
/// Run the given closure with a new task budget, resetting the previous
/// budget when the closure finishes.
///
/// This is intended for internal use by `LocalSet` and (potentially) other
/// similar schedulers which are themselves futures, and need a fresh budget
/// for each of their children.
#[inline(always)]
pub(crate) fn reset<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
HITS.with(move |hits| {
let prev = hits.get();
hits.set(UNCONSTRAINED);
let _guard = ResetGuard {
hits,
prev,
};
f()
})
}
}
}

/// Invoke `f` with a subset of the remaining budget.
///
/// This is useful if you have sub-futures that you need to poll, but that you want to restrict
Expand Down Expand Up @@ -225,7 +247,7 @@ pub fn limit<R>(bound: usize, f: impl FnOnce() -> R) -> R {
pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
HITS.with(|hits| {
let n = hits.get();
if n == UNCONSTRAINED {
if dbg!(n) == UNCONSTRAINED {
// opted out of budgeting
Poll::Ready(())
} else if n == 0 {
Expand Down Expand Up @@ -289,6 +311,11 @@ pin_project_lite::pin_project! {
}
}

struct ResetGuard<'a> {
hits: &'a Cell<usize>,
prev: usize,
}

impl<F: Future> Future for CoopFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down Expand Up @@ -327,6 +354,12 @@ cfg_sync! {
impl<F> CoopFutureExt for F where F: Future {}
}

impl<'a> Drop for ResetGuard<'a> {
fn drop(&mut self) {
self.hits.set(self.prev);
}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::*;
Expand Down
17 changes: 17 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,23 @@ macro_rules! cfg_blocking_impl {
}
}

/// Enables blocking API internals
macro_rules! cfg_blocking_impl_or_task {
($($item:item)*) => {
$(
#[cfg(any(
feature = "blocking",
feature = "fs",
feature = "dns",
feature = "io-std",
feature = "rt-threaded",
feature = "task",
))]
$item
)*
}
}

/// Enables enter::block_on
macro_rules! cfg_block_on {
($($item:item)*) => {
Expand Down
67 changes: 38 additions & 29 deletions tokio/src/task/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl LocalSet {
/// notified again.
fn tick(&self) -> bool {
for _ in 0..MAX_TASKS_PER_TICK {
match self.next_task() {
match dbg!(self.next_task()) {
// Run the task
//
// Safety: As spawned tasks are `!Send`, `run_unchecked` must be
Expand All @@ -416,9 +416,9 @@ impl LocalSet {

fn next_task(&self) -> Option<task::Notified<Arc<Shared>>> {
let tick = self.tick.get();
self.tick.set(tick.wrapping_add(1));
self.tick.set(dbg!(tick.wrapping_add(1)));

if tick % REMOTE_FIRST_INTERVAL == 0 {
if dbg!(tick % REMOTE_FIRST_INTERVAL == 0) {
self.context
.shared
.queue
Expand Down Expand Up @@ -454,20 +454,24 @@ impl Future for LocalSet {
// Register the waker before starting to work
self.context.shared.waker.register_by_ref(cx.waker());

if self.with(|| self.tick()) {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
Poll::Pending
} else if self.context.tasks.borrow().owned.is_empty() {
// If the scheduler has no remaining futures, we're done!
Poll::Ready(())
} else {
// There are still futures in the local set, but we've polled all the
// futures in the run queue. Therefore, we can just return Pending
// since the remaining futures will be woken from somewhere else.
Poll::Pending
}
// Reset any previous task budget while polling tasks spawned on the
// `LocalSet`, ensuring that each has its own separate budget.
crate::coop::reset(|| {
if self.with(|| self.tick()) {
// If `tick` returns true, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
Poll::Pending
} else if self.context.tasks.borrow().owned.is_empty() {
// If the scheduler has no remaining futures, we're done!
Poll::Ready(())
} else {
// There are still futures in the local set, but we've polled all the
// futures in the run queue. Therefore, we can just return Pending
// since the remaining futures will be woken from somewhere else.
Poll::Pending
}
})
}
}

Expand Down Expand Up @@ -521,18 +525,23 @@ impl<T: Future> Future for RunUntil<'_, T> {
.register_by_ref(cx.waker());

let _no_blocking = crate::runtime::enter::disallow_blocking();

if let Poll::Ready(output) = me.future.poll(cx) {
return Poll::Ready(output);
}

if me.local_set.tick() {
// If `tick` returns `true`, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}

Poll::Pending
// Reset any previous task budget so that the future passed to
// `run_until` and any tasks spawned on the `LocalSet` have their
// own budgets.
crate::coop::reset(|| {
let f = me.future;
if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) {
return Poll::Ready(output);
}

if me.local_set.tick() {
// If `tick` returns `true`, we need to notify the local future again:
// there are still tasks remaining in the run queue.
cx.waker().wake_by_ref();
}

Poll::Pending
})
})
}
}
Expand Down
75 changes: 54 additions & 21 deletions tokio/tests/task_local_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,28 +312,17 @@ fn drop_cancels_tasks() {
assert_eq!(1, Rc::strong_count(&rc1));
}

#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
/// Runs a test function in a separate thread, and panics if the test does not
/// complete within the specified timeout, or if the test function panics.
///
/// This is intended for running tests whose failure mode is a hang or infinite
/// loop that cannot be detected otherwise.
fn with_timeout(timeout: Duration, f: impl FnOnce() + Send + 'static) {
use std::sync::mpsc::RecvTimeoutError;

let (done_tx, done_rx) = std::sync::mpsc::channel();
let thread = std::thread::spawn(move || {
let (tx, mut rx) = mpsc::channel::<()>(1024);

let mut rt = rt();

let local = LocalSet::new();
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
local.block_on(&mut rt, async {
time::delay_for(Duration::from_millis(1)).await;
});

drop(tx);

// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
f();

// Send a message on the channel so that the test thread can
// determine if we have entered an infinite loop:
Expand All @@ -349,10 +338,11 @@ fn drop_cancels_remote_tasks() {
//
// Note that it should definitely complete in under a minute, but just
// in case CI is slow, we'll give it a long timeout.
match done_rx.recv_timeout(Duration::from_secs(60)) {
match done_rx.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) => panic!(
"test did not complete within 60 seconds, \
we have (probably) entered an infinite loop!"
"test did not complete within {:?} seconds, \
we have (probably) entered an infinite loop!",
timeout,
),
// Did the test thread panic? We'll find out for sure when we `join`
// with it.
Expand All @@ -366,6 +356,49 @@ fn drop_cancels_remote_tasks() {
thread.join().expect("test thread should not panic!")
}

#[test]
fn drop_cancels_remote_tasks() {
// This test reproduces issue #1885.
with_timeout(Duration::from_secs(60), || {
let (tx, mut rx) = mpsc::channel::<()>(1024);

let mut rt = rt();

let local = LocalSet::new();
local.spawn_local(async move { while let Some(_) = rx.recv().await {} });
local.block_on(&mut rt, async {
time::delay_for(Duration::from_millis(1)).await;
});

drop(tx);

// This enters an infinite loop if the remote notified tasks are not
// properly cancelled.
drop(local);
});
}

#[test]
fn local_tasks_wake_join_all() {
// This test reproduces issue #2460.
with_timeout(Duration::from_secs(60), || {
use futures::future::join_all;
use tokio::task::LocalSet;

let mut rt = rt();
let set = LocalSet::new();
let mut handles = Vec::new();

for _ in 1..=128 {
handles.push(set.spawn_local(async move {
tokio::task::spawn_local(async move {}).await.unwrap();
}));
}

rt.block_on(set.run_until(join_all(handles)));
});
}

#[tokio::test]
async fn local_tasks_are_polled_after_tick() {
// Reproduces issues #1899 and #1900
Expand Down