Skip to content

Commit

Permalink
Add exponential backoff to queries using RetryClient (#60)
Browse files Browse the repository at this point in the history
* wip trying to get error type

* Add retry option to args

* format

* adapt to new fetcher pattern

* remove debug

* fix test

* Try using retry client

* fix

* update defaults
  • Loading branch information
ipatka authored Sep 16, 2023
1 parent 7ca0073 commit a2941ce
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 3 deletions.
8 changes: 8 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ pub struct Args {
#[arg(short('l'), long, value_name = "limit", help_heading = "Acquisition Options")]
pub requests_per_second: Option<u32>,

/// Specify max retries on provider errors
#[arg(long, default_value_t = 5, value_name = "R", help_heading = "Acquisition Options")]
pub max_retries: u32,

/// Specify initial backoff for retry strategy (ms)
#[arg(long, default_value_t = 500, value_name = "B", help_heading = "Acquisition Options")]
pub initial_backoff: u64,

/// Global number of concurrent requests
#[arg(long, value_name = "M", help_heading = "Acquisition Options")]
pub max_concurrent_requests: Option<u64>,
Expand Down
5 changes: 3 additions & 2 deletions crates/cli/src/parse/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::args::Args;
pub(crate) async fn parse_source(args: &Args) -> Result<Source, ParseError> {
// parse network info
let rpc_url = parse_rpc_url(args);
let provider = Provider::<Http>::try_from(rpc_url)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let provider =
Provider::<RetryClient<Http>>::new_client(&rpc_url, args.max_retries, args.initial_backoff)
.map_err(|_e| ParseError::ParseError("could not connect to provider".to_string()))?;
let chain_id = provider.get_chainid().await.map_err(ParseError::ProviderError)?.as_u64();

let rate_limiter = match args.requests_per_second {
Expand Down
1 change: 1 addition & 0 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub(crate) fn fetch_block_traces(
let fetcher = source.fetcher.clone();
task::spawn(async move {
let result = fetcher.trace_block(BlockNumber::Number(number.into())).await;

match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/types/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub type RateLimiter = governor::RateLimiter<NotKeyed, InMemoryState, DefaultClo
#[derive(Clone)]
pub struct Source {
/// Shared provider for rpc data
pub fetcher: Arc<Fetcher<Http>>,
pub fetcher: Arc<Fetcher<RetryClient<Http>>>,
/// chain_id of network
pub chain_id: u64,
/// number of blocks per log request
Expand Down
6 changes: 6 additions & 0 deletions crates/python/src/collect_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use cryo_freeze::collect;
requests_per_second = None,
max_concurrent_requests = None,
max_concurrent_chunks = None,
max_retries = 10,
initial_backoff = 500,
dry = false,
chunk_size = 1000,
n_chunks = None,
Expand Down Expand Up @@ -67,6 +69,8 @@ pub fn _collect(
requests_per_second: Option<u32>,
max_concurrent_requests: Option<u64>,
max_concurrent_chunks: Option<u64>,
max_retries: u32,
initial_backoff: u64,
dry: bool,
chunk_size: u64,
n_chunks: Option<u64>,
Expand Down Expand Up @@ -107,6 +111,8 @@ pub fn _collect(
requests_per_second,
max_concurrent_requests,
max_concurrent_chunks,
max_retries,
initial_backoff,
dry,
chunk_size,
n_chunks,
Expand Down
6 changes: 6 additions & 0 deletions crates/python/src/freeze_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use cryo_cli::{run, Args};
requests_per_second = None,
max_concurrent_requests = None,
max_concurrent_chunks = None,
max_retries = 10,
initial_backoff = 500,
dry = false,
chunk_size = 1000,
n_chunks = None,
Expand Down Expand Up @@ -68,6 +70,8 @@ pub fn _freeze(
requests_per_second: Option<u32>,
max_concurrent_requests: Option<u64>,
max_concurrent_chunks: Option<u64>,
max_retries: u32,
initial_backoff: u64,
dry: bool,
chunk_size: u64,
n_chunks: Option<u64>,
Expand Down Expand Up @@ -108,6 +112,8 @@ pub fn _freeze(
requests_per_second,
max_concurrent_requests,
max_concurrent_chunks,
max_retries,
initial_backoff,
dry,
chunk_size,
n_chunks,
Expand Down

0 comments on commit a2941ce

Please sign in to comment.