Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: preload cachedreads with tip state #5804

Merged
merged 23 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8513037
feat: explore payload builder, prepare boilerplate code for the next …
allnil Dec 16, 2023
caf046c
feat: pass canon state notification stream to payload loader, add new…
allnil Dec 17, 2023
558313a
save progress
allnil Dec 18, 2023
36befd2
remove generic stream type for chain events
allnil Dec 20, 2023
f260505
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 20, 2023
59fbc48
pass reth components events stream in payload builder service, add bo…
allnil Dec 20, 2023
6b5bfd2
refactor traits, add task to grab events and new tip
allnil Dec 20, 2023
f8705c9
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 21, 2023
1b8f281
Merge branch 'main' of github.com:allnil/reth into feat/preload-cache…
allnil Dec 22, 2023
5ea77b3
chore: refactor code, get canonical_chain_stream, remove generics, tr…
allnil Dec 22, 2023
264f825
chore: get back unpin from canon state notifications in components
allnil Dec 22, 2023
706e992
chore: remove clone trait from payload generator in future
allnil Dec 22, 2023
e1a31f2
chore: successfully pass stream to the payload service, clean rubbish
allnil Dec 22, 2023
42c7059
chore: experiment with cache_reads preservation
allnil Dec 22, 2023
eb6a55e
merge
allnil Dec 30, 2023
86ad016
merge
allnil Dec 30, 2023
65bebed
changed on_new_state signature to &mut self
allnil Dec 30, 2023
f5d1ceb
Merge branch 'main' into feat/preload-cached-reads
mattsse Jan 9, 2024
2adb0c8
feat: preload cache based on committed state
mattsse Jan 9, 2024
0f3d4ae
fix: make tests compile
mattsse Jan 9, 2024
c42025b
move to helper fn
mattsse Jan 9, 2024
e86b1c3
Merge branch 'main' into feat/preload-cached-reads
mattsse Jan 9, 2024
7d6eeea
update interface
mattsse Jan 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ const-str = "0.5.6"
boyer-moore-magiclen = "0.2.16"
itertools.workspace = true
rayon.workspace = true
futures-util.workspace = true

[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }
Expand Down
6 changes: 5 additions & 1 deletion bin/reth/src/cli/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::cli::{
use clap::Args;
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_provider::CanonStateSubscriptions;
use reth_tasks::TaskSpawner;
use std::{fmt, marker::PhantomData};

Expand Down Expand Up @@ -161,7 +162,10 @@ pub trait RethNodeCommandConfig: fmt::Debug {
components.chain_spec(),
payload_builder,
);
let (payload_service, payload_builder) = PayloadBuilderService::new(payload_generator);
let (payload_service, payload_builder) = PayloadBuilderService::new(
payload_generator,
components.events().canonical_state_stream(),
);

components
.task_executor()
Expand Down
95 changes: 74 additions & 21 deletions crates/payload/basic/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,6 @@
use alloy_rlp::Encodable;
use futures_core::ready;
use futures_util::FutureExt;
use revm::{
db::states::bundle_state::BundleRetention,
primitives::{BlockEnv, CfgEnv, Env},
Database, DatabaseCommit, State,
};
use std::{
future::Future,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{oneshot, Semaphore},
time::{Interval, Sleep},
};
use tracing::{debug, trace, warn};

use reth_interfaces::RethResult;
use reth_payload_builder::{
database::CachedReads, error::PayloadBuilderError, BuiltPayload, KeepPayloadJobAlive,
Expand All @@ -43,14 +25,32 @@ use reth_primitives::{
B256, EMPTY_OMMER_ROOT_HASH, U256,
};
use reth_provider::{
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, ProviderError, StateProviderFactory,
BlockReaderIdExt, BlockSource, BundleStateWithReceipts, CanonStateNotification, ProviderError,
StateProviderFactory,
};
use reth_revm::{
database::StateProviderDatabase,
state_change::{apply_beacon_root_contract_call, post_block_withdrawals_balance_increments},
};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use revm::{
db::states::bundle_state::BundleRetention,
primitives::{BlockEnv, CfgEnv, Env},
Database, DatabaseCommit, State,
};
use std::{
future::Future,
pin::Pin,
sync::{atomic::AtomicBool, Arc},
task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::{
sync::{oneshot, Semaphore},
time::{Interval, Sleep},
};
use tracing::{debug, trace, warn};

use crate::metrics::PayloadBuilderMetrics;

Expand All @@ -75,6 +75,8 @@ pub struct BasicPayloadJobGenerator<Client, Pool, Tasks, Builder> {
///
/// See [PayloadBuilder]
builder: Builder,
/// Stored cached_reads for new payload jobs.
pre_cached: Option<PrecachedState>,
}

// === impl BasicPayloadJobGenerator ===
Expand All @@ -97,6 +99,7 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
config,
chain_spec,
builder,
pre_cached: None,
}
}

Expand All @@ -123,6 +126,22 @@ impl<Client, Pool, Tasks, Builder> BasicPayloadJobGenerator<Client, Pool, Tasks,
fn job_deadline(&self, unix_timestamp: u64) -> tokio::time::Instant {
tokio::time::Instant::now() + self.max_job_duration(unix_timestamp)
}

/// Returns a reference to the tasks type
pub fn tasks(&self) -> &Tasks {
&self.executor
}

/// Returns the pre-cached reads for the given parent block if it matches the cached state's
/// block.
fn take_pre_cached(&mut self, parent: B256) -> Option<CachedReads> {
let pre_cached = self.pre_cached.take()?;
if pre_cached.block == parent {
Some(pre_cached.cached)
} else {
None
}
}
}

// === impl BasicPayloadJobGenerator ===
Expand All @@ -138,7 +157,7 @@ where
type Job = BasicPayloadJob<Client, Pool, Tasks, Builder>;

fn new_payload_job(
&self,
&mut self,
attributes: PayloadBuilderAttributes,
) -> Result<Self::Job, PayloadBuilderError> {
let parent_block = if attributes.parent.is_zero() {
Expand Down Expand Up @@ -167,6 +186,8 @@ where
let until = self.job_deadline(config.attributes.timestamp);
let deadline = Box::pin(tokio::time::sleep_until(until));

let cached_reads = self.take_pre_cached(config.parent_block.hash());

Ok(BasicPayloadJob {
config,
client: self.client.clone(),
Expand All @@ -176,12 +197,44 @@ where
interval: tokio::time::interval(self.config.interval),
best_payload: None,
pending_block: None,
cached_reads: None,
cached_reads,
payload_task_guard: self.payload_task_guard.clone(),
metrics: Default::default(),
builder: self.builder.clone(),
})
}

fn on_new_state(&mut self, new_state: CanonStateNotification) {
if let Some(committed) = new_state.committed() {
let mut cached = CachedReads::default();

// extract the state from the notification and put it into the cache
let new_state = committed.state();
for (addr, acc) in new_state.accounts_iter() {
if let Some(acc) = acc {
let storage = if let Some(acc) = new_state.state().account(&addr) {
acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect()
} else {
Default::default()
};
cached.insert_account(addr, acc.clone(), storage);
}
}

self.pre_cached = Some(PrecachedState { block: committed.tip().hash, cached });
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rakita @rkrasiuk could you please take a closer look here.

It's not really obvious to me how I can get the plain state (account, accountstorage) from the Bundlestate

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this is okay, but would add helper function to BundleStateWithReceipts to make it more obvious, maybe something like fn bundle_account_iter().

To point out, the bundle state contains only written accounts, not the reads, but it is still better than nothing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattsse you can create a new method accounts_with_storage_iter to make it easier. @rakita made good point that these are only writes, but that's better than nothing

}

/// Pre-filled [CachedReads] for a specific block.
///
/// This is extracted from the [CanonStateNotification] for the tip block.
#[derive(Debug, Clone)]
pub struct PrecachedState {
/// The block for which the state is pre-cached.
pub block: B256,
/// Cached state for the block.
pub cached: CachedReads,
}

/// Restricts how many generator tasks can be executed at once.
Expand Down
2 changes: 2 additions & 0 deletions crates/payload/builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ reth-rpc-types.workspace = true
reth-transaction-pool.workspace = true
reth-interfaces.workspace = true
reth-rpc-types-compat.workspace = true
reth-provider.workspace = true
reth-tasks.workspace = true

# ethereum
alloy-rlp.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions crates/payload/builder/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ impl CachedReads {
fn as_db_mut<DB>(&mut self, db: DB) -> CachedReadsDbMut<'_, DB> {
CachedReadsDbMut { cached: self, db }
}

/// Inserts an account info into the cache.
pub fn insert_account(
&mut self,
address: Address,
info: AccountInfo,
storage: HashMap<U256, U256>,
) {
self.accounts.insert(address, CachedAccount { info: Some(info), storage });
}
}

#[derive(Debug)]
Expand Down
22 changes: 16 additions & 6 deletions crates/payload/builder/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
error::PayloadBuilderError, metrics::PayloadBuilderServiceMetrics, traits::PayloadJobGenerator,
BuiltPayload, KeepPayloadJobAlive, PayloadBuilderAttributes, PayloadJob,
};
use futures_util::{future::FutureExt, StreamExt};
use futures_util::{future::FutureExt, Stream, StreamExt};
use reth_provider::CanonStateNotification;
use reth_rpc_types::engine::PayloadId;
use std::{
fmt,
Expand Down Expand Up @@ -160,7 +161,7 @@ impl PayloadBuilderHandle {
/// does know nothing about how to build them, it just drives their jobs to completion.
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PayloadBuilderService<Gen>
pub struct PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator,
{
Expand All @@ -174,25 +175,29 @@ where
command_rx: UnboundedReceiverStream<PayloadServiceCommand>,
/// Metrics for the payload builder service
metrics: PayloadBuilderServiceMetrics,
/// Chain events notification stream
chain_events: St,
}

// === impl PayloadBuilderService ===

impl<Gen> PayloadBuilderService<Gen>
impl<Gen, St> PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator,
{
/// Creates a new payload builder service and returns the [PayloadBuilderHandle] to interact
/// with it.
pub fn new(generator: Gen) -> (Self, PayloadBuilderHandle) {
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle) {
let (service_tx, command_rx) = mpsc::unbounded_channel();
let service = Self {
generator,
payload_jobs: Vec::new(),
service_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
metrics: Default::default(),
chain_events,
};

let handle = service.handle();
(service, handle)
}
Expand Down Expand Up @@ -271,17 +276,22 @@ where
}
}

impl<Gen> Future for PayloadBuilderService<Gen>
impl<Gen, St> Future for PayloadBuilderService<Gen, St>
where
Gen: PayloadJobGenerator + Unpin + 'static,
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
{
allnil marked this conversation as resolved.
Show resolved Hide resolved
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
// notify the generator of new chain events
while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
this.generator.on_new_state(new_head);
}

// we poll all jobs first, so we always have the latest payload that we can report if
// requests
// we don't care about the order of the jobs, so we can just swap_remove them
Expand Down
3 changes: 2 additions & 1 deletion crates/payload/builder/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
PayloadJobGenerator,
};
use reth_primitives::{Block, U256};
use reth_provider::CanonStateNotification;
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -35,7 +36,7 @@ impl PayloadJobGenerator for TestPayloadJobGenerator {
type Job = TestPayloadJob;

fn new_payload_job(
&self,
&mut self,
attr: PayloadBuilderAttributes,
) -> Result<Self::Job, PayloadBuilderError> {
Ok(TestPayloadJob { attr })
Expand Down
12 changes: 11 additions & 1 deletion crates/payload/builder/src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Trait abstractions used by the payload crate.

use reth_provider::CanonStateNotification;

use crate::{error::PayloadBuilderError, BuiltPayload, PayloadBuilderAttributes};
use std::{future::Future, sync::Arc};

Expand Down Expand Up @@ -77,7 +79,15 @@ pub trait PayloadJobGenerator: Send + Sync {
/// This is expected to initially build a new (empty) payload without transactions, so it can be
/// returned directly.
fn new_payload_job(
&self,
&mut self,
attr: PayloadBuilderAttributes,
) -> Result<Self::Job, PayloadBuilderError>;

/// Handles new chain state events
///
/// This is intended for any logic that needs to be run when the chain state changes or used to
/// use the in memory state for the head block.
fn on_new_state(&mut self, new_state: CanonStateNotification) {
let _ = new_state;
}
}
Loading