Skip to content

Commit

Permalink
fallback to multiple eth_getTransactionReceipt calls if `eth_getBlo…
Browse files Browse the repository at this point in the history
…ckReceipts` is not present (#94)

* fallback to multiple `eth_getTransactionReceipt` calls if `eth_getBlockReceipts` is not present

* remove unnecessary comment

* edit methods used by command

* move implementation from separate trait to source impl
  • Loading branch information
cool-mestorf authored Nov 3, 2023
1 parent b66ba9b commit be8078b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 63 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Standard types across tables:
|dataset|blocks per request|results per block|method|
|-|-|-|-|
|Blocks|1|1|`eth_getBlockByNumber`|
|Transactions|1|multiple|`eth_getBlockByNumber`|
|Transactions|1|multiple|`eth_getBlockByNumber`, `eth_getBlockReceipts`, `eth_getTransactionReceipt`|
|Logs|multiple|multiple|`eth_getLogs`|
|Contracts|1|multiple|`trace_block`|
|Traces|1|multiple|`trace_block`|
Expand Down
2 changes: 1 addition & 1 deletion crates/freeze/src/datasets/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl CollectByBlock for Transactions {
.ok_or(CollectError::CollectError("block not found".to_string()))?;
let schema = query.schemas.get_schema(&Datatype::Transactions)?;
let receipt = if schema.has_column("gas_used") | schema.has_column("success") {
Some(source.fetcher.get_block_receipts(request.block_number()?).await?)
Some(source.get_tx_receipts_in_block(&block).await?)
} else {
None
};
Expand Down
108 changes: 47 additions & 61 deletions crates/freeze/src/types/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use governor::{
middleware::NoOpMiddleware,
state::{direct::NotKeyed, InMemoryState},
};
use tokio::sync::{AcquireError, Semaphore, SemaphorePermit};
use tokio::{
sync::{AcquireError, Semaphore, SemaphorePermit},
task,
};

use crate::CollectError;

Expand All @@ -30,6 +33,46 @@ pub struct Source {
pub labels: SourceLabels,
}

impl Source {
/// Returns all receipts for a block.
/// Tries to use `eth_getBlockReceipts` first, and falls back to `eth_getTransactionReceipt`
pub async fn get_tx_receipts_in_block(
&self,
block: &Block<Transaction>,
) -> Result<Vec<TransactionReceipt>> {
let block_number =
block.number.ok_or(CollectError::CollectError("no block number".to_string()))?.as_u64();
if let Ok(receipts) = self.fetcher.get_block_receipts(block_number).await {
return Ok(receipts)
}

// fallback to `eth_getTransactionReceipt`
let mut tasks = Vec::new();
for tx in &block.transactions {
let tx_hash = tx.hash;
let fetcher = self.fetcher.clone();
let task = task::spawn(async move {
match fetcher.get_transaction_receipt(tx_hash).await? {
Some(receipt) => Ok(receipt),
None => {
Err(CollectError::CollectError("could not find tx receipt".to_string()))
}
}
});
tasks.push(task);
}
let mut receipts = Vec::new();
for task in tasks {
match task.await {
Ok(receipt) => receipts.push(receipt?),
Err(e) => return Err(CollectError::TaskFailed(e)),
}
}

Ok(receipts)
}
}

/// source labels (non-functional)
#[derive(Clone)]
pub struct SourceLabels {
Expand Down Expand Up @@ -185,6 +228,9 @@ impl<P: JsonRpcClient> Fetcher<P> {
}

/// Returns all receipts for a block.
/// Note that this uses the `eth_getBlockReceipts` method which is not supported by all nodes.
/// Consider using `FetcherExt::get_tx_receipts_in_block` which takes a block, and falls back to
/// `eth_getTransactionReceipt` if `eth_getBlockReceipts` is not supported.
pub async fn get_block_receipts(&self, block_num: u64) -> Result<Vec<TransactionReceipt>> {
let _permit = self.permit_request().await;
Self::map_err(self.provider.get_block_receipts(block_num).await)
Expand Down Expand Up @@ -551,7 +597,6 @@ impl<P: JsonRpcClient> Fetcher<P> {

use crate::err;
use std::collections::BTreeMap;
use tokio::task;

fn parse_geth_diff_object(
map: ethers::utils::__serde_json::Map<String, ethers::utils::__serde_json::Value>,
Expand All @@ -564,62 +609,3 @@ fn parse_geth_diff_object(

Ok(DiffMode { pre, post })
}

impl Source {
/// get gas used by transactions in block
pub async fn get_txs_gas_used(&self, block: &Block<Transaction>) -> Result<Vec<u64>> {
match get_txs_gas_used_per_block(block, self.fetcher.clone()).await {
Ok(value) => Ok(value),
Err(_) => get_txs_gas_used_per_tx(block, self.fetcher.clone()).await,
}
}
}

async fn get_txs_gas_used_per_block<P: JsonRpcClient>(
block: &Block<Transaction>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<u64>> {
// let fetcher = Arc::new(fetcher);
let block_number = match block.number {
Some(number) => number,
None => return Err(CollectError::CollectError("no block number".to_string())),
};
let receipts = fetcher.get_block_receipts(block_number.as_u64()).await?;
let mut gas_used: Vec<u64> = Vec::new();
for receipt in receipts {
match receipt.gas_used {
Some(value) => gas_used.push(value.as_u64()),
None => return Err(CollectError::CollectError("no gas_used for tx".to_string())),
}
}
Ok(gas_used)
}

async fn get_txs_gas_used_per_tx<P: JsonRpcClient + 'static>(
block: &Block<Transaction>,
fetcher: Arc<Fetcher<P>>,
) -> Result<Vec<u64>> {
// let fetcher = Arc::new(*fetcher.clone());
let mut tasks = Vec::new();
for tx in &block.transactions {
let tx_clone = tx.hash;
let fetcher = fetcher.clone();
let task = task::spawn(async move {
match fetcher.get_transaction_receipt(tx_clone).await? {
Some(receipt) => Ok(receipt.gas_used),
None => Err(CollectError::CollectError("could not find tx receipt".to_string())),
}
});
tasks.push(task);
}

let mut gas_used: Vec<u64> = Vec::new();
for task in tasks {
match task.await {
Ok(Ok(Some(value))) => gas_used.push(value.as_u64()),
_ => return Err(CollectError::CollectError("gas_used not available from node".into())),
}
}

Ok(gas_used)
}

0 comments on commit be8078b

Please sign in to comment.