diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 8cf9aeaac2d..285374bffbf 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -144,6 +144,7 @@ impl Blockchain for Chain { }); Ok(Box::new(FirehoseBlockStream::new( + self.chain_store(), deployment.hash, self.chain_client(), store.block_ptr(), diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index bbd7a1ac410..e72c4d77a39 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -137,6 +137,7 @@ impl Blockchain for Chain { }); Ok(Box::new(FirehoseBlockStream::new( + self.chain_store(), deployment.hash, self.chain_client(), store.block_ptr(), diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index c55a3ebaf77..4e4f26fd9b1 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -100,6 +100,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 905d057f93d..906aed29d09 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1719,10 +1719,16 @@ impl EthereumAdapterTrait for EthereumAdapter { let mut blocks: Vec> = blocks_map .into_iter() - .filter_map(|(_number, values)| { + .filter_map(|(number, values)| { if values.len() == 1 { Arc::new(values[0].clone()).into() } else { + warn!( + &logger, + "Expected one block for block number {:?}, found {}", + number, + values.len() + ); None } }) @@ -1739,6 +1745,8 @@ impl EthereumAdapterTrait for EthereumAdapter { "Loading {} block(s) not in the block cache", missing_blocks.len() ); + + debug!(logger, "Missing blocks {:?}", missing_blocks); } Box::new( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 238551df841..db134fef9e5 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -136,6 +136,7 @@ impl BlockStreamBuilder for NearStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 826e1fe2c91..0721a3a9c6f 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -227,6 +227,7 @@ impl BlockStreamBuilder for StarknetStreamBuilder { let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); Ok(Box::new(FirehoseBlockStream::new( + chain.chain_store(), deployment.hash, chain.chain_client(), subgraph_current_block, diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index 254ccd42f82..5f4fc02fcfc 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -4,7 +4,7 @@ use super::block_stream::{ use super::client::ChainClient; use super::Blockchain; use crate::blockchain::block_stream::FirehoseCursor; -use crate::blockchain::TriggerFilter; +use crate::blockchain::{Block, TriggerFilter}; use crate::prelude::*; use crate::util::backoff::ExponentialBackoff; use crate::{firehose, firehose::FirehoseEndpoint}; @@ -108,6 +108,7 @@ where C: Blockchain, { pub fn new( + chain_store: Arc, deployment: DeploymentHash, client: Arc>, subgraph_current_block: Option, @@ -134,6 +135,7 @@ where let metrics = FirehoseBlockStreamMetrics::new(registry, deployment.clone()); FirehoseBlockStream { stream: Box::pin(stream_blocks( + chain_store, client, cursor, deployment, @@ -148,6 +150,7 @@ where } fn stream_blocks>( + chain_store: Arc, client: Arc>, mut latest_cursor: FirehoseCursor, deployment: DeploymentHash, @@ -257,6 +260,7 @@ fn stream_blocks>( for await response in stream { match process_firehose_response( + chain_store.clone(), &endpoint, response, &mut check_subgraph_continuity, @@ -344,6 +348,7 @@ enum BlockResponse { } async fn process_firehose_response>( + chain_store: Arc, endpoint: &Arc, result: Result, check_subgraph_continuity: &mut bool, @@ -359,11 +364,46 @@ async fn process_firehose_response>( .await .context("Mapping block to BlockStreamEvent failed")?; + if let BlockStreamEvent::ProcessBlock(block, _) = &event { + info!(logger, "Inserting block to cache"; "block_number" => block.block.number(), "block_hash" => format!("{:?}", block.block.hash())); + + let start_time = Instant::now(); + + let result = chain_store + .insert_block(Arc::new(block.block.clone())) + .await; + + let elapsed = start_time.elapsed(); + + match result { + Ok(_) => { + trace!( + logger, + "Block inserted to cache successfully"; + "block_number" => block.block.number(), + "block_hash" => format!("{:?}", block.block.hash()), + "time_taken" => format!("{:?}", elapsed) + ); + } + Err(e) => { + error!( + logger, + "Failed to insert block into store"; + "block_number" => block.block.number(), + "block_hash" => format!("{:?}", block.block.hash()), + "error" => format!("{:?}", e), + "time_taken" => format!("{:?}", elapsed) + ); + } + } + } + if *check_subgraph_continuity { info!(logger, "Firehose started from a subgraph pointer without an existing cursor, ensuring chain continuity"); if let BlockStreamEvent::ProcessBlock(ref block, _) = event { let previous_block_ptr = block.parent_ptr(); + if previous_block_ptr.is_some() && previous_block_ptr.as_ref() != subgraph_current_block { warn!(&logger, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index af43bd1b605..e54a6cf556d 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -445,6 +445,8 @@ pub trait ChainStore: Send + Sync + 'static { /// Insert a block into the store (or update if they are already present). async fn upsert_block(&self, block: Arc) -> Result<(), Error>; + async fn insert_block(&self, block: Arc) -> Result<(), Error>; + fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error>; /// Try to update the head block pointer to the block with the highest block number. diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 487107ed77a..9dd73b20b0c 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1956,6 +1956,27 @@ impl ChainStore { Ok(block_map) } + + async fn save_block(&self, block: Arc, allow_update: bool) -> Result<(), Error> { + // We should always have the parent block available to us at this point. + if let Some(parent_hash) = block.parent_hash() { + let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok()); + self.recent_blocks_cache.insert_block(block); + } + + let pool = self.pool.clone(); + let network = self.chain.clone(); + let storage = self.storage.clone(); + pool.with_conn(move |conn, _| { + conn.transaction(|conn| { + storage + .upsert_block(conn, &network, block.as_ref(), allow_update) + .map_err(CancelableError::from) + }) + }) + .await + .map_err(Error::from) + } } fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result { @@ -1984,24 +2005,11 @@ impl ChainStoreTrait for ChainStore { } async fn upsert_block(&self, block: Arc) -> Result<(), Error> { - // We should always have the parent block available to us at this point. - if let Some(parent_hash) = block.parent_hash() { - let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok()); - self.recent_blocks_cache.insert_block(block); - } + self.save_block(block, true).await + } - let pool = self.pool.clone(); - let network = self.chain.clone(); - let storage = self.storage.clone(); - pool.with_conn(move |conn, _| { - conn.transaction(|conn| { - storage - .upsert_block(conn, &network, block.as_ref(), true) - .map_err(CancelableError::from) - }) - }) - .await - .map_err(Error::from) + async fn insert_block(&self, block: Arc) -> Result<(), Error> { + self.save_block(block, false).await } fn upsert_light_blocks(&self, blocks: &[&dyn Block]) -> Result<(), Error> {