Skip to content

Commit

Permalink
feat: txpool block building fallback (#3755)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jul 15, 2023
1 parent be656c2 commit e12883e
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 30 deletions.
10 changes: 9 additions & 1 deletion crates/rpc/rpc/src/eth/api/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use crate::{
use reth_primitives::{BlockId, BlockNumberOrTag, TransactionMeta};
use reth_provider::{BlockReaderIdExt, EvmEnvProvider, StateProviderFactory};
use reth_rpc_types::{Block, Index, RichBlock, TransactionReceipt};
use reth_transaction_pool::TransactionPool;

impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Pool: TransactionPool + Clone + 'static,
Network: Send + Sync + 'static,
{
/// Returns the uncle headers of the given block
///
Expand Down Expand Up @@ -121,7 +124,12 @@ where

if block_id.is_pending() {
// Pending block can be fetched directly without need for caching
return Ok(self.provider().pending_block()?)
let maybe_pending = self.provider().pending_block()?;
return if maybe_pending.is_some() {
return Ok(maybe_pending)
} else {
self.local_pending_block().await
}
}

let block_hash = match self.provider().block_hash_for_id(block_id)? {
Expand Down
82 changes: 79 additions & 3 deletions crates/rpc/rpc/src/eth/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@ use crate::eth::{
use async_trait::async_trait;
use reth_interfaces::Result;
use reth_network_api::NetworkInfo;
use reth_primitives::{Address, BlockId, BlockNumberOrTag, ChainInfo, H256, U256, U64};
use reth_primitives::{
Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlock, H256, U256, U64,
};
use reth_provider::{BlockReaderIdExt, EvmEnvProvider, StateProviderBox, StateProviderFactory};
use reth_rpc_types::{SyncInfo, SyncStatus};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use std::{future::Future, sync::Arc};
use tokio::sync::oneshot;
use revm_primitives::{BlockEnv, CfgEnv};
use std::{future::Future, sync::Arc, time::Instant};
use tokio::sync::{oneshot, Mutex};

mod block;
mod call;
mod fees;
mod pending_block;
mod server;
mod sign;
mod state;
mod transactions;

use crate::eth::api::pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin};
pub use transactions::{EthTransactions, TransactionSource};

/// `Eth` API trait.
Expand Down Expand Up @@ -115,6 +120,7 @@ where
gas_oracle,
starting_block: U256::from(latest_block),
task_spawner,
pending_block: Default::default(),
};
Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -201,6 +207,74 @@ where
}
}

impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Pool: TransactionPool + Clone + 'static,
Network: Send + Sync + 'static,
{
/// Configures the [CfgEnv] and [BlockEnv] for the pending block
///
/// If no pending block is available, this will derive it from the `latest` block
pub(crate) fn pending_block_env_and_cfg(&self) -> EthResult<PendingBlockEnv> {
let origin = if let Some(pending) = self.provider().pending_block()? {
PendingBlockEnvOrigin::ActualPending(pending)
} else {
// no pending block from the CL yet, so we use the latest block and modify the env
// values that we can
let mut latest =
self.provider().latest_header()?.ok_or_else(|| EthApiError::UnknownBlockNumber)?;

// child block
latest.number += 1;
// assumed child block is in the next slot
latest.timestamp += 12;
// base fee of the child block
latest.base_fee_per_gas = latest.next_block_base_fee();

PendingBlockEnvOrigin::DerivedFromLatest(latest)
};

let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
self.provider().fill_block_env_with_header(&mut block_env, origin.header())?;
self.provider().fill_cfg_env_with_header(&mut cfg, origin.header())?;

Ok(PendingBlockEnv { cfg, block_env, origin })
}

/// Returns the locally built pending block
pub(crate) async fn local_pending_block(&self) -> EthResult<Option<SealedBlock>> {
let pending = self.pending_block_env_and_cfg()?;
if pending.origin.is_actual_pending() {
return Ok(pending.origin.into_actual_pending())
}

// no pending block from the CL yet, so we need to build it ourselves via txpool
self.on_blocking_task(|this| async move {
let PendingBlockEnv { cfg: _, block_env, origin } = pending;
let lock = this.inner.pending_block.lock().await;
let now = Instant::now();
// this is guaranteed to be the `latest` header
let parent_header = origin.into_header();

// check if the block is still good
if let Some(pending) = lock.as_ref() {
if block_env.number.to::<u64>() == pending.block.number &&
pending.block.parent_hash == parent_header.parent_hash &&
now <= pending.expires_at
{
return Ok(Some(pending.block.clone()))
}
}

// TODO(mattsse): actually build the pending block
Ok(None)
})
.await
}
}

impl<Provider, Pool, Events> std::fmt::Debug for EthApi<Provider, Pool, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
Expand Down Expand Up @@ -284,4 +358,6 @@ struct EthApiInner<Provider, Pool, Network> {
starting_block: U256,
/// The type that can spawn tasks which would otherwise block.
task_spawner: Box<dyn TaskSpawner>,
/// Cached pending block if any
pending_block: Mutex<Option<PendingBlock>>,
}
67 changes: 67 additions & 0 deletions crates/rpc/rpc/src/eth/api/pending_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Support for building a pending block via local txpool.

use reth_primitives::{SealedBlock, SealedHeader};
use revm_primitives::{BlockEnv, CfgEnv};
use std::time::Instant;

/// Configured [BlockEnv] and [CfgEnv] for a pending block
#[derive(Debug, Clone)]
pub(crate) struct PendingBlockEnv {
/// Configured [CfgEnv] for the pending block.
pub(crate) cfg: CfgEnv,
/// Configured [BlockEnv] for the pending block.
pub(crate) block_env: BlockEnv,
/// Origin block for the config
pub(crate) origin: PendingBlockEnvOrigin,
}

/// The origin for a configured [PendingBlockEnv]
#[derive(Clone, Debug)]
pub(crate) enum PendingBlockEnvOrigin {
/// The pending block as received from the CL.
ActualPending(SealedBlock),
/// The header of the latest block
DerivedFromLatest(SealedHeader),
}

impl PendingBlockEnvOrigin {
/// Returns true if the origin is the actual pending block as received from the CL.
pub(crate) fn is_actual_pending(&self) -> bool {
matches!(self, PendingBlockEnvOrigin::ActualPending(_))
}

/// Consumes the type and returns the actual pending block.
pub(crate) fn into_actual_pending(self) -> Option<SealedBlock> {
match self {
PendingBlockEnvOrigin::ActualPending(block) => Some(block),
_ => None,
}
}

/// Returns the header this pending block is based on.
pub(crate) fn header(&self) -> &SealedHeader {
match self {
PendingBlockEnvOrigin::ActualPending(block) => &block.header,
PendingBlockEnvOrigin::DerivedFromLatest(header) => header,
}
}

/// Consumes the type and returns the header this pending block is based on.
pub(crate) fn into_header(self) -> SealedHeader {
match self {
PendingBlockEnvOrigin::ActualPending(block) => block.header,
PendingBlockEnvOrigin::DerivedFromLatest(header) => header,
}
}
}

/// In memory pending block for `pending` tag
#[derive(Debug)]
pub(crate) struct PendingBlock {
/// The cached pending block
pub(crate) block: SealedBlock,
/// Timestamp when the pending block is considered outdated
pub(crate) expires_at: Instant,
}

impl PendingBlock {}
31 changes: 5 additions & 26 deletions crates/rpc/rpc/src/eth/api/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains RPC handler implementations specific to transactions
use crate::{
eth::{
api::pending_block::PendingBlockEnv,
error::{EthApiError, EthResult, SignError},
revm_utils::{
inspect, inspect_and_return_db, prepare_call_env, replay_transactions_until, transact,
Expand Down Expand Up @@ -239,31 +240,8 @@ where

async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> {
if at.is_pending() {
let header = if let Some(pending) = self.provider().pending_header()? {
pending
} else {
// no pending block from the CL yet, so we use the latest block and modify the env
// values that we can
let mut latest = self
.provider()
.latest_header()?
.ok_or_else(|| EthApiError::UnknownBlockNumber)?;

// child block
latest.number += 1;
// assumed child block is in the next slot
latest.timestamp += 12;
// base fee of the child block
latest.base_fee_per_gas = latest.next_block_base_fee();

latest
};

let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
self.provider().fill_block_env_with_header(&mut block_env, &header)?;
self.provider().fill_cfg_env_with_header(&mut cfg, &header)?;
return Ok((cfg, block_env, header.hash.into()))
let PendingBlockEnv { cfg, block_env, origin } = self.pending_block_env_and_cfg()?;
Ok((cfg, block_env, origin.header().hash.into()))
} else {
// Use cached values if there is no pending block
let block_hash = self
Expand Down Expand Up @@ -652,6 +630,7 @@ where
impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Network: 'static,
{
/// Helper function for `eth_getTransactionReceipt`
///
Expand All @@ -675,7 +654,7 @@ impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Pool: TransactionPool + 'static,
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Network: 'static,
Network: Send + Sync + 'static,
{
pub(crate) fn sign_request(
&self,
Expand Down

0 comments on commit e12883e

Please sign in to comment.