From 03d4611de11aa4cd8b11981645bf4667b47eb17d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:24:49 -0700 Subject: [PATCH 1/7] chain: Simplify EthereumAdapter.load_blocks --- chain/ethereum/src/adapter.rs | 3 +- chain/ethereum/src/chain.rs | 5 --- chain/ethereum/src/ethereum_adapter.rs | 53 ++++++++++++-------------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 469e8932b5e..8421bb0422e 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -26,7 +26,6 @@ use graph::prelude::*; use graph::{ blockchain as bc, components::metrics::{CounterVec, GaugeVec, HistogramVec}, - futures01::Stream, petgraph::{self, graphmap::GraphMap}, }; @@ -1107,7 +1106,7 @@ pub trait EthereumAdapter: Send + Sync + 'static { logger: Logger, chain_store: Arc, block_hashes: HashSet, - ) -> Box, Error = Error> + Send>; + ) -> Result>, Error>; /// Find a block by its hash. fn block_by_hash( diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 911d4d3ebfe..0408771f23e 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -10,7 +10,6 @@ use graph::components::network_provider::ChainName; use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::firehose::{FirehoseEndpoint, ForkStep}; -use graph::futures03::compat::Future01CompatExt; use graph::futures03::TryStreamExt; use graph::prelude::{ retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock, @@ -1060,7 +1059,6 @@ impl TriggersAdapterTrait for TriggersAdapter { } async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { - use graph::futures01::stream::Stream; use graph::prelude::LightEthereumBlockExt; let block = match self.chain_client.as_ref() { @@ -1111,9 +1109,6 @@ impl TriggersAdapterTrait for TriggersAdapter { self.chain_store.cheap_clone(), HashSet::from_iter(Some(block.hash_as_h256())), ) - .await - .collect() - .compat() .await?; assert_eq!(blocks.len(), 1); diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index e0714c24f02..713177abfef 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1734,7 +1734,7 @@ impl EthereumAdapterTrait for EthereumAdapter { logger: Logger, chain_store: Arc, block_hashes: HashSet, - ) -> Box, Error = Error> + Send> { + ) -> Result>, Error> { let block_hashes: Vec<_> = block_hashes.iter().cloned().collect(); // Search for the block in the store first then use json-rpc as a backup. let mut blocks: Vec> = chain_store @@ -1756,27 +1756,25 @@ impl EthereumAdapterTrait for EthereumAdapter { // Return a stream that lazily loads batches of blocks. debug!(logger, "Requesting {} block(s)", missing_blocks.len()); - Box::new( - self.load_blocks_rpc(logger.clone(), missing_blocks) - .collect() - .map(move |new_blocks| { - let upsert_blocks: Vec<_> = new_blocks - .iter() - .map(|block| BlockFinality::Final(block.clone())) - .collect(); - let block_refs: Vec<_> = upsert_blocks - .iter() - .map(|block| block as &dyn graph::blockchain::Block) - .collect(); - if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) { - error!(logger, "Error writing to block cache {}", e); - } - blocks.extend(new_blocks); - blocks.sort_by_key(|block| block.number); - stream::iter_ok(blocks) - }) - .flatten_stream(), - ) + let new_blocks = self + .load_blocks_rpc(logger.clone(), missing_blocks) + .collect() + .compat() + .await?; + let upsert_blocks: Vec<_> = new_blocks + .iter() + .map(|block| BlockFinality::Final(block.clone())) + .collect(); + let block_refs: Vec<_> = upsert_blocks + .iter() + .map(|block| block as &dyn graph::blockchain::Block) + .collect(); + if let Err(e) = chain_store.upsert_light_blocks(block_refs.as_slice()) { + error!(logger, "Error writing to block cache {}", e); + } + blocks.extend(new_blocks); + blocks.sort_by_key(|block| block.number); + Ok(blocks) } } @@ -1911,10 +1909,11 @@ pub(crate) async fn blocks_with_triggers( let logger2 = logger.cheap_clone(); - let blocks = eth + let blocks: Vec<_> = eth .load_blocks(logger.cheap_clone(), chain_store.clone(), block_hashes) - .await - .and_then( + .await? + .into_iter() + .map( move |block| match triggers_by_block.remove(&(block.number() as BlockNumber)) { Some(triggers) => Ok(BlockWithTriggers::new( BlockFinality::Final(block), @@ -1927,9 +1926,7 @@ pub(crate) async fn blocks_with_triggers( )), }, ) - .collect() - .compat() - .await?; + .collect::>()?; // Filter out call triggers that come from unsuccessful transactions let futures = blocks.into_iter().map(|block| { From 7dd404d685edde8c77b28e0f1d9e1461c8dcae93 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:27:28 -0700 Subject: [PATCH 2/7] chain: Modernize EthereumAdapter.latest_block_header --- chain/ethereum/src/adapter.rs | 4 +- chain/ethereum/src/ethereum_adapter.rs | 51 ++++++++++++-------------- chain/ethereum/src/ingestor.rs | 1 - 3 files changed, 25 insertions(+), 31 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 8421bb0422e..4e0ccfe32fb 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1088,10 +1088,10 @@ pub trait EthereumAdapter: Send + Sync + 'static { ) -> Box + Send + Unpin>; /// Get the latest block, with only the header and transaction hashes. - fn latest_block_header( + async fn latest_block_header( &self, logger: &Logger, - ) -> Box, Error = bc::IngestorError> + Send>; + ) -> Result, bc::IngestorError>; fn load_block( &self, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 713177abfef..3ac0f4bd77b 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1258,39 +1258,34 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(ident) } - fn latest_block_header( + async fn latest_block_header( &self, logger: &Logger, - ) -> Box, Error = IngestorError> + Send> { + ) -> Result, IngestorError> { let web3 = self.web3.clone(); - Box::new( - retry("eth_getBlockByNumber(latest) no txs RPC call", logger) - .redact_log_urls(true) - .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - let web3 = web3.cheap_clone(); - async move { - let block_opt = web3 - .eth() - .block(Web3BlockNumber::Latest.into()) - .await - .map_err(|e| { - anyhow!("could not get latest block from Ethereum: {}", e) - })?; + retry("eth_getBlockByNumber(latest) no txs RPC call", logger) + .redact_log_urls(true) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + let block_opt = web3 + .eth() + .block(Web3BlockNumber::Latest.into()) + .await + .map_err(|e| anyhow!("could not get latest block from Ethereum: {}", e))?; - block_opt - .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into()) - } - }) - .map_err(move |e| { - e.into_inner().unwrap_or_else(move || { - anyhow!("Ethereum node took too long to return latest block").into() - }) + block_opt + .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into()) + } + }) + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!("Ethereum node took too long to return latest block").into() }) - .boxed() - .compat(), - ) + }) + .await } fn latest_block( diff --git a/chain/ethereum/src/ingestor.rs b/chain/ethereum/src/ingestor.rs index e0fc8c5becd..fdbbac0d3a7 100644 --- a/chain/ethereum/src/ingestor.rs +++ b/chain/ethereum/src/ingestor.rs @@ -210,7 +210,6 @@ impl PollingBlockIngestor { ) -> Result { eth_adapter .latest_block_header(&logger) - .compat() .await .map(|block| block.into()) } From 20139ab43cac22f029a3078ced3bfba82c970f0b Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:29:49 -0700 Subject: [PATCH 3/7] chain: Modernize EthereumAdapter.latest_block --- chain/ethereum/src/adapter.rs | 6 +-- chain/ethereum/src/ethereum_adapter.rs | 52 +++++++++++--------------- 2 files changed, 23 insertions(+), 35 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 4e0ccfe32fb..55d1e41d3f2 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -17,7 +17,6 @@ use prost_types::Any; use std::cmp; use std::collections::{HashMap, HashSet}; use std::fmt; -use std::marker::Unpin; use thiserror::Error; use tiny_keccak::keccak256; use web3::types::{Address, Log, H256}; @@ -1082,10 +1081,7 @@ pub trait EthereumAdapter: Send + Sync + 'static { async fn net_identifiers(&self) -> Result; /// Get the latest block, including full transactions. - fn latest_block( - &self, - logger: &Logger, - ) -> Box + Send + Unpin>; + async fn latest_block(&self, logger: &Logger) -> Result; /// Get the latest block, with only the header and transaction hashes. async fn latest_block_header( diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 3ac0f4bd77b..29ced61b7fb 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1288,38 +1288,30 @@ impl EthereumAdapterTrait for EthereumAdapter { .await } - fn latest_block( - &self, - logger: &Logger, - ) -> Box + Send + Unpin> { + async fn latest_block(&self, logger: &Logger) -> Result { let web3 = self.web3.clone(); - Box::new( - retry("eth_getBlockByNumber(latest) with txs RPC call", logger) - .redact_log_urls(true) - .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - let web3 = web3.cheap_clone(); - async move { - let block_opt = web3 - .eth() - .block_with_txs(Web3BlockNumber::Latest.into()) - .await - .map_err(|e| { - anyhow!("could not get latest block from Ethereum: {}", e) - })?; - block_opt - .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into()) - } - }) - .map_err(move |e| { - e.into_inner().unwrap_or_else(move || { - anyhow!("Ethereum node took too long to return latest block").into() - }) + retry("eth_getBlockByNumber(latest) with txs RPC call", logger) + .redact_log_urls(true) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + let block_opt = web3 + .eth() + .block_with_txs(Web3BlockNumber::Latest.into()) + .await + .map_err(|e| anyhow!("could not get latest block from Ethereum: {}", e))?; + block_opt + .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into()) + } + }) + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!("Ethereum node took too long to return latest block").into() }) - .boxed() - .compat(), - ) + }) + .await } fn load_block( From 6d6b754b7a4d2e50c2993a7f935c3276d6595bb8 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:33:42 -0700 Subject: [PATCH 4/7] chain: Modernize EthereumAdapter.load_block, block_by_hash and by_number --- chain/ethereum/src/adapter.rs | 12 +-- chain/ethereum/src/ethereum_adapter.rs | 113 +++++++++++----------- chain/ethereum/src/ingestor.rs | 2 - node/src/manager/commands/chain.rs | 2 - node/src/manager/commands/check_blocks.rs | 2 - 5 files changed, 60 insertions(+), 71 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 55d1e41d3f2..bc489334ba6 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1089,11 +1089,11 @@ pub trait EthereumAdapter: Send + Sync + 'static { logger: &Logger, ) -> Result, bc::IngestorError>; - fn load_block( + async fn load_block( &self, logger: &Logger, block_hash: H256, - ) -> Box + Send>; + ) -> Result; /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. @@ -1105,17 +1105,17 @@ pub trait EthereumAdapter: Send + Sync + 'static { ) -> Result>, Error>; /// Find a block by its hash. - fn block_by_hash( + async fn block_by_hash( &self, logger: &Logger, block_hash: H256, - ) -> Box, Error = Error> + Send>; + ) -> Result, Error>; - fn block_by_number( + async fn block_by_number( &self, logger: &Logger, block_number: BlockNumber, - ) -> Box, Error = Error> + Send>; + ) -> Result, Error>; /// Load full information for the specified `block` (in particular, transaction receipts). fn load_full_block( diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 29ced61b7fb..2c2ce691c51 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1314,92 +1314,87 @@ impl EthereumAdapterTrait for EthereumAdapter { .await } - fn load_block( + async fn load_block( &self, logger: &Logger, block_hash: H256, - ) -> Box + Send> { - Box::new( - self.block_by_hash(logger, block_hash) - .and_then(move |block_opt| { - block_opt.ok_or_else(move || { - anyhow!( - "Ethereum node could not find block with hash {}", - block_hash - ) - }) - }), - ) + ) -> Result { + self.block_by_hash(logger, block_hash) + .await? + .ok_or_else(move || { + anyhow!( + "Ethereum node could not find block with hash {}", + block_hash + ) + }) } - fn block_by_hash( + async fn block_by_hash( &self, logger: &Logger, block_hash: H256, - ) -> Box, Error = Error> + Send> { + ) -> Result, Error> { let web3 = self.web3.clone(); let logger = logger.clone(); let retry_log_message = format!( "eth_getBlockByHash RPC call for block hash {:?}", block_hash ); - Box::new( - retry(retry_log_message, &logger) - .redact_log_urls(true) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - Box::pin(web3.eth().block_with_txs(BlockId::Hash(block_hash))) - .compat() - .from_err() - .compat() - }) - .map_err(move |e| { - e.into_inner().unwrap_or_else(move || { - anyhow!("Ethereum node took too long to return block {}", block_hash) - }) + + retry(retry_log_message, &logger) + .redact_log_urls(true) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block_with_txs(BlockId::Hash(block_hash)) + .await + .map_err(Error::from) + } + }) + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!("Ethereum node took too long to return block {}", block_hash) }) - .boxed() - .compat(), - ) + }) + .await } - fn block_by_number( + async fn block_by_number( &self, logger: &Logger, block_number: BlockNumber, - ) -> Box, Error = Error> + Send> { + ) -> Result, Error> { let web3 = self.web3.clone(); let logger = logger.clone(); let retry_log_message = format!( "eth_getBlockByNumber RPC call for block number {}", block_number ); - Box::new( - retry(retry_log_message, &logger) - .redact_log_urls(true) - .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - let web3 = web3.cheap_clone(); - async move { - web3.eth() - .block_with_txs(BlockId::Number(block_number.into())) - .await - .map_err(Error::from) - } - }) - .map_err(move |e| { - e.into_inner().unwrap_or_else(move || { - anyhow!( - "Ethereum node took too long to return block {}", - block_number - ) - }) + retry(retry_log_message, &logger) + .redact_log_urls(true) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block_with_txs(BlockId::Number(block_number.into())) + .await + .map_err(Error::from) + } + }) + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!( + "Ethereum node took too long to return block {}", + block_number + ) }) - .boxed() - .compat(), - ) + }) + .await } fn load_full_block( diff --git a/chain/ethereum/src/ingestor.rs b/chain/ethereum/src/ingestor.rs index fdbbac0d3a7..935cb525936 100644 --- a/chain/ethereum/src/ingestor.rs +++ b/chain/ethereum/src/ingestor.rs @@ -3,7 +3,6 @@ use crate::{EthereumAdapter, EthereumAdapterTrait as _}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockchainKind; use graph::components::network_provider::ChainName; -use graph::futures03::compat::Future01CompatExt as _; use graph::slog::o; use graph::util::backoff::ExponentialBackoff; use graph::{ @@ -175,7 +174,6 @@ impl PollingBlockIngestor { // Get the fully populated block let block = eth_adapter .block_by_hash(logger, block_hash) - .compat() .await? .ok_or(IngestorError::BlockUnavailable(block_hash))?; let ethereum_block = eth_adapter.load_full_block(&logger, block).await?; diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 2c07c3d37b8..e1f460a7581 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -10,7 +10,6 @@ use graph::cheap_clone::CheapClone; use graph::components::network_provider::ChainIdentifierStore; use graph::components::network_provider::ChainName; use graph::components::store::StoreError; -use graph::futures03::compat::Future01CompatExt as _; use graph::prelude::BlockNumber; use graph::prelude::ChainStore as _; use graph::prelude::LightEthereumBlockExt; @@ -273,7 +272,6 @@ pub async fn ingest( ) -> Result<(), Error> { let Some(block) = ethereum_adapter .block_by_number(logger, number) - .compat() .await .map_err(|e| anyhow!("error getting block number {number}: {}", e))? else { diff --git a/node/src/manager/commands/check_blocks.rs b/node/src/manager/commands/check_blocks.rs index 6a82c67c3e6..0afa54bd7d3 100644 --- a/node/src/manager/commands/check_blocks.rs +++ b/node/src/manager/commands/check_blocks.rs @@ -153,7 +153,6 @@ async fn handle_multiple_block_hashes( mod steps { use super::*; - use graph::futures03::compat::Future01CompatExt; use graph::{ anyhow::bail, prelude::serde_json::{self, Value}, @@ -204,7 +203,6 @@ mod steps { ) -> anyhow::Result { let provider_block = ethereum_adapter .block_by_hash(logger, *block_hash) - .compat() .await .with_context(|| format!("failed to fetch block {block_hash}"))? .ok_or_else(|| anyhow!("JRPC provider found no block with hash {block_hash:?}"))?; From 470244400613179bfadd6be1d7622f9dc13c5304 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:39:43 -0700 Subject: [PATCH 5/7] chain: Modernize EthereumAdapter.load_full_block --- chain/ethereum/src/adapter.rs | 6 ++-- chain/ethereum/src/ethereum_adapter.rs | 44 +++++++++++--------------- 2 files changed, 20 insertions(+), 30 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index bc489334ba6..cd174926066 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1118,13 +1118,11 @@ pub trait EthereumAdapter: Send + Sync + 'static { ) -> Result, Error>; /// Load full information for the specified `block` (in particular, transaction receipts). - fn load_full_block( + async fn load_full_block( &self, logger: &Logger, block: LightEthereumBlock, - ) -> Pin< - Box> + Send + '_>, - >; + ) -> Result; /// Find a block by its number, according to the Ethereum node. /// diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 2c2ce691c51..716879c83c9 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1397,12 +1397,11 @@ impl EthereumAdapterTrait for EthereumAdapter { .await } - fn load_full_block( + async fn load_full_block( &self, logger: &Logger, block: LightEthereumBlock, - ) -> Pin> + Send + '_>> - { + ) -> Result { let web3 = Arc::clone(&self.web3); let logger = logger.clone(); let block_hash = block.hash.expect("block is missing block hash"); @@ -1411,36 +1410,29 @@ impl EthereumAdapterTrait for EthereumAdapter { // request an empty batch which is not valid in JSON-RPC. if block.transactions.is_empty() { trace!(logger, "Block {} contains no transactions", block_hash); - return Box::pin(std::future::ready(Ok(EthereumBlock { + return Ok(EthereumBlock { block: Arc::new(block), transaction_receipts: Vec::new(), - }))); + }); } let hashes: Vec<_> = block.transactions.iter().map(|txn| txn.hash).collect(); - let supports_block_receipts_future = self.check_block_receipt_support_and_update_cache( - web3.clone(), - block_hash, - self.supports_eip_1898, - self.call_only, - logger.clone(), - ); + let supports_block_receipts = self + .check_block_receipt_support_and_update_cache( + web3.clone(), + block_hash, + self.supports_eip_1898, + self.call_only, + logger.clone(), + ) + .await; - let receipts_future = supports_block_receipts_future - .then(move |supports_block_receipts| { - fetch_receipts_with_retry(web3, hashes, block_hash, logger, supports_block_receipts) + fetch_receipts_with_retry(web3, hashes, block_hash, logger, supports_block_receipts) + .await + .map(|transaction_receipts| EthereumBlock { + block: Arc::new(block), + transaction_receipts, }) - .boxed(); - - let block_future = - futures03::TryFutureExt::map_ok(receipts_future, move |transaction_receipts| { - EthereumBlock { - block: Arc::new(block), - transaction_receipts, - } - }); - - Box::pin(block_future) } fn block_hash_by_block_number( From 15601633c8792b244cbc944cb23684dc0073f873 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:41:09 -0700 Subject: [PATCH 6/7] chain: Modernize EthereumAdapter.block_hash_by_block_number --- chain/ethereum/src/adapter.rs | 4 +- chain/ethereum/src/ethereum_adapter.rs | 51 ++++++++++++-------------- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index cd174926066..cab6682ce40 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1133,11 +1133,11 @@ pub trait EthereumAdapter: Send + Sync + 'static { /// those confirmations. /// If the Ethereum node is far behind in processing blocks, even old blocks can be subject to /// reorgs. - fn block_hash_by_block_number( + async fn block_hash_by_block_number( &self, logger: &Logger, block_number: BlockNumber, - ) -> Box, Error = Error> + Send>; + ) -> Result, Error>; /// Finds the hash and number of the lowest non-null block with height greater than or equal to /// the given number. diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 716879c83c9..02b7efe7f11 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1435,42 +1435,39 @@ impl EthereumAdapterTrait for EthereumAdapter { }) } - fn block_hash_by_block_number( + async fn block_hash_by_block_number( &self, logger: &Logger, block_number: BlockNumber, - ) -> Box, Error = Error> + Send> { + ) -> Result, Error> { let web3 = self.web3.clone(); let retry_log_message = format!( "eth_getBlockByNumber RPC call for block number {}", block_number ); - Box::new( - retry(retry_log_message, logger) - .redact_log_urls(true) - .no_limit() - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - let web3 = web3.cheap_clone(); - async move { - web3.eth() - .block(BlockId::Number(block_number.into())) - .await - .map(|block_opt| block_opt.and_then(|block| block.hash)) - .map_err(Error::from) - } + retry(retry_log_message, logger) + .redact_log_urls(true) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block(BlockId::Number(block_number.into())) + .await + .map(|block_opt| block_opt.and_then(|block| block.hash)) + .map_err(Error::from) + } + }) + .await + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!( + "Ethereum node took too long to return data for block #{}", + block_number + ) }) - .boxed() - .compat() - .map_err(move |e| { - e.into_inner().unwrap_or_else(move || { - anyhow!( - "Ethereum node took too long to return data for block #{}", - block_number - ) - }) - }), - ) + }) } fn get_balance( From 45dda04d4ae18231439a995252ca3e8b926c750d Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Sat, 19 Apr 2025 22:44:53 -0700 Subject: [PATCH 7/7] chain: Modernize EthereumAdapter.get_balance and get_code --- chain/ethereum/src/adapter.rs | 9 +++---- chain/ethereum/src/ethereum_adapter.rs | 26 +++++++++---------- chain/ethereum/src/runtime/runtime_adapter.rs | 15 +++-------- 3 files changed, 19 insertions(+), 31 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index cab6682ce40..93a7fc60781 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -7,7 +7,6 @@ use graph::data_source::common::ContractCall; use graph::firehose::CallToFilter; use graph::firehose::CombinedFilter; use graph::firehose::LogFilter; -use graph::futures01::Future; use graph::prelude::web3::types::Bytes; use graph::prelude::web3::types::H160; use graph::prelude::web3::types::U256; @@ -1170,20 +1169,20 @@ pub trait EthereumAdapter: Send + Sync + 'static { cache: Arc, ) -> Result>, call::Source)>, ContractCallError>; - fn get_balance( + async fn get_balance( &self, logger: &Logger, address: H160, block_ptr: BlockPtr, - ) -> Box + Send>; + ) -> Result; // Returns the compiled bytecode of a smart contract - fn get_code( + async fn get_code( &self, logger: &Logger, address: H160, block_ptr: BlockPtr, - ) -> Box + Send>; + ) -> Result; } #[cfg(test)] diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 02b7efe7f11..1c1d214f6a5 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -500,12 +500,12 @@ impl EthereumAdapter { } } - fn code( + async fn code( &self, logger: &Logger, address: Address, block_ptr: BlockPtr, - ) -> impl Future + Send { + ) -> Result { let web3 = self.web3.clone(); let logger = Logger::new(&logger, o!("provider" => self.provider.clone())); @@ -531,17 +531,16 @@ impl EthereumAdapter { } } }) + .await .map_err(|e| e.into_inner().unwrap_or(EthereumRpcError::Timeout)) - .boxed() - .compat() } - fn balance( + async fn balance( &self, logger: &Logger, address: Address, block_ptr: BlockPtr, - ) -> impl Future + Send { + ) -> Result { let web3 = self.web3.clone(); let logger = Logger::new(&logger, o!("provider" => self.provider.clone())); @@ -567,9 +566,8 @@ impl EthereumAdapter { } } }) + .await .map_err(|e| e.into_inner().unwrap_or(EthereumRpcError::Timeout)) - .boxed() - .compat() } async fn call( @@ -1470,32 +1468,32 @@ impl EthereumAdapterTrait for EthereumAdapter { }) } - fn get_balance( + async fn get_balance( &self, logger: &Logger, address: H160, block_ptr: BlockPtr, - ) -> Box + Send> { + ) -> Result { debug!( logger, "eth_getBalance"; "address" => format!("{}", address), "block" => format!("{}", block_ptr) ); - Box::new(self.balance(logger, address, block_ptr)) + self.balance(logger, address, block_ptr).await } - fn get_code( + async fn get_code( &self, logger: &Logger, address: H160, block_ptr: BlockPtr, - ) -> Box + Send> { + ) -> Result { debug!( logger, "eth_getCode"; "address" => format!("{}", address), "block" => format!("{}", block_ptr) ); - Box::new(self.code(logger, address, block_ptr)) + self.code(logger, address, block_ptr).await } async fn next_existing_ptr_to_number( diff --git a/chain/ethereum/src/runtime/runtime_adapter.rs b/chain/ethereum/src/runtime/runtime_adapter.rs index 01f148bdd4c..951958d786b 100644 --- a/chain/ethereum/src/runtime/runtime_adapter.rs +++ b/chain/ethereum/src/runtime/runtime_adapter.rs @@ -14,7 +14,6 @@ use graph::data::store::scalar::BigInt; use graph::data::subgraph::API_VERSION_0_0_9; use graph::data_source; use graph::data_source::common::{ContractCall, MappingABI}; -use graph::futures03::compat::Future01CompatExt; use graph::prelude::web3::types::H160; use graph::runtime::gas::Gas; use graph::runtime::{AscIndexId, IndexForAscTypeId}; @@ -227,11 +226,7 @@ fn eth_get_balance( let address: H160 = asc_get(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)?; - let result = graph::block_on( - eth_adapter - .get_balance(logger, address, block_ptr.clone()) - .compat(), - ); + let result = graph::block_on(eth_adapter.get_balance(logger, address, block_ptr.clone())); match result { Ok(v) => { @@ -265,12 +260,8 @@ fn eth_has_code( let address: H160 = asc_get(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)?; - let result = graph::block_on( - eth_adapter - .get_code(logger, address, block_ptr.clone()) - .compat(), - ) - .map(|v| !v.0.is_empty()); + let result = graph::block_on(eth_adapter.get_code(logger, address, block_ptr.clone())) + .map(|v| !v.0.is_empty()); match result { Ok(v) => Ok(asc_new(ctx.heap, &AscWrapped { inner: v }, &ctx.gas)?),