Skip to content

Commit

Permalink
Dzejkop/timeout-when-waiting-for-blocks (#58)
Browse files Browse the repository at this point in the history
* Add timeout

* Mae it configurable
  • Loading branch information
Dzejkop authored Oct 10, 2024
1 parent 6ec8bac commit 5fe9c32
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
15 changes: 15 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ pub struct TxSitterConfig {
)]
pub hard_reorg_interval: Duration,

/// Max amount of time to wait for a new block from the RPC block stream
#[serde(
with = "humantime_serde",
default = "default::block_stream_timeout"
)]
pub block_stream_timeout: Duration,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub predefined: Option<Predefined>,

Expand Down Expand Up @@ -219,6 +226,10 @@ mod default {
Duration::from_secs(60 * 60)
}

pub fn block_stream_timeout() -> Duration {
Duration::from_secs(60)
}

pub mod metrics {
pub fn host() -> String {
"127.0.0.1".to_string()
Expand Down Expand Up @@ -249,6 +260,7 @@ mod tests {
escalation_interval = "1h"
soft_reorg_interval = "1m"
hard_reorg_interval = "1h"
block_stream_timeout = "1m"
[server]
host = "127.0.0.1:3000"
Expand All @@ -266,6 +278,7 @@ mod tests {
escalation_interval = "1h"
soft_reorg_interval = "1m"
hard_reorg_interval = "1h"
block_stream_timeout = "1m"
[server]
host = "127.0.0.1:3000"
Expand All @@ -289,6 +302,7 @@ mod tests {
escalation_interval: Duration::from_secs(60 * 60),
soft_reorg_interval: default::soft_reorg_interval(),
hard_reorg_interval: default::hard_reorg_interval(),
block_stream_timeout: default::block_stream_timeout(),
predefined: None,
telemetry: None,
},
Expand Down Expand Up @@ -317,6 +331,7 @@ mod tests {
escalation_interval: Duration::from_secs(60 * 60),
soft_reorg_interval: default::soft_reorg_interval(),
hard_reorg_interval: default::hard_reorg_interval(),
block_stream_timeout: default::block_stream_timeout(),
predefined: None,
telemetry: None,
},
Expand Down
28 changes: 23 additions & 5 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethers::types::{Block, BlockNumber, H256};
use eyre::{Context, ContextCompat};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::time::timeout;

use crate::app::App;
use crate::broadcast_utils::gas_estimation::{
Expand Down Expand Up @@ -34,18 +35,35 @@ async fn index_inner(app: Arc<App>, chain_id: u64) -> eyre::Result<()> {
let rpc = app.http_provider(chain_id).await?;

tracing::info!("Subscribing to new blocks");
// Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream
let mut blocks_stream = ws_rpc.subscribe_blocks().await?;

// Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head
tracing::info!("Backfilling blocks");
if let Some(latest_block) = blocks_stream.next().await {
backfill_to_block(app.clone(), chain_id, &rpc, latest_block).await?;
}

// Index incoming blocks from the stream
while let Some(block) = blocks_stream.next().await {
index_block(app.clone(), chain_id, &rpc, block).await?;
loop {
let next_block = timeout(
app.config.service.block_stream_timeout,
blocks_stream.next(),
)
.await;

match next_block {
Ok(Some(block)) => {
index_block(app.clone(), chain_id, &rpc, block).await?;
}
Ok(None) => {
// Stream ended, break out of the loop
tracing::info!("Block stream ended");
break;
}
Err(_) => {
// Timeout occurred
tracing::warn!("Timed out waiting for a block");
break;
}
}
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions tests/common/service_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl ServiceBuilder {
escalation_interval: self.escalation_interval,
soft_reorg_interval: self.soft_reorg_interval,
hard_reorg_interval: self.hard_reorg_interval,
block_stream_timeout: Duration::from_secs(60),
telemetry: None,
predefined: Some(Predefined {
network: PredefinedNetwork {
Expand Down

0 comments on commit 5fe9c32

Please sign in to comment.