Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

feat: support retrying connection errors #1629

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 153 additions & 24 deletions ethers-providers/src/transports/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{provider::ProviderError, JsonRpcClient};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{
error::Error,
fmt::Debug,
sync::atomic::{AtomicU32, Ordering},
time::Duration,
Expand All @@ -22,6 +23,27 @@ pub trait RetryPolicy<E>: Send + Sync + Debug {

/// [RetryClient] presents as a wrapper around [JsonRpcClient] that will retry
/// requests based with an exponential backoff and filtering based on [RetryPolicy].
///
/// The `RetryPolicy`, mainly for rate-limiting errors, can be adjusted for specific applications,
/// endpoints. In addition to the `RetryPolicy` errors due to connectivity issues, like timed out
/// connections or responses in range `5xx` can be retried separately.
///
/// # Example
///
/// ```
/// # async fn demo() {
/// use ethers_providers::{Http, RetryClient, RetryClientBuilder, HttpRateLimitRetryPolicy};
/// use std::time::Duration;
/// use url::Url;
///
/// let http = Http::new(Url::parse("http://localhost:8545").unwrap());
/// let client = RetryClientBuilder::default()
/// .rate_limit_retries(10)
/// .timeout_retries(3)
/// .initial_backoff(Duration::from_millis(500))
/// .build(http, Box::new(HttpRateLimitRetryPolicy::default()));
/// # }
/// ```
#[derive(Debug)]
pub struct RetryClient<T>
where
Expand All @@ -30,9 +52,14 @@ where
{
inner: T,
requests_enqueued: AtomicU32,
/// The policy to use to determine whether to retry a request due to rate limiting
policy: Box<dyn RetryPolicy<T::Error>>,
max_retry: u32,
initial_backoff: u64,
/// How many connection `TimedOut` should be retried.
timeout_retries: u32,
/// How many retries for rate limited responses
rate_limit_retries: u32,
/// How long to wait initially
initial_backoff: Duration,
/// available CPU per second
compute_units_per_second: u64,
}
Expand Down Expand Up @@ -67,15 +94,10 @@ where
// in milliseconds
initial_backoff: u64,
) -> Self {
Self {
inner,
requests_enqueued: AtomicU32::new(0),
policy,
max_retry,
initial_backoff,
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: 330,
}
RetryClientBuilder::default()
.initial_backoff(Duration::from_millis(initial_backoff))
.rate_limit_retries(max_retry)
.build(inner, policy)
}

/// Sets the free compute units per second limit.
Expand All @@ -90,6 +112,87 @@ where
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RetryClientBuilder {
/// How many connection `TimedOut` should be retried.
timeout_retries: u32,
/// How many retries for rate limited responses
rate_limit_retries: u32,
/// How long to wait initially
initial_backoff: Duration,
/// available CPU per second
compute_units_per_second: u64,
}

// === impl RetryClientBuilder ===

impl RetryClientBuilder {
/// Sets the number of retries after a connection times out
///
/// **Note:** this will only be used for `request::Error::TimedOut`
pub fn timeout_retries(mut self, timeout_retries: u32) -> Self {
self.timeout_retries = timeout_retries;
self
}

/// How many retries for rate limited responses
pub fn rate_limit_retries(mut self, rate_limit_retries: u32) -> Self {
self.rate_limit_retries = rate_limit_retries;
self
}

/// Sets the number of assumed available compute units per second
///
/// See also, <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
pub fn compute_units_per_second(mut self, compute_units_per_second: u64) -> Self {
self.compute_units_per_second = compute_units_per_second;
self
}

/// Sets the duration to wait initially before retrying
pub fn initial_backoff(mut self, initial_backoff: Duration) -> Self {
self.initial_backoff = initial_backoff;
self
}

/// Creates the `RetryClient` with the configured settings
pub fn build<T>(self, client: T, policy: Box<dyn RetryPolicy<T::Error>>) -> RetryClient<T>
where
T: JsonRpcClient,
T::Error: Sync + Send + 'static,
{
let RetryClientBuilder {
timeout_retries,
rate_limit_retries,
initial_backoff,
compute_units_per_second,
} = self;
RetryClient {
inner: client,
requests_enqueued: AtomicU32::new(0),
policy,
timeout_retries,
rate_limit_retries,
initial_backoff,
compute_units_per_second,
}
}
}

// Some sensible defaults
impl Default for RetryClientBuilder {
fn default() -> Self {
Self {
timeout_retries: 3,
// this should be enough to even out heavy loads
rate_limit_retries: 10,
initial_backoff: Duration::from_millis(100),
// alchemy max cpus <https://github.com/alchemyplatform/alchemy-docs/blob/master/documentation/compute-units.md#rate-limits-cups>
compute_units_per_second: 330,
}
}
}

/// Error thrown when:
/// 1. Internal client throws an error we do not wish to try to recover from.
/// 2. Params serialization failed.
Expand Down Expand Up @@ -137,7 +240,7 @@ where

async fn request<A, R>(&self, method: &str, params: A) -> Result<R, Self::Error>
where
A: std::fmt::Debug + Serialize + Send + Sync,
A: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
// Helper type that caches the `params` value across several retries
Expand All @@ -158,7 +261,8 @@ where

let ahead_in_queue = self.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;

let mut retry_number: u32 = 0;
let mut rate_limit_retry_number: u32 = 0;
let mut timeout_retries: u32 = 0;

loop {
let err;
Expand All @@ -179,20 +283,22 @@ where
}
}

retry_number += 1;
if retry_number > self.max_retry {
trace!("request timed out after {} retries", self.max_retry);
return Err(RetryClientError::TimeoutError)
}

let should_retry = self.policy.should_retry(&err);
if should_retry {
rate_limit_retry_number += 1;
if rate_limit_retry_number > self.rate_limit_retries {
trace!("request timed out after {} retries", self.rate_limit_retries);
return Err(RetryClientError::TimeoutError)
}

let current_queued_requests = self.requests_enqueued.load(Ordering::SeqCst) as u64;
// using `retry_number` for creating back pressure because
// of already queued requests
// this increases exponentially with retries and adds a delay based on how many
// requests are currently queued
let mut next_backoff = self.initial_backoff * 2u64.pow(retry_number);
let mut next_backoff = Duration::from_millis(
self.initial_backoff.as_millis().pow(rate_limit_retry_number) as u64,
);

// requests are usually weighted and can vary from 10 CU to several 100 CU, cheaper
// requests are more common some example alchemy weights:
Expand All @@ -209,12 +315,17 @@ where
current_queued_requests,
ahead_in_queue,
);
// backoff is measured in millis
next_backoff += seconds_to_wait_for_compute_budge * 1000;
next_backoff += Duration::from_secs(seconds_to_wait_for_compute_budge);

trace!("retrying and backing off for {}ms", next_backoff);
tokio::time::sleep(Duration::from_millis(next_backoff)).await;
trace!("retrying and backing off for {:?}", next_backoff);
tokio::time::sleep(next_backoff).await;
} else {
if timeout_retries < self.timeout_retries && maybe_connectivity(&err) {
timeout_retries += 1;
trace!(err = ?err, "retrying due to spurious network");
continue
}

trace!(err = ?err, "should not retry");
self.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Err(RetryClientError::ProviderError(err))
Expand Down Expand Up @@ -266,6 +377,24 @@ fn compute_unit_offset_in_secs(
}
}

/// Checks whether the `error` is the result of a connectivity issue, like
/// `request::Error::TimedOut`
fn maybe_connectivity(err: &(dyn Error + 'static)) -> bool {
if let Some(reqwest_err) = err.downcast_ref::<reqwest::Error>() {
if reqwest_err.is_timeout() || reqwest_err.is_connect() {
return true
}
// Error HTTP codes (5xx) are considered connectivity issues and will prompt retry
if let Some(status) = reqwest_err.status() {
let code = status.as_u16();
if (500..600).contains(&code) {
return true
}
}
}
false
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down