Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: fix time::advance() with sub-ms durations #3852

Merged
merged 7 commits into from
Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,9 @@ cfg_test_util! {
/// runtime.
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let until = clock.now() + duration;
clock.advance(duration);

crate::time::sleep_until(until).await;
crate::task::yield_now().await;
}

/// Return the current instant, factoring in frozen time.
Expand Down
93 changes: 75 additions & 18 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> {

/// Parker to delegate to
park: P,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

/// A structure which handles conversion from Instants to u64 timestamps.
Expand Down Expand Up @@ -178,6 +187,8 @@ where
time_source,
handle: Handle::new(Arc::new(inner)),
park,
#[cfg(feature = "test-util")]
did_wake: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -192,8 +203,6 @@ where
}

fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

let mut lock = self.handle.get().state.lock();

assert!(!self.handle.is_shutdown());
Expand All @@ -217,26 +226,14 @@ where
duration = std::cmp::min(limit, duration);
}

if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;

// Simulate advancing time
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park()?;
}
Expand All @@ -248,6 +245,39 @@ where

Ok(())
}

cfg_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;

// If the time driver was woken, then the park completed
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(duration)?;
}

Ok(())
}

fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}

cfg_not_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
self.park.park_timeout(duration)
}
}
}

impl Handle {
Expand Down Expand Up @@ -387,11 +417,11 @@ impl<P> Park for Driver<P>
where
P: Park + 'static,
{
type Unpark = P::Unpark;
type Unpark = TimerUnpark<P>;
type Error = P::Error;

fn unpark(&self) -> Self::Unpark {
self.park.unpark()
TimerUnpark::new(self)
}

fn park(&mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -426,6 +456,33 @@ where
}
}

pub(crate) struct TimerUnpark<P: Park + 'static> {
inner: P::Unpark,

#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

impl<P: Park + 'static> TimerUnpark<P> {
fn new(driver: &Driver<P>) -> TimerUnpark<P> {
TimerUnpark {
inner: driver.park.unpark(),

#[cfg(feature = "test-util")]
did_wake: driver.did_wake.clone(),
}
}
}

impl<P: Park + 'static> Unpark for TimerUnpark<P> {
fn unpark(&self) {
#[cfg(feature = "test-util")]
self.did_wake.store(true, Ordering::SeqCst);

self.inner.unpark();
}
}

// ===== impl Inner =====

impl Inner {
Expand Down
104 changes: 103 additions & 1 deletion tokio/tests/time_pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use rand::SeedableRng;
use rand::{rngs::StdRng, Rng};
use tokio::time::{self, Duration, Instant, Sleep};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task};

use std::{
future::Future,
Expand Down Expand Up @@ -215,6 +215,108 @@ async fn interval() {
assert_pending!(poll_next(&mut i));
}

#[tokio::test(start_paused = true)]
async fn test_time_advance_sub_ms() {
let now = Instant::now();

let dur = Duration::from_micros(51_592);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_micros(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn test_time_advance_3ms_and_change() {
let now = Instant::now();

let dur = Duration::from_micros(3_141_592);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_micros(3_123_456);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

carllerche marked this conversation as resolved.
Show resolved Hide resolved
#[tokio::test(start_paused = true)]
async fn regression_3710_with_submillis_advance() {
let start = Instant::now();

time::advance(Duration::from_millis(1)).await;

let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60)));

assert_pending!(sleep.poll());

let before = Instant::now();
let dur = Duration::from_micros(51_592);
time::advance(dur).await;
assert_eq!(before.elapsed(), dur);

assert_pending!(sleep.poll());
}

#[tokio::test(start_paused = true)]
async fn exact_1ms_advance() {
let now = Instant::now();

let dur = Duration::from_millis(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_millis(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn advance_once_with_timer() {
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(1500)).await;

assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}

#[tokio::test(start_paused = true)]
async fn advance_multi_with_timer() {
// Round to the nearest ms
// time::sleep(Duration::from_millis(1)).await;

let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}

fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
interval.enter(|cx, mut interval| interval.poll_tick(cx))
}
Expand Down