Skip to content
This repository has been archived by the owner on Apr 2, 2018. It is now read-only.

Commit

Permalink
Adds new function interval_range
Browse files Browse the repository at this point in the history
The functions returns a `Stream` that fires at some random value, that is in the
given range, into the future. The next duration is calculated every time after
the Interval fired into the future, by choosing a random value in the given range.
  • Loading branch information
bkchr committed Oct 13, 2017
1 parent d7bd58f commit abe0056
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ Timer facilities for Tokio
[dependencies]
futures = "0.1"
slab = "0.3.0"
rand = "0.3"
45 changes: 40 additions & 5 deletions src/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,58 @@ use {Sleep, TimerError};

use std::time::Duration;

/// A stream representing notifications at fixed interval
use rand::{thread_rng, Rng};

/// A stream representing notifications at given interval
///
/// Intervals are created through `Timer::interval`.
#[derive(Debug)]
pub struct Interval {
sleep: Sleep,
duration: Duration,
min_duration: Duration,
max_duration: Duration,
}

/// Create a new interval
pub fn new(sleep: Sleep, dur: Duration) -> Interval {
pub fn new(sleep: Sleep, min_dur: Duration, max_dur: Duration) -> Interval {
Interval {
sleep: sleep,
duration: dur,
min_duration: min_dur,
max_duration: max_dur,
}
}

const NANOS_PER_SEC: u32 = 1_000_000_000;

/// Returns the next duration for an interval
/// If `min` and `max` are equal, the duration is fixed.
/// If `min` and `max` are not equal, a duration in the range [`min`, `max`] is returned.
///
/// # Panics
///
/// Panics if `max < min`.
pub(crate) fn next_duration(min: Duration, max: Duration) -> Duration {
let mut rng = thread_rng();

let secs = if min.as_secs() == max.as_secs() {
min.as_secs()
} else {
rng.gen_range(min.as_secs(), max.as_secs() + 1)
};

let nsecs = if min.subsec_nanos() == max.subsec_nanos() {
min.subsec_nanos()
} else if secs == min.as_secs() {
rng.gen_range(min.subsec_nanos(), NANOS_PER_SEC)
} else if secs == max.as_secs() {
rng.gen_range(0, max.subsec_nanos() + 1)
} else {
rng.gen_range(0, NANOS_PER_SEC)
};

Duration::new(secs, nsecs)
}

impl Stream for Interval {
type Item = ();
type Error = TimerError;
Expand All @@ -29,7 +64,7 @@ impl Stream for Interval {
let _ = try_ready!(self.sleep.poll());

// Reset the timeout
self.sleep = self.sleep.timer().sleep(self.duration);
self.sleep = self.sleep.timer().sleep(next_duration(self.min_duration, self.max_duration));

Ok(Async::Ready(Some(())))
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
#[macro_use]
extern crate futures;
extern crate slab;
extern crate rand;

mod interval;
mod mpmc;
Expand Down
14 changes: 12 additions & 2 deletions src/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Timer {
/// Creates a new interval which will fire at `dur` time into the future,
/// and will repeat every `dur` interval after
pub fn interval(&self, dur: Duration) -> Interval {
interval::new(self.sleep(dur), dur)
interval::new(self.sleep(dur), dur, dur)
}

/// Creates a new interval which will fire at the time specified by `at`,
Expand All @@ -126,7 +126,17 @@ impl Timer {
self.sleep(Duration::from_millis(0))
};

interval::new(sleep, dur)
interval::new(sleep, dur, dur)
}

/// Creates a new interval which will fire at a time in the range [`min`, `max`] into the
/// future, and will repeat every time with a new interval in the range [`min`, `max`] after.
///
/// # Panics
///
/// Panics if `max < min`.
pub fn interval_range(&self, min: Duration, max: Duration) -> Interval {
interval::new(self.sleep(interval::next_duration(min, max)), min, max)
}
}

Expand Down

0 comments on commit abe0056

Please sign in to comment.