Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: report correct error for timers that exceed max duration #2023

Merged
merged 3 commits into from
Jul 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/src/time/driver/atomic_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Drop for AtomicStackEntries {
fn drop(&mut self) {
for entry in self {
// Flag the entry as errored
entry.error();
entry.error(Error::shutdown());
}
}
}
28 changes: 20 additions & 8 deletions tokio/src/time/driver/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::time::{Duration, Error, Instant};

use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU8};
use std::sync::{Arc, Weak};
use std::task::{self, Poll};
use std::u64;
Expand Down Expand Up @@ -45,6 +45,11 @@ pub(crate) struct Entry {
/// instant, this value is changed.
state: AtomicU64,

/// Stores the actual error. If `state` indicates that an error occurred,
/// this is guaranteed to be a non-zero value representing the first error
/// that occurred. Otherwise its value is undefined.
error: AtomicU8,

/// Task to notify once the deadline is reached.
waker: AtomicWaker,

Expand Down Expand Up @@ -110,8 +115,9 @@ impl Entry {
let entry: Entry;

// Increment the number of active timeouts
if inner.increment().is_err() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR)
if let Err(err) = inner.increment() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
Expand All @@ -123,8 +129,8 @@ impl Entry {
}

let entry = Arc::new(entry);
if inner.queue(&entry).is_err() {
entry.error();
if let Err(err) = inner.queue(&entry) {
entry.error(err);
}

entry
Expand Down Expand Up @@ -190,7 +196,12 @@ impl Entry {
self.waker.wake();
}

pub(crate) fn error(&self) {
pub(crate) fn error(&self, error: Error) {
// Record the precise nature of the error, if there isn't already an
// error present. If we don't actually transition to the error state
// below, that's fine, as the error details we set here will be ignored.
self.error.compare_and_swap(0, error.as_u8(), SeqCst);

// Only transition to the error state if not currently elapsed
let mut curr = self.state.load(SeqCst);

Expand Down Expand Up @@ -235,7 +246,7 @@ impl Entry {

if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
Err(Error::from_u8(self.error.load(SeqCst)))
} else {
Ok(())
});
Expand All @@ -247,7 +258,7 @@ impl Entry {

if is_elapsed(curr) {
return Poll::Ready(if curr == ERROR {
Err(Error::shutdown())
Err(Error::from_u8(self.error.load(SeqCst)))
} else {
Ok(())
});
Expand Down Expand Up @@ -310,6 +321,7 @@ impl Entry {
waker: AtomicWaker::new(),
state: AtomicU64::new(state),
queued: AtomicBool::new(false),
error: AtomicU8::new(0),
next_atomic: UnsafeCell::new(ptr::null_mut()),
when: UnsafeCell::new(None),
next_stack: UnsafeCell::new(None),
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/time/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
// The entry's deadline is invalid, so error it and update the
// internal state accordingly.
entry.set_when_internal(None);
entry.error();
entry.error(Error::invalid());
}
}
}
Expand Down Expand Up @@ -317,7 +317,7 @@ impl<T> Drop for Driver<T> {
let mut poll = wheel::Poll::new(u64::MAX);

while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
entry.error();
entry.error(Error::shutdown());
}
}
}
Expand Down
35 changes: 32 additions & 3 deletions tokio/src/time/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use std::fmt;
#[derive(Debug)]
pub struct Error(Kind);

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
#[repr(u8)]
enum Kind {
Shutdown,
AtCapacity,
Shutdown = 1,
AtCapacity = 2,
Invalid = 3,
}

impl Error {
Expand Down Expand Up @@ -56,6 +58,32 @@ impl Error {
_ => false,
}
}

/// Create an error representing a misconfigured timer.
pub fn invalid() -> Error {
Error(Invalid)
}

/// Returns `true` if the error was caused by the timer being misconfigured.
pub fn is_invalid(&self) -> bool {
match self.0 {
Kind::Invalid => true,
_ => false,
}
}

pub(crate) fn as_u8(&self) -> u8 {
self.0 as u8
}

pub(crate) fn from_u8(n: u8) -> Self {
Error(match n {
1 => Shutdown,
2 => AtCapacity,
3 => Invalid,
_ => panic!("u8 does not correspond to any time error variant"),
})
}
}

impl error::Error for Error {}
Expand All @@ -66,6 +94,7 @@ impl fmt::Display for Error {
let descr = match self.0 {
Shutdown => "the timer is shutdown, must be called from the context of Tokio runtime",
AtCapacity => "timer is at capacity and cannot create a new entry",
Invalid => "timer duration exceeds maximum duration",
};
write!(fmt, "{}", descr)
}
Expand Down