Skip to content

Commit

Permalink
Refactor tokio-timer to use std::future (#1137)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored and carllerche committed Jun 6, 2019
1 parent 6192ea3 commit c5c379c
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 314 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ members = [
"tokio-sync",
"tokio-test",
# "tokio-threadpool",
# "tokio-timer",
"tokio-timer",
"tokio-tcp",
# "tokio-tls",
# "tokio-trace",
Expand Down
20 changes: 19 additions & 1 deletion tokio-timer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,32 @@ Timer facilities for Tokio
"""
publish = false

[features]
# individual `Stream` impls if you so desire
delay-queue = ["futures-core-preview"]
interval = ["futures-core-preview"]
timeout-stream = ["futures-core-preview"]
throttle = ["futures-core-preview"]

# easily enable all `Stream` impls
streams = [
"delay-queue",
"interval",
"timeout-stream",
"throttle",
]

[dependencies]
futures = "0.1.19"
tokio-executor = { version = "0.2.0", path = "../tokio-executor" }
tokio-sync = { version = "0.2.0", path = "../tokio-sync" }
crossbeam-utils = "0.6.0"

# Backs `DelayQueue`
slab = "0.4.1"

# optionals
futures-core-preview = { version = "0.3.0-alpha.16", optional = true }

[dev-dependencies]
rand = "0.6"
tokio-mock-task = "0.1.0"
Expand Down
26 changes: 20 additions & 6 deletions tokio-timer/src/delay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::timer::{HandlePriv, Registration};
use crate::Error;
use futures::{Future, Poll};
use std::future::Future;
use std::pin::Pin;
use std::time::{Duration, Instant};
use std::task::{self, Poll};

/// A future that completes at a specified instant in time.
///
Expand Down Expand Up @@ -72,6 +73,8 @@ impl Delay {
self.registration.reset(deadline);
}

// Used by `Timeout<Stream>`
#[cfg(feature = "timeout-stream")]
pub(crate) fn reset_timeout(&mut self) {
self.registration.reset_timeout();
}
Expand All @@ -84,13 +87,24 @@ impl Delay {
}

impl Future for Delay {
type Item = ();
type Error = Error;
type Output = ();

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
// Ensure the `Delay` instance is associated with a timer.
self.register();

self.registration.poll_elapsed()
// `poll_elapsed` can return an error in two cases:
//
// - AtCapacity: this is a pathlogical case where far too many
// delays have been scheduled.
// - Shutdown: No timer has been setup, which is a mis-use error.
//
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
match ready!(self.registration.poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}
56 changes: 32 additions & 24 deletions tokio-timer/src/delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ use crate::clock::now;
use crate::timer::Handle;
use crate::wheel::{self, Wheel};
use crate::{Delay, Error};
use futures::{try_ready, Future, Poll, Stream};
use futures_core::Stream;
use slab::Slab;
use std::cmp;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::{Duration, Instant};

/// A queue of delayed elements.
Expand Down Expand Up @@ -177,7 +180,7 @@ pub struct Key {
struct Stack<T> {
/// Head of the stack
head: Option<usize>,
_p: PhantomData<T>,
_p: PhantomData<fn() -> T>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -645,19 +648,19 @@ impl<T> DelayQueue<T> {
/// should be returned.
///
/// A slot should be returned when the associated deadline has been reached.
fn poll_idx(&mut self) -> Poll<Option<usize>, Error> {
fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<usize, Error>>> {
use self::wheel::Stack;

let expired = self.expired.pop(&mut self.slab);

if expired.is_some() {
return Ok(expired.into());
return Poll::Ready(expired.map(Ok));
}

loop {
if let Some(ref mut delay) = self.delay {
if !delay.is_elapsed() {
try_ready!(delay.poll());
ready!(Pin::new(&mut *delay).poll(cx));
}

let now = crate::ms(delay.deadline() - self.start, crate::Round::Down);
Expand All @@ -668,13 +671,13 @@ impl<T> DelayQueue<T> {
self.delay = None;

if let Some(idx) = self.wheel.poll(&mut self.poll, &mut self.slab) {
return Ok(Some(idx).into());
return Poll::Ready(Some(Ok(idx)));
}

if let Some(deadline) = self.next_deadline() {
self.delay = Some(self.handle.delay(deadline));
} else {
return Ok(None.into());
return Poll::Ready(None);
}
}
}
Expand All @@ -690,24 +693,29 @@ impl<T> DelayQueue<T> {
}
}

impl<T> Stream for DelayQueue<T> {
type Item = Expired<T>;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Error> {
let item = try_ready!(self.poll_idx()).map(|idx| {
let data = self.slab.remove(idx);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());

Expired {
key: Key::new(idx),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
});
// We never put `T` in a `Pin`...
impl<T> Unpin for DelayQueue<T> {}

Ok(item.into())
impl<T> Stream for DelayQueue<T> {
// DelayQueue seems much more specific, where a user may care that it
// has reached capacity, so return those errors instead of panicking.
type Item = Result<Expired<T>, Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|idx| {
let data = self.slab.remove(idx);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());

Expired {
key: Key::new(idx),
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
})
}))
}
}

Expand Down
15 changes: 9 additions & 6 deletions tokio-timer/src/interval.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::clock;
use crate::Delay;
use futures::{try_ready, Future, Poll, Stream};
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::{Duration, Instant};

/// A stream representing notifications at fixed interval
Expand Down Expand Up @@ -53,20 +56,20 @@ impl Interval {

impl Stream for Interval {
type Item = Instant;
type Error = crate::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
// Wait for the delay to be done
let _ = try_ready!(self.delay.poll());
ready!(Pin::new(&mut self.delay).poll(cx));

// Get the `now` by looking at the `delay` deadline
let now = self.delay.deadline();

// The next interval value is `duration` after the one that just
// yielded.
self.delay.reset(now + self.duration);
let next = now + self.duration;
self.delay.reset(next);

// Return the current instant
Ok(Some(now).into())
Poll::Ready(Some(now))
}
}
19 changes: 14 additions & 5 deletions tokio-timer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,36 @@
//! [`Interval`]: struct.Interval.html
//! [`Timer`]: timer/struct.Timer.html

macro_rules! ready {
($e:expr) => (
match $e {
::std::task::Poll::Ready(v) => v,
::std::task::Poll::Pending => return ::std::task::Poll::Pending,
}
)
}

pub mod clock;
#[cfg(feature = "delay-queue")]
pub mod delay_queue;
#[cfg(feature = "throttle")]
pub mod throttle;
pub mod timeout;
pub mod timer;

mod atomic;
mod deadline;
mod delay;
mod error;
#[cfg(feature = "interval")]
mod interval;
mod wheel;

#[deprecated(since = "0.2.6", note = "use Timeout instead")]
#[doc(hidden)]
#[allow(deprecated)]
pub use deadline::{Deadline, DeadlineError};
pub use delay::Delay;
#[cfg(feature = "delay-queue")]
#[doc(inline)]
pub use delay_queue::DelayQueue;
pub use error::Error;
#[cfg(feature = "interval")]
pub use interval::Interval;
#[doc(inline)]
pub use timeout::Timeout;
Expand Down
Loading

0 comments on commit c5c379c

Please sign in to comment.