From 18779aa2e292c4c060a35c2882601373c36e7af0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 16:36:04 -0700 Subject: [PATCH] time: fix time::advance() with sub-ms durations (#3852) Instead of using sleep in time::advance, this fixes the root of the issue. When futures passed to `Runtime::block_on` are woken, it bypassed all the machinery around advancing time. By intercepting wakes in the time driver, we know when the block_on task is woken and skip advancing time in that case. Fixes #3837 --- tokio/src/time/clock.rs | 13 +---- tokio/src/time/driver/mod.rs | 93 +++++++++++++++++++++++++------ tokio/tests/time_pause.rs | 104 ++++++++++++++++++++++++++++++++++- 3 files changed, 180 insertions(+), 30 deletions(-) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index c5ef86be4e0..a0ff62139d3 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -7,7 +7,7 @@ //! configurable. cfg_not_test_util! { - use crate::time::{Duration, Instant}; + use crate::time::{Instant}; #[derive(Debug, Clone)] pub(crate) struct Clock {} @@ -24,14 +24,6 @@ cfg_not_test_util! { pub(crate) fn now(&self) -> Instant { now() } - - pub(crate) fn is_paused(&self) -> bool { - false - } - - pub(crate) fn advance(&self, _dur: Duration) { - unreachable!(); - } } } @@ -121,10 +113,9 @@ cfg_test_util! { /// runtime. pub async fn advance(duration: Duration) { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let until = clock.now() + duration; clock.advance(duration); - crate::time::sleep_until(until).await; + crate::task::yield_now().await; } /// Return the current instant, factoring in frozen time. diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 3eb100412fb..37d2231c34f 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -91,6 +91,15 @@ pub(crate) struct Driver { /// Parker to delegate to park: P, + + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. + #[cfg(feature = "test-util")] + did_wake: Arc, } /// A structure which handles conversion from Instants to u64 timestamps. @@ -178,6 +187,8 @@ where time_source, handle: Handle::new(Arc::new(inner)), park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), } } @@ -192,8 +203,6 @@ where } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { - let clock = &self.time_source.clock; - let mut lock = self.handle.get().state.lock(); assert!(!self.handle.is_shutdown()); @@ -217,26 +226,14 @@ where duration = std::cmp::min(limit, duration); } - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - // Simulate advancing time - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park_timeout(Duration::from_secs(0))?; } } None => { if let Some(duration) = limit { - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - clock.advance(duration); - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park()?; } @@ -248,6 +245,39 @@ where Ok(()) } + + cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) + } + } } impl Handle { @@ -387,11 +417,11 @@ impl

Park for Driver

where P: Park + 'static, { - type Unpark = P::Unpark; + type Unpark = TimerUnpark

; type Error = P::Error; fn unpark(&self) -> Self::Unpark { - self.park.unpark() + TimerUnpark::new(self) } fn park(&mut self) -> Result<(), Self::Error> { @@ -426,6 +456,33 @@ where } } +pub(crate) struct TimerUnpark { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc, +} + +impl TimerUnpark

{ + fn new(driver: &Driver

) -> TimerUnpark

{ + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl Unpark for TimerUnpark

{ + fn unpark(&self) { + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); + } +} + // ===== impl Inner ===== impl Inner { diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index d1834af2157..02e050a2dc4 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -4,7 +4,7 @@ use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; use tokio::time::{self, Duration, Instant, Sleep}; -use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task}; +use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task}; use std::{ future::Future, @@ -215,6 +215,108 @@ async fn interval() { assert_pending!(poll_next(&mut i)); } +#[tokio::test(start_paused = true)] +async fn test_time_advance_sub_ms() { + let now = Instant::now(); + + let dur = Duration::from_micros(51_592); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_micros(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + +#[tokio::test(start_paused = true)] +async fn test_time_advance_3ms_and_change() { + let now = Instant::now(); + + let dur = Duration::from_micros(3_141_592); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_micros(3_123_456); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + +#[tokio::test(start_paused = true)] +async fn regression_3710_with_submillis_advance() { + let start = Instant::now(); + + time::advance(Duration::from_millis(1)).await; + + let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60))); + + assert_pending!(sleep.poll()); + + let before = Instant::now(); + let dur = Duration::from_micros(51_592); + time::advance(dur).await; + assert_eq!(before.elapsed(), dur); + + assert_pending!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn exact_1ms_advance() { + let now = Instant::now(); + + let dur = Duration::from_millis(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_millis(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + +#[tokio::test(start_paused = true)] +async fn advance_once_with_timer() { + let mut sleep = task::spawn(time::sleep(Duration::from_millis(1))); + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(1500)).await; + + assert!(sleep.is_woken()); + assert_ready!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn advance_multi_with_timer() { + // Round to the nearest ms + // time::sleep(Duration::from_millis(1)).await; + + let mut sleep = task::spawn(time::sleep(Duration::from_millis(1))); + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert!(sleep.is_woken()); + assert_ready!(sleep.poll()); +} + fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| interval.poll_tick(cx)) }