Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proceed to next block after a few retries if Hermes can't parse current block during event sourcing #3906

Merged
merged 7 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- Improve reliabily of event source in `pull` mode by proceeding to next block even if Hermes cannot parse the current block.
Add new configuration option to `event_source` setting: `max_retries` defines how many times Hermes should attempt to pull a block over RPC.
```toml
event_source = { mode = 'pull', interval = '1s', max_retries = 4 }
```
([\#3894](https://github.com/informalsystems/hermes/issues/3894))
3 changes: 2 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,12 @@ grpc_addr = 'http://127.0.0.1:9090'

# b) Pull: for polling for IBC events via the `/block_results` RPC endpoint.
#
# `{ mode = 'pull', interval = '1s' }`
# `{ mode = 'pull', interval = '1s', max_retries = 4 }`
#
# where
#
# - `interval` is the interval at which to poll for blocks. Default: 1s
# - `max_retries` is the maximum number of retries to collect events for each block. Default: 4
#
# This mode should only be used in situations where Hermes misses events that it should be
# receiving, such as when relaying for CosmWasm-enabled chains which emit IBC events without
Expand Down
6 changes: 5 additions & 1 deletion crates/relayer-cli/src/commands/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,14 @@ fn subscribe(
*batch_delay,
rt,
),
EventSourceMode::Pull { interval } => EventSource::rpc(
EventSourceMode::Pull {
interval,
max_retries,
} => EventSource::rpc(
chain_config.id().clone(),
HttpClient::new(config.rpc_addr.clone())?,
*interval,
*max_retries,
rt,
),
}?;
Expand Down
6 changes: 5 additions & 1 deletion crates/relayer/src/chain/cosmos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,14 @@ impl CosmosSdkChain {
*batch_delay,
self.rt.clone(),
),
Mode::Pull { interval } => EventSource::rpc(
Mode::Pull {
interval,
max_retries,
} => EventSource::rpc(
self.config.id.clone(),
self.rpc_client.clone(),
*interval,
*max_retries,
self.rt.clone(),
),
}
Expand Down
9 changes: 9 additions & 0 deletions crates/relayer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub mod default {
Duration::from_secs(1)
}

pub fn max_retries() -> u32 {
4
}

pub fn batch_delay() -> Duration {
Duration::from_millis(500)
}
Expand Down Expand Up @@ -622,6 +626,11 @@ pub enum EventSourceMode {
/// The polling interval
#[serde(default = "default::poll_interval", with = "humantime_serde")]
interval: Duration,

/// The maximum retries to collect the block results
/// before giving up and moving to the next block
#[serde(default = "default::max_retries")]
max_retries: u32,
},
}

Expand Down
4 changes: 3 additions & 1 deletion crates/relayer/src/event/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ impl EventSource {
chain_id: ChainId,
rpc_client: HttpClient,
poll_interval: Duration,
max_retries: u32,
rt: Arc<TokioRuntime>,
) -> Result<(Self, TxEventSourceCmd)> {
let (source, tx) = rpc::EventSource::new(chain_id, rpc_client, poll_interval, rt)?;
let (source, tx) =
rpc::EventSource::new(chain_id, rpc_client, poll_interval, max_retries, rt)?;
Ok((Self::Rpc(source), tx))
}

Expand Down
51 changes: 40 additions & 11 deletions crates/relayer/src/event/source/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ibc_relayer_types::{

use crate::{
chain::tracking::TrackingId,
event::{bus::EventBus, source::Error, IbcEventWithHeight},
event::{bus::EventBus, error::ErrorDetail, source::Error, IbcEventWithHeight},
telemetry,
util::retry::ConstantGrowth,
};
Expand All @@ -45,6 +45,9 @@ pub struct EventSource {
/// Poll interval
poll_interval: Duration,

/// Max retries to collect events
max_retries: u32,

/// Event bus for broadcasting events
event_bus: EventBus<Arc<Result<EventBatch>>>,

Expand All @@ -63,6 +66,7 @@ impl EventSource {
chain_id: ChainId,
rpc_client: HttpClient,
poll_interval: Duration,
max_retries: u32,
rt: Arc<TokioRuntime>,
) -> Result<(Self, TxEventSourceCmd)> {
let event_bus = EventBus::new();
Expand All @@ -73,6 +77,7 @@ impl EventSource {
chain_id,
rpc_client,
poll_interval,
max_retries,
event_bus,
rx_cmd,
last_fetched_height: BlockHeight::from(0_u32),
Expand Down Expand Up @@ -203,19 +208,38 @@ impl EventSource {
for height in heights {
trace!("collecting events at height {height}");

let result = collect_events(&self.rpc_client, &self.chain_id, height).await;
let mut attempts = 0;
let mut backoff = retries_backoff(self.max_retries);

// NOTE: Even if we failed to collect events after max retries,
// we still need to update to move on next block
self.last_fetched_height = height;

match result {
Ok(batch) => {
self.last_fetched_height = height;
loop {
attempts += 1;

if let Some(batch) = batch {
batches.push(batch);
match collect_events(&self.rpc_client, &self.chain_id, height).await {
Ok(batch) => {
if let Some(batch) = batch {
batches.push(batch);
}
break;
}
}
Err(e) => {
error!(%height, "failed to collect events: {e}");
break;
Err(e) => match e.detail() {
ErrorDetail::Rpc(_) if attempts < self.max_retries => {
let delay = backoff.next().expect(
"backoff has attempted to make more iterates than is expected",
);

error!(%height, "failed to collect events: {e}, retrying in {delay:?}...");
sleep(delay).await;
}

_ => {
error!(%height, "failed to collect events after {attempts} attempts: {e}");
break;
}
},
}
}
}
Expand Down Expand Up @@ -244,6 +268,11 @@ fn poll_backoff(poll_interval: Duration) -> impl Iterator<Item = Duration> {
.clamp(poll_interval * 5, usize::MAX)
}

fn retries_backoff(collect_retries: u32) -> impl Iterator<Item = Duration> {
ConstantGrowth::new(Duration::from_secs(1), Duration::from_millis(500))
.clamp(Duration::from_secs(4), collect_retries as usize)
}

fn dedupe(events: Vec<abci::Event>) -> Vec<abci::Event> {
use itertools::Itertools;
use std::hash::{Hash, Hasher};
Expand Down
Loading