Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: txpool block building fallback #3755

Merged
merged 2 commits into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/rpc/rpc/src/eth/api/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ use crate::{
use reth_primitives::{BlockId, BlockNumberOrTag, TransactionMeta};
use reth_provider::{BlockReaderIdExt, EvmEnvProvider, StateProviderFactory};
use reth_rpc_types::{Block, Index, RichBlock, TransactionReceipt};
use reth_transaction_pool::TransactionPool;

impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Pool: TransactionPool + Clone + 'static,
Network: Send + Sync + 'static,
{
/// Returns the uncle headers of the given block
///
Expand Down Expand Up @@ -121,7 +124,12 @@ where

if block_id.is_pending() {
// Pending block can be fetched directly without need for caching
return Ok(self.provider().pending_block()?)
let maybe_pending = self.provider().pending_block()?;
return if maybe_pending.is_some() {
return Ok(maybe_pending)
} else {
self.local_pending_block().await
}
Comment on lines +130 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice

}

let block_hash = match self.provider().block_hash_for_id(block_id)? {
Expand Down
82 changes: 79 additions & 3 deletions crates/rpc/rpc/src/eth/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@ use crate::eth::{
use async_trait::async_trait;
use reth_interfaces::Result;
use reth_network_api::NetworkInfo;
use reth_primitives::{Address, BlockId, BlockNumberOrTag, ChainInfo, H256, U256, U64};
use reth_primitives::{
Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlock, H256, U256, U64,
};
use reth_provider::{BlockReaderIdExt, EvmEnvProvider, StateProviderBox, StateProviderFactory};
use reth_rpc_types::{SyncInfo, SyncStatus};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use std::{future::Future, sync::Arc};
use tokio::sync::oneshot;
use revm_primitives::{BlockEnv, CfgEnv};
use std::{future::Future, sync::Arc, time::Instant};
use tokio::sync::{oneshot, Mutex};

mod block;
mod call;
mod fees;
mod pending_block;
mod server;
mod sign;
mod state;
mod transactions;

use crate::eth::api::pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin};
pub use transactions::{EthTransactions, TransactionSource};

/// `Eth` API trait.
Expand Down Expand Up @@ -115,6 +120,7 @@ where
gas_oracle,
starting_block: U256::from(latest_block),
task_spawner,
pending_block: Default::default(),
};
Self { inner: Arc::new(inner) }
}
Expand Down Expand Up @@ -201,6 +207,74 @@ where
}
}

impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Pool: TransactionPool + Clone + 'static,
Network: Send + Sync + 'static,
{
/// Configures the [CfgEnv] and [BlockEnv] for the pending block
///
/// If no pending block is available, this will derive it from the `latest` block
pub(crate) fn pending_block_env_and_cfg(&self) -> EthResult<PendingBlockEnv> {
let origin = if let Some(pending) = self.provider().pending_block()? {
PendingBlockEnvOrigin::ActualPending(pending)
} else {
// no pending block from the CL yet, so we use the latest block and modify the env
// values that we can
let mut latest =
self.provider().latest_header()?.ok_or_else(|| EthApiError::UnknownBlockNumber)?;

// child block
latest.number += 1;
// assumed child block is in the next slot
latest.timestamp += 12;
// base fee of the child block
latest.base_fee_per_gas = latest.next_block_base_fee();

PendingBlockEnvOrigin::DerivedFromLatest(latest)
};

let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
self.provider().fill_block_env_with_header(&mut block_env, origin.header())?;
self.provider().fill_cfg_env_with_header(&mut cfg, origin.header())?;

Ok(PendingBlockEnv { cfg, block_env, origin })
}

/// Returns the locally built pending block
pub(crate) async fn local_pending_block(&self) -> EthResult<Option<SealedBlock>> {
let pending = self.pending_block_env_and_cfg()?;
if pending.origin.is_actual_pending() {
return Ok(pending.origin.into_actual_pending())
}

// no pending block from the CL yet, so we need to build it ourselves via txpool
self.on_blocking_task(|this| async move {
Comment on lines +253 to +254
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes me think we should probably add docs on which ops are blocking and when we recommend spawning blocking?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, will do!

let PendingBlockEnv { cfg: _, block_env, origin } = pending;
let lock = this.inner.pending_block.lock().await;
let now = Instant::now();
// this is guaranteed to be the `latest` header
let parent_header = origin.into_header();

// check if the block is still good
if let Some(pending) = lock.as_ref() {
if block_env.number.to::<u64>() == pending.block.number &&
pending.block.parent_hash == parent_header.parent_hash &&
now <= pending.expires_at
{
return Ok(Some(pending.block.clone()))
}
}

// TODO(mattsse): actually build the pending block
Ok(None)
Comment on lines +271 to +272
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})
.await
}
}

impl<Provider, Pool, Events> std::fmt::Debug for EthApi<Provider, Pool, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
Expand Down Expand Up @@ -284,4 +358,6 @@ struct EthApiInner<Provider, Pool, Network> {
starting_block: U256,
/// The type that can spawn tasks which would otherwise block.
task_spawner: Box<dyn TaskSpawner>,
/// Cached pending block if any
pending_block: Mutex<Option<PendingBlock>>,
}
67 changes: 67 additions & 0 deletions crates/rpc/rpc/src/eth/api/pending_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
//! Support for building a pending block via local txpool.

use reth_primitives::{SealedBlock, SealedHeader};
use revm_primitives::{BlockEnv, CfgEnv};
use std::time::Instant;

/// Configured [BlockEnv] and [CfgEnv] for a pending block
#[derive(Debug, Clone)]
pub(crate) struct PendingBlockEnv {
/// Configured [CfgEnv] for the pending block.
pub(crate) cfg: CfgEnv,
/// Configured [BlockEnv] for the pending block.
pub(crate) block_env: BlockEnv,
/// Origin block for the config
pub(crate) origin: PendingBlockEnvOrigin,
}

/// The origin for a configured [PendingBlockEnv]
#[derive(Clone, Debug)]
pub(crate) enum PendingBlockEnvOrigin {
/// The pending block as received from the CL.
ActualPending(SealedBlock),
/// The header of the latest block
DerivedFromLatest(SealedHeader),
}

impl PendingBlockEnvOrigin {
/// Returns true if the origin is the actual pending block as received from the CL.
pub(crate) fn is_actual_pending(&self) -> bool {
matches!(self, PendingBlockEnvOrigin::ActualPending(_))
}

/// Consumes the type and returns the actual pending block.
pub(crate) fn into_actual_pending(self) -> Option<SealedBlock> {
match self {
PendingBlockEnvOrigin::ActualPending(block) => Some(block),
_ => None,
}
}

/// Returns the header this pending block is based on.
pub(crate) fn header(&self) -> &SealedHeader {
match self {
PendingBlockEnvOrigin::ActualPending(block) => &block.header,
PendingBlockEnvOrigin::DerivedFromLatest(header) => header,
}
}

/// Consumes the type and returns the header this pending block is based on.
pub(crate) fn into_header(self) -> SealedHeader {
match self {
PendingBlockEnvOrigin::ActualPending(block) => block.header,
PendingBlockEnvOrigin::DerivedFromLatest(header) => header,
}
}
}

/// In memory pending block for `pending` tag
#[derive(Debug)]
pub(crate) struct PendingBlock {
/// The cached pending block
pub(crate) block: SealedBlock,
/// Timestamp when the pending block is considered outdated
pub(crate) expires_at: Instant,
}

impl PendingBlock {}
31 changes: 5 additions & 26 deletions crates/rpc/rpc/src/eth/api/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Contains RPC handler implementations specific to transactions
use crate::{
eth::{
api::pending_block::PendingBlockEnv,
error::{EthApiError, EthResult, SignError},
revm_utils::{
inspect, inspect_and_return_db, prepare_call_env, replay_transactions_until, transact,
Expand Down Expand Up @@ -239,31 +240,8 @@ where

async fn evm_env_at(&self, at: BlockId) -> EthResult<(CfgEnv, BlockEnv, BlockId)> {
if at.is_pending() {
let header = if let Some(pending) = self.provider().pending_header()? {
pending
} else {
// no pending block from the CL yet, so we use the latest block and modify the env
// values that we can
let mut latest = self
.provider()
.latest_header()?
.ok_or_else(|| EthApiError::UnknownBlockNumber)?;

// child block
latest.number += 1;
// assumed child block is in the next slot
latest.timestamp += 12;
// base fee of the child block
latest.base_fee_per_gas = latest.next_block_base_fee();

latest
};

let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
self.provider().fill_block_env_with_header(&mut block_env, &header)?;
self.provider().fill_cfg_env_with_header(&mut cfg, &header)?;
return Ok((cfg, block_env, header.hash.into()))
let PendingBlockEnv { cfg, block_env, origin } = self.pending_block_env_and_cfg()?;
Ok((cfg, block_env, origin.header().hash.into()))
} else {
// Use cached values if there is no pending block
let block_hash = self
Expand Down Expand Up @@ -652,6 +630,7 @@ where
impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Network: 'static,
{
/// Helper function for `eth_getTransactionReceipt`
///
Expand All @@ -675,7 +654,7 @@ impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
where
Pool: TransactionPool + 'static,
Provider: BlockReaderIdExt + StateProviderFactory + EvmEnvProvider + 'static,
Network: 'static,
Network: Send + Sync + 'static,
{
pub(crate) fn sign_request(
&self,
Expand Down