From ea25baf0996d79f6d2e8e67111d0b8b04d051127 Mon Sep 17 00:00:00 2001 From: Jason Orendorff Date: Wed, 26 Oct 2022 10:19:04 -0500 Subject: [PATCH] spawn_blocking: Re-enable auto-advance if the task panics. This uses a destructor, so it will also work if tokio machinery panics while trying to e.g. spawn a thread. --- tokio/src/runtime/blocking/pool.rs | 6 ++--- tokio/src/runtime/blocking/task.rs | 7 +---- tokio/src/time/clock.rs | 42 ++++++++++++++++-------------- tokio/src/time/mod.rs | 4 +-- tokio/tests/task_blocking.rs | 20 ++++++++++++++ 5 files changed, 49 insertions(+), 30 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 73b4fd8b9b0..d681ed28458 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -337,10 +337,10 @@ impl Spawner { #[cfg(not(all(tokio_unstable, feature = "tracing")))] let _ = name; - let (task, handle) = task::unowned(fut, NoopSchedule, id); - #[cfg(feature = "test-util")] - crate::time::inhibit_auto_advance(); + let fut = crate::time::inhibit_auto_advance(fut); + + let (task, handle) = task::unowned(fut, NoopSchedule, id); let spawned = self.spawn_task(Task::new(task, is_mandatory), rt); (handle, spawned) diff --git a/tokio/src/runtime/blocking/task.rs b/tokio/src/runtime/blocking/task.rs index 54b64baa244..0b7803a6c0b 100644 --- a/tokio/src/runtime/blocking/task.rs +++ b/tokio/src/runtime/blocking/task.rs @@ -39,11 +39,6 @@ where // we want it to start without any budgeting. crate::coop::stop(); - let r = func(); - - #[cfg(feature = "test-util")] - crate::time::allow_auto_advance(); - - Poll::Ready(r) + Poll::Ready(func()) } } diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 2a12c7b8758..55e6b3eea9d 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -28,6 +28,8 @@ cfg_not_test_util! { } cfg_test_util! { + use std::future::Future; + use crate::time::{Duration, Instant}; use crate::loom::sync::{Arc, Mutex}; @@ -126,28 +128,30 @@ cfg_test_util! { inner.unfrozen = Some(std::time::Instant::now()); } - /// Stop auto-advancing the clock (see `tokio::time::pause`) until - /// `allow_auto_advance` is called. - /// - /// # Panics - /// - /// Panics if called from outsie of a `current_thread` Tokio runtime. - #[track_caller] - pub(crate) fn inhibit_auto_advance() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.inhibit_auto_advance(); + struct AutoAdvanceInhibit(Clock); + + impl Drop for AutoAdvanceInhibit { + fn drop(&mut self) { + self.0.allow_auto_advance(); + } } - /// Resume auto-advance. This should only be called to balance out a previous - /// call to `inhibit_auto_advance`. - /// - /// # Panics + /// Temporarily stop auto-advancing the clock (see `tokio::time::pause`) + /// and decorate the given future with code to re-enable auto-advance when + /// it returns `Ready` or is dropped. /// - /// Panics if called from outsie of a `current_thread` Tokio runtime. - #[track_caller] - pub(crate) fn allow_auto_advance() { - let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - clock.allow_auto_advance(); + /// This is a no-op when called from outside the Tokio runtime. + pub(crate) fn inhibit_auto_advance(fut: F) -> impl Future { + // Bump the inhibit count immediately, not inside the async block, to + // avoid a race condition when used by spawn_blocking. + let guard = clock().map(|clock| { + clock.inhibit_auto_advance(); + AutoAdvanceInhibit(clock) + }); + async move { + let _guard = guard; + fut.await + } } /// Advances time. diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index e15b674943f..42715de90f6 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -87,9 +87,9 @@ mod clock; pub(crate) use self::clock::Clock; #[cfg(feature = "test-util")] -pub use clock::{advance, pause, resume}; +pub(crate) use clock::inhibit_auto_advance; #[cfg(feature = "test-util")] -pub(crate) use clock::{allow_auto_advance, inhibit_auto_advance}; +pub use clock::{advance, pause, resume}; pub mod error; diff --git a/tokio/tests/task_blocking.rs b/tokio/tests/task_blocking.rs index a5c3b3a7701..5859368b4cd 100644 --- a/tokio/tests/task_blocking.rs +++ b/tokio/tests/task_blocking.rs @@ -268,3 +268,23 @@ async fn blocking_task_wakes_paused_runtime() { "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" ); } + +#[cfg(feature = "test-util")] +#[tokio::test(start_paused = true)] +async fn panicking_blocking_task_wakes_paused_runtime() { + let t0 = std::time::Instant::now(); + let result = time::timeout( + Duration::from_secs(15), + task::spawn_blocking(|| { + thread::sleep(Duration::from_millis(250)); + panic!("blocking task panicked"); + }), + ) + .await + .expect("timeout should not trigger"); + assert!(result.is_err(), "blocking task should have panicked"); + assert!( + t0.elapsed() < Duration::from_secs(10), + "completing a spawn_blocking should wake the scheduler if it's parked while time is paused" + ); +}