From 8513037323a50b00938e142de3935793ccf5d55a Mon Sep 17 00:00:00 2001 From: allnil Date: Sat, 16 Dec 2023 15:17:51 +0000 Subject: [PATCH 01/16] feat: explore payload builder, prepare boilerplate code for the next CachedReads preload --- Cargo.lock | 1 + bin/reth/src/cli/ext.rs | 4 +++- crates/payload/builder/Cargo.toml | 1 + crates/payload/builder/src/service.rs | 15 +++++++++++---- 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 881de09d6d0d..678d68f994c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6178,6 +6178,7 @@ dependencies = [ "reth-interfaces", "reth-metrics", "reth-primitives", + "reth-provider", "reth-rpc-types", "reth-rpc-types-compat", "reth-transaction-pool", diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 4bfbeb64d105..ebbef54c7c6d 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -161,7 +161,9 @@ pub trait RethNodeCommandConfig: fmt::Debug { components.chain_spec(), payload_builder, ); - let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); + let events_clone = Box::pin(components.events().clone()); + let (payload_service, payload_builder) = + PayloadBuilderService::new(payload_generator, events_clone); components .task_executor() diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 056629ce69dd..31b3a68ca3b1 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -15,6 +15,7 @@ reth-rpc-types.workspace = true reth-transaction-pool.workspace = true reth-interfaces.workspace = true reth-rpc-types-compat.workspace = true +reth-provider.workspace = true # ethereum alloy-rlp.workspace = true diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 300e514037fe..40677360da6c 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -8,6 +8,7 @@ use crate::{ BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; use futures_util::{future::FutureExt, StreamExt}; +use reth_provider::CanonStateSubscriptions; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -160,9 +161,10 @@ impl PayloadBuilderHandle { /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where Gen: PayloadJobGenerator, + St: CanonStateSubscriptions + Clone + 'static, { /// The type that knows how to create new payloads. generator: Gen, @@ -174,17 +176,20 @@ where command_rx: UnboundedReceiverStream, /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, + /// CanoncialState notification channel + events: St, } // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where Gen: PayloadJobGenerator, + St: CanonStateSubscriptions + Clone + 'static, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) { + pub fn new(generator: Gen, mut events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -192,6 +197,7 @@ where service_tx, command_rx: UnboundedReceiverStream::new(command_rx), metrics: Default::default(), + events, }; let handle = service.handle(); (service, handle) @@ -271,10 +277,11 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, + St: CanonStateSubscriptions + Unpin + Clone + 'static, { type Output = (); From caf046c630d87cc91a399f974aa7833b6d2bae18 Mon Sep 17 00:00:00 2001 From: allnil Date: Sun, 17 Dec 2023 17:11:11 +0000 Subject: [PATCH 02/16] feat: pass canon state notification stream to payload loader, add new noop function on_new_state --- Cargo.lock | 1 + bin/reth/Cargo.toml | 1 + bin/reth/src/cli/ext.rs | 16 +++++++++++----- bin/reth/src/node/mod.rs | 4 +++- crates/payload/basic/src/lib.rs | 4 ++++ crates/payload/builder/src/service.rs | 18 +++++++++--------- crates/payload/builder/src/traits.rs | 3 +++ 7 files changed, 32 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 678d68f994c2..42368dc67d83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5556,6 +5556,7 @@ dependencies = [ "eyre", "fdlimit", "futures", + "futures-util", "human_bytes", "humantime", "hyper", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 1619c0cf8a3e..ce7dfe4ff3a0 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -104,6 +104,7 @@ const-str = "0.5.6" boyer-moore-magiclen = "0.2.16" itertools.workspace = true rayon.workspace = true +futures-util.workspace = true [target.'cfg(not(windows))'.dependencies] jemallocator = { version = "0.5.0", optional = true } diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index ebbef54c7c6d..30bee347b0fc 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -5,8 +5,11 @@ use crate::cli::{ config::{PayloadBuilderConfig, RethNetworkConfig, RethRpcConfig}, }; use clap::Args; + +use futures_util::Stream; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; +use reth_provider::CanonStateNotification; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; @@ -124,14 +127,16 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// By default this spawns a [BasicPayloadJobGenerator] with the default configuration /// [BasicPayloadJobGeneratorConfig]. - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, components: &Reth, + chain_events: St, ) -> eyre::Result where Conf: PayloadBuilderConfig, Reth: RethNodeComponents, + St: Stream + Send + Unpin + 'static, { let payload_job_config = BasicPayloadJobGeneratorConfig::default() .interval(conf.interval()) @@ -161,9 +166,8 @@ pub trait RethNodeCommandConfig: fmt::Debug { components.chain_spec(), payload_builder, ); - let events_clone = Box::pin(components.events().clone()); let (payload_service, payload_builder) = - PayloadBuilderService::new(payload_generator, events_clone); + PayloadBuilderService::new(payload_generator, chain_events); components .task_executor() @@ -312,18 +316,20 @@ impl RethNodeCommandConfig for NoArgs { } } - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, components: &Reth, + chain_events: St, ) -> eyre::Result where Conf: PayloadBuilderConfig, Reth: RethNodeComponents, + St: Stream + Send + Unpin + 'static, { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? - .spawn_payload_builder_service(conf, components) + .spawn_payload_builder_service(conf, components, chain_events) } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b7b4c520c128..3fa2ca13d278 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -383,7 +383,9 @@ impl NodeCommand { self.ext.on_components_initialized(&components)?; debug!(target: "reth::cli", "Spawning payload builder service"); - let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; + let chain_events = blockchain_db.canonical_state_stream(); + let payload_builder = + self.ext.spawn_payload_builder_service(&self.builder, &components, chain_events)?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); let max_block = if let Some(block) = self.debug.max_block { diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index a275ad5d78a1..f61bd38616bd 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -217,6 +217,10 @@ where builder: self.builder.clone(), }) } + + fn on_new_state(&self) { + () + } } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 40677360da6c..f18c73a2f7a3 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,8 +7,8 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; -use futures_util::{future::FutureExt, StreamExt}; -use reth_provider::CanonStateSubscriptions; +use futures_util::{future::FutureExt, Stream, StreamExt}; +use reth_provider::CanonStateNotification; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -164,7 +164,7 @@ impl PayloadBuilderHandle { pub struct PayloadBuilderService where Gen: PayloadJobGenerator, - St: CanonStateSubscriptions + Clone + 'static, + St: Stream + Send + Unpin + 'static, { /// The type that knows how to create new payloads. generator: Gen, @@ -176,8 +176,8 @@ where command_rx: UnboundedReceiverStream, /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, - /// CanoncialState notification channel - events: St, + /// Chain events notification stream + chain_events: St, } // === impl PayloadBuilderService === @@ -185,11 +185,11 @@ where impl PayloadBuilderService where Gen: PayloadJobGenerator, - St: CanonStateSubscriptions + Clone + 'static, + St: Stream + Send + Unpin + 'static, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new(generator: Gen, mut events: St) -> (Self, PayloadBuilderHandle) { + pub fn new(generator: Gen, mut chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -197,7 +197,7 @@ where service_tx, command_rx: UnboundedReceiverStream::new(command_rx), metrics: Default::default(), - events, + chain_events, }; let handle = service.handle(); (service, handle) @@ -281,7 +281,7 @@ impl Future for PayloadBuilderService where Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, - St: CanonStateSubscriptions + Unpin + Clone + 'static, + St: Stream + Send + Unpin + 'static, { type Output = (); diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index ab118709fa41..12c78336e682 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -80,4 +80,7 @@ pub trait PayloadJobGenerator: Send + Sync { &self, attr: PayloadBuilderAttributes, ) -> Result; + + /// New func on_new_state + fn on_new_state(&self); } From 558313a426302ee0e40f0c1765423b3d249ea53b Mon Sep 17 00:00:00 2001 From: allnil Date: Mon, 18 Dec 2023 19:37:34 +0000 Subject: [PATCH 03/16] save progress --- bin/reth/src/cli/ext.rs | 8 ++++++++ crates/payload/basic/src/lib.rs | 4 +--- crates/payload/builder/src/service.rs | 2 ++ crates/payload/builder/src/traits.rs | 2 +- crates/transaction-pool/src/maintain.rs | 2 +- 5 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 30bee347b0fc..92fe49f95f3d 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -169,6 +169,14 @@ pub trait RethNodeCommandConfig: fmt::Debug { let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator, chain_events); + components.task_executor().spawn_critical( + "cache canonical blocks for payload builder service", + Box::pin(async move { + payload_builder.clone().chain_updates().await; + }), + ); + + components .task_executor() .spawn_critical("payload builder service", Box::pin(payload_service)); diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index f61bd38616bd..10fd8595de55 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -218,9 +218,7 @@ where }) } - fn on_new_state(&self) { - () - } + fn on_new_state(&self) {} } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index f18c73a2f7a3..18fc25c50770 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -199,6 +199,7 @@ where metrics: Default::default(), chain_events, }; + let handle = service.handle(); (service, handle) } @@ -288,6 +289,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); + loop { // we poll all jobs first, so we always have the latest payload that we can report if // requests diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 12c78336e682..5039b4cbfb07 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -81,6 +81,6 @@ pub trait PayloadJobGenerator: Send + Sync { attr: PayloadBuilderAttributes, ) -> Result; - /// New func on_new_state + /// Handles new chain state events fn on_new_state(&self); } diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index db74f50f2f8b..79272a9ed296 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -160,7 +160,7 @@ pub async fn maintain_transaction_pool( let _ = tx.send(res); } .boxed() - }; + ; reload_accounts_fut = rx.fuse(); task_spawner.spawn_blocking(fut); } From 36befd28f6b300806955daddd688c753e430e60a Mon Sep 17 00:00:00 2001 From: allnil Date: Wed, 20 Dec 2023 16:11:12 +0000 Subject: [PATCH 04/16] remove generic stream type for chain events --- bin/reth/src/cli/ext.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 92fe49f95f3d..4e8d62e17669 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -127,16 +127,14 @@ pub trait RethNodeCommandConfig: fmt::Debug { /// /// By default this spawns a [BasicPayloadJobGenerator] with the default configuration /// [BasicPayloadJobGeneratorConfig]. - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, components: &Reth, - chain_events: St, ) -> eyre::Result where Conf: PayloadBuilderConfig, Reth: RethNodeComponents, - St: Stream + Send + Unpin + 'static, { let payload_job_config = BasicPayloadJobGeneratorConfig::default() .interval(conf.interval()) @@ -324,16 +322,14 @@ impl RethNodeCommandConfig for NoArgs { } } - fn spawn_payload_builder_service( + fn spawn_payload_builder_service( &mut self, conf: &Conf, components: &Reth, - chain_events: St, ) -> eyre::Result where Conf: PayloadBuilderConfig, Reth: RethNodeComponents, - St: Stream + Send + Unpin + 'static, { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? From 59fbc4812e7f439eecc780524524bb2948ad81a5 Mon Sep 17 00:00:00 2001 From: allnil Date: Wed, 20 Dec 2023 16:39:56 +0000 Subject: [PATCH 05/16] pass reth components events stream in payload builder service, add bound Events with Unpin --- bin/reth/src/cli/components.rs | 4 ++-- bin/reth/src/cli/ext.rs | 13 ++----------- bin/reth/src/node/mod.rs | 4 +--- crates/payload/builder/src/service.rs | 11 +++++------ crates/transaction-pool/src/maintain.rs | 4 ++-- 5 files changed, 12 insertions(+), 24 deletions(-) diff --git a/bin/reth/src/cli/components.rs b/bin/reth/src/cli/components.rs index 18ef804f4a11..ea782ddc108b 100644 --- a/bin/reth/src/cli/components.rs +++ b/bin/reth/src/cli/components.rs @@ -50,7 +50,7 @@ pub trait RethNodeComponents: Clone + Send + Sync + 'static { /// The network type used to communicate with p2p. type Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static; /// The events type used to create subscriptions. - type Events: CanonStateSubscriptions + Clone + 'static; + type Events: CanonStateSubscriptions + Clone + Unpin + 'static; /// The type that is used to spawn tasks. type Tasks: TaskSpawner + Clone + Unpin + 'static; @@ -118,7 +118,7 @@ where Tasks: TaskSpawner + Clone + Unpin + 'static, Pool: TransactionPool + Clone + Unpin + 'static, Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, + Events: CanonStateSubscriptions + Clone + Unpin + 'static, { type Provider = Provider; type Pool = Pool; diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 4e8d62e17669..3644124e436f 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -9,7 +9,6 @@ use clap::Args; use futures_util::Stream; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; -use reth_provider::CanonStateNotification; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; @@ -165,15 +164,7 @@ pub trait RethNodeCommandConfig: fmt::Debug { payload_builder, ); let (payload_service, payload_builder) = - PayloadBuilderService::new(payload_generator, chain_events); - - components.task_executor().spawn_critical( - "cache canonical blocks for payload builder service", - Box::pin(async move { - payload_builder.clone().chain_updates().await; - }), - ); - + PayloadBuilderService::new(payload_generator, components.events()); components .task_executor() @@ -333,7 +324,7 @@ impl RethNodeCommandConfig for NoArgs { { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? - .spawn_payload_builder_service(conf, components, chain_events) + .spawn_payload_builder_service(conf, components) } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index eef5c065f6a8..97df7d295ce2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -384,9 +384,7 @@ impl NodeCommand { self.ext.on_components_initialized(&components)?; debug!(target: "reth::cli", "Spawning payload builder service"); - let chain_events = blockchain_db.canonical_state_stream(); - let payload_builder = - self.ext.spawn_payload_builder_service(&self.builder, &components, chain_events)?; + let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); let max_block = if let Some(block) = self.debug.max_block { diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 18fc25c50770..68b982d0bf01 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -8,7 +8,7 @@ use crate::{ BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; use futures_util::{future::FutureExt, Stream, StreamExt}; -use reth_provider::CanonStateNotification; +use reth_provider::CanonStateSubscriptions; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -164,7 +164,7 @@ impl PayloadBuilderHandle { pub struct PayloadBuilderService where Gen: PayloadJobGenerator, - St: Stream + Send + Unpin + 'static, + St: CanonStateSubscriptions + Send + Unpin + 'static, { /// The type that knows how to create new payloads. generator: Gen, @@ -185,11 +185,11 @@ where impl PayloadBuilderService where Gen: PayloadJobGenerator, - St: Stream + Send + Unpin + 'static, + St: CanonStateSubscriptions + Send + Unpin + 'static, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new(generator: Gen, mut chain_events: St) -> (Self, PayloadBuilderHandle) { + pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -282,14 +282,13 @@ impl Future for PayloadBuilderService where Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, - St: Stream + Send + Unpin + 'static, + St: CanonStateSubscriptions + Send + Unpin + 'static, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - loop { // we poll all jobs first, so we always have the latest payload that we can report if // requests diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 79272a9ed296..49da8f1d1bf4 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -160,10 +160,10 @@ pub async fn maintain_transaction_pool( let _ = tx.send(res); } .boxed() - ; + }; reload_accounts_fut = rx.fuse(); task_spawner.spawn_blocking(fut); - } + }; // check if we have a new finalized block if let Some(finalized) = From 6b5bfd22019b9d88deb235b58193211ab79f8f19 Mon Sep 17 00:00:00 2001 From: allnil Date: Wed, 20 Dec 2023 20:37:29 +0000 Subject: [PATCH 06/16] refactor traits, add task to grab events and new tip --- Cargo.lock | 1 + crates/payload/basic/src/lib.rs | 18 ++++++++--- crates/payload/builder/Cargo.toml | 1 + crates/payload/builder/src/service.rs | 45 +++++++++++++++++++++------ crates/payload/builder/src/traits.rs | 9 +++++- 5 files changed, 59 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4f79210a2ff..8b1d966d0cb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6207,6 +6207,7 @@ dependencies = [ "reth-provider", "reth-rpc-types", "reth-rpc-types-compat", + "reth-tasks", "reth-transaction-pool", "revm", "revm-primitives", diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 10fd8595de55..2e91da634bed 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -110,6 +110,12 @@ impl BasicPayloadJobGenerator { EthereumPayloadBuilder, ) } + + + /// Returns a reference to the tasks type + pub fn tasks(&self) -> &Tasks { + &self.executor + } } impl BasicPayloadJobGenerator { @@ -160,16 +166,16 @@ impl BasicPayloadJobGenerator PayloadJobGenerator +impl PayloadJobGenerator for BasicPayloadJobGenerator where Client: StateProviderFactory + BlockReaderIdExt + Clone + Unpin + 'static, - Pool: TransactionPool + Unpin + 'static, + Pool: TransactionPool + Clone + Unpin + 'static, Tasks: TaskSpawner + Clone + Unpin + 'static, - Builder: PayloadBuilder + Unpin + 'static, + Builder: PayloadBuilder + Unpin + Clone + 'static, { type Job = BasicPayloadJob; - + fn new_payload_job( &self, attributes: PayloadBuilderAttributes, @@ -219,6 +225,10 @@ where } fn on_new_state(&self) {} + + fn tasks(&self) -> &Tasks { + &self.executor + } } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/Cargo.toml b/crates/payload/builder/Cargo.toml index 31b3a68ca3b1..295577b0b5ea 100644 --- a/crates/payload/builder/Cargo.toml +++ b/crates/payload/builder/Cargo.toml @@ -16,6 +16,7 @@ reth-transaction-pool.workspace = true reth-interfaces.workspace = true reth-rpc-types-compat.workspace = true reth-provider.workspace = true +reth-tasks.workspace = true # ethereum alloy-rlp.workspace = true diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 68b982d0bf01..1b0349cfdc89 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,7 +7,9 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; -use futures_util::{future::FutureExt, Stream, StreamExt}; +use reth_tasks::TaskSpawner; + +use futures_util::{future::FutureExt, StreamExt}; use reth_provider::CanonStateSubscriptions; use reth_rpc_types::engine::PayloadId; use std::{ @@ -16,8 +18,14 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, +}; + +use tokio::{ + sync::{mpsc, oneshot}, + time::timeout, }; -use tokio::sync::{mpsc, oneshot}; + use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, info, trace, warn}; @@ -161,10 +169,11 @@ impl PayloadBuilderHandle { /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where - Gen: PayloadJobGenerator, + Gen: PayloadJobGenerator, St: CanonStateSubscriptions + Send + Unpin + 'static, + Tasks: TaskSpawner + Unpin + 'static, { /// The type that knows how to create new payloads. generator: Gen, @@ -182,10 +191,11 @@ where // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where - Gen: PayloadJobGenerator, + Gen: PayloadJobGenerator, St: CanonStateSubscriptions + Send + Unpin + 'static, + Tasks: TaskSpawner + Unpin + 'static, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. @@ -278,18 +288,33 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where - Gen: PayloadJobGenerator + Unpin + 'static, - ::Job: Unpin + 'static, + Tasks: TaskSpawner + Unpin + 'static, + Gen: PayloadJobGenerator + Unpin + Clone + 'static, + >::Job: Unpin + 'static, St: CanonStateSubscriptions + Send + Unpin + 'static, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - loop { + let mut canon_events = this.chain_events.subscribe_to_canonical_state(); + this.generator.tasks().spawn_critical_blocking( + "rpc request", + Box::pin(async move { + // more than enough time for the next block + let duration = Duration::from_secs(15); + + // wait for canon event or timeout + let update = timeout(duration, canon_events.recv()) + .await + .expect("canon state should change before timeout") + .expect("canon events stream is still open"); + let new_tip = update.tip(); + }), + ); // we poll all jobs first, so we always have the latest payload that we can report if // requests // we don't care about the order of the jobs, so we can just swap_remove them diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 5039b4cbfb07..ddd423b75705 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,6 +1,7 @@ //! Trait abstractions used by the payload crate. use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; +use reth_tasks::TaskSpawner; use std::{future::Future, sync::Arc}; /// A type that can build a payload. @@ -62,7 +63,10 @@ pub enum KeepPayloadJobAlive { } /// A type that knows how to create new jobs for creating payloads. -pub trait PayloadJobGenerator: Send + Sync { +pub trait PayloadJobGenerator: Send + Sync +where + Tasks: TaskSpawner + Unpin + 'static, +{ /// The type that manages the lifecycle of a payload. /// /// This type is a future that yields better payloads. @@ -83,4 +87,7 @@ pub trait PayloadJobGenerator: Send + Sync { /// Handles new chain state events fn on_new_state(&self); + + /// Get executor + fn tasks(&self) -> &Tasks; } From 5ea77b3d250f5ef292b83e831bbb5003c099250a Mon Sep 17 00:00:00 2001 From: allnil Date: Fri, 22 Dec 2023 16:18:44 +0000 Subject: [PATCH 07/16] chore: refactor code, get canonical_chain_stream, remove generics, try to resolve compiler errors --- bin/reth/src/cli/ext.rs | 7 ++- bin/reth/src/node/mod.rs | 6 ++- crates/payload/basic/src/lib.rs | 13 ++---- crates/payload/builder/src/service.rs | 57 +++++++++---------------- crates/payload/builder/src/traits.rs | 9 +--- crates/transaction-pool/src/maintain.rs | 2 +- 6 files changed, 36 insertions(+), 58 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 3644124e436f..f315f22ffec2 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -9,6 +9,7 @@ use clap::Args; use futures_util::Stream; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; +use reth_provider::CanonStateNotificationStream; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; @@ -130,6 +131,7 @@ pub trait RethNodeCommandConfig: fmt::Debug { &mut self, conf: &Conf, components: &Reth, + chain_events: CanonStateNotificationStream, ) -> eyre::Result where Conf: PayloadBuilderConfig, @@ -164,7 +166,7 @@ pub trait RethNodeCommandConfig: fmt::Debug { payload_builder, ); let (payload_service, payload_builder) = - PayloadBuilderService::new(payload_generator, components.events()); + PayloadBuilderService::new(payload_generator, chain_events); components .task_executor() @@ -317,6 +319,7 @@ impl RethNodeCommandConfig for NoArgs { &mut self, conf: &Conf, components: &Reth, + chain_events: CanonStateNotificationStream, ) -> eyre::Result where Conf: PayloadBuilderConfig, @@ -324,7 +327,7 @@ impl RethNodeCommandConfig for NoArgs { { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? - .spawn_payload_builder_service(conf, components) + .spawn_payload_builder_service(conf, components, chain_events) } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 7cecca9f2690..45a1b77baf5a 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -383,7 +383,11 @@ impl NodeCommand { self.ext.on_components_initialized(&components)?; debug!(target: "reth::cli", "Spawning payload builder service"); - let payload_builder = self.ext.spawn_payload_builder_service(&self.builder, &components)?; + let payload_builder = self.ext.spawn_payload_builder_service( + &self.builder, + &components, + blockchain_db.canonical_state_stream(), + )?; let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel(); let max_block = if let Some(block) = self.debug.max_block { diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 2e91da634bed..b0884ffbb205 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -111,7 +111,6 @@ impl BasicPayloadJobGenerator { ) } - /// Returns a reference to the tasks type pub fn tasks(&self) -> &Tasks { &self.executor @@ -166,16 +165,16 @@ impl BasicPayloadJobGenerator PayloadJobGenerator +impl PayloadJobGenerator for BasicPayloadJobGenerator where Client: StateProviderFactory + BlockReaderIdExt + Clone + Unpin + 'static, - Pool: TransactionPool + Clone + Unpin + 'static, + Pool: TransactionPool + Unpin + 'static, Tasks: TaskSpawner + Clone + Unpin + 'static, - Builder: PayloadBuilder + Unpin + Clone + 'static, + Builder: PayloadBuilder + Unpin + 'static, { type Job = BasicPayloadJob; - + fn new_payload_job( &self, attributes: PayloadBuilderAttributes, @@ -225,10 +224,6 @@ where } fn on_new_state(&self) {} - - fn tasks(&self) -> &Tasks { - &self.executor - } } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 1b0349cfdc89..489ddf009627 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,10 +7,9 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; -use reth_tasks::TaskSpawner; use futures_util::{future::FutureExt, StreamExt}; -use reth_provider::CanonStateSubscriptions; +use reth_provider::CanonStateNotificationStream; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -18,13 +17,9 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Duration, }; -use tokio::{ - sync::{mpsc, oneshot}, - time::timeout, -}; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, info, trace, warn}; @@ -169,11 +164,9 @@ impl PayloadBuilderHandle { /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where - Gen: PayloadJobGenerator, - St: CanonStateSubscriptions + Send + Unpin + 'static, - Tasks: TaskSpawner + Unpin + 'static, + Gen: PayloadJobGenerator, { /// The type that knows how to create new payloads. generator: Gen, @@ -186,20 +179,21 @@ where /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, /// Chain events notification stream - chain_events: St, + chain_events: CanonStateNotificationStream, } // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where - Gen: PayloadJobGenerator, - St: CanonStateSubscriptions + Send + Unpin + 'static, - Tasks: TaskSpawner + Unpin + 'static, + Gen: PayloadJobGenerator, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { + pub fn new( + generator: Gen, + chain_events: CanonStateNotificationStream, + ) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -288,33 +282,22 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where - Tasks: TaskSpawner + Unpin + 'static, - Gen: PayloadJobGenerator + Unpin + Clone + 'static, - >::Job: Unpin + 'static, - St: CanonStateSubscriptions + Send + Unpin + 'static, + Gen: PayloadJobGenerator + Unpin + Clone + 'static, + ::Job: Unpin + 'static, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); loop { - let mut canon_events = this.chain_events.subscribe_to_canonical_state(); - this.generator.tasks().spawn_critical_blocking( - "rpc request", - Box::pin(async move { - // more than enough time for the next block - let duration = Duration::from_secs(15); - - // wait for canon event or timeout - let update = timeout(duration, canon_events.recv()) - .await - .expect("canon state should change before timeout") - .expect("canon events stream is still open"); - let new_tip = update.tip(); - }), - ); + // let e = self.chain_events.flat_map(|new_chain| { + // new_chain.committed().map(|chain| chain.state()) + // }); + + let e = this.chain_events.poll_next_unpin(cx); + // we poll all jobs first, so we always have the latest payload that we can report if // requests // we don't care about the order of the jobs, so we can just swap_remove them diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index ddd423b75705..5039b4cbfb07 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,7 +1,6 @@ //! Trait abstractions used by the payload crate. use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; -use reth_tasks::TaskSpawner; use std::{future::Future, sync::Arc}; /// A type that can build a payload. @@ -63,10 +62,7 @@ pub enum KeepPayloadJobAlive { } /// A type that knows how to create new jobs for creating payloads. -pub trait PayloadJobGenerator: Send + Sync -where - Tasks: TaskSpawner + Unpin + 'static, -{ +pub trait PayloadJobGenerator: Send + Sync { /// The type that manages the lifecycle of a payload. /// /// This type is a future that yields better payloads. @@ -87,7 +83,4 @@ where /// Handles new chain state events fn on_new_state(&self); - - /// Get executor - fn tasks(&self) -> &Tasks; } diff --git a/crates/transaction-pool/src/maintain.rs b/crates/transaction-pool/src/maintain.rs index 49da8f1d1bf4..db74f50f2f8b 100644 --- a/crates/transaction-pool/src/maintain.rs +++ b/crates/transaction-pool/src/maintain.rs @@ -163,7 +163,7 @@ pub async fn maintain_transaction_pool( }; reload_accounts_fut = rx.fuse(); task_spawner.spawn_blocking(fut); - }; + } // check if we have a new finalized block if let Some(finalized) = From 264f825043f3f58edd3d350906bc140dea489c97 Mon Sep 17 00:00:00 2001 From: allnil Date: Fri, 22 Dec 2023 16:20:37 +0000 Subject: [PATCH 08/16] chore: get back unpin from canon state notifications in components --- bin/reth/src/cli/components.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/reth/src/cli/components.rs b/bin/reth/src/cli/components.rs index ea782ddc108b..18ef804f4a11 100644 --- a/bin/reth/src/cli/components.rs +++ b/bin/reth/src/cli/components.rs @@ -50,7 +50,7 @@ pub trait RethNodeComponents: Clone + Send + Sync + 'static { /// The network type used to communicate with p2p. type Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static; /// The events type used to create subscriptions. - type Events: CanonStateSubscriptions + Clone + Unpin + 'static; + type Events: CanonStateSubscriptions + Clone + 'static; /// The type that is used to spawn tasks. type Tasks: TaskSpawner + Clone + Unpin + 'static; @@ -118,7 +118,7 @@ where Tasks: TaskSpawner + Clone + Unpin + 'static, Pool: TransactionPool + Clone + Unpin + 'static, Network: NetworkInfo + Peers + NetworkProtocols + NetworkEvents + Clone + 'static, - Events: CanonStateSubscriptions + Clone + Unpin + 'static, + Events: CanonStateSubscriptions + Clone + 'static, { type Provider = Provider; type Pool = Pool; From 706e9928d1d1c6fe813bb6c784affb6cb5aee6cf Mon Sep 17 00:00:00 2001 From: allnil Date: Fri, 22 Dec 2023 16:22:17 +0000 Subject: [PATCH 09/16] chore: remove clone trait from payload generator in future --- crates/payload/builder/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 489ddf009627..7d86d04f972c 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -284,7 +284,7 @@ where impl Future for PayloadBuilderService where - Gen: PayloadJobGenerator + Unpin + Clone + 'static, + Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, { type Output = (); From e1a31f27216f36a3148bd93601aa6e3337e603dc Mon Sep 17 00:00:00 2001 From: allnil Date: Fri, 22 Dec 2023 16:34:16 +0000 Subject: [PATCH 10/16] chore: successfully pass stream to the payload service, clean rubbish --- bin/reth/src/cli/ext.rs | 1 - crates/payload/builder/src/service.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index f315f22ffec2..b4b7bbd4215f 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -6,7 +6,6 @@ use crate::cli::{ }; use clap::Args; -use futures_util::Stream; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::CanonStateNotificationStream; diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 7d86d04f972c..a27f9fa6f230 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -296,7 +296,7 @@ where // new_chain.committed().map(|chain| chain.state()) // }); - let e = this.chain_events.poll_next_unpin(cx); + let _e = this.chain_events.poll_next_unpin(cx); // we poll all jobs first, so we always have the latest payload that we can report if // requests From 42c7059080ddffd7dfe0f18c2c8bfec166ae2dfd Mon Sep 17 00:00:00 2001 From: allnil Date: Fri, 22 Dec 2023 18:07:51 +0000 Subject: [PATCH 11/16] chore: experiment with cache_reads preservation --- crates/payload/basic/src/lib.rs | 26 ++++++++++++++++++++++---- crates/payload/builder/src/service.rs | 9 ++++----- crates/payload/builder/src/traits.rs | 4 +++- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index b0884ffbb205..3e52ce2a3e67 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -31,7 +31,8 @@ use reth_primitives::{ SealedBlock, Withdrawal, B256, EMPTY_OMMER_ROOT_HASH, U256, }; use reth_provider::{ - BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory, + BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError, + StateProviderFactory, }; use reth_revm::{ database::StateProviderDatabase, @@ -45,9 +46,10 @@ use revm::{ Database, DatabaseCommit, State, }; use std::{ + cell::RefCell, future::Future, pin::Pin, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, Arc, RwLock}, task::{Context, Poll}, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -88,6 +90,8 @@ pub struct BasicPayloadJobGenerator>, } // === impl BasicPayloadJobGenerator === @@ -135,6 +139,7 @@ impl BasicPayloadJobGenerator, + // contracts: HashMap, + // block_hashes: HashMap, + let mut cached_reads = CachedReads::default(); + cached_reads.as_db(new_state.committed().unwrap().state().state().state.clone()); + let mut write_guard = self.cached_reads.write().unwrap(); + *write_guard = cached_reads; + () + } } /// Restricts how many generator tasks can be executed at once. diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index a27f9fa6f230..f4147bb0894a 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -292,11 +292,10 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); loop { - // let e = self.chain_events.flat_map(|new_chain| { - // new_chain.committed().map(|chain| chain.state()) - // }); - - let _e = this.chain_events.poll_next_unpin(cx); + let _ = this + .chain_events + .poll_next_unpin(cx) + .map(|new_state| this.generator.on_new_state(new_state.unwrap())); // we poll all jobs first, so we always have the latest payload that we can report if // requests diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 5039b4cbfb07..b1a986a086d9 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -1,5 +1,7 @@ //! Trait abstractions used by the payload crate. +use reth_provider::CanonStateNotification; + use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes}; use std::{future::Future, sync::Arc}; @@ -82,5 +84,5 @@ pub trait PayloadJobGenerator: Send + Sync { ) -> Result; /// Handles new chain state events - fn on_new_state(&self); + fn on_new_state(&self, new_state: CanonStateNotification); } From 65bebed0357a14244db669346fda9438db972ed8 Mon Sep 17 00:00:00 2001 From: allnil Date: Sat, 30 Dec 2023 20:21:08 +0000 Subject: [PATCH 12/16] changed on_new_state signature to &mut self --- crates/payload/basic/src/lib.rs | 13 ++++--------- crates/payload/builder/src/traits.rs | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index 292ba285ec94..1994f3f30d76 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -112,7 +112,7 @@ pub struct BasicPayloadJobGenerator { /// See [PayloadBuilder] builder: Builder, /// Stored cached_reads for new payload jobs - cached_reads: Arc>, + cached_reads: Arc, } // === impl BasicPayloadJobGenerator === @@ -135,7 +135,7 @@ impl BasicPayloadJobGenerator, - // contracts: HashMap, - // block_hashes: HashMap, + fn on_new_state(&mut self, new_state: CanonStateNotification) { let mut cached_reads = CachedReads::default(); cached_reads.as_db(new_state.committed().unwrap().state().state().state.clone()); - let mut write_guard = self.cached_reads.write().unwrap(); - *write_guard = cached_reads; + self.cached_reads = cached_reads; () } } diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index b1a986a086d9..55dc12c3d41c 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -84,5 +84,5 @@ pub trait PayloadJobGenerator: Send + Sync { ) -> Result; /// Handles new chain state events - fn on_new_state(&self, new_state: CanonStateNotification); + fn on_new_state(&mut self, new_state: CanonStateNotification); } From 2adb0c827e9203f194b6745226970e33ba419e34 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 9 Jan 2024 13:50:14 +0100 Subject: [PATCH 13/16] feat: preload cache based on committed state --- bin/reth/src/cli/ext.rs | 13 ++-- crates/payload/basic/src/lib.rs | 93 +++++++++++++----------- crates/payload/builder/src/database.rs | 10 +++ crates/payload/builder/src/service.rs | 29 +++----- crates/payload/builder/src/test_utils.rs | 3 +- crates/payload/builder/src/traits.rs | 9 ++- 6 files changed, 87 insertions(+), 70 deletions(-) diff --git a/bin/reth/src/cli/ext.rs b/bin/reth/src/cli/ext.rs index 94a95f9ef0ac..266acaa5f19d 100644 --- a/bin/reth/src/cli/ext.rs +++ b/bin/reth/src/cli/ext.rs @@ -5,10 +5,9 @@ use crate::cli::{ config::{PayloadBuilderConfig, RethNetworkConfig, RethRpcConfig}, }; use clap::Args; - use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; -use reth_provider::CanonStateNotificationStream; +use reth_provider::CanonStateSubscriptions; use reth_tasks::TaskSpawner; use std::{fmt, marker::PhantomData}; @@ -130,7 +129,6 @@ pub trait RethNodeCommandConfig: fmt::Debug { &mut self, conf: &Conf, components: &Reth, - chain_events: CanonStateNotificationStream, ) -> eyre::Result where Conf: PayloadBuilderConfig, @@ -164,8 +162,10 @@ pub trait RethNodeCommandConfig: fmt::Debug { components.chain_spec(), payload_builder, ); - let (payload_service, payload_builder) = - PayloadBuilderService::new(payload_generator, chain_events); + let (payload_service, payload_builder) = PayloadBuilderService::new( + payload_generator, + components.events().canonical_state_stream(), + ); components .task_executor() @@ -319,7 +319,6 @@ impl RethNodeCommandConfig for NoArgs { &mut self, conf: &Conf, components: &Reth, - chain_events: CanonStateNotificationStream, ) -> eyre::Result where Conf: PayloadBuilderConfig, @@ -327,7 +326,7 @@ impl RethNodeCommandConfig for NoArgs { { self.inner_mut() .ok_or_else(|| eyre::eyre!("config value must be set"))? - .spawn_payload_builder_service(conf, components, chain_events) + .spawn_payload_builder_service(conf, components) } } diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index f021e0992a09..b2ce75162158 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -18,14 +18,11 @@ use reth_payload_builder::{ use reth_primitives::{ bytes::BytesMut, constants::{ - eip4844::MAX_DATA_GAS_PER_BLOCK, BEACON_NONCE, EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, - EMPTY_WITHDRAWALS, ETHEREUM_BLOCK_GAS_LIMIT, RETH_CLIENT_VERSION, SLOT_DURATION, + BEACON_NONCE, EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, EMPTY_WITHDRAWALS, + ETHEREUM_BLOCK_GAS_LIMIT, RETH_CLIENT_VERSION, SLOT_DURATION, }, - eip4844::calculate_excess_blob_gas, - proofs, - revm::{compat::into_reth_log, env::tx_env_with_recovered}, - Block, BlockNumberOrTag, Bytes, ChainSpec, Header, IntoRecoveredTransaction, Receipt, Receipts, - SealedBlock, Withdrawal, B256, EMPTY_OMMER_ROOT_HASH, U256, + proofs, Block, BlockNumberOrTag, Bytes, ChainSpec, Header, Receipts, SealedBlock, Withdrawal, + B256, EMPTY_OMMER_ROOT_HASH, U256, }; use reth_provider::{ BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError, @@ -43,10 +40,9 @@ use revm::{ Database, DatabaseCommit, State, }; use std::{ - cell::RefCell, future::Future, pin::Pin, - sync::{atomic::AtomicBool, Arc, RwLock}, + sync::{atomic::AtomicBool, Arc}, task::{Context, Poll}, time::{Duration, SystemTime, UNIX_EPOCH}, }; @@ -56,30 +52,6 @@ use tokio::{ }; use tracing::{debug, trace, warn}; -use reth_interfaces::RethResult; -use reth_payload_builder::{ - database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive, - PayloadBuilderAttributes, PayloadId, PayloadJob, PayloadJobGenerator, -}; -use reth_primitives::{ - bytes::BytesMut, - constants::{ - BEACON_NONCE, EMPTY_RECEIPTS, EMPTY_TRANSACTIONS, EMPTY_WITHDRAWALS, - ETHEREUM_BLOCK_GAS_LIMIT, RETH_CLIENT_VERSION, SLOT_DURATION, - }, - proofs, Block, BlockNumberOrTag, Bytes, ChainSpec, Header, Receipts, SealedBlock, Withdrawal, - B256, EMPTY_OMMER_ROOT_HASH, U256, -}; -use reth_provider::{ - BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory, -}; -use reth_revm::{ - database::StateProviderDatabase, - state_change::{apply_beacon_root_contract_call, post_block_withdrawals_balance_increments}, -}; -use reth_tasks::TaskSpawner; -use reth_transaction_pool::TransactionPool; - use crate::metrics::PayloadBuilderMetrics; mod metrics; @@ -103,8 +75,8 @@ pub struct BasicPayloadJobGenerator { /// /// See [PayloadBuilder] builder: Builder, - /// Stored cached_reads for new payload jobs - cached_reads: Arc, + /// Stored cached_reads for new payload jobs. + pre_cached: Option, } // === impl BasicPayloadJobGenerator === @@ -127,7 +99,7 @@ impl BasicPayloadJobGenerator BasicPayloadJobGenerator &Tasks { &self.executor } + + /// Returns the pre-cached reads for the given parent block if it matches the cached state's + /// block. + fn take_pre_cached(&mut self, parent: B256) -> Option { + let pre_cached = self.pre_cached.take()?; + if pre_cached.block == parent { + Some(pre_cached.cached) + } else { + None + } + } } // === impl BasicPayloadJobGenerator === @@ -174,7 +157,7 @@ where type Job = BasicPayloadJob; fn new_payload_job( - &self, + &mut self, attributes: PayloadBuilderAttributes, ) -> Result { let parent_block = if attributes.parent.is_zero() { @@ -202,9 +185,8 @@ where let until = self.job_deadline(config.attributes.timestamp); let deadline = Box::pin(tokio::time::sleep_until(until)); - let read_cached_reads = self.cached_reads.read().unwrap(); - let cached_reads = Some(read_cached_reads.clone()); + let cached_reads = self.take_pre_cached(config.parent_block.hash()); Ok(BasicPayloadJob { config, @@ -223,13 +205,38 @@ where } fn on_new_state(&mut self, new_state: CanonStateNotification) { - let mut cached_reads = CachedReads::default(); - cached_reads.as_db(new_state.committed().unwrap().state().state().state.clone()); - self.cached_reads = cached_reads; - () + if let Some(committed) = new_state.committed() { + let mut cached = CachedReads::default(); + + // extract the state from the notification and put it into the cache + let new_state = committed.state(); + for (addr, acc) in new_state.accounts_iter() { + if let Some(acc) = acc { + let storage = if let Some(acc) = new_state.state().account(&addr) { + acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect() + } else { + Default::default() + }; + cached.insert_account(addr, acc.clone(), storage); + } + } + + self.pre_cached = Some(PrecachedState { block: committed.tip().hash, cached }); + } } } +/// Pre-filled [CachedReads] for a specific block. +/// +/// This is extracted from the [CanonStateNotification] for the tip block. +#[derive(Debug, Clone)] +pub struct PrecachedState { + /// The block for which the state is pre-cached. + pub block: B256, + /// Cached state for the block. + pub cached: CachedReads, +} + /// Restricts how many generator tasks can be executed at once. #[derive(Debug, Clone)] struct PayloadTaskGuard(Arc); diff --git a/crates/payload/builder/src/database.rs b/crates/payload/builder/src/database.rs index 04998c45b7e6..0205bad335d3 100644 --- a/crates/payload/builder/src/database.rs +++ b/crates/payload/builder/src/database.rs @@ -50,6 +50,16 @@ impl CachedReads { fn as_db_mut(&mut self, db: DB) -> CachedReadsDbMut<'_, DB> { CachedReadsDbMut { cached: self, db } } + + /// Inserts an account info into the cache. + pub fn insert_account( + &mut self, + address: Address, + info: AccountInfo, + storage: HashMap, + ) { + self.accounts.insert(address, CachedAccount { info: Some(info), storage }); + } } #[derive(Debug)] diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index f4147bb0894a..9fcdcc1c3abb 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -7,9 +7,8 @@ use crate::{ error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator, BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob, }; - -use futures_util::{future::FutureExt, StreamExt}; -use reth_provider::CanonStateNotificationStream; +use futures_util::{future::FutureExt, Stream, StreamExt}; +use reth_provider::CanonStateNotification; use reth_rpc_types::engine::PayloadId; use std::{ fmt, @@ -18,9 +17,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; - use tokio::sync::{mpsc, oneshot}; - use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, info, trace, warn}; @@ -164,7 +161,7 @@ impl PayloadBuilderHandle { /// does know nothing about how to build them, it just drives their jobs to completion. #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PayloadBuilderService +pub struct PayloadBuilderService where Gen: PayloadJobGenerator, { @@ -179,21 +176,18 @@ where /// Metrics for the payload builder service metrics: PayloadBuilderServiceMetrics, /// Chain events notification stream - chain_events: CanonStateNotificationStream, + chain_events: St, } // === impl PayloadBuilderService === -impl PayloadBuilderService +impl PayloadBuilderService where Gen: PayloadJobGenerator, { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. - pub fn new( - generator: Gen, - chain_events: CanonStateNotificationStream, - ) -> (Self, PayloadBuilderHandle) { + pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { generator, @@ -282,20 +276,21 @@ where } } -impl Future for PayloadBuilderService +impl Future for PayloadBuilderService where Gen: PayloadJobGenerator + Unpin + 'static, ::Job: Unpin + 'static, + St: Stream + Send + Unpin + 'static, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); loop { - let _ = this - .chain_events - .poll_next_unpin(cx) - .map(|new_state| this.generator.on_new_state(new_state.unwrap())); + // notify the generator of new chain events + while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) { + this.generator.on_new_state(new_head); + } // we poll all jobs first, so we always have the latest payload that we can report if // requests diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index 0257c4c0bd01..a150833bf142 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -6,6 +6,7 @@ use crate::{ PayloadJobGenerator, }; use reth_primitives::{Block, U256}; +use reth_provider::CanonStateNotification; use std::{ future::Future, pin::Pin, @@ -35,7 +36,7 @@ impl PayloadJobGenerator for TestPayloadJobGenerator { type Job = TestPayloadJob; fn new_payload_job( - &self, + &mut self, attr: PayloadBuilderAttributes, ) -> Result { Ok(TestPayloadJob { attr }) diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 55dc12c3d41c..0af6656ed2a8 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -79,10 +79,15 @@ pub trait PayloadJobGenerator: Send + Sync { /// This is expected to initially build a new (empty) payload without transactions, so it can be /// returned directly. fn new_payload_job( - &self, + &mut self, attr: PayloadBuilderAttributes, ) -> Result; /// Handles new chain state events - fn on_new_state(&mut self, new_state: CanonStateNotification); + /// + /// This is intended for any logic that needs to be run when the chain state changes or used to + /// use the in memory state for the head block. + fn on_new_state(&mut self, new_state: CanonStateNotification) { + let _ = new_state; + } } From 0f3d4aebc3247b0aee0f0c048016b8000451ea3f Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 9 Jan 2024 14:09:39 +0100 Subject: [PATCH 14/16] fix: make tests compile --- crates/payload/basic/src/lib.rs | 10 +++++----- crates/payload/builder/src/service.rs | 3 +++ crates/payload/builder/src/test_utils.rs | 13 +++++++++---- crates/payload/builder/src/traits.rs | 2 +- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index b2ce75162158..cbead1e77c66 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -134,10 +134,10 @@ impl BasicPayloadJobGenerator Option { - let pre_cached = self.pre_cached.take()?; + fn maybe_pre_cached(&self, parent: B256) -> Option { + let pre_cached = self.pre_cached.as_ref()?; if pre_cached.block == parent { - Some(pre_cached.cached) + Some(pre_cached.cached.clone()) } else { None } @@ -157,7 +157,7 @@ where type Job = BasicPayloadJob; fn new_payload_job( - &mut self, + &self, attributes: PayloadBuilderAttributes, ) -> Result { let parent_block = if attributes.parent.is_zero() { @@ -186,7 +186,7 @@ where let until = self.job_deadline(config.attributes.timestamp); let deadline = Box::pin(tokio::time::sleep_until(until)); - let cached_reads = self.take_pre_cached(config.parent_block.hash()); + let cached_reads = self.maybe_pre_cached(config.parent_block.hash()); Ok(BasicPayloadJob { config, diff --git a/crates/payload/builder/src/service.rs b/crates/payload/builder/src/service.rs index 9fcdcc1c3abb..f760bce110d4 100644 --- a/crates/payload/builder/src/service.rs +++ b/crates/payload/builder/src/service.rs @@ -187,6 +187,9 @@ where { /// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact /// with it. + /// + /// This also takes a stream of chain events that will be forwarded to the generator to apply + /// additional logic when new state is committed. See also [PayloadJobGenerator::on_new_state]. pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) { let (service_tx, command_rx) = mpsc::unbounded_channel(); let service = Self { diff --git a/crates/payload/builder/src/test_utils.rs b/crates/payload/builder/src/test_utils.rs index a150833bf142..871c4828b1d0 100644 --- a/crates/payload/builder/src/test_utils.rs +++ b/crates/payload/builder/src/test_utils.rs @@ -15,9 +15,14 @@ use std::{ }; /// Creates a new [PayloadBuilderService] for testing purposes. -pub fn test_payload_service( -) -> (PayloadBuilderService, PayloadBuilderHandle) { - PayloadBuilderService::new(Default::default()) +pub fn test_payload_service() -> ( + PayloadBuilderService< + TestPayloadJobGenerator, + futures_util::stream::Empty, + >, + PayloadBuilderHandle, +) { + PayloadBuilderService::new(Default::default(), futures_util::stream::empty()) } /// Creates a new [PayloadBuilderService] for testing purposes and spawns it in the background. @@ -36,7 +41,7 @@ impl PayloadJobGenerator for TestPayloadJobGenerator { type Job = TestPayloadJob; fn new_payload_job( - &mut self, + &self, attr: PayloadBuilderAttributes, ) -> Result { Ok(TestPayloadJob { attr }) diff --git a/crates/payload/builder/src/traits.rs b/crates/payload/builder/src/traits.rs index 0af6656ed2a8..60f6328176bd 100644 --- a/crates/payload/builder/src/traits.rs +++ b/crates/payload/builder/src/traits.rs @@ -79,7 +79,7 @@ pub trait PayloadJobGenerator: Send + Sync { /// This is expected to initially build a new (empty) payload without transactions, so it can be /// returned directly. fn new_payload_job( - &mut self, + &self, attr: PayloadBuilderAttributes, ) -> Result; From c42025b0d6c3ae2e612d931968dc46bcab57c2d5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 9 Jan 2024 16:53:42 +0100 Subject: [PATCH 15/16] move to helper fn --- crates/payload/basic/src/lib.rs | 15 +++++++-------- .../bundle_state/bundle_state_with_receipts.rs | 10 +++++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/crates/payload/basic/src/lib.rs b/crates/payload/basic/src/lib.rs index cbead1e77c66..c540b1065a4f 100644 --- a/crates/payload/basic/src/lib.rs +++ b/crates/payload/basic/src/lib.rs @@ -210,14 +210,13 @@ where // extract the state from the notification and put it into the cache let new_state = committed.state(); - for (addr, acc) in new_state.accounts_iter() { - if let Some(acc) = acc { - let storage = if let Some(acc) = new_state.state().account(&addr) { - acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect() - } else { - Default::default() - }; - cached.insert_account(addr, acc.clone(), storage); + for (addr, acc) in new_state.bundle_accounts_iter() { + if let Some(info) = acc.info.clone() { + // we want pre cache existing accounts and their storage + // this only includes changed accounts and storage but is better than nothing + let storage = + acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect(); + cached.insert_account(addr, info, storage); } } diff --git a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs index 958ed5cbd09b..196e8cd5cebe 100644 --- a/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs +++ b/crates/storage/provider/src/bundle_state/bundle_state_with_receipts.rs @@ -16,7 +16,10 @@ use reth_trie::{ updates::TrieUpdates, StateRoot, StateRootError, }; -use revm::{db::states::BundleState, primitives::AccountInfo}; +use revm::{ + db::{states::BundleState, BundleAccount}, + primitives::AccountInfo, +}; use std::collections::HashMap; pub use revm::db::states::OriginalValuesKnown; @@ -110,6 +113,11 @@ impl BundleStateWithReceipts { self.bundle.state().iter().map(|(a, acc)| (*a, acc.info.as_ref())) } + /// Return iterator over all [BundleAccount]s in the bundle + pub fn bundle_accounts_iter(&self) -> impl Iterator { + self.bundle.state().iter().map(|(a, acc)| (*a, acc)) + } + /// Get account if account is known. pub fn account(&self, address: &Address) -> Option> { self.bundle.account(address).map(|a| a.info.clone().map(into_reth_acc)) From 7d6eeea9ffc8a73e50632edc91e9ab2a18224c43 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 9 Jan 2024 17:13:51 +0100 Subject: [PATCH 16/16] update interface --- bin/reth/src/commands/debug_cmd/replay_engine.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index a1b37da99296..adb0f8c3a58c 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -26,7 +26,7 @@ use reth_primitives::{ fs::{self}, ChainSpec, }; -use reth_provider::{providers::BlockchainProvider, ProviderFactory}; +use reth_provider::{providers::BlockchainProvider, CanonStateSubscriptions, ProviderFactory}; use reth_revm::EvmProcessorFactory; use reth_rpc_types::{ engine::{CancunPayloadFields, ForkchoiceState, PayloadAttributes}, @@ -175,7 +175,8 @@ impl Command { self.chain.clone(), payload_builder, ); - let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator); + let (payload_service, payload_builder) = + PayloadBuilderService::new(payload_generator, blockchain_db.canonical_state_stream()); ctx.task_executor.spawn_critical("payload builder service", Box::pin(payload_service)); // Configure the consensus engine