From 82526fe04dab222b6307b2bfa6a90acf43b5f543 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Tue, 28 Jul 2020 17:52:10 -0400 Subject: [PATCH 1/3] Reset coop budget when blocking in block_on Previously, we would fail to reset the coop budget in this case, making it so that `coop::poll_proceed` would perpetually yield `Poll::Pending` in nested executers even when run in `block_in_place`. This is also a further improvement on #2645. --- tokio/src/runtime/thread_pool/worker.rs | 6 +-- tokio/tests/task_blocking.rs | 51 +++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index abe20da59cf..c53c9384bde 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -199,7 +199,6 @@ cfg_blocking! { } } - let mut had_core = false; let mut had_entered = false; CURRENT.with(|maybe_cx| { @@ -248,7 +247,6 @@ cfg_blocking! { // // First, move the core back into the worker's shared core slot. cx.worker.core.set(core); - had_core = true; // Next, clone the worker handle and send it to a new thread for // processing. @@ -259,13 +257,11 @@ cfg_blocking! { runtime::spawn_blocking(move || run(worker)); }); - if had_core { + if had_entered { // Unset the current task's budget. Blocking sections are not // constrained by task budgets. let _reset = Reset(coop::stop()); - crate::runtime::enter::exit(f) - } else if had_entered { crate::runtime::enter::exit(f) } else { f() diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 50c070a355a..9ab1ce03587 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -176,3 +176,54 @@ fn can_shutdown_now_in_runtime() { rt.shutdown_background(); }); } + +#[test] +fn coop_disabled_in_block_in_place() { + let mut outer = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + for i in 0..200 { + tx.send(i).unwrap(); + } + drop(tx); + + outer + .block_on(async move { + tokio::spawn(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); + }) + }) + }) + .await + }) + .unwrap(); +} + +#[test] +fn coop_disabled_in_block_in_place_in_block_on() { + let mut outer = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + for i in 0..200 { + tx.send(i).unwrap(); + } + drop(tx); + + outer.block_on(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| { dbg!(n + 1) }).await, 200); + }) + }) + }); +} From 7ef5c72d16976cbc178dc2ef6587714c314b4483 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Tue, 28 Jul 2020 18:09:29 -0400 Subject: [PATCH 2/3] Make tests time out nicely --- tokio/tests/task_blocking.rs | 68 ++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index 9ab1ce03587..d1fa192d31e 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -181,6 +181,7 @@ fn can_shutdown_now_in_runtime() { fn coop_disabled_in_block_in_place() { let mut outer = tokio::runtime::Builder::new() .threaded_scheduler() + .enable_time() .build() .unwrap(); @@ -190,40 +191,55 @@ fn coop_disabled_in_block_in_place() { } drop(tx); - outer - .block_on(async move { - tokio::spawn(async move { - tokio::task::block_in_place(move || { - futures::executor::block_on(async move { - use tokio::stream::StreamExt; - assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); - }) + outer.block_on(async move { + let jh = tokio::spawn(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); }) }) + }); + + tokio::time::timeout(Duration::from_secs(1), jh) .await - }) - .unwrap(); + .expect("timed out (probably hanging)") + .unwrap() + }); } #[test] fn coop_disabled_in_block_in_place_in_block_on() { - let mut outer = tokio::runtime::Builder::new() - .threaded_scheduler() - .build() - .unwrap(); - - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - for i in 0..200 { - tx.send(i).unwrap(); - } - drop(tx); + let (done_tx, done_rx) = std::sync::mpsc::channel(); + let done = done_tx.clone(); + thread::spawn(move || { + let mut outer = tokio::runtime::Builder::new() + .threaded_scheduler() + .build() + .unwrap(); - outer.block_on(async move { - tokio::task::block_in_place(move || { - futures::executor::block_on(async move { - use tokio::stream::StreamExt; - assert_eq!(rx.fold(0, |n, _| { dbg!(n + 1) }).await, 200); + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + for i in 0..200 { + tx.send(i).unwrap(); + } + drop(tx); + + let x = outer.block_on(async move { + tokio::task::block_in_place(move || { + futures::executor::block_on(async move { + use tokio::stream::StreamExt; + assert_eq!(rx.fold(0, |n, _| n + 1).await, 200); + }) }) - }) + }); + + let _ = done.send(Ok(x)); }); + + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); + let _ = done_tx.send(Err("timed out (probably hanging)")); + }); + + done_rx.recv().unwrap().unwrap(); } From 2ff68bdf8751622172928b6c68d4c48db9273f18 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Tue, 28 Jul 2020 18:43:37 -0400 Subject: [PATCH 3/3] Thanks clippy --- tokio/tests/task_blocking.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index d1fa192d31e..4ca1596e052 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -224,7 +224,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { } drop(tx); - let x = outer.block_on(async move { + outer.block_on(async move { tokio::task::block_in_place(move || { futures::executor::block_on(async move { use tokio::stream::StreamExt; @@ -233,7 +233,7 @@ fn coop_disabled_in_block_in_place_in_block_on() { }) }); - let _ = done.send(Ok(x)); + let _ = done.send(Ok(())); }); thread::spawn(move || {