Skip to content

Commit

Permalink
Improve error reporting when using retry_on_retryable_error
Browse files Browse the repository at this point in the history
To improve the error reporting, this commit introduces a RetryError decorator
which adds information whether the retry_on_retryable_error failed because the
error was not retryable or whether the retry attempts have been exhausted.
  • Loading branch information
tillrohrmann committed Jan 31, 2025
1 parent 4676291 commit c3e4ac5
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 17 deletions.
7 changes: 4 additions & 3 deletions crates/bifrost/src/bifrost_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<'a> BifrostAdmin<'a> {
})
})
.await
.map_err(|e| e.transpose())?;
.map_err(|e| e.into_inner().transpose())?;

self.inner.metadata_writer.update(Arc::new(logs)).await?;
Ok(())
Expand Down Expand Up @@ -287,7 +287,7 @@ impl<'a> BifrostAdmin<'a> {
)
})
.await
.map_err(|e| e.transpose())?;
.map_err(|e| e.into_inner().transpose())?;

self.inner.metadata_writer.update(Arc::new(logs)).await?;
Ok(())
Expand All @@ -310,7 +310,8 @@ impl<'a> BifrostAdmin<'a> {
Logs::from_configuration(&Configuration::pinned())
})
})
.await?;
.await
.map_err(|err| err.into_inner())?;

self.inner.metadata_writer.update(Arc::new(logs)).await?;
Ok(())
Expand Down
57 changes: 46 additions & 11 deletions crates/core/src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,22 +535,57 @@ impl From<WriteError> for ReadWriteError {

static_assertions::assert_impl_all!(MetadataStoreClient: Send, Sync, Clone);

pub async fn retry_on_retryable_error<Fn, Fut, T, E, P>(retry_policy: P, action: Fn) -> Result<T, E>
pub async fn retry_on_retryable_error<Fn, Fut, T, E, P>(
retry_policy: P,
mut action: Fn,
) -> Result<T, RetryError<E>>
where
P: Into<RetryPolicy>,
Fn: FnMut() -> Fut,
Fut: Future<Output = Result<T, E>>,
E: MaybeRetryableError,
{
retry_policy
.into()
.retry_if(action, |err: &E| {
if err.retryable() {
debug!(%err, "Operation failed. Retrying it.");
true
} else {
false
let retry_policy = retry_policy.into();
let mut retry_iter = retry_policy.iter();

loop {
match action().await {
Ok(value) => return Ok(value),
Err(err) => {
if err.retryable() {
if let Some(delay) = retry_iter.next() {
tokio::time::sleep(delay).await;
} else {
return Err(RetryError::RetriesExhausted(err));
}
} else {
return Err(RetryError::NotRetryable(err));
}
}
})
.await
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum RetryError<E> {
#[error("retries exhausted: {0}")]
RetriesExhausted(E),
#[error(transparent)]
NotRetryable(E),
}

impl<E> RetryError<E> {
pub fn into_inner(self) -> E {
match self {
RetryError::RetriesExhausted(err) => err,
RetryError::NotRetryable(err) => err,
}
}

pub fn map<F>(self, mapper: impl Fn(E) -> F) -> RetryError<F> {
match self {
RetryError::RetriesExhausted(err) => RetryError::RetriesExhausted(mapper(err)),
RetryError::NotRetryable(err) => RetryError::NotRetryable(mapper(err)),
}
}
}
11 changes: 8 additions & 3 deletions crates/log-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use anyhow::Context;
use tonic::codec::CompressionEncoding;
use tracing::{debug, info, instrument};

use restate_core::metadata_store::{retry_on_retryable_error, ReadWriteError};
use restate_core::metadata_store::{retry_on_retryable_error, ReadWriteError, RetryError};
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{MessageRouterBuilder, NetworkServerBuilder};
use restate_core::{Metadata, MetadataWriter, TaskCenter, TaskKind};
Expand Down Expand Up @@ -240,10 +240,15 @@ impl LogServerService {
)
})
.await
.map_err(|e| e.transpose())
.map_err(|e| e.map(|err| err.transpose()))
{
Ok(nodes_config) => nodes_config,
Err(MarkNodeAsWriteableError::PreviousAttemptSucceeded(nodes_config)) => nodes_config,
Err(RetryError::NotRetryable(MarkNodeAsWriteableError::PreviousAttemptSucceeded(
nodes_config,
)))
| Err(RetryError::RetriesExhausted(
MarkNodeAsWriteableError::PreviousAttemptSucceeded(nodes_config),
)) => nodes_config,
Err(err) => {
return Err(err).context("failed to mark this node as writeable");
}
Expand Down

0 comments on commit c3e4ac5

Please sign in to comment.