From 169fcd999690d30e96f688517a0214128167ad76 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 28 Nov 2024 09:55:27 +0100 Subject: [PATCH 1/3] feat: integrate engine validator --- Cargo.lock | 4 +- crates/e2e-test-utils/src/lib.rs | 10 +- crates/engine/local/Cargo.toml | 2 +- crates/engine/local/src/service.rs | 10 +- crates/engine/service/Cargo.toml | 1 - crates/engine/service/src/service.rs | 19 +- crates/engine/tree/Cargo.toml | 3 +- crates/engine/tree/src/tree/mod.rs | 36 +- crates/node/builder/src/launch/engine.rs | 676 ++++++++++++----------- crates/node/builder/src/rpc.rs | 3 +- 10 files changed, 389 insertions(+), 375 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33d60eac3f0d..e40e71d0ca0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7176,10 +7176,10 @@ dependencies = [ "reth-engine-tree", "reth-ethereum-engine-primitives", "reth-evm", + "reth-node-types", "reth-payload-builder", "reth-payload-builder-primitives", "reth-payload-primitives", - "reth-payload-validator", "reth-provider", "reth-prune", "reth-rpc-types-compat", @@ -7228,7 +7228,6 @@ dependencies = [ "reth-network-p2p", "reth-node-types", "reth-payload-builder", - "reth-payload-validator", "reth-primitives", "reth-provider", "reth-prune", @@ -7273,7 +7272,6 @@ dependencies = [ "reth-payload-builder", "reth-payload-builder-primitives", "reth-payload-primitives", - "reth-payload-validator", "reth-primitives", "reth-provider", "reth-prune", diff --git a/crates/e2e-test-utils/src/lib.rs b/crates/e2e-test-utils/src/lib.rs index f4939f2c011e..e7958a2c12ef 100644 --- a/crates/e2e-test-utils/src/lib.rs +++ b/crates/e2e-test-utils/src/lib.rs @@ -1,7 +1,6 @@ //! Utilities for end-to-end tests. use std::sync::Arc; - use node::NodeTestContext; use reth::{ args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, @@ -15,6 +14,7 @@ use reth::{ use reth_chainspec::EthChainSpec; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_engine_local::LocalPayloadAttributesBuilder; +use reth_node_api::EngineValidator; use reth_node_builder::{ components::NodeComponentsBuilder, rpc::{EngineValidatorAddOn, RethRpcAddOns}, @@ -133,7 +133,13 @@ where >, >, N::AddOns: RethRpcAddOns>>> - + EngineValidatorAddOn>>>, + + EngineValidatorAddOn< + Adapter>>, + Validator: EngineValidator< + N::Engine, + Block = reth_primitives::Block, + >, + >, LocalPayloadAttributesBuilder: PayloadAttributesBuilder< <::Engine as PayloadTypes>::PayloadAttributes, >, diff --git a/crates/engine/local/Cargo.toml b/crates/engine/local/Cargo.toml index a1b74d13fee7..d8a66e65e04c 100644 --- a/crates/engine/local/Cargo.toml +++ b/crates/engine/local/Cargo.toml @@ -16,12 +16,12 @@ reth-consensus.workspace = true reth-engine-primitives.workspace = true reth-engine-service.workspace = true reth-engine-tree.workspace = true +reth-node-types.workspace = true reth-evm.workspace = true reth-ethereum-engine-primitives.workspace = true reth-payload-builder.workspace = true reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true -reth-payload-validator.workspace = true reth-provider.workspace = true reth-prune.workspace = true reth-rpc-types-compat.workspace = true diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index e2b5e056d028..5838cb89116b 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -19,7 +19,7 @@ use futures_util::{Stream, StreamExt}; use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes}; use reth_chainspec::EthChainSpec; use reth_consensus::Consensus; -use reth_engine_primitives::BeaconEngineMessage; +use reth_engine_primitives::{BeaconEngineMessage, EngineValidator}; use reth_engine_service::service::EngineMessageStream; use reth_engine_tree::{ chain::{ChainEvent, HandlerEvent}, @@ -31,9 +31,9 @@ use reth_engine_tree::{ tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig}, }; use reth_evm::execute::BlockExecutorProvider; +use reth_node_types::BlockTy; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_primitives::{PayloadAttributesBuilder, PayloadTypes}; -use reth_payload_validator::ExecutionPayloadValidator; use reth_provider::{providers::BlockchainProvider2, ChainSpecProvider, ProviderFactory}; use reth_prune::PrunerWithFactory; use reth_stages_api::MetricEventsSender; @@ -63,13 +63,14 @@ where { /// Constructor for [`LocalEngineService`]. #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn new( consensus: Arc, executor_factory: impl BlockExecutorProvider, provider: ProviderFactory, blockchain_db: BlockchainProvider2, pruner: PrunerWithFactory>, payload_builder: PayloadBuilderHandle, + payload_validator: V, tree_config: TreeConfig, invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, @@ -80,6 +81,7 @@ where ) -> Self where B: PayloadAttributesBuilder<::PayloadAttributes>, + V: EngineValidator>, { let chain_spec = provider.chain_spec(); let engine_kind = @@ -87,8 +89,6 @@ where let persistence_handle = PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); - let payload_validator = ExecutionPayloadValidator::new(chain_spec); - let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); let (to_tree_tx, from_tree) = EngineApiTreeHandler::::spawn_new( diff --git a/crates/engine/service/Cargo.toml b/crates/engine/service/Cargo.toml index 8359c453dccb..8854fd18879d 100644 --- a/crates/engine/service/Cargo.toml +++ b/crates/engine/service/Cargo.toml @@ -18,7 +18,6 @@ reth-engine-tree.workspace = true reth-evm.workspace = true reth-network-p2p.workspace = true reth-payload-builder.workspace = true -reth-payload-validator.workspace = true reth-provider.workspace = true reth-prune.workspace = true reth-stages-api.workspace = true diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 44d145c9c0b6..a54a2ef9e1a1 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -3,7 +3,7 @@ use pin_project::pin_project; use reth_beacon_consensus::{BeaconConsensusEngineEvent, EngineNodeTypes}; use reth_chainspec::EthChainSpec; use reth_consensus::Consensus; -use reth_engine_primitives::BeaconEngineMessage; +use reth_engine_primitives::{BeaconEngineMessage, EngineValidator}; use reth_engine_tree::{ backfill::PipelineSync, download::BasicBlockDownloader, @@ -17,9 +17,8 @@ pub use reth_engine_tree::{ }; use reth_evm::execute::BlockExecutorProvider; use reth_network_p2p::EthBlockClient; -use reth_node_types::NodeTypesWithEngine; +use reth_node_types::{BlockTy, NodeTypesWithEngine}; use reth_payload_builder::PayloadBuilderHandle; -use reth_payload_validator::ExecutionPayloadValidator; use reth_provider::{providers::BlockchainProvider2, ProviderFactory}; use reth_prune::PrunerWithFactory; use reth_stages_api::{MetricEventsSender, Pipeline}; @@ -65,7 +64,7 @@ where { /// Constructor for `EngineService`. #[allow(clippy::too_many_arguments)] - pub fn new( + pub fn new( consensus: Arc, executor_factory: E, chain_spec: Arc, @@ -77,10 +76,14 @@ where blockchain_db: BlockchainProvider2, pruner: PrunerWithFactory>, payload_builder: PayloadBuilderHandle, + payload_validator: V, tree_config: TreeConfig, invalid_block_hook: Box, sync_metrics_tx: MetricEventsSender, - ) -> Self { + ) -> Self + where + V: EngineValidator>, + { let engine_kind = if chain_spec.is_optimism() { EngineApiKind::OpStack } else { EngineApiKind::Ethereum }; @@ -88,7 +91,6 @@ where let persistence_handle = PersistenceHandle::spawn_service(provider, pruner, sync_metrics_tx); - let payload_validator = ExecutionPayloadValidator::new(chain_spec); let canonical_in_memory_state = blockchain_db.canonical_in_memory_state(); @@ -148,7 +150,7 @@ mod tests { use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_engine_primitives::BeaconEngineMessage; use reth_engine_tree::{test_utils::TestPipelineBuilder, tree::NoopInvalidBlockHook}; - use reth_ethereum_engine_primitives::EthEngineTypes; + use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator}; use reth_evm_ethereum::execute::EthExecutorProvider; use reth_exex_types::FinishedExExHeight; use reth_network_p2p::test_utils::TestFullBlockClient; @@ -186,7 +188,7 @@ mod tests { let blockchain_db = BlockchainProvider2::with_latest(provider_factory.clone(), SealedHeader::default()) .unwrap(); - + let engine_payload_validator = EthereumEngineValidator::new(chain_spec.clone()); let (_tx, rx) = watch::channel(FinishedExExHeight::NoExExs); let pruner = Pruner::new_with_factory(provider_factory.clone(), vec![], 0, 0, None, rx); @@ -204,6 +206,7 @@ mod tests { blockchain_db, pruner, PayloadBuilderHandle::new(tx), + engine_payload_validator, TreeConfig::default(), Box::new(NoopInvalidBlockHook::default()), sync_metrics_tx, diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 01d7e7e20241..a50c75a5a16b 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -16,7 +16,7 @@ reth-beacon-consensus.workspace = true reth-blockchain-tree-api.workspace = true reth-blockchain-tree.workspace = true reth-chain-state.workspace = true -reth-chainspec.workspace = true +reth-chainspec = { workspace = true, optional = true } reth-consensus.workspace = true reth-engine-primitives.workspace = true reth-errors.workspace = true @@ -26,7 +26,6 @@ reth-network-p2p.workspace = true reth-payload-builder-primitives.workspace = true reth-payload-builder.workspace = true reth-payload-primitives.workspace = true -reth-payload-validator.workspace = true reth-primitives.workspace = true reth-provider.workspace = true reth-prune.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index e9e86d3b09bb..2d09c16c6afb 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1,8 +1,9 @@ use crate::{ backfill::{BackfillAction, BackfillSyncState}, chain::FromOrchestrator, - engine::{DownloadRequest, EngineApiEvent, FromEngine}, + engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine}, persistence::PersistenceHandle, + tree::metrics::EngineApiMetrics, }; use alloy_consensus::{BlockHeader, Header}; use alloy_eips::BlockNumHash; @@ -24,18 +25,16 @@ use reth_blockchain_tree::{ use reth_chain_state::{ CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain, }; -use reth_chainspec::EthereumHardforks; use reth_consensus::{Consensus, PostExecutionInput}; use reth_engine_primitives::{ BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, - ForkchoiceStateTracker, OnForkChoiceUpdated, + EngineValidator, ForkchoiceStateTracker, OnForkChoiceUpdated, }; use reth_errors::{ConsensusError, ProviderResult}; use reth_evm::execute::BlockExecutorProvider; use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder_primitives::PayloadBuilder; use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes}; -use reth_payload_validator::ExecutionPayloadValidator; use reth_primitives::{ Block, GotExpected, NodePrimitives, SealedBlock, SealedBlockWithSenders, SealedHeader, }; @@ -71,10 +70,6 @@ pub mod config; mod invalid_block_hook; mod metrics; mod persistence_state; -use crate::{ - engine::{EngineApiKind, EngineApiRequest}, - tree::metrics::EngineApiMetrics, -}; pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; pub use persistence_state::PersistenceState; @@ -472,11 +467,14 @@ pub enum TreeAction { /// /// This type is responsible for processing engine API requests, maintaining the canonical state and /// emitting events. -pub struct EngineApiTreeHandler { +pub struct EngineApiTreeHandler +where + T: EngineTypes, +{ provider: P, executor_provider: E, consensus: Arc, - payload_validator: ExecutionPayloadValidator, + payload_validator: V, /// Keeps track of internals such as executed and buffered blocks. state: EngineApiTreeState, /// The half for sending messages to the engine. @@ -516,8 +514,8 @@ pub struct EngineApiTreeHandler { _primtives: PhantomData, } -impl std::fmt::Debug - for EngineApiTreeHandler +impl std::fmt::Debug + for EngineApiTreeHandler { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EngineApiTreeHandler") @@ -540,7 +538,7 @@ impl std::fmt::Debug } } -impl EngineApiTreeHandler +impl EngineApiTreeHandler where N: NodePrimitives, P: DatabaseProviderFactory @@ -552,7 +550,7 @@ where

::Provider: BlockReader, E: BlockExecutorProvider, T: EngineTypes, - Spec: Send + Sync + EthereumHardforks + 'static, + V: EngineValidator, { /// Creates a new [`EngineApiTreeHandler`]. #[allow(clippy::too_many_arguments)] @@ -560,7 +558,7 @@ where provider: P, executor_provider: E, consensus: Arc, - payload_validator: ExecutionPayloadValidator, + payload_validator: V, outgoing: UnboundedSender, state: EngineApiTreeState, canonical_in_memory_state: CanonicalInMemoryState, @@ -609,7 +607,7 @@ where provider: P, executor_provider: E, consensus: Arc, - payload_validator: ExecutionPayloadValidator, + payload_validator: V, persistence: PersistenceHandle, payload_builder: PayloadBuilderHandle, canonical_in_memory_state: CanonicalInMemoryState, @@ -2629,7 +2627,7 @@ mod tests { use reth_chain_state::{test_utils::TestBlockBuilder, BlockState}; use reth_chainspec::{ChainSpec, HOLESKY, MAINNET}; use reth_engine_primitives::ForkchoiceStatus; - use reth_ethereum_engine_primitives::EthEngineTypes; + use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator}; use reth_evm::test_utils::MockExecutorProvider; use reth_primitives::{BlockExt, EthPrimitives}; use reth_provider::test_utils::MockEthProvider; @@ -2701,7 +2699,7 @@ mod tests { MockEthProvider, MockExecutorProvider, EthEngineTypes, - ChainSpec, + EthereumEngineValidator, >, to_tree_tx: Sender>>, from_tree_rx: UnboundedReceiver, @@ -2736,7 +2734,7 @@ mod tests { let provider = MockEthProvider::default(); let executor_provider = MockExecutorProvider::default(); - let payload_validator = ExecutionPayloadValidator::new(chain_spec.clone()); + let payload_validator = EthereumEngineValidator::new(chain_spec.clone()); let (from_tree_tx, from_tree_rx) = unbounded_channel(); diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 430ca31a5b16..44018f0417ad 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -19,8 +19,8 @@ use reth_exex::ExExManagerHandle; use reth_network::{NetworkSyncUpdater, SyncState}; use reth_network_api::BlockDownloaderProvider; use reth_node_api::{ - BuiltPayload, FullNodeTypes, NodeTypesWithEngine, PayloadAttributesBuilder, PayloadBuilder, - PayloadTypes, + BlockTy, BuiltPayload, EngineValidator, FullNodeTypes, NodeTypesWithEngine, + PayloadAttributesBuilder, PayloadBuilder, PayloadTypes, }; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, @@ -75,7 +75,15 @@ where T: FullNodeTypes>, CB: NodeComponentsBuilder, AO: RethRpcAddOns> - + EngineValidatorAddOn>, + + EngineValidatorAddOn + < + NodeAdapter, + Validator: EngineValidator< + ::Engine, + Block = BlockTy, + >, + >, + LocalPayloadAttributesBuilder: PayloadAttributesBuilder< <::Engine as PayloadTypes>::PayloadAttributes, >, @@ -86,335 +94,337 @@ where self, target: NodeBuilderWithComponents, ) -> eyre::Result { - let Self { ctx, engine_tree_config } = self; - let NodeBuilderWithComponents { - adapter: NodeTypesAdapter { database }, - components_builder, - add_ons: AddOns { hooks, exexs: installed_exex, add_ons }, - config, - } = target; - let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; - - // setup the launch context - let ctx = ctx - .with_configured_globals() - // load the toml config - .with_loaded_toml_config(config)? - // add resolved peers - .with_resolved_peers().await? - // attach the database - .attach(database.clone()) - // ensure certain settings take effect - .with_adjusted_configs() - // Create the provider factory - .with_provider_factory().await? - .inspect(|_| { - info!(target: "reth::cli", "Database opened"); - }) - .with_prometheus_server().await? - .inspect(|this| { - debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); - }) - .with_genesis()? - .inspect(|this: &LaunchContextWith, _>>| { - info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); - }) - .with_metrics_task() - // passing FullNodeTypes as type parameter here so that we can build - // later the components. - .with_blockchain_db::(move |provider_factory| { - Ok(BlockchainProvider2::new(provider_factory)?) - })? - .with_components(components_builder, on_component_initialized).await?; - - // spawn exexs - let exex_manager_handle = ExExLauncher::new( - ctx.head(), - ctx.node_adapter().clone(), - installed_exex, - ctx.configs().clone(), - ) - .launch() - .await?; - - // create pipeline - let network_client = ctx.components().network().fetch_client().await?; - let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - - let node_config = ctx.node_config(); - let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) - .maybe_skip_fcu(node_config.debug.skip_fcu) - .maybe_skip_new_payload(node_config.debug.skip_new_payload) - .maybe_reorg( - ctx.blockchain_db().clone(), - ctx.components().evm_config().clone(), - reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()), - node_config.debug.reorg_frequency, - node_config.debug.reorg_depth, - ) - // Store messages _after_ skipping so that `replay-engine` command - // would replay only the messages that were observed by the engine - // during this run. - .maybe_store_messages(node_config.debug.engine_api_store.clone()); - - let max_block = ctx.max_block(network_client.clone()).await?; - let mut hooks = EngineHooks::new(); - - let static_file_producer = ctx.static_file_producer(); - let static_file_producer_events = static_file_producer.lock().events(); - hooks.add(StaticFileHook::new( - static_file_producer.clone(), - Box::new(ctx.task_executor().clone()), - )); - info!(target: "reth::cli", "StaticFileProducer initialized"); - - // Configure the pipeline - let pipeline_exex_handle = - exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); - let pipeline = build_networked_pipeline( - &ctx.toml_config().stages, - network_client.clone(), - ctx.consensus(), - ctx.provider_factory().clone(), - ctx.task_executor(), - ctx.sync_metrics_tx(), - ctx.prune_config(), - max_block, - static_file_producer, - ctx.components().block_executor().clone(), - pipeline_exex_handle, - )?; - - // The new engine writes directly to static files. This ensures that they're up to the tip. - pipeline.move_to_static_files()?; - - let pipeline_events = pipeline.events(); - - let mut pruner_builder = ctx.pruner_builder(); - if let Some(exex_manager_handle) = &exex_manager_handle { - pruner_builder = - pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); - } - let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone()); - - let pruner_events = pruner.events(); - info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); - - let mut engine_service = if ctx.is_dev() { - let eth_service = LocalEngineService::new( - ctx.consensus(), - ctx.components().block_executor().clone(), - ctx.provider_factory().clone(), - ctx.blockchain_db().clone(), - pruner, - ctx.components().payload_builder().clone(), - engine_tree_config, - ctx.invalid_block_hook()?, - ctx.sync_metrics_tx(), - consensus_engine_tx.clone(), - Box::pin(consensus_engine_stream), - ctx.dev_mining_mode(ctx.components().pool()), - LocalPayloadAttributesBuilder::new(ctx.chain_spec()), - ); - - Either::Left(eth_service) - } else { - let eth_service = EngineService::new( - ctx.consensus(), - ctx.components().block_executor().clone(), - ctx.chain_spec(), - network_client.clone(), - Box::pin(consensus_engine_stream), - pipeline, - Box::new(ctx.task_executor().clone()), - ctx.provider_factory().clone(), - ctx.blockchain_db().clone(), - pruner, - ctx.components().payload_builder().clone(), - engine_tree_config, - ctx.invalid_block_hook()?, - ctx.sync_metrics_tx(), - ); - - Either::Right(eth_service) - }; - - let event_sender = EventSender::default(); - - let beacon_engine_handle = - BeaconConsensusEngineHandle::new(consensus_engine_tx, event_sender.clone()); - - info!(target: "reth::cli", "Consensus engine initialized"); - - let events = stream_select!( - beacon_engine_handle.event_listener().map(Into::into), - pipeline_events.map(Into::into), - if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { - Either::Left( - ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) - .map(Into::into), - ) - } else { - Either::Right(stream::empty()) - }, - pruner_events.map(Into::into), - static_file_producer_events.map(Into::into), - ); - ctx.task_executor().spawn_critical( - "events task", - node::handle_events( - Some(Box::new(ctx.components().network().clone())), - Some(ctx.head().number), - events, - ), - ); - - // extract the jwt secret from the args if possible - let jwt_secret = ctx.auth_jwt_secret()?; - - let add_ons_ctx = AddOnsContext { - node: ctx.node_adapter().clone(), - config: ctx.node_config(), - beacon_engine_handle, - jwt_secret, - }; - - let RpcHandle { rpc_server_handles, rpc_registry } = - add_ons.launch_add_ons(add_ons_ctx).await?; - - // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104 - if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { - info!(target: "reth::cli", "Using etherscan as consensus client"); - - let chain = ctx.node_config().chain.chain(); - let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { - // If URL isn't provided, use default Etherscan URL for the chain if it is known - chain - .etherscan_urls() - .map(|urls| urls.0.to_string()) - .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) - })?; - - let block_provider = EtherscanBlockProvider::new( - etherscan_url, - chain.etherscan_api_key().ok_or_else(|| { - eyre::eyre!( - "etherscan api key not found for rpc consensus client for chain: {chain}" - ) - })?, - ); - let rpc_consensus_client = DebugConsensusClient::new( - rpc_server_handles.auth.clone(), - Arc::new(block_provider), - ); - ctx.task_executor().spawn_critical("etherscan consensus client", async move { - rpc_consensus_client.run::<::Engine>().await - }); - } - - // Run consensus engine to completion - let initial_target = ctx.initial_backfill_target()?; - let network_handle = ctx.components().network().clone(); - let mut built_payloads = ctx - .components() - .payload_builder() - .subscribe() - .await - .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))? - .into_built_payload_stream() - .fuse(); - let chainspec = ctx.chain_spec(); - let (exit, rx) = oneshot::channel(); - let terminate_after_backfill = ctx.terminate_after_initial_backfill(); - - info!(target: "reth::cli", "Starting consensus engine"); - ctx.task_executor().spawn_critical("consensus engine", async move { - if let Some(initial_target) = initial_target { - debug!(target: "reth::cli", %initial_target, "start backfill sync"); - if let Either::Right(eth_service) = &mut engine_service { - eth_service.orchestrator_mut().start_backfill_sync(initial_target); - } - } - - let mut res = Ok(()); - - // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL - loop { - tokio::select! { - payload = built_payloads.select_next_some() => { - if let Some(executed_block) = payload.executed_block() { - debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); - if let Either::Right(eth_service) = &mut engine_service { - eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); - } - } - } - event = engine_service.next() => { - let Some(event) = event else { break }; - debug!(target: "reth::cli", "Event: {event}"); - match event { - ChainEvent::BackfillSyncFinished => { - if terminate_after_backfill { - debug!(target: "reth::cli", "Terminating after initial backfill"); - break - } - - network_handle.update_sync_state(SyncState::Idle); - } - ChainEvent::BackfillSyncStarted => { - network_handle.update_sync_state(SyncState::Syncing); - } - ChainEvent::FatalError => { - error!(target: "reth::cli", "Fatal error in consensus engine"); - res = Err(eyre::eyre!("Fatal error in consensus engine")); - break - } - ChainEvent::Handler(ev) => { - if let Some(head) = ev.canonical_header() { - let head_block = Head { - number: head.number, - hash: head.hash(), - difficulty: head.difficulty, - timestamp: head.timestamp, - total_difficulty: chainspec - .final_paris_total_difficulty(head.number) - .unwrap_or_default(), - }; - network_handle.update_status(head_block); - } - event_sender.notify(ev); - } - } - } - } - } - - let _ = exit.send(res); - }); - - let full_node = FullNode { - evm_config: ctx.components().evm_config().clone(), - block_executor: ctx.components().block_executor().clone(), - pool: ctx.components().pool().clone(), - network: ctx.components().network().clone(), - provider: ctx.node_adapter().provider.clone(), - payload_builder: ctx.components().payload_builder().clone(), - task_executor: ctx.task_executor().clone(), - config: ctx.node_config().clone(), - data_dir: ctx.data_dir().clone(), - add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry }, - }; - // Notify on node started - on_node_started.on_event(FullNode::clone(&full_node))?; - - let handle = NodeHandle { - node_exit_future: NodeExitFuture::new( - async { rx.await? }, - full_node.config.debug.terminate, - ), - node: full_node, - }; - - Ok(handle) + todo!() + // let Self { ctx, engine_tree_config } = self; + // let NodeBuilderWithComponents { + // adapter: NodeTypesAdapter { database }, + // components_builder, + // add_ons: AddOns { hooks, exexs: installed_exex, add_ons }, + // config, + // } = target; + // let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; + // + // // setup the launch context + // let ctx = ctx + // .with_configured_globals() + // // load the toml config + // .with_loaded_toml_config(config)? + // // add resolved peers + // .with_resolved_peers().await? + // // attach the database + // .attach(database.clone()) + // // ensure certain settings take effect + // .with_adjusted_configs() + // // Create the provider factory + // .with_provider_factory().await? + // .inspect(|_| { + // info!(target: "reth::cli", "Database opened"); + // }) + // .with_prometheus_server().await? + // .inspect(|this| { + // debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); + // }) + // .with_genesis()? + // .inspect(|this: &LaunchContextWith, _>>| { + // info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); + // }) + // .with_metrics_task() + // // passing FullNodeTypes as type parameter here so that we can build + // // later the components. + // .with_blockchain_db::(move |provider_factory| { + // Ok(BlockchainProvider2::new(provider_factory)?) + // })? + // .with_components(components_builder, on_component_initialized).await?; + // + // // spawn exexs + // let exex_manager_handle = ExExLauncher::new( + // ctx.head(), + // ctx.node_adapter().clone(), + // installed_exex, + // ctx.configs().clone(), + // ) + // .launch() + // .await?; + // + // // create pipeline + // let network_client = ctx.components().network().fetch_client().await?; + // let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + // + // let node_config = ctx.node_config(); + // let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + // .maybe_skip_fcu(node_config.debug.skip_fcu) + // .maybe_skip_new_payload(node_config.debug.skip_new_payload) + // .maybe_reorg( + // ctx.blockchain_db().clone(), + // ctx.components().evm_config().clone(), + // reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()), + // node_config.debug.reorg_frequency, + // node_config.debug.reorg_depth, + // ) + // // Store messages _after_ skipping so that `replay-engine` command + // // would replay only the messages that were observed by the engine + // // during this run. + // .maybe_store_messages(node_config.debug.engine_api_store.clone()); + // + // let max_block = ctx.max_block(network_client.clone()).await?; + // let mut hooks = EngineHooks::new(); + // + // let static_file_producer = ctx.static_file_producer(); + // let static_file_producer_events = static_file_producer.lock().events(); + // hooks.add(StaticFileHook::new( + // static_file_producer.clone(), + // Box::new(ctx.task_executor().clone()), + // )); + // info!(target: "reth::cli", "StaticFileProducer initialized"); + // + // // Configure the pipeline + // let pipeline_exex_handle = + // exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); + // let pipeline = build_networked_pipeline( + // &ctx.toml_config().stages, + // network_client.clone(), + // ctx.consensus(), + // ctx.provider_factory().clone(), + // ctx.task_executor(), + // ctx.sync_metrics_tx(), + // ctx.prune_config(), + // max_block, + // static_file_producer, + // ctx.components().block_executor().clone(), + // pipeline_exex_handle, + // )?; + // + // // The new engine writes directly to static files. This ensures that they're up to the tip. + // pipeline.move_to_static_files()?; + // + // let pipeline_events = pipeline.events(); + // + // let mut pruner_builder = ctx.pruner_builder(); + // if let Some(exex_manager_handle) = &exex_manager_handle { + // pruner_builder = + // pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); + // } + // let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone()); + // let pruner_events = pruner.events(); + // info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); + // + // let event_sender = EventSender::default(); + // let beacon_engine_handle = + // BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone()); + // + // // extract the jwt secret from the args if possible + // let jwt_secret = ctx.auth_jwt_secret()?; + // + // let add_ons_ctx = AddOnsContext { + // node: ctx.node_adapter().clone(), + // config: ctx.node_config(), + // beacon_engine_handle: beacon_engine_handle.clone(), + // jwt_secret, + // }; + // let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?; + // + // let mut engine_service = if ctx.is_dev() { + // let eth_service = LocalEngineService::new( + // ctx.consensus(), + // ctx.components().block_executor().clone(), + // ctx.provider_factory().clone(), + // ctx.blockchain_db().clone(), + // pruner, + // ctx.components().payload_builder().clone(), + // engine_payload_validator, + // engine_tree_config, + // ctx.invalid_block_hook()?, + // ctx.sync_metrics_tx(), + // consensus_engine_tx.clone(), + // Box::pin(consensus_engine_stream), + // ctx.dev_mining_mode(ctx.components().pool()), + // LocalPayloadAttributesBuilder::new(ctx.chain_spec()), + // ); + // + // Either::Left(eth_service) + // } else { + // let eth_service = EngineService::new( + // ctx.consensus(), + // ctx.components().block_executor().clone(), + // ctx.chain_spec(), + // network_client.clone(), + // Box::pin(consensus_engine_stream), + // pipeline, + // Box::new(ctx.task_executor().clone()), + // ctx.provider_factory().clone(), + // ctx.blockchain_db().clone(), + // pruner, + // ctx.components().payload_builder().clone(), + // engine_payload_validator, + // engine_tree_config, + // ctx.invalid_block_hook()?, + // ctx.sync_metrics_tx(), + // ); + // + // Either::Right(eth_service) + // }; + // + // info!(target: "reth::cli", "Consensus engine initialized"); + // + // let events = stream_select!( + // beacon_engine_handle.event_listener().map(Into::into), + // pipeline_events.map(Into::into), + // if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { + // Either::Left( + // ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) + // .map(Into::into), + // ) + // } else { + // Either::Right(stream::empty()) + // }, + // pruner_events.map(Into::into), + // static_file_producer_events.map(Into::into), + // ); + // ctx.task_executor().spawn_critical( + // "events task", + // node::handle_events( + // Some(Box::new(ctx.components().network().clone())), + // Some(ctx.head().number), + // events, + // ), + // ); + // + // let RpcHandle { rpc_server_handles, rpc_registry } = + // add_ons.launch_add_ons(add_ons_ctx).await?; + // + // // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104 + // if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { + // info!(target: "reth::cli", "Using etherscan as consensus client"); + // + // let chain = ctx.node_config().chain.chain(); + // let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { + // // If URL isn't provided, use default Etherscan URL for the chain if it is known + // chain + // .etherscan_urls() + // .map(|urls| urls.0.to_string()) + // .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) + // })?; + // + // let block_provider = EtherscanBlockProvider::new( + // etherscan_url, + // chain.etherscan_api_key().ok_or_else(|| { + // eyre::eyre!( + // "etherscan api key not found for rpc consensus client for chain: {chain}" + // ) + // })?, + // ); + // let rpc_consensus_client = DebugConsensusClient::new( + // rpc_server_handles.auth.clone(), + // Arc::new(block_provider), + // ); + // ctx.task_executor().spawn_critical("etherscan consensus client", async move { + // rpc_consensus_client.run::<::Engine>().await + // }); + // } + // + // // Run consensus engine to completion + // let initial_target = ctx.initial_backfill_target()?; + // let network_handle = ctx.components().network().clone(); + // let mut built_payloads = ctx + // .components() + // .payload_builder() + // .subscribe() + // .await + // .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))? + // .into_built_payload_stream() + // .fuse(); + // let chainspec = ctx.chain_spec(); + // let (exit, rx) = oneshot::channel(); + // let terminate_after_backfill = ctx.terminate_after_initial_backfill(); + // + // info!(target: "reth::cli", "Starting consensus engine"); + // ctx.task_executor().spawn_critical("consensus engine", async move { + // if let Some(initial_target) = initial_target { + // debug!(target: "reth::cli", %initial_target, "start backfill sync"); + // if let Either::Right(eth_service) = &mut engine_service { + // eth_service.orchestrator_mut().start_backfill_sync(initial_target); + // } + // } + // + // let mut res = Ok(()); + // + // // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL + // loop { + // tokio::select! { + // payload = built_payloads.select_next_some() => { + // if let Some(executed_block) = payload.executed_block() { + // debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); + // if let Either::Right(eth_service) = &mut engine_service { + // eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); + // } + // } + // } + // event = engine_service.next() => { + // let Some(event) = event else { break }; + // debug!(target: "reth::cli", "Event: {event}"); + // match event { + // ChainEvent::BackfillSyncFinished => { + // if terminate_after_backfill { + // debug!(target: "reth::cli", "Terminating after initial backfill"); + // break + // } + // + // network_handle.update_sync_state(SyncState::Idle); + // } + // ChainEvent::BackfillSyncStarted => { + // network_handle.update_sync_state(SyncState::Syncing); + // } + // ChainEvent::FatalError => { + // error!(target: "reth::cli", "Fatal error in consensus engine"); + // res = Err(eyre::eyre!("Fatal error in consensus engine")); + // break + // } + // ChainEvent::Handler(ev) => { + // if let Some(head) = ev.canonical_header() { + // let head_block = Head { + // number: head.number, + // hash: head.hash(), + // difficulty: head.difficulty, + // timestamp: head.timestamp, + // total_difficulty: chainspec + // .final_paris_total_difficulty(head.number) + // .unwrap_or_default(), + // }; + // network_handle.update_status(head_block); + // } + // event_sender.notify(ev); + // } + // } + // } + // } + // } + // + // let _ = exit.send(res); + // }); + // + // let full_node = FullNode { + // evm_config: ctx.components().evm_config().clone(), + // block_executor: ctx.components().block_executor().clone(), + // pool: ctx.components().pool().clone(), + // network: ctx.components().network().clone(), + // provider: ctx.node_adapter().provider.clone(), + // payload_builder: ctx.components().payload_builder().clone(), + // task_executor: ctx.task_executor().clone(), + // config: ctx.node_config().clone(), + // data_dir: ctx.data_dir().clone(), + // add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry }, + // }; + // // Notify on node started + // on_node_started.on_event(FullNode::clone(&full_node))?; + // + // let handle = NodeHandle { + // node_exit_future: NodeExitFuture::new( + // async { rx.await? }, + // full_node.config.debug.terminate, + // ), + // node: full_node, + // }; + // + // Ok(handle) } } diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 2eae77f8d835..e2f147f46946 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -584,11 +584,12 @@ pub trait EngineValidatorAddOn: Send { fn engine_validator( &self, ctx: &AddOnsContext<'_, Node>, - ) -> impl Future>; + ) -> impl Future> + Send; } impl EngineValidatorAddOn for RpcAddOns where + Self: Sync, N: FullNodeComponents, EthApi: EthApiTypes, EV: EngineValidatorBuilder, From 2d00f02837d32b621fca69f1749301948c45fa4c Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 28 Nov 2024 11:44:15 +0100 Subject: [PATCH 2/3] fix bounds --- crates/e2e-test-utils/src/lib.rs | 7 +- crates/node/builder/src/launch/engine.rs | 666 +++++++++++------------ crates/node/builder/src/launch/mod.rs | 4 +- crates/node/builder/src/rpc.rs | 7 +- 4 files changed, 339 insertions(+), 345 deletions(-) diff --git a/crates/e2e-test-utils/src/lib.rs b/crates/e2e-test-utils/src/lib.rs index e7958a2c12ef..ad47ccba3613 100644 --- a/crates/e2e-test-utils/src/lib.rs +++ b/crates/e2e-test-utils/src/lib.rs @@ -1,6 +1,5 @@ //! Utilities for end-to-end tests. -use std::sync::Arc; use node::NodeTestContext; use reth::{ args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, @@ -22,6 +21,7 @@ use reth_node_builder::{ NodeTypesWithDBAdapter, NodeTypesWithEngine, PayloadAttributesBuilder, PayloadTypes, }; use reth_provider::providers::{BlockchainProvider, BlockchainProvider2, NodeTypesForProvider}; +use std::sync::Arc; use tracing::{span, Level}; use wallet::Wallet; @@ -135,10 +135,7 @@ where N::AddOns: RethRpcAddOns>>> + EngineValidatorAddOn< Adapter>>, - Validator: EngineValidator< - N::Engine, - Block = reth_primitives::Block, - >, + Validator: EngineValidator, >, LocalPayloadAttributesBuilder: PayloadAttributesBuilder< <::Engine as PayloadTypes>::PayloadAttributes, diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 44018f0417ad..b1141314d106 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -75,8 +75,7 @@ where T: FullNodeTypes>, CB: NodeComponentsBuilder, AO: RethRpcAddOns> - + EngineValidatorAddOn - < + + EngineValidatorAddOn< NodeAdapter, Validator: EngineValidator< ::Engine, @@ -94,337 +93,336 @@ where self, target: NodeBuilderWithComponents, ) -> eyre::Result { - todo!() - // let Self { ctx, engine_tree_config } = self; - // let NodeBuilderWithComponents { - // adapter: NodeTypesAdapter { database }, - // components_builder, - // add_ons: AddOns { hooks, exexs: installed_exex, add_ons }, - // config, - // } = target; - // let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; - // - // // setup the launch context - // let ctx = ctx - // .with_configured_globals() - // // load the toml config - // .with_loaded_toml_config(config)? - // // add resolved peers - // .with_resolved_peers().await? - // // attach the database - // .attach(database.clone()) - // // ensure certain settings take effect - // .with_adjusted_configs() - // // Create the provider factory - // .with_provider_factory().await? - // .inspect(|_| { - // info!(target: "reth::cli", "Database opened"); - // }) - // .with_prometheus_server().await? - // .inspect(|this| { - // debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); - // }) - // .with_genesis()? - // .inspect(|this: &LaunchContextWith, _>>| { - // info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); - // }) - // .with_metrics_task() - // // passing FullNodeTypes as type parameter here so that we can build - // // later the components. - // .with_blockchain_db::(move |provider_factory| { - // Ok(BlockchainProvider2::new(provider_factory)?) - // })? - // .with_components(components_builder, on_component_initialized).await?; - // - // // spawn exexs - // let exex_manager_handle = ExExLauncher::new( - // ctx.head(), - // ctx.node_adapter().clone(), - // installed_exex, - // ctx.configs().clone(), - // ) - // .launch() - // .await?; - // - // // create pipeline - // let network_client = ctx.components().network().fetch_client().await?; - // let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); - // - // let node_config = ctx.node_config(); - // let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) - // .maybe_skip_fcu(node_config.debug.skip_fcu) - // .maybe_skip_new_payload(node_config.debug.skip_new_payload) - // .maybe_reorg( - // ctx.blockchain_db().clone(), - // ctx.components().evm_config().clone(), - // reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()), - // node_config.debug.reorg_frequency, - // node_config.debug.reorg_depth, - // ) - // // Store messages _after_ skipping so that `replay-engine` command - // // would replay only the messages that were observed by the engine - // // during this run. - // .maybe_store_messages(node_config.debug.engine_api_store.clone()); - // - // let max_block = ctx.max_block(network_client.clone()).await?; - // let mut hooks = EngineHooks::new(); - // - // let static_file_producer = ctx.static_file_producer(); - // let static_file_producer_events = static_file_producer.lock().events(); - // hooks.add(StaticFileHook::new( - // static_file_producer.clone(), - // Box::new(ctx.task_executor().clone()), - // )); - // info!(target: "reth::cli", "StaticFileProducer initialized"); - // - // // Configure the pipeline - // let pipeline_exex_handle = - // exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); - // let pipeline = build_networked_pipeline( - // &ctx.toml_config().stages, - // network_client.clone(), - // ctx.consensus(), - // ctx.provider_factory().clone(), - // ctx.task_executor(), - // ctx.sync_metrics_tx(), - // ctx.prune_config(), - // max_block, - // static_file_producer, - // ctx.components().block_executor().clone(), - // pipeline_exex_handle, - // )?; - // - // // The new engine writes directly to static files. This ensures that they're up to the tip. - // pipeline.move_to_static_files()?; - // - // let pipeline_events = pipeline.events(); - // - // let mut pruner_builder = ctx.pruner_builder(); - // if let Some(exex_manager_handle) = &exex_manager_handle { - // pruner_builder = - // pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); - // } - // let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone()); - // let pruner_events = pruner.events(); - // info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); - // - // let event_sender = EventSender::default(); - // let beacon_engine_handle = - // BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone()); - // - // // extract the jwt secret from the args if possible - // let jwt_secret = ctx.auth_jwt_secret()?; - // - // let add_ons_ctx = AddOnsContext { - // node: ctx.node_adapter().clone(), - // config: ctx.node_config(), - // beacon_engine_handle: beacon_engine_handle.clone(), - // jwt_secret, - // }; - // let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?; - // - // let mut engine_service = if ctx.is_dev() { - // let eth_service = LocalEngineService::new( - // ctx.consensus(), - // ctx.components().block_executor().clone(), - // ctx.provider_factory().clone(), - // ctx.blockchain_db().clone(), - // pruner, - // ctx.components().payload_builder().clone(), - // engine_payload_validator, - // engine_tree_config, - // ctx.invalid_block_hook()?, - // ctx.sync_metrics_tx(), - // consensus_engine_tx.clone(), - // Box::pin(consensus_engine_stream), - // ctx.dev_mining_mode(ctx.components().pool()), - // LocalPayloadAttributesBuilder::new(ctx.chain_spec()), - // ); - // - // Either::Left(eth_service) - // } else { - // let eth_service = EngineService::new( - // ctx.consensus(), - // ctx.components().block_executor().clone(), - // ctx.chain_spec(), - // network_client.clone(), - // Box::pin(consensus_engine_stream), - // pipeline, - // Box::new(ctx.task_executor().clone()), - // ctx.provider_factory().clone(), - // ctx.blockchain_db().clone(), - // pruner, - // ctx.components().payload_builder().clone(), - // engine_payload_validator, - // engine_tree_config, - // ctx.invalid_block_hook()?, - // ctx.sync_metrics_tx(), - // ); - // - // Either::Right(eth_service) - // }; - // - // info!(target: "reth::cli", "Consensus engine initialized"); - // - // let events = stream_select!( - // beacon_engine_handle.event_listener().map(Into::into), - // pipeline_events.map(Into::into), - // if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { - // Either::Left( - // ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) - // .map(Into::into), - // ) - // } else { - // Either::Right(stream::empty()) - // }, - // pruner_events.map(Into::into), - // static_file_producer_events.map(Into::into), - // ); - // ctx.task_executor().spawn_critical( - // "events task", - // node::handle_events( - // Some(Box::new(ctx.components().network().clone())), - // Some(ctx.head().number), - // events, - // ), - // ); - // - // let RpcHandle { rpc_server_handles, rpc_registry } = - // add_ons.launch_add_ons(add_ons_ctx).await?; - // - // // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104 - // if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { - // info!(target: "reth::cli", "Using etherscan as consensus client"); - // - // let chain = ctx.node_config().chain.chain(); - // let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { - // // If URL isn't provided, use default Etherscan URL for the chain if it is known - // chain - // .etherscan_urls() - // .map(|urls| urls.0.to_string()) - // .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) - // })?; - // - // let block_provider = EtherscanBlockProvider::new( - // etherscan_url, - // chain.etherscan_api_key().ok_or_else(|| { - // eyre::eyre!( - // "etherscan api key not found for rpc consensus client for chain: {chain}" - // ) - // })?, - // ); - // let rpc_consensus_client = DebugConsensusClient::new( - // rpc_server_handles.auth.clone(), - // Arc::new(block_provider), - // ); - // ctx.task_executor().spawn_critical("etherscan consensus client", async move { - // rpc_consensus_client.run::<::Engine>().await - // }); - // } - // - // // Run consensus engine to completion - // let initial_target = ctx.initial_backfill_target()?; - // let network_handle = ctx.components().network().clone(); - // let mut built_payloads = ctx - // .components() - // .payload_builder() - // .subscribe() - // .await - // .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))? - // .into_built_payload_stream() - // .fuse(); - // let chainspec = ctx.chain_spec(); - // let (exit, rx) = oneshot::channel(); - // let terminate_after_backfill = ctx.terminate_after_initial_backfill(); - // - // info!(target: "reth::cli", "Starting consensus engine"); - // ctx.task_executor().spawn_critical("consensus engine", async move { - // if let Some(initial_target) = initial_target { - // debug!(target: "reth::cli", %initial_target, "start backfill sync"); - // if let Either::Right(eth_service) = &mut engine_service { - // eth_service.orchestrator_mut().start_backfill_sync(initial_target); - // } - // } - // - // let mut res = Ok(()); - // - // // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL - // loop { - // tokio::select! { - // payload = built_payloads.select_next_some() => { - // if let Some(executed_block) = payload.executed_block() { - // debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); - // if let Either::Right(eth_service) = &mut engine_service { - // eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); - // } - // } - // } - // event = engine_service.next() => { - // let Some(event) = event else { break }; - // debug!(target: "reth::cli", "Event: {event}"); - // match event { - // ChainEvent::BackfillSyncFinished => { - // if terminate_after_backfill { - // debug!(target: "reth::cli", "Terminating after initial backfill"); - // break - // } - // - // network_handle.update_sync_state(SyncState::Idle); - // } - // ChainEvent::BackfillSyncStarted => { - // network_handle.update_sync_state(SyncState::Syncing); - // } - // ChainEvent::FatalError => { - // error!(target: "reth::cli", "Fatal error in consensus engine"); - // res = Err(eyre::eyre!("Fatal error in consensus engine")); - // break - // } - // ChainEvent::Handler(ev) => { - // if let Some(head) = ev.canonical_header() { - // let head_block = Head { - // number: head.number, - // hash: head.hash(), - // difficulty: head.difficulty, - // timestamp: head.timestamp, - // total_difficulty: chainspec - // .final_paris_total_difficulty(head.number) - // .unwrap_or_default(), - // }; - // network_handle.update_status(head_block); - // } - // event_sender.notify(ev); - // } - // } - // } - // } - // } - // - // let _ = exit.send(res); - // }); - // - // let full_node = FullNode { - // evm_config: ctx.components().evm_config().clone(), - // block_executor: ctx.components().block_executor().clone(), - // pool: ctx.components().pool().clone(), - // network: ctx.components().network().clone(), - // provider: ctx.node_adapter().provider.clone(), - // payload_builder: ctx.components().payload_builder().clone(), - // task_executor: ctx.task_executor().clone(), - // config: ctx.node_config().clone(), - // data_dir: ctx.data_dir().clone(), - // add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry }, - // }; - // // Notify on node started - // on_node_started.on_event(FullNode::clone(&full_node))?; - // - // let handle = NodeHandle { - // node_exit_future: NodeExitFuture::new( - // async { rx.await? }, - // full_node.config.debug.terminate, - // ), - // node: full_node, - // }; - // - // Ok(handle) + let Self { ctx, engine_tree_config } = self; + let NodeBuilderWithComponents { + adapter: NodeTypesAdapter { database }, + components_builder, + add_ons: AddOns { hooks, exexs: installed_exex, add_ons }, + config, + } = target; + let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; + + // setup the launch context + let ctx = ctx + .with_configured_globals() + // load the toml config + .with_loaded_toml_config(config)? + // add resolved peers + .with_resolved_peers().await? + // attach the database + .attach(database.clone()) + // ensure certain settings take effect + .with_adjusted_configs() + // Create the provider factory + .with_provider_factory().await? + .inspect(|_| { + info!(target: "reth::cli", "Database opened"); + }) + .with_prometheus_server().await? + .inspect(|this| { + debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis"); + }) + .with_genesis()? + .inspect(|this: &LaunchContextWith, _>>| { + info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks()); + }) + .with_metrics_task() + // passing FullNodeTypes as type parameter here so that we can build + // later the components. + .with_blockchain_db::(move |provider_factory| { + Ok(BlockchainProvider2::new(provider_factory)?) + })? + .with_components(components_builder, on_component_initialized).await?; + + // spawn exexs + let exex_manager_handle = ExExLauncher::new( + ctx.head(), + ctx.node_adapter().clone(), + installed_exex, + ctx.configs().clone(), + ) + .launch() + .await?; + + // create pipeline + let network_client = ctx.components().network().fetch_client().await?; + let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); + + let node_config = ctx.node_config(); + let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx) + .maybe_skip_fcu(node_config.debug.skip_fcu) + .maybe_skip_new_payload(node_config.debug.skip_new_payload) + .maybe_reorg( + ctx.blockchain_db().clone(), + ctx.components().evm_config().clone(), + reth_payload_validator::ExecutionPayloadValidator::new(ctx.chain_spec()), + node_config.debug.reorg_frequency, + node_config.debug.reorg_depth, + ) + // Store messages _after_ skipping so that `replay-engine` command + // would replay only the messages that were observed by the engine + // during this run. + .maybe_store_messages(node_config.debug.engine_api_store.clone()); + + let max_block = ctx.max_block(network_client.clone()).await?; + let mut hooks = EngineHooks::new(); + + let static_file_producer = ctx.static_file_producer(); + let static_file_producer_events = static_file_producer.lock().events(); + hooks.add(StaticFileHook::new( + static_file_producer.clone(), + Box::new(ctx.task_executor().clone()), + )); + info!(target: "reth::cli", "StaticFileProducer initialized"); + + // Configure the pipeline + let pipeline_exex_handle = + exex_manager_handle.clone().unwrap_or_else(ExExManagerHandle::empty); + let pipeline = build_networked_pipeline( + &ctx.toml_config().stages, + network_client.clone(), + ctx.consensus(), + ctx.provider_factory().clone(), + ctx.task_executor(), + ctx.sync_metrics_tx(), + ctx.prune_config(), + max_block, + static_file_producer, + ctx.components().block_executor().clone(), + pipeline_exex_handle, + )?; + + // The new engine writes directly to static files. This ensures that they're up to the tip. + pipeline.move_to_static_files()?; + + let pipeline_events = pipeline.events(); + + let mut pruner_builder = ctx.pruner_builder(); + if let Some(exex_manager_handle) = &exex_manager_handle { + pruner_builder = + pruner_builder.finished_exex_height(exex_manager_handle.finished_height()); + } + let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone()); + let pruner_events = pruner.events(); + info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); + + let event_sender = EventSender::default(); + let beacon_engine_handle = + BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone()); + + // extract the jwt secret from the args if possible + let jwt_secret = ctx.auth_jwt_secret()?; + + let add_ons_ctx = AddOnsContext { + node: ctx.node_adapter().clone(), + config: ctx.node_config(), + beacon_engine_handle: beacon_engine_handle.clone(), + jwt_secret, + }; + let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?; + + let mut engine_service = if ctx.is_dev() { + let eth_service = LocalEngineService::new( + ctx.consensus(), + ctx.components().block_executor().clone(), + ctx.provider_factory().clone(), + ctx.blockchain_db().clone(), + pruner, + ctx.components().payload_builder().clone(), + engine_payload_validator, + engine_tree_config, + ctx.invalid_block_hook()?, + ctx.sync_metrics_tx(), + consensus_engine_tx.clone(), + Box::pin(consensus_engine_stream), + ctx.dev_mining_mode(ctx.components().pool()), + LocalPayloadAttributesBuilder::new(ctx.chain_spec()), + ); + + Either::Left(eth_service) + } else { + let eth_service = EngineService::new( + ctx.consensus(), + ctx.components().block_executor().clone(), + ctx.chain_spec(), + network_client.clone(), + Box::pin(consensus_engine_stream), + pipeline, + Box::new(ctx.task_executor().clone()), + ctx.provider_factory().clone(), + ctx.blockchain_db().clone(), + pruner, + ctx.components().payload_builder().clone(), + engine_payload_validator, + engine_tree_config, + ctx.invalid_block_hook()?, + ctx.sync_metrics_tx(), + ); + + Either::Right(eth_service) + }; + + info!(target: "reth::cli", "Consensus engine initialized"); + + let events = stream_select!( + beacon_engine_handle.event_listener().map(Into::into), + pipeline_events.map(Into::into), + if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + }, + pruner_events.map(Into::into), + static_file_producer_events.map(Into::into), + ); + ctx.task_executor().spawn_critical( + "events task", + node::handle_events( + Some(Box::new(ctx.components().network().clone())), + Some(ctx.head().number), + events, + ), + ); + + let RpcHandle { rpc_server_handles, rpc_registry } = + add_ons.launch_add_ons(add_ons_ctx).await?; + + // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104 + if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() { + info!(target: "reth::cli", "Using etherscan as consensus client"); + + let chain = ctx.node_config().chain.chain(); + let etherscan_url = maybe_custom_etherscan_url.map(Ok).unwrap_or_else(|| { + // If URL isn't provided, use default Etherscan URL for the chain if it is known + chain + .etherscan_urls() + .map(|urls| urls.0.to_string()) + .ok_or_else(|| eyre::eyre!("failed to get etherscan url for chain: {chain}")) + })?; + + let block_provider = EtherscanBlockProvider::new( + etherscan_url, + chain.etherscan_api_key().ok_or_else(|| { + eyre::eyre!( + "etherscan api key not found for rpc consensus client for chain: {chain}" + ) + })?, + ); + let rpc_consensus_client = DebugConsensusClient::new( + rpc_server_handles.auth.clone(), + Arc::new(block_provider), + ); + ctx.task_executor().spawn_critical("etherscan consensus client", async move { + rpc_consensus_client.run::<::Engine>().await + }); + } + + // Run consensus engine to completion + let initial_target = ctx.initial_backfill_target()?; + let network_handle = ctx.components().network().clone(); + let mut built_payloads = ctx + .components() + .payload_builder() + .subscribe() + .await + .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))? + .into_built_payload_stream() + .fuse(); + let chainspec = ctx.chain_spec(); + let (exit, rx) = oneshot::channel(); + let terminate_after_backfill = ctx.terminate_after_initial_backfill(); + + info!(target: "reth::cli", "Starting consensus engine"); + ctx.task_executor().spawn_critical("consensus engine", async move { + if let Some(initial_target) = initial_target { + debug!(target: "reth::cli", %initial_target, "start backfill sync"); + if let Either::Right(eth_service) = &mut engine_service { + eth_service.orchestrator_mut().start_backfill_sync(initial_target); + } + } + + let mut res = Ok(()); + + // advance the chain and await payloads built locally to add into the engine api tree handler to prevent re-execution if that block is received as payload from the CL + loop { + tokio::select! { + payload = built_payloads.select_next_some() => { + if let Some(executed_block) = payload.executed_block() { + debug!(target: "reth::cli", block=?executed_block.block().num_hash(), "inserting built payload"); + if let Either::Right(eth_service) = &mut engine_service { + eth_service.orchestrator_mut().handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block).into()); + } + } + } + event = engine_service.next() => { + let Some(event) = event else { break }; + debug!(target: "reth::cli", "Event: {event}"); + match event { + ChainEvent::BackfillSyncFinished => { + if terminate_after_backfill { + debug!(target: "reth::cli", "Terminating after initial backfill"); + break + } + + network_handle.update_sync_state(SyncState::Idle); + } + ChainEvent::BackfillSyncStarted => { + network_handle.update_sync_state(SyncState::Syncing); + } + ChainEvent::FatalError => { + error!(target: "reth::cli", "Fatal error in consensus engine"); + res = Err(eyre::eyre!("Fatal error in consensus engine")); + break + } + ChainEvent::Handler(ev) => { + if let Some(head) = ev.canonical_header() { + let head_block = Head { + number: head.number, + hash: head.hash(), + difficulty: head.difficulty, + timestamp: head.timestamp, + total_difficulty: chainspec + .final_paris_total_difficulty(head.number) + .unwrap_or_default(), + }; + network_handle.update_status(head_block); + } + event_sender.notify(ev); + } + } + } + } + } + + let _ = exit.send(res); + }); + + let full_node = FullNode { + evm_config: ctx.components().evm_config().clone(), + block_executor: ctx.components().block_executor().clone(), + pool: ctx.components().pool().clone(), + network: ctx.components().network().clone(), + provider: ctx.node_adapter().provider.clone(), + payload_builder: ctx.components().payload_builder().clone(), + task_executor: ctx.task_executor().clone(), + config: ctx.node_config().clone(), + data_dir: ctx.data_dir().clone(), + add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry }, + }; + // Notify on node started + on_node_started.on_event(FullNode::clone(&full_node))?; + + let handle = NodeHandle { + node_exit_future: NodeExitFuture::new( + async { rx.await? }, + full_node.config.debug.terminate, + ), + node: full_node, + }; + + Ok(handle) } } diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 627145d2df7a..9f2c027f76b5 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -70,7 +70,7 @@ pub trait LaunchNode { type Node; /// Create and return a new node asynchronously. - fn launch_node(self, target: Target) -> impl Future> + Send; + fn launch_node(self, target: Target) -> impl Future>; } impl LaunchNode for F @@ -80,7 +80,7 @@ where { type Node = Node; - fn launch_node(self, target: Target) -> impl Future> + Send { + fn launch_node(self, target: Target) -> impl Future> { self(target) } } diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index e2f147f46946..55313f3e9898 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -584,12 +584,11 @@ pub trait EngineValidatorAddOn: Send { fn engine_validator( &self, ctx: &AddOnsContext<'_, Node>, - ) -> impl Future> + Send; + ) -> impl Future>; } impl EngineValidatorAddOn for RpcAddOns where - Self: Sync, N: FullNodeComponents, EthApi: EthApiTypes, EV: EngineValidatorBuilder, @@ -602,7 +601,7 @@ where } /// A type that knows how to build the engine validator. -pub trait EngineValidatorBuilder: Send + Clone { +pub trait EngineValidatorBuilder: Send + Sync + Clone { /// The consensus implementation to build. type Validator: EngineValidator<::Engine>; @@ -618,7 +617,7 @@ where Node: FullNodeComponents, Validator: EngineValidator<::Engine> + Clone + Unpin + 'static, - F: FnOnce(&AddOnsContext<'_, Node>) -> Fut + Send + Clone, + F: FnOnce(&AddOnsContext<'_, Node>) -> Fut + Send + Sync + Clone, Fut: Future> + Send, { type Validator = Validator; From d57db73b01722251873c57a54f7c422e7cf097c4 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 28 Nov 2024 15:12:29 +0100 Subject: [PATCH 3/3] fmt --- crates/e2e-test-utils/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/e2e-test-utils/src/lib.rs b/crates/e2e-test-utils/src/lib.rs index 1ac3121bf32b..15065377fabf 100644 --- a/crates/e2e-test-utils/src/lib.rs +++ b/crates/e2e-test-utils/src/lib.rs @@ -5,6 +5,7 @@ use reth_chainspec::EthChainSpec; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_engine_local::LocalPayloadAttributesBuilder; use reth_network_api::test_utils::PeersHandleProvider; +use reth_node_api::EngineValidator; use reth_node_builder::{ components::NodeComponentsBuilder, rpc::{EngineValidatorAddOn, RethRpcAddOns}, @@ -18,7 +19,6 @@ use reth_provider::providers::{ BlockchainProvider, BlockchainProvider2, NodeTypesForProvider, NodeTypesForTree, }; use reth_rpc_server_types::RpcModuleSelection; -use reth_node_api::EngineValidator; use reth_tasks::TaskManager; use std::sync::Arc; use tracing::{span, Level};