diff --git a/Cargo.toml b/Cargo.toml index 4d8d332..a7eb250 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ Timer facilities for Tokio [dependencies] futures = "0.1" slab = "0.3.0" +rand = "0.3" diff --git a/src/interval.rs b/src/interval.rs index 3997238..661fb56 100644 --- a/src/interval.rs +++ b/src/interval.rs @@ -4,23 +4,58 @@ use {Sleep, TimerError}; use std::time::Duration; -/// A stream representing notifications at fixed interval +use rand::{thread_rng, Rng}; + +/// A stream representing notifications at given interval /// /// Intervals are created through `Timer::interval`. #[derive(Debug)] pub struct Interval { sleep: Sleep, - duration: Duration, + min_duration: Duration, + max_duration: Duration, } /// Create a new interval -pub fn new(sleep: Sleep, dur: Duration) -> Interval { +pub fn new(sleep: Sleep, min_dur: Duration, max_dur: Duration) -> Interval { Interval { sleep: sleep, - duration: dur, + min_duration: min_dur, + max_duration: max_dur, } } +const NANOS_PER_SEC: u32 = 1_000_000_000; + +/// Returns the next duration for an interval +/// If `min` and `max` are equal, the duration is fixed. +/// If `min` and `max` are not equal, a duration in the range [`min`, `max`] is returned. +/// +/// # Panics +/// +/// Panics if `max < min`. +pub(crate) fn next_duration(min: Duration, max: Duration) -> Duration { + let mut rng = thread_rng(); + + let secs = if min.as_secs() == max.as_secs() { + min.as_secs() + } else { + rng.gen_range(min.as_secs(), max.as_secs() + 1) + }; + + let nsecs = if min.subsec_nanos() == max.subsec_nanos() { + min.subsec_nanos() + } else if secs == min.as_secs() { + rng.gen_range(min.subsec_nanos(), NANOS_PER_SEC) + } else if secs == max.as_secs() { + rng.gen_range(0, max.subsec_nanos() + 1) + } else { + rng.gen_range(0, NANOS_PER_SEC) + }; + + Duration::new(secs, nsecs) +} + impl Stream for Interval { type Item = (); type Error = TimerError; @@ -29,7 +64,7 @@ impl Stream for Interval { let _ = try_ready!(self.sleep.poll()); // Reset the timeout - self.sleep = self.sleep.timer().sleep(self.duration); + self.sleep = self.sleep.timer().sleep(next_duration(self.min_duration, self.max_duration)); Ok(Async::Ready(Some(()))) } diff --git a/src/lib.rs b/src/lib.rs index 9ac3557..2ac0a13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -90,6 +90,7 @@ #[macro_use] extern crate futures; extern crate slab; +extern crate rand; mod interval; mod mpmc; diff --git a/src/timer.rs b/src/timer.rs index f95ab4c..efab0de 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -112,7 +112,7 @@ impl Timer { /// Creates a new interval which will fire at `dur` time into the future, /// and will repeat every `dur` interval after pub fn interval(&self, dur: Duration) -> Interval { - interval::new(self.sleep(dur), dur) + interval::new(self.sleep(dur), dur, dur) } /// Creates a new interval which will fire at the time specified by `at`, @@ -126,7 +126,17 @@ impl Timer { self.sleep(Duration::from_millis(0)) }; - interval::new(sleep, dur) + interval::new(sleep, dur, dur) + } + + /// Creates a new interval which will fire at a time in the range [`min`, `max`] into the + /// future, and will repeat every time with a new interval in the range [`min`, `max`] after. + /// + /// # Panics + /// + /// Panics if `max < min`. + pub fn interval_range(&self, min: Duration, max: Duration) -> Interval { + interval::new(self.sleep(interval::next_duration(min, max)), min, max) } }