Skip to content

Commit

Permalink
time: refactor to avoid double validation
Browse files Browse the repository at this point in the history
The fix in the last commit was surgical, but resulted in duplicating the
valid-duration check. This commit adds a new field to Entry that tracks
the precise nature of the error that occurred, so that the correct error
can be returned when `poll_elapsed` is called.
  • Loading branch information
benesch committed Dec 24, 2019
1 parent 4f273e8 commit 990fe88
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
2 changes: 1 addition & 1 deletion tokio/src/time/driver/atomic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Drop for AtomicStackEntries {
fn drop(&mut self) {
for entry in self {
// Flag the entry as errored
entry.error();
entry.error(Error::shutdown());
}
}
}
31 changes: 20 additions & 11 deletions tokio/src/time/driver/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use crate::loom::sync::atomic::AtomicU64;
use crate::sync::AtomicWaker;
use crate::time::driver::{Handle, Inner};
use crate::time::{Duration, Error, Instant};
use crate::time::wheel::MAX_DURATION;

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::{Arc, Weak};
use std::task::{self, Poll};
use std::u64;
Expand Down Expand Up @@ -45,6 +44,11 @@ pub(crate) struct Entry {
/// instant, this value is changed.
state: AtomicU64,

/// Stores the actual error. If `state` indicates that an error occurred,
/// this is guaranteed to be a non-zero value representing the first error
/// that occurred. Otherwise its value is undefined.
error: AtomicU8,

/// Task to notify once the deadline is reached.
waker: AtomicWaker,

Expand Down Expand Up @@ -110,23 +114,22 @@ impl Entry {
let entry: Entry;

// Increment the number of active timeouts
if inner.increment().is_err() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR)
if let Err(err) = inner.increment() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
ELAPSED
} else if when - inner.elapsed() > MAX_DURATION {
panic!("timer duration exceeds maximum duration");
} else {
when
};
entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
}

let entry = Arc::new(entry);
if inner.queue(&entry).is_err() {
entry.error();
if let Err(err) = inner.queue(&entry) {
entry.error(err);
}

entry
Expand Down Expand Up @@ -192,7 +195,12 @@ impl Entry {
self.waker.wake();
}

pub(crate) fn error(&self) {
pub(crate) fn error(&self, error: Error) {
// Record the precise nature of the error, if there isn't already an
// error present. If we don't actually transition to the error state
// below, that's fine, as the error details we set here will be ignored.
self.error.compare_and_swap(0, error.as_u8(), SeqCst);

// Only transition to the error state if not currently elapsed
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -237,7 +245,7 @@ impl Entry {

if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
Err(Error::from_u8(self.error.load(SeqCst)))
} else {
Ok(())
});
Expand All @@ -249,7 +257,7 @@ impl Entry {

if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
Err(Error::from_u8(self.error.load(SeqCst)))
} else {
Ok(())
});
Expand Down Expand Up @@ -312,6 +320,7 @@ impl Entry {
waker: AtomicWaker::new(),
state: AtomicU64::new(state),
queued: AtomicBool::new(false),
error: AtomicU8::new(0),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where
// The entry's deadline is invalid, so error it and update the
// internal state accordingly.
entry.set_when_internal(None);
entry.error();
entry.error(Error::invalid());
}
}
}
Expand Down Expand Up @@ -297,7 +297,7 @@ impl<T> Drop for Driver<T> {
let mut poll = wheel::Poll::new(u64::MAX);

while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error();
entry.error(Error::shutdown());
}
}
}
Expand Down
35 changes: 32 additions & 3 deletions tokio/src/time/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use std::fmt;
#[derive(Debug)]
pub struct Error(Kind);

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
#[repr(u8)]
enum Kind {
Shutdown,
AtCapacity,
Shutdown = 1,
AtCapacity = 2,
Invalid = 3,
}

impl Error {
Expand Down Expand Up @@ -56,6 +58,32 @@ impl Error {
_ => false,
}
}

/// Create an error representing a misconfigured timer.
pub fn invalid() -> Error {
Error(Invalid)
}

/// Returns `true` if the error was caused by the timer being misconfigured.
pub fn is_invalid(&self) -> bool {
match self.0 {
Kind::Invalid => true,
_ => false,
}
}

pub(crate) fn as_u8(&self) -> u8 {
self.0 as u8
}

pub(crate) fn from_u8(n: u8) -> Self {
Error(match n {
1 => Shutdown,
2 => AtCapacity,
3 => Invalid,
_ => panic!("u8 does not correspond to any time error variant"),
})
}
}

impl error::Error for Error {}
Expand All @@ -66,6 +94,7 @@ impl fmt::Display for Error {
let descr = match self.0 {
Shutdown => "timer is shutdown",
AtCapacity => "timer is at capacity and cannot create a new entry",
Invalid => "timer duration exceeds maximum duration",
};
write!(fmt, "{}", descr)
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/time/wheel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) struct Wheel<T> {
const NUM_LEVELS: usize = 6;

/// The maximum duration of a delay
pub(crate) const MAX_DURATION: u64 = 1 << (6 * NUM_LEVELS);
const MAX_DURATION: u64 = 1 << (6 * NUM_LEVELS);

#[derive(Debug)]
pub(crate) enum InsertError {
Expand Down

0 comments on commit 990fe88

Please sign in to comment.