Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transport): retry layer #849

Merged
merged 19 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/json-rpc/src/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,53 @@ pub struct ErrorPayload<ErrData = Box<RawValue>> {
pub data: Option<ErrData>,
}

impl<E> ErrorPayload<E> {
/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the
/// error code or the message.
pub fn is_retry_err(&self) -> bool {
// alchemy throws it this way
if self.code == 429 {
return true;
}

// This is an infura error code for `exceeded project rate limit`
if self.code == -32005 {
return true;
}

// alternative alchemy error for specific IPs
if self.code == -32016 && self.message.contains("rate limit") {
return true;
}

// quick node error `"credits limited to 6000/sec"`
// <https://github.com/foundry-rs/foundry/pull/6712#issuecomment-1951441240>
if self.code == -32012 && self.message.contains("credits") {
return true;
}

// quick node rate limit error: `100/second request limit reached - reduce calls per second
// or upgrade your account at quicknode.com` <https://github.com/foundry-rs/foundry/issues/4894>
if self.code == -32007 && self.message.contains("request limit reached") {
return true;
}

match self.message.as_str() {
// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
"header not found" => true,
// also thrown by infura if out of budget for the day and ratelimited
"daily request count exceeded, request rate limited" => true,
msg => {
msg.contains("rate limit")
|| msg.contains("rate exceeded")
|| msg.contains("too many requests")
|| msg.contains("credits limited")
|| msg.contains("request limit")
}
}
}
}

impl<ErrData> fmt::Display for ErrorPayload<ErrData> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "error code {}: {}", self.code, self.message)
Expand Down
6 changes: 3 additions & 3 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ serde.workspace = true
thiserror.workspace = true
tower.workspace = true
url.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["rt"] }

[features]

wasm-bindgen = ["dep:wasm-bindgen-futures"]
65 changes: 64 additions & 1 deletion crates/transport/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_json_rpc::{Id, RpcError, RpcResult};
use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult};
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{error::Error as StdError, fmt::Debug};
use thiserror::Error;
Expand Down Expand Up @@ -110,3 +111,65 @@ impl HttpError {
false
}
}

/// Extension trait to implement methods for [`RpcError<TransportErrorKind, E>`].
pub(crate) trait RpcErrorExt {
/// Analyzes whether to retry the request depending on the error.
fn is_retryable(&self) -> bool;

/// Fetches the backoff hint from the error message if present
fn backoff_hint(&self) -> Option<std::time::Duration>;
}

impl RpcErrorExt for RpcError<TransportErrorKind> {
fn is_retryable(&self) -> bool {
match self {
// There was a transport-level error. This is either a non-retryable error,
// or a server error that should be retried.
Self::Transport(err) => err.is_retry_err(),
// The transport could not serialize the error itself. The request was malformed from
// the start.
Self::SerError(_) => false,
Self::DeserError { text, .. } => {
if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
return resp.is_retry_err();
}

// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
// text should be a `JsonRpcError`
#[derive(Deserialize)]
struct Resp {
error: ErrorPayload,
}

if let Ok(resp) = serde_json::from_str::<Resp>(text) {
return resp.error.is_retry_err();
}

false
}
Self::ErrorResp(err) => err.is_retry_err(),
Self::NullResp => true,
_ => false,
}
}

fn backoff_hint(&self) -> Option<std::time::Duration> {
if let Self::ErrorResp(resp) = self {
let data = resp.try_data_as::<serde_json::Value>();
if let Some(Ok(data)) = data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let backoff_seconds = &data["rate"]["backoff_seconds"];
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(std::time::Duration::from_secs(seconds));
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(std::time::Duration::from_secs(seconds as u64 + 1));
}
}
}
None
}
}
6 changes: 6 additions & 0 deletions crates/transport/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Module for housing transport layers.

mod retry;

/// RetryBackoffLayer
pub use retry::{RateLimitRetryPolicy, RetryBackoffLayer, RetryBackoffService, RetryPolicy};
228 changes: 228 additions & 0 deletions crates/transport/src/layers/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
use crate::{
error::{RpcErrorExt, TransportError, TransportErrorKind},
TransportFut,
};
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use tower::{Layer, Service};
use tracing::trace;

/// A Transport Layer that is responsible for retrying requests based on the
/// error type. See [`TransportError`].
///
/// TransportError: crate::error::TransportError
#[derive(Debug, Clone)]
pub struct RetryBackoffLayer {
/// The maximum number of retries for rate limit errors
max_rate_limit_retries: u32,
/// The initial backoff in milliseconds
initial_backoff: u64,
/// The number of compute units per second for this provider
compute_units_per_second: u64,
}

impl RetryBackoffLayer {
/// Creates a new retry layer with the given parameters.
pub const fn new(
max_rate_limit_retries: u32,
initial_backoff: u64,
compute_units_per_second: u64,
) -> Self {
Self { max_rate_limit_retries, initial_backoff, compute_units_per_second }
}
}

/// [RateLimitRetryPolicy] implements [RetryPolicy] to determine whether to retry depending on the
/// err.
#[derive(Debug, Copy, Clone, Default)]
#[non_exhaustive]
pub struct RateLimitRetryPolicy;
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

/// [RetryPolicy] defines logic for which [TransportError] instances should
/// the client retry the request and try to recover from.
pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
/// Whether to retry the request based on the given `error`
fn should_retry(&self, error: &TransportError) -> bool;

/// Providers may include the `backoff` in the error response directly
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration>;
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
}

impl RetryPolicy for RateLimitRetryPolicy {
fn should_retry(&self, error: &TransportError) -> bool {
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved
error.is_retryable()
}

/// Provides a backoff hint if the error response contains it
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration> {
error.backoff_hint()
}
}

impl<S> Layer<S> for RetryBackoffLayer {
type Service = RetryBackoffService<S>;

fn layer(&self, inner: S) -> Self::Service {
RetryBackoffService {
inner,
policy: RateLimitRetryPolicy,
max_rate_limit_retries: self.max_rate_limit_retries,
initial_backoff: self.initial_backoff,
compute_units_per_second: self.compute_units_per_second,
requests_enqueued: Arc::new(AtomicU32::new(0)),
}
}
}

/// A Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based
/// on the error type. See [TransportError] and [RateLimitRetryPolicy].
#[derive(Debug, Clone)]
pub struct RetryBackoffService<S> {
/// The inner service
inner: S,
/// The retry policy
policy: RateLimitRetryPolicy,
/// The maximum number of retries for rate limit errors
max_rate_limit_retries: u32,
/// The initial backoff in milliseconds
initial_backoff: u64,
/// The number of compute units per second for this service
compute_units_per_second: u64,
/// The number of requests currently enqueued
requests_enqueued: Arc<AtomicU32>,
}

impl<S> RetryBackoffService<S> {
const fn initial_backoff(&self) -> Duration {
Duration::from_millis(self.initial_backoff)
}
}

impl<S> Service<RequestPacket> for RetryBackoffService<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Send
+ 'static
+ Clone,
S::Future: Send + 'static,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Our middleware doesn't care about backpressure, so it's ready as long
// as the inner service is ready.
self.inner.poll_ready(cx)
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
let inner = self.inner.clone();
let this = self.clone();
let mut inner = std::mem::replace(&mut self.inner, inner);
Box::pin(async move {
let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
let mut rate_limit_retry_number: u32 = 0;
loop {
let err;
let res = inner.call(request.clone()).await;

match res {
Ok(res) => {
if let Some(e) = res.as_error() {
mattsse marked this conversation as resolved.
Show resolved Hide resolved
err = TransportError::ErrorResp(e.clone())
} else {
this.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Ok(res);
}
}
Err(e) => err = e,
}

let should_retry = this.policy.should_retry(&err);
if should_retry {
rate_limit_retry_number += 1;
if rate_limit_retry_number > this.max_rate_limit_retries {
return Err(TransportErrorKind::custom_str(&format!(
"Max retries exceeded {}",
err
)));
}
trace!(%err, "retrying request");

let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64;

// try to extract the requested backoff from the error or compute the next
// backoff based on retry count
let backoff_hint = this.policy.backoff_hint(&err);
let next_backoff = backoff_hint.unwrap_or_else(|| this.initial_backoff());

// requests are usually weighted and can vary from 10 CU to several 100 CU,
// cheaper requests are more common some example alchemy
// weights:
// - `eth_getStorageAt`: 17
// - `eth_getBlockByNumber`: 16
// - `eth_newFilter`: 20
//
// (coming from forking mode) assuming here that storage request will be the
// driver for Rate limits we choose `17` as the average cost
// of any request
const AVG_COST: u64 = 17u64;
let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
AVG_COST,
this.compute_units_per_second,
current_queued_reqs,
ahead_in_queue,
);
let total_backoff = next_backoff
+ std::time::Duration::from_secs(seconds_to_wait_for_compute_budget);

trace!(
total_backoff_millis = total_backoff.as_millis(),
budget_backoff_millis = seconds_to_wait_for_compute_budget * 1000,
default_backoff_millis = next_backoff.as_millis(),
backoff_hint_millis = backoff_hint.map(|d| d.as_millis()),
"(all in ms) backing off due to rate limit"
);

tokio::time::sleep(total_backoff).await;
} else {
this.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Err(err);
}
}
})
}
}

/// Calculates an offset in seconds by taking into account the number of currently queued requests,
/// number of requests that were ahead in the queue when the request was first issued, the average
/// cost a weighted request (heuristic), and the number of available compute units per seconds.
///
/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request
/// is supposed to wait to not get rate limited. The budget per second is
/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory)
/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited.
/// By taking into account the number of concurrent request and the position in queue when the
/// request was first issued and determine the number of seconds a request is supposed to wait, if
/// at all
fn compute_unit_offset_in_secs(
avg_cost: u64,
compute_units_per_second: u64,
current_queued_requests: u64,
ahead_in_queue: u64,
) -> u64 {
let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
if current_queued_requests > request_capacity_per_second {
current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second)
} else {
0
}
}
2 changes: 2 additions & 0 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub use r#trait::Transport;
pub use alloy_json_rpc::{RpcError, RpcResult};
pub use futures_utils_wasm::{impl_future, BoxFuture};

pub mod layers;
yash-atreya marked this conversation as resolved.
Show resolved Hide resolved

/// Misc. utilities for building transports.
pub mod utils;

Expand Down