Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
11ee1e4
feat: seperate robust provider into own crate
LeoPatOZ Nov 10, 2025
0e5781d
feat: internal retry fallback function
LeoPatOZ Nov 10, 2025
fdd54be
ref: rename retry to try with failover
LeoPatOZ Nov 10, 2025
c22c0c3
feat: crate modifier
LeoPatOZ Nov 10, 2025
8934fec
feat: robust subscription
LeoPatOZ Nov 10, 2025
193baa5
feat: add robust subscription to block range scanner
LeoPatOZ Nov 10, 2025
6de479a
Merge branch 'add-robust-subsscription' into refactor-robust-provider
LeoPatOZ Nov 10, 2025
97ca46c
feat: merge latest sub
LeoPatOZ Nov 10, 2025
70fc300
feat: impl stream on RobustSuscscription
LeoPatOZ Nov 10, 2025
dbf7f45
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 10, 2025
77ce10c
feat: impl stream on robust subsription
LeoPatOZ Nov 11, 2025
3443501
feat: add robust stream logic to block range scanner
LeoPatOZ Nov 11, 2025
19aeb9c
ref: add todo about unbounded channel
LeoPatOZ Nov 11, 2025
e65f5d1
feat: add sub timeout (seperate to rpc timeout)
LeoPatOZ Nov 11, 2025
d8c122e
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 11, 2025
4c62598
ref: better comment
LeoPatOZ Nov 11, 2025
c64d700
test: test subscription failover
LeoPatOZ Nov 11, 2025
2d53712
fix: increase test speed
LeoPatOZ Nov 11, 2025
e84db1c
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 11, 2025
fc3c9bf
Merge branch 'main' into add-robust-subscription
LeoPatOZ Nov 11, 2025
efd857d
fix: from_min --> from_sec
LeoPatOZ Nov 11, 2025
06df99e
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 11, 2025
4cb95b1
fix: imports
LeoPatOZ Nov 11, 2025
71a3b3b
fix: from_min --> from_sec
LeoPatOZ Nov 11, 2025
01c1729
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 11, 2025
6c59321
fix: doc
LeoPatOZ Nov 11, 2025
212ef73
fix: format
LeoPatOZ Nov 11, 2025
8a47d92
ref: rename type and robust_subscription
LeoPatOZ Nov 12, 2025
a0cdda9
fix: all subsequent imports due to rename
LeoPatOZ Nov 12, 2025
c569048
fix: doc test
LeoPatOZ Nov 12, 2025
782b9cf
fix: unwaited try_stream
LeoPatOZ Nov 12, 2025
d471b16
ref: add comment link
LeoPatOZ Nov 12, 2025
9793875
ref: rewrite as match
LeoPatOZ Nov 12, 2025
9d6bd11
ref: use match
LeoPatOZ Nov 12, 2025
18a6dd7
ref: try fallback providers fn
LeoPatOZ Nov 12, 2025
26ed893
ref: - to *
LeoPatOZ Nov 12, 2025
36ff752
ref: update to use bounded channel
LeoPatOZ Nov 12, 2025
9732ef0
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 12, 2025
c3eae82
fix: add timeout for flaky test
LeoPatOZ Nov 12, 2025
3fc2173
test: add sub timeout to test
LeoPatOZ Nov 12, 2025
a128aef
Merge branch 'add-robust-subscription' into refactor-robust-provider
LeoPatOZ Nov 12, 2025
8304024
ref: revert changes
LeoPatOZ Nov 12, 2025
4ba97ee
ref: rebase onto main
LeoPatOZ Nov 12, 2025
9c3c648
Merge branch 'main' into refactor-robust-provider
LeoPatOZ Nov 12, 2025
41a891d
fix: rebase past changes
LeoPatOZ Nov 12, 2025
e76b5f7
ref: move all imports to nested in robust_provider
LeoPatOZ Nov 13, 2025
ff516bd
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
c5980ca
ref: use new import
LeoPatOZ Nov 13, 2025
a4d0c7c
Merge branch 'main' into refactor-robust-provider
LeoPatOZ Nov 13, 2025
ab9eb25
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
3f9d49d
Merge branch 'main' into refactor-robust-provider
LeoPatOZ Nov 13, 2025
f308f30
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
ee5857c
ref: rename max timeout to call timeout
LeoPatOZ Nov 13, 2025
6cc293c
ref: add sleep
LeoPatOZ Nov 13, 2025
ce4c640
ref: simpler test
LeoPatOZ Nov 13, 2025
a2b0f54
ref: imports
LeoPatOZ Nov 13, 2025
dd0f4fc
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
f6c2425
ref: export all builder
LeoPatOZ Nov 13, 2025
cde26a4
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
12e3a91
ref: remove primitive import
LeoPatOZ Nov 13, 2025
7e2a099
Merge branch 'refactor-robust-provider' into add-robust-subscription
LeoPatOZ Nov 13, 2025
b47e062
Merge branch 'main' into add-robust-subscription
LeoPatOZ Nov 13, 2025
8570f69
test: reconnect to primary
LeoPatOZ Nov 14, 2025
1307368
test: more robust subscription scenarios
LeoPatOZ Nov 14, 2025
cfa95cf
feat: add primary and fallback fields seperately
LeoPatOZ Nov 14, 2025
59d9c49
feat: add try fallback from logic
LeoPatOZ Nov 14, 2025
f1f6369
feat: add fallback provider tracking
LeoPatOZ Nov 14, 2025
d50aa75
ref: refactor try connect primary to be simpler
LeoPatOZ Nov 14, 2025
4378c9e
ref: simplify test and add better timeout logic
LeoPatOZ Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/historical_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> anyhow::Result<()> {
let _ = counter_contract.increase().send().await?.get_receipt().await?;

let robust_provider = RobustProviderBuilder::new(provider)
.max_timeout(std::time::Duration::from_secs(30))
.call_timeout(std::time::Duration::from_secs(30))
.max_retries(5)
.min_delay(std::time::Duration::from_millis(500))
.build()
Expand Down
2 changes: 1 addition & 1 deletion examples/latest_events_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
.event(Counter::CountIncreased::SIGNATURE);

let robust_provider = RobustProviderBuilder::new(provider)
.max_timeout(std::time::Duration::from_secs(30))
.call_timeout(std::time::Duration::from_secs(30))
.max_retries(5)
.min_delay(std::time::Duration::from_millis(500))
.build()
Expand Down
2 changes: 1 addition & 1 deletion examples/live_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
.event(Counter::CountIncreased::SIGNATURE);

let robust_provider = RobustProviderBuilder::new(provider)
.max_timeout(std::time::Duration::from_secs(30))
.call_timeout(std::time::Duration::from_secs(30))
.max_retries(5)
.min_delay(std::time::Duration::from_millis(500))
.build()
Expand Down
2 changes: 1 addition & 1 deletion examples/sync_from_block_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
}

let robust_provider = RobustProviderBuilder::new(provider)
.max_timeout(Duration::from_secs(30))
.call_timeout(Duration::from_secs(30))
.max_retries(5)
.min_delay(Duration::from_millis(500))
.build()
Expand Down
2 changes: 1 addition & 1 deletion examples/sync_from_latest_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
.event(Counter::CountIncreased::SIGNATURE);

let robust_provider = RobustProviderBuilder::new(provider)
.max_timeout(std::time::Duration::from_secs(30))
.call_timeout(std::time::Duration::from_secs(30))
.max_retries(5)
.min_delay(std::time::Duration::from_millis(500))
.build()
Expand Down
21 changes: 17 additions & 4 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
//! }
//! ```

use crate::robust_provider::subscription::RobustSubscription;
use std::{cmp::Ordering, ops::RangeInclusive};
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -77,7 +78,6 @@ use alloy::{
eips::BlockNumberOrTag,
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::{B256, BlockNumber},
pubsub::Subscription,
transports::{RpcError, TransportErrorKind},
};
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -609,16 +609,29 @@ impl<N: Network> Service<N> {

async fn stream_live_blocks(
mut range_start: BlockNumber,
subscription: Subscription<N::HeaderResponse>,
subscription: RobustSubscription<N>,
sender: mpsc::Sender<Message>,
block_confirmations: u64,
max_block_range: u64,
) {
// ensure we start streaming only after the expected_next_block cutoff
let cutoff = range_start;
let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff);
let mut stream = subscription.into_stream().skip_while(|result| match result {
Ok(header) => header.number() < cutoff,
Err(_) => false,
});

while let Some(result) = stream.next().await {
let incoming_block = match result {
Ok(block) => block,
Err(e) => {
error!(error = %e, "Error receiving block from stream");
// Error from subscription, exit the stream
_ = sender.try_stream(e).await;
return;
}
};

while let Some(incoming_block) = stream.next().await {
let incoming_block_num = incoming_block.number();
info!(block_number = incoming_block_num, "Received block header");

Expand Down
69 changes: 53 additions & 16 deletions src/robust_provider/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,51 @@ use std::{marker::PhantomData, time::Duration};

use alloy::{network::Network, providers::Provider};

use crate::robust_provider::{Error, IntoProvider, RobustProvider};
use crate::robust_provider::{
Error, IntoProvider, RobustProvider, subscription::DEFAULT_RECONNECT_INTERVAL,
};

// RPC retry and timeout settings
/// Default timeout used by `RobustProvider`
pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60);
pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(60);
/// Default timeout for subscriptions
pub const DEFAULT_SUBSCRIPTION_TIMEOUT: Duration = Duration::from_secs(120);
/// Default maximum number of retry attempts.
pub const DEFAULT_MAX_RETRIES: usize = 3;
/// Default base delay between retries.
pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1);

#[derive(Clone)]
pub struct RobustProviderBuilder<N: Network, P: IntoProvider<N>> {
providers: Vec<P>,
max_timeout: Duration,
primary_provider: P,
fallback_providers: Vec<P>,
call_timeout: Duration,
subscription_timeout: Duration,
max_retries: usize,
min_delay: Duration,
reconnect_interval: Duration,
_network: PhantomData<N>,
}

impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
/// Create a new `RobustProvider` with default settings.
/// Create a new [`RobustProvider`] with default settings.
///
/// The provided provider is treated as the primary provider.
#[must_use]
pub fn new(provider: P) -> Self {
Self {
providers: vec![provider],
max_timeout: DEFAULT_MAX_TIMEOUT,
primary_provider: provider,
fallback_providers: vec![],
call_timeout: DEFAULT_CALL_TIMEOUT,
subscription_timeout: DEFAULT_SUBSCRIPTION_TIMEOUT,
max_retries: DEFAULT_MAX_RETRIES,
min_delay: DEFAULT_MIN_DELAY,
reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
_network: PhantomData,
}
}

/// Create a new `RobustProvider` with no retry attempts and only timeout set.
/// Create a new [`RobustProvider`] with no retry attempts and only timeout set.
///
/// The provided provider is treated as the primary provider.
#[must_use]
Expand All @@ -49,14 +59,24 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
/// Fallback providers are used when the primary provider times out or fails.
#[must_use]
pub fn fallback(mut self, provider: P) -> Self {
self.providers.push(provider);
self.fallback_providers.push(provider);
self
}

/// Set the maximum timeout for RPC operations.
#[must_use]
pub fn max_timeout(mut self, timeout: Duration) -> Self {
self.max_timeout = timeout;
pub fn call_timeout(mut self, timeout: Duration) -> Self {
self.call_timeout = timeout;
self
}

/// Set the timeout for subscription operations.
///
/// This should be set higher than [`call_timeout`](Self::call_timeout) to accommodate chains
/// with slow block times. Default is [`DEFAULT_SUBSCRIPTION_TIMEOUT`].
#[must_use]
pub fn subscription_timeout(mut self, timeout: Duration) -> Self {
self.subscription_timeout = timeout;
self
}

Expand All @@ -74,6 +94,17 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
self
}

/// Set the interval for attempting to reconnect to the primary provider.
///
/// After a failover to a fallback provider, the subscription will periodically
/// attempt to reconnect to the primary provider at this interval.
/// Default is [`DEFAULT_RECONNECT_INTERVAL`].
#[must_use]
pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self {
self.reconnect_interval = reconnect_interval;
self
}

/// Build the `RobustProvider`.
///
/// Final builder method: consumes the builder and returns the built [`RobustProvider`].
Expand All @@ -82,15 +113,21 @@ impl<N: Network, P: IntoProvider<N>> RobustProviderBuilder<N, P> {
///
/// Returns an error if any of the providers fail to connect.
pub async fn build(self) -> Result<RobustProvider<N>, Error> {
let mut providers = vec![];
for p in self.providers {
providers.push(p.into_provider().await?.root().to_owned());
let primary_provider = self.primary_provider.into_provider().await?.root().to_owned();

let mut fallback_providers = vec![];
for p in self.fallback_providers {
fallback_providers.push(p.into_provider().await?.root().to_owned());
}

Ok(RobustProvider {
providers,
max_timeout: self.max_timeout,
primary_provider,
fallback_providers,
call_timeout: self.call_timeout,
subscription_timeout: self.subscription_timeout,
max_retries: self.max_retries,
min_delay: self.min_delay,
reconnect_interval: self.reconnect_interval,
})
}
}
2 changes: 2 additions & 0 deletions src/robust_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ pub mod builder;
pub mod error;
pub mod provider;
pub mod provider_conversion;
pub mod subscription;

pub use builder::*;
pub use error::Error;
pub use provider::RobustProvider;
pub use provider_conversion::{IntoProvider, IntoRobustProvider};
pub use subscription::RobustSubscription;
Loading
Loading