-
-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: Add blocking retry with context support (#80)
Signed-off-by: Xuanwo <github@xuanwo.io>
- Loading branch information
Showing
2 changed files
with
198 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
use crate::backoff::BackoffBuilder; | ||
use crate::Backoff; | ||
|
||
/// BlockingRetryableWithContext will add retry support for functions. | ||
pub trait BlockingRetryableWithContext< | ||
B: BackoffBuilder, | ||
T, | ||
E, | ||
Ctx, | ||
F: FnMut(Ctx) -> (Ctx, Result<T, E>), | ||
> | ||
{ | ||
/// Generate a new retry | ||
fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, Ctx, F>; | ||
} | ||
|
||
impl<B, T, E, Ctx, F> BlockingRetryableWithContext<B, T, E, Ctx, F> for F | ||
where | ||
B: BackoffBuilder, | ||
F: FnMut(Ctx) -> (Ctx, Result<T, E>), | ||
{ | ||
fn retry(self, builder: &B) -> BlockingRetry<B::Backoff, T, E, Ctx, F> { | ||
BlockingRetry::new(self, builder.build()) | ||
} | ||
} | ||
|
||
/// Retry struct generated by [`Retryable`]. | ||
pub struct BlockingRetry< | ||
B: Backoff, | ||
T, | ||
E, | ||
Ctx, | ||
F: FnMut(Ctx) -> (Ctx, Result<T, E>), | ||
RF = fn(&E) -> bool, | ||
NF = fn(&E, Duration), | ||
> { | ||
backoff: B, | ||
retryable: RF, | ||
notify: NF, | ||
f: F, | ||
ctx: Option<Ctx>, | ||
} | ||
|
||
impl<B, T, E, Ctx, F> BlockingRetry<B, T, E, Ctx, F> | ||
where | ||
B: Backoff, | ||
F: FnMut(Ctx) -> (Ctx, Result<T, E>), | ||
{ | ||
/// Create a new retry. | ||
fn new(f: F, backoff: B) -> Self { | ||
BlockingRetry { | ||
backoff, | ||
retryable: |_: &E| true, | ||
notify: |_: &E, _: Duration| {}, | ||
f, | ||
ctx: None, | ||
} | ||
} | ||
} | ||
|
||
impl<B, T, E, Ctx, F, RF, NF> BlockingRetry<B, T, E, Ctx, F, RF, NF> | ||
where | ||
B: Backoff, | ||
F: FnMut(Ctx) -> (Ctx, Result<T, E>), | ||
RF: FnMut(&E) -> bool, | ||
NF: FnMut(&E, Duration), | ||
{ | ||
/// Set the context for retrying. | ||
pub fn context(self, context: Ctx) -> BlockingRetry<B, T, E, Ctx, F, RF, NF> { | ||
BlockingRetry { | ||
backoff: self.backoff, | ||
retryable: self.retryable, | ||
notify: self.notify, | ||
f: self.f, | ||
ctx: Some(context), | ||
} | ||
} | ||
|
||
/// Set the conditions for retrying. | ||
/// | ||
/// If not specified, we treat all errors as retryable. | ||
pub fn when<RN: FnMut(&E) -> bool>( | ||
self, | ||
retryable: RN, | ||
) -> BlockingRetry<B, T, E, Ctx, F, RN, NF> { | ||
BlockingRetry { | ||
backoff: self.backoff, | ||
retryable, | ||
notify: self.notify, | ||
f: self.f, | ||
ctx: self.ctx, | ||
} | ||
} | ||
|
||
/// Set to notify for everything retrying. | ||
/// | ||
/// If not specified, this is a no-op. | ||
pub fn notify<NN: FnMut(&E, Duration)>( | ||
self, | ||
notify: NN, | ||
) -> BlockingRetry<B, T, E, Ctx, F, RF, NN> { | ||
BlockingRetry { | ||
backoff: self.backoff, | ||
retryable: self.retryable, | ||
notify, | ||
f: self.f, | ||
ctx: self.ctx, | ||
} | ||
} | ||
|
||
/// Call the retried function. | ||
/// | ||
/// TODO: implment [`std::ops::FnOnce`] after it stable. | ||
pub fn call(mut self) -> (Ctx, Result<T, E>) { | ||
let mut ctx = self.ctx.take().expect("context must be valid"); | ||
loop { | ||
let (xctx, result) = (self.f)(ctx); | ||
// return ctx ownership back | ||
ctx = xctx; | ||
|
||
match result { | ||
Ok(v) => return (ctx, Ok(v)), | ||
Err(err) => { | ||
if !(self.retryable)(&err) { | ||
return (ctx, Err(err)); | ||
} | ||
|
||
match self.backoff.next() { | ||
None => return (ctx, Err(err)), | ||
Some(dur) => { | ||
(self.notify)(&err, dur); | ||
thread::sleep(dur); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use std::time::Duration; | ||
|
||
use anyhow::anyhow; | ||
use std::sync::Mutex; | ||
|
||
use super::*; | ||
use crate::exponential::ExponentialBuilder; | ||
use anyhow::Result; | ||
|
||
struct Test; | ||
|
||
impl Test { | ||
fn hello(&mut self) -> Result<usize> { | ||
Err(anyhow!("not retryable")) | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_retry_with_not_retryable_error() -> Result<()> { | ||
let error_times = Mutex::new(0); | ||
|
||
let test = Test; | ||
|
||
let backoff = ExponentialBuilder::default().with_min_delay(Duration::from_millis(1)); | ||
|
||
let (_, result) = { | ||
|mut v: Test| { | ||
let mut x = error_times.lock().unwrap(); | ||
*x += 1; | ||
|
||
let res = v.hello(); | ||
(v, res) | ||
} | ||
} | ||
.retry(&backoff) | ||
.context(test) | ||
// Only retry If error message is `retryable` | ||
.when(|e| e.to_string() == "retryable") | ||
.call(); | ||
|
||
assert!(result.is_err()); | ||
assert_eq!("not retryable", result.unwrap_err().to_string()); | ||
// `f` always returns error "not retryable", so it should be executed | ||
// only once. | ||
assert_eq!(*error_times.lock().unwrap(), 1); | ||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters