Skip to content

Commit

Permalink
test-util: don't auto-advance time when a spawn_blocking task is ru…
Browse files Browse the repository at this point in the history
…nning (#5115)
  • Loading branch information
jorendorff authored Dec 17, 2022
1 parent 42db755 commit e14ca72
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 19 deletions.
2 changes: 0 additions & 2 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ cfg_trace! {
mod schedule;
mod shutdown;
mod task;
#[cfg(all(test, not(tokio_wasm)))]
pub(crate) use schedule::NoopSchedule;
pub(crate) use task::BlockingTask;

use crate::runtime::Builder;
Expand Down
9 changes: 5 additions & 4 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::NoopSchedule;
use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
Expand Down Expand Up @@ -120,7 +120,7 @@ struct Shared {
}

pub(crate) struct Task {
task: task::UnownedTask<NoopSchedule>,
task: task::UnownedTask<BlockingSchedule>,
mandatory: Mandatory,
}

Expand Down Expand Up @@ -151,7 +151,7 @@ impl From<SpawnError> for io::Error {
}

impl Task {
pub(crate) fn new(task: task::UnownedTask<NoopSchedule>, mandatory: Mandatory) -> Task {
pub(crate) fn new(task: task::UnownedTask<BlockingSchedule>, mandatory: Mandatory) -> Task {
Task { task, mandatory }
}

Expand Down Expand Up @@ -379,7 +379,8 @@ impl Spawner {
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;

let (task, handle) = task::unowned(fut, NoopSchedule, id);
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);
(handle, spawned)
}
Expand Down
49 changes: 43 additions & 6 deletions tokio/src/runtime/blocking/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,52 @@
#[cfg(feature = "test-util")]
use crate::runtime::scheduler;
use crate::runtime::task::{self, Task};
use crate::runtime::Handle;

/// `task::Schedule` implementation that does nothing. This is unique to the
/// blocking scheduler as tasks scheduled are not really futures but blocking
/// operations.
/// `task::Schedule` implementation that does nothing (except some bookkeeping
/// in test-util builds). This is unique to the blocking scheduler as tasks
/// scheduled are not really futures but blocking operations.
///
/// We avoid storing the task by forgetting it in `bind` and re-materializing it
/// in `release.
pub(crate) struct NoopSchedule;
/// in `release`.
pub(crate) struct BlockingSchedule {
#[cfg(feature = "test-util")]
handle: Handle,
}

impl BlockingSchedule {
#[cfg_attr(not(feature = "test-util"), allow(unused_variables))]
pub(crate) fn new(handle: &Handle) -> Self {
#[cfg(feature = "test-util")]
{
match &handle.inner {
scheduler::Handle::CurrentThread(handle) => {
handle.driver.clock.inhibit_auto_advance();
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
}
}
BlockingSchedule {
#[cfg(feature = "test-util")]
handle: handle.clone(),
}
}
}

impl task::Schedule for NoopSchedule {
impl task::Schedule for BlockingSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
#[cfg(feature = "test-util")]
{
match &self.handle.inner {
scheduler::Handle::CurrentThread(handle) => {
handle.driver.clock.allow_auto_advance();
handle.driver.unpark();
}
#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))]
scheduler::Handle::MultiThread(_) => {}
}
}
None
}

Expand Down
21 changes: 21 additions & 0 deletions tokio/src/runtime/tests/loom_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,27 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread
});
}

#[test]
fn spawn_blocking_when_paused() {
use std::time::Duration;
loom::model(|| {
let rt = crate::runtime::Builder::new_current_thread()
.enable_time()
.start_paused(true)
.build()
.unwrap();
let handle = rt.handle();
let _enter = handle.enter();
let a = crate::task::spawn_blocking(|| {});
let b = crate::task::spawn_blocking(|| {});
rt.block_on(crate::time::timeout(Duration::from_millis(1), async move {
a.await.expect("blocking task should finish");
b.await.expect("blocking task should finish");
}))
.expect("timeout should not trigger");
});
}

fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/loom_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::scheduler::multi_thread::queue;
use crate::runtime::task::Inject;
use crate::runtime::tests::NoopSchedule;
use crate::runtime::MetricsBatch;

use loom::thread;
Expand Down
20 changes: 19 additions & 1 deletion tokio/src/runtime/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,29 @@
// other code when running loom tests.
#![cfg_attr(loom, warn(dead_code, unreachable_pub))]

use self::noop_scheduler::NoopSchedule;
use self::unowned_wrapper::unowned;

mod noop_scheduler {
use crate::runtime::task::{self, Task};

/// `task::Schedule` implementation that does nothing, for testing.
pub(crate) struct NoopSchedule;

impl task::Schedule for NoopSchedule {
fn release(&self, _task: &Task<Self>) -> Option<Task<Self>> {
None
}

fn schedule(&self, _task: task::Notified<Self>) {
unreachable!();
}
}
}

mod unowned_wrapper {
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{Id, JoinHandle, Notified};
use crate::runtime::tests::NoopSchedule;

#[cfg(all(tokio_unstable, feature = "tracing"))]
pub(crate) fn unowned<T>(task: T) -> (Notified<NoopSchedule>, JoinHandle<T::Output>)
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/tests/task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::blocking::NoopSchedule;
use crate::runtime::task::{self, unowned, Id, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::tests::NoopSchedule;
use crate::util::TryLock;

use std::collections::VecDeque;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl Driver {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;

if clock.is_paused() {
if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));

// If the time driver was woken, then the park completed
Expand Down
19 changes: 17 additions & 2 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ cfg_test_util! {

/// Instant at which the clock was last unfrozen.
unfrozen: Option<std::time::Instant>,

/// Number of `inhibit_auto_advance` calls still in effect.
auto_advance_inhibit_count: usize,
}

/// Pauses time.
Expand Down Expand Up @@ -187,6 +190,7 @@ cfg_test_util! {
enable_pausing,
base: now,
unfrozen: Some(now),
auto_advance_inhibit_count: 0,
})),
};

Expand All @@ -212,9 +216,20 @@ cfg_test_util! {
inner.unfrozen = None;
}

pub(crate) fn is_paused(&self) -> bool {
/// Temporarily stop auto-advancing the clock (see `tokio::time::pause`).
pub(crate) fn inhibit_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count += 1;
}

pub(crate) fn allow_auto_advance(&self) {
let mut inner = self.inner.lock();
inner.auto_advance_inhibit_count -= 1;
}

pub(crate) fn can_auto_advance(&self) -> bool {
let inner = self.inner.lock();
inner.unfrozen.is_none()
inner.unfrozen.is_none() && inner.auto_advance_inhibit_count == 0
}

#[track_caller]
Expand Down
83 changes: 82 additions & 1 deletion tokio/tests/task_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(tokio_wasi)))] // Wasi doesn't support threads

use tokio::{runtime, task};
use tokio::{runtime, task, time};
use tokio_test::assert_ok;

use std::thread;
Expand Down Expand Up @@ -226,3 +226,84 @@ fn coop_disabled_in_block_in_place_in_block_on() {

done_rx.recv().unwrap().unwrap();
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_when_paused() {
// Do not auto-advance time when we have started a blocking task that has
// not yet finished.
time::timeout(
Duration::from_secs(3),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");

// Really: Do not auto-advance time, even if the timeout is short and the
// blocking task runs for longer than that. It doesn't matter: Tokio time
// is paused; system time is not.
time::timeout(
Duration::from_millis(1),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(50))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| thread::sleep(Duration::from_millis(1))),
)
.await
.expect("timeout should not trigger")
.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn unawaited_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();

// When this task finishes, time should auto-advance, even though the
// JoinHandle has not been awaited yet.
let a = task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(1));
});

crate::time::sleep(Duration::from_secs(15)).await;
a.await.expect("blocking task should finish");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

#[cfg(feature = "test-util")]
#[tokio::test(start_paused = true)]
async fn panicking_blocking_task_wakes_paused_runtime() {
let t0 = std::time::Instant::now();
let result = time::timeout(
Duration::from_secs(15),
task::spawn_blocking(|| {
thread::sleep(Duration::from_millis(1));
panic!("blocking task panicked");
}),
)
.await
.expect("timeout should not trigger");
assert!(result.is_err(), "blocking task should have panicked");
assert!(
t0.elapsed() < Duration::from_secs(10),
"completing a spawn_blocking should wake the scheduler if it's parked while time is paused"
);
}

0 comments on commit e14ca72

Please sign in to comment.