-
Notifications
You must be signed in to change notification settings - Fork 131
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add retry utility in prep for s3_store
- Loading branch information
Showing
4 changed files
with
316 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. | ||
|
||
use futures::future::Future; | ||
use futures::stream::StreamExt; | ||
use std::pin::Pin; | ||
use std::time::Duration; | ||
|
||
use error::{make_err, Code, Error}; | ||
|
||
pub struct ExponentialBackoff { | ||
current: Duration, | ||
} | ||
|
||
impl ExponentialBackoff { | ||
pub fn new(base: Duration) -> Self { | ||
ExponentialBackoff { current: base } | ||
} | ||
} | ||
|
||
impl Iterator for ExponentialBackoff { | ||
type Item = Duration; | ||
|
||
fn next(&mut self) -> Option<Duration> { | ||
self.current = self.current * 2; | ||
Some(self.current) | ||
} | ||
} | ||
|
||
type SleepFn = Box<dyn Fn(Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> + Sync + Send>; | ||
|
||
#[derive(PartialEq, Eq, Debug)] | ||
pub enum RetryResult<T> { | ||
Ok(T), | ||
Retry(Error), | ||
Err(Error), | ||
} | ||
|
||
/// Class used to retry a job with a sleep function in between each retry. | ||
pub struct Retrier { | ||
sleep_fn: SleepFn, | ||
} | ||
|
||
impl Retrier { | ||
pub fn new(sleep_fn: SleepFn) -> Self { | ||
Retrier { sleep_fn } | ||
} | ||
|
||
pub fn retry<'a, T, Fut, I>( | ||
&'a self, | ||
duration_iter: I, | ||
operation: Fut, | ||
) -> Pin<Box<dyn Future<Output = Result<T, Error>> + 'a + Send>> | ||
where | ||
Fut: futures::stream::Stream<Item = RetryResult<T>> + Send + 'a, | ||
I: IntoIterator<Item = Duration> + Send + 'a, | ||
<I as IntoIterator>::IntoIter: Send, | ||
T: Send, | ||
{ | ||
Box::pin(async move { | ||
let mut iter = duration_iter.into_iter(); | ||
let mut operation = Box::pin(operation); | ||
loop { | ||
match operation.next().await { | ||
None => return Err(make_err!(Code::Internal, "Retry stream ended abruptly",)), | ||
Some(RetryResult::Ok(value)) => return Ok(value), | ||
Some(RetryResult::Err(e)) => return Err(e), | ||
Some(RetryResult::Retry(e)) => (self.sleep_fn)(iter.next().ok_or_else(|| e)?).await, | ||
} | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
// Copyright 2021 Nathan (Blaise) Bruer. All rights reserved. | ||
use std::pin::Pin; | ||
use std::sync::atomic::{AtomicI32, Ordering}; | ||
use std::sync::Arc; | ||
|
||
use error::{make_err, Code, Error}; | ||
|
||
use tokio::time::Duration; | ||
|
||
use futures::future::ready; | ||
use futures::stream::repeat_with; | ||
|
||
use retry::{Retrier, RetryResult}; | ||
|
||
struct MockDurationIterator { | ||
duration: Duration, | ||
} | ||
|
||
impl MockDurationIterator { | ||
pub fn new(duration: Duration) -> Self { | ||
MockDurationIterator { duration: duration } | ||
} | ||
} | ||
|
||
impl Iterator for MockDurationIterator { | ||
type Item = Duration; | ||
|
||
fn next(&mut self) -> Option<Duration> { | ||
Some(self.duration) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod retry_tests { | ||
use super::*; | ||
use pretty_assertions::assert_eq; // Must be declared in every module. | ||
|
||
#[tokio::test] | ||
async fn retry_simple_success() -> Result<(), Error> { | ||
let retrier = Retrier::new(Box::new(|_duration| Box::pin(ready(())))); | ||
let retry_config = MockDurationIterator::new(Duration::from_millis(1)); | ||
let run_count = Arc::new(AtomicI32::new(0)); | ||
|
||
let result = Pin::new(&retrier) | ||
.retry( | ||
retry_config, | ||
repeat_with(|| { | ||
run_count.fetch_add(1, Ordering::Relaxed); | ||
RetryResult::Ok(true) | ||
}), | ||
) | ||
.await?; | ||
assert_eq!( | ||
run_count.load(Ordering::Relaxed), | ||
1, | ||
"Expected function to be called once" | ||
); | ||
assert_eq!(result, true, "Expected result to succeed"); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn retry_fails_after_3_runs() -> Result<(), Error> { | ||
let retrier = Retrier::new(Box::new(|_duration| Box::pin(ready(())))); | ||
let retry_config = MockDurationIterator::new(Duration::from_millis(1)).take(2); // .take() will run X times + 1. | ||
let run_count = Arc::new(AtomicI32::new(0)); | ||
|
||
let result = Pin::new(&retrier) | ||
.retry( | ||
retry_config, | ||
repeat_with(|| { | ||
run_count.fetch_add(1, Ordering::Relaxed); | ||
RetryResult::<bool>::Retry(make_err!(Code::Unavailable, "Dummy failure",)) | ||
}), | ||
) | ||
.await; | ||
assert_eq!(run_count.load(Ordering::Relaxed), 3, "Expected function to be called"); | ||
assert_eq!(result.is_err(), true, "Expected result to error"); | ||
assert_eq!( | ||
result.unwrap_err().to_string(), | ||
"Error { code: Unavailable, messages: [\"Dummy failure\"] }" | ||
); | ||
|
||
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn retry_success_after_2_runs() -> Result<(), Error> { | ||
let retrier = Retrier::new(Box::new(|_duration| Box::pin(ready(())))); | ||
let retry_config = MockDurationIterator::new(Duration::from_millis(1)).take(5); // .take() will run X times + 1. | ||
let run_count = Arc::new(AtomicI32::new(0)); | ||
|
||
let result = Pin::new(&retrier) | ||
.retry( | ||
retry_config, | ||
repeat_with(|| { | ||
run_count.fetch_add(1, Ordering::Relaxed); | ||
if run_count.load(Ordering::Relaxed) == 2 { | ||
return RetryResult::Ok(true); | ||
} | ||
RetryResult::<bool>::Retry(make_err!(Code::Unavailable, "Dummy failure",)) | ||
}), | ||
) | ||
.await?; | ||
assert_eq!(run_count.load(Ordering::Relaxed), 2, "Expected function to be called"); | ||
assert_eq!(result, true, "Expected result to succeed"); | ||
|
||
Ok(()) | ||
} | ||
|
||
// test-prefix/987cc59b2d364596f94bd82f250d02aebb716c3a100163ca784b54a50e0dfde2-142 | ||
#[tokio::test] | ||
async fn retry_calls_sleep_fn() -> Result<(), Error> { | ||
const EXPECTED_MS: u64 = 71; | ||
let sleep_fn_run_count = Arc::new(AtomicI32::new(0)); | ||
let sleep_fn_run_count_copy = sleep_fn_run_count.clone(); | ||
let retrier = Retrier::new(Box::new(move |duration| { | ||
// Note: Need to make another copy to make the compiler happy. | ||
let sleep_fn_run_count_copy = sleep_fn_run_count_copy.clone(); | ||
Box::pin(async move { | ||
// Remember: This function is called only on retries, not the first run. | ||
sleep_fn_run_count_copy.fetch_add(1, Ordering::Relaxed); | ||
assert_eq!(duration, Duration::from_millis(EXPECTED_MS)); | ||
() | ||
}) | ||
})); | ||
|
||
// s3://blaisebruer-cas-store/test-prefix/987cc59b2d364596f94bd82f250d02aebb716c3a100163ca784b54a50e0dfde2-142 | ||
{ | ||
// Try with retry limit hit. | ||
let retry_config = MockDurationIterator::new(Duration::from_millis(EXPECTED_MS)).take(5); | ||
let result = Pin::new(&retrier) | ||
.retry( | ||
retry_config, | ||
repeat_with(|| RetryResult::<bool>::Retry(make_err!(Code::Unavailable, "Dummy failure",))), | ||
) | ||
.await; | ||
|
||
assert_eq!(result.is_err(), true, "Expected the retry to fail"); | ||
assert_eq!( | ||
sleep_fn_run_count.load(Ordering::Relaxed), | ||
5, | ||
"Expected the sleep_fn to be called twice" | ||
); | ||
} | ||
sleep_fn_run_count.store(0, Ordering::Relaxed); // Reset our counter. | ||
{ | ||
// Try with 3 retries. | ||
let retry_config = MockDurationIterator::new(Duration::from_millis(EXPECTED_MS)).take(5); | ||
let run_count = Arc::new(AtomicI32::new(0)); | ||
let result = Pin::new(&retrier) | ||
.retry( | ||
retry_config, | ||
repeat_with(|| { | ||
run_count.fetch_add(1, Ordering::Relaxed); | ||
// Remember: This function is only called every time, not just retries. | ||
// We run the first time, then retry 2 additional times meaning 3 runs. | ||
if run_count.load(Ordering::Relaxed) == 3 { | ||
return RetryResult::Ok(true); | ||
} | ||
RetryResult::<bool>::Retry(make_err!(Code::Unavailable, "Dummy failure",)) | ||
}), | ||
) | ||
.await?; | ||
|
||
assert_eq!(result, true, "Expected results to pass"); | ||
assert_eq!( | ||
sleep_fn_run_count.load(Ordering::Relaxed), | ||
2, | ||
"Expected the sleep_fn to be called twice" | ||
); | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |