diff --git a/src/config.rs b/src/config.rs index 1978ace..6974943 100644 --- a/src/config.rs +++ b/src/config.rs @@ -63,6 +63,13 @@ pub struct TxSitterConfig { )] pub hard_reorg_interval: Duration, + /// Max amount of time to wait for a new block from the RPC block stream + #[serde( + with = "humantime_serde", + default = "default::block_stream_timeout" + )] + pub block_stream_timeout: Duration, + #[serde(default, skip_serializing_if = "Option::is_none")] pub predefined: Option, @@ -219,6 +226,10 @@ mod default { Duration::from_secs(60 * 60) } + pub fn block_stream_timeout() -> Duration { + Duration::from_secs(60) + } + pub mod metrics { pub fn host() -> String { "127.0.0.1".to_string() @@ -249,6 +260,7 @@ mod tests { escalation_interval = "1h" soft_reorg_interval = "1m" hard_reorg_interval = "1h" + block_stream_timeout = "1m" [server] host = "127.0.0.1:3000" @@ -266,6 +278,7 @@ mod tests { escalation_interval = "1h" soft_reorg_interval = "1m" hard_reorg_interval = "1h" + block_stream_timeout = "1m" [server] host = "127.0.0.1:3000" @@ -289,6 +302,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), soft_reorg_interval: default::soft_reorg_interval(), hard_reorg_interval: default::hard_reorg_interval(), + block_stream_timeout: default::block_stream_timeout(), predefined: None, telemetry: None, }, @@ -317,6 +331,7 @@ mod tests { escalation_interval: Duration::from_secs(60 * 60), soft_reorg_interval: default::soft_reorg_interval(), hard_reorg_interval: default::hard_reorg_interval(), + block_stream_timeout: default::block_stream_timeout(), predefined: None, telemetry: None, }, diff --git a/src/tasks/index.rs b/src/tasks/index.rs index c60d3b5..f601a92 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -7,6 +7,7 @@ use ethers::types::{Block, BlockNumber, H256}; use eyre::{Context, ContextCompat}; use futures::stream::FuturesUnordered; use futures::StreamExt; +use tokio::time::timeout; use crate::app::App; use crate::broadcast_utils::gas_estimation::{ @@ -34,18 +35,35 @@ async fn index_inner(app: Arc, chain_id: u64) -> eyre::Result<()> { let rpc = app.http_provider(chain_id).await?; tracing::info!("Subscribing to new blocks"); - // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream let mut blocks_stream = ws_rpc.subscribe_blocks().await?; - // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head tracing::info!("Backfilling blocks"); if let Some(latest_block) = blocks_stream.next().await { backfill_to_block(app.clone(), chain_id, &rpc, latest_block).await?; } - // Index incoming blocks from the stream - while let Some(block) = blocks_stream.next().await { - index_block(app.clone(), chain_id, &rpc, block).await?; + loop { + let next_block = timeout( + app.config.service.block_stream_timeout, + blocks_stream.next(), + ) + .await; + + match next_block { + Ok(Some(block)) => { + index_block(app.clone(), chain_id, &rpc, block).await?; + } + Ok(None) => { + // Stream ended, break out of the loop + tracing::info!("Block stream ended"); + break; + } + Err(_) => { + // Timeout occurred + tracing::warn!("Timed out waiting for a block"); + break; + } + } } Ok(()) diff --git a/tests/common/service_builder.rs b/tests/common/service_builder.rs index 05c428e..48074f8 100644 --- a/tests/common/service_builder.rs +++ b/tests/common/service_builder.rs @@ -58,6 +58,7 @@ impl ServiceBuilder { escalation_interval: self.escalation_interval, soft_reorg_interval: self.soft_reorg_interval, hard_reorg_interval: self.hard_reorg_interval, + block_stream_timeout: Duration::from_secs(60), telemetry: None, predefined: Some(Predefined { network: PredefinedNetwork {