Skip to content

Commit

Permalink
Merge pull request #7 from bcecchinato/feature/dynamics
Browse files Browse the repository at this point in the history
Adding dynamic rate and delay
  • Loading branch information
sfackler authored Mar 21, 2020
2 parents fd355bb + 0c700c5 commit 13d6f11
Showing 1 changed file with 135 additions and 0 deletions.
135 changes: 135 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ enum JobType {
f: Box<dyn FnMut() + Send + 'static>,
rate: Duration,
},
DynamicRate(Box<dyn FnMut() -> Option<Duration> + Send + 'static>),
FixedDelay {
f: Box<dyn FnMut() + Send + 'static>,
delay: Duration,
},
DynamicDelay(Box<dyn FnMut() -> Option<Duration> + Send + 'static>),
}

struct Job {
Expand Down Expand Up @@ -215,6 +217,33 @@ impl ScheduledThreadPool {
JobHandle(canceled)
}

/// Executes a closure after an initial delay at a dynamic rate in the pool.
///
/// The rate includes the time spent running the closure. For example, if
/// the return rate is 5 seconds and the closure takes 2 seconds to run, the
/// closure will be run again 3 seconds after it completes.
///
/// # Panics
///
/// If the closure panics, it will not be run again.
pub fn execute_at_dynamic_rate<F>(
&self,
initial_delay: Duration,
f: F,
) -> JobHandle
where
F: FnMut() -> Option<Duration> + Send + 'static
{
let canceled = Arc::new(AtomicBool::new(false));
let job = Job {
type_: JobType::DynamicRate(Box::new(f)),
time: Instant::now() + initial_delay,
canceled: canceled.clone(),
};
self.shared.run(job);
JobHandle(canceled)
}

/// Executes a closure after an initial delay at a fixed rate in the pool.
///
/// In contrast to `execute_at_fixed_rate`, the execution time of the
Expand Down Expand Up @@ -246,6 +275,34 @@ impl ScheduledThreadPool {
self.shared.run(job);
JobHandle(canceled)
}

/// Executes a closure after an initial delay at a dynamic rate in the pool.
///
/// In contrast to `execute_at_dynamic_rate`, the execution time of the
/// closure is not subtracted from the returned delay before it runs again. For
/// example, if the delay is 5 seconds and the closure takes 2 seconds to
/// run, the closure will run again 5 seconds after it completes.
///
/// # Panics
///
/// If the closure panics, it will not be run again.
pub fn execute_with_dynamic_delay<F>(
&self,
initial_delay: Duration,
f: F,
) -> JobHandle
where
F: FnMut() -> Option<Duration> + Send + 'static
{
let canceled = Arc::new(AtomicBool::new(false));
let job = Job {
type_: JobType::DynamicDelay(Box::new(f)),
time: Instant::now() + initial_delay,
canceled: canceled.clone(),
};
self.shared.run(job);
JobHandle(canceled)
}
}

struct Worker {
Expand Down Expand Up @@ -314,6 +371,16 @@ impl Worker {
};
self.shared.run(new_job)
}
JobType::DynamicRate(mut f) => {
if let Some(next_rate) = f() {
let new_job = Job {
type_: JobType::DynamicRate(f),
time: job.time + next_rate,
canceled: job.canceled,
};
self.shared.run(new_job)
}
}
JobType::FixedDelay { mut f, delay } => {
f();
let new_job = Job {
Expand All @@ -323,6 +390,16 @@ impl Worker {
};
self.shared.run(new_job)
}
JobType::DynamicDelay(mut f) => {
if let Some(next_delay) = f() {
let new_job = Job {
type_: JobType::DynamicDelay(f),
time: Instant::now() + next_delay,
canceled: job.canceled,
};
self.shared.run(new_job)
}
}
}
}
}
Expand Down Expand Up @@ -444,6 +521,64 @@ mod test {
assert!(rx.recv().is_err());
}

#[test]
fn test_dynamic_rate_jobs_stop_after_drop() {
let pool = Arc::new(ScheduledThreadPool::new(TEST_TASKS));
let (tx, rx) = channel();
let (tx2, rx2) = channel();

let mut pool2 = Some(pool.clone());
let mut i = 0i32;
pool.execute_with_dynamic_delay(
Duration::from_millis(500),
move || {
i += 1;
tx.send(i).unwrap();
rx2.recv().unwrap();
if i == 2 {
drop(pool2.take().unwrap());
}
Some(Duration::from_millis(500))
},
);
drop(pool);

assert_eq!(Ok(1), rx.recv());
tx2.send(()).unwrap();
assert_eq!(Ok(2), rx.recv());
tx2.send(()).unwrap();
assert!(rx.recv().is_err());
}

#[test]
fn test_dynamic_delay_jobs_stop_after_drop() {
let pool = Arc::new(ScheduledThreadPool::new(TEST_TASKS));
let (tx, rx) = channel();
let (tx2, rx2) = channel();

let mut pool2 = Some(pool.clone());
let mut i = 0i32;
pool.execute_at_dynamic_rate(
Duration::from_millis(500),
move || {
i += 1;
tx.send(i).unwrap();
rx2.recv().unwrap();
if i == 2 {
drop(pool2.take().unwrap());
}
Some(Duration::from_millis(500))
},
);
drop(pool);

assert_eq!(Ok(1), rx.recv());
tx2.send(()).unwrap();
assert_eq!(Ok(2), rx.recv());
tx2.send(()).unwrap();
assert!(rx.recv().is_err());
}

#[test]
fn cancellation() {
let pool = ScheduledThreadPool::new(TEST_TASKS);
Expand Down

0 comments on commit 13d6f11

Please sign in to comment.