Skip to content

Commit

Permalink
Merge branch 'main' into emhane/network-traits
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Aug 1, 2024
2 parents 98a3064 + 321032f commit 7552a4e
Show file tree
Hide file tree
Showing 26 changed files with 435 additions and 285 deletions.
3 changes: 2 additions & 1 deletion bin/reth-bench/src/bench/new_payload_fcu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ impl Command {
// calculate the total duration and the fcu latency, record
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult { new_payload_result, fcu_latency, total_latency };
let combined_result =
CombinedResult { block_number, new_payload_result, fcu_latency, total_latency };

// current duration since the start of the benchmark
let current_duration = total_benchmark_duration.elapsed();
Expand Down
8 changes: 6 additions & 2 deletions bin/reth-bench/src/bench/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ impl Serialize for NewPayloadResult {
/// latency.
#[derive(Debug)]
pub(crate) struct CombinedResult {
/// The block number of the block being processed.
pub(crate) block_number: u64,
/// The `newPayload` result.
pub(crate) new_payload_result: NewPayloadResult,
/// The latency of the `forkchoiceUpdated` call.
Expand All @@ -83,7 +85,8 @@ impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
"Payload {} processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
self.block_number,
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
self.combined_gas_per_second() / GIGAGAS as f64,
Expand All @@ -104,9 +107,10 @@ impl Serialize for CombinedResult {
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 4)?;
let mut state = serializer.serialize_struct("CombinedResult", 5)?;

// flatten the new payload result because this is meant for CSV writing
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
Expand Down
1 change: 1 addition & 0 deletions book/run/optimism.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The `optimism` feature flag in `op-reth` adds several new CLI flags to the `reth
1. `--rollup.sequencer-http <uri>` - The sequencer endpoint to connect to. Transactions sent to the `op-reth` EL are also forwarded to this sequencer endpoint for inclusion, as the sequencer is the entity that builds blocks on OP Stack chains.
1. `--rollup.disable-tx-pool-gossip` - Disables gossiping of transactions in the mempool to peers. This can be omitted for personal nodes, though providers should always opt to enable this flag.
1. `--rollup.enable-genesis-walkback` - Disables setting the forkchoice status to tip on startup, making the `op-node` walk back to genesis and verify the integrity of the chain before starting to sync. This can be omitted unless a corruption of local chainstate is suspected.
1. `--rollup.discovery.v4` - Enables the discovery v4 protocol for peer discovery.

First, ensure that your L1 archival node is running and synced to tip. Also make sure that the beacon node / consensus layer client is running and has http APIs enabled. Then, start `op-reth` with the `--rollup.sequencer-http` flag set to the `Base Mainnet` sequencer endpoint:
```sh
Expand Down
194 changes: 77 additions & 117 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
use reth_chain_state::ExecutedBlock;
use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database};
use reth_errors::ProviderResult;
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256};
use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256};
use reth_provider::{
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW,
HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter,
StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter,
providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader,
BlockWriter, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderFactory,
StageCheckpointWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory,
StaticFileWriter, TransactionsProviderExt, TrieWriter,
};
use reth_prune::{Pruner, PrunerOutput};
use reth_stages_types::{StageCheckpoint, StageId};
Expand Down Expand Up @@ -45,64 +46,6 @@ impl<DB: Database> PersistenceService<DB> {
Self { provider, incoming, pruner }
}

/// Writes the cloned tree state to database
fn write(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "tree::persistence", "Attempted to write empty block range");
return Ok(())
}

debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database");
let first_number = blocks.first().unwrap().block().number;

let last = blocks.last().unwrap().block();
let last_block_number = last.number;

// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * indices (already done basically)
// Insert the blocks
for block in blocks {
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
provider_rw.insert_block(sealed_block)?;

// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
// TODO: do we provide a static file producer here?
let mut storage_writer = StorageWriter::new(Some(provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
provider_rw.write_hashed_state(&hashed_state.clone().into_sorted())?;
provider_rw.write_trie_updates(&trie_updates)?;
}
}

// update history indices
provider_rw.update_history_indices(first_number..=last_block_number)?;

// Update pipeline progress
provider_rw.update_pipeline_stages(last_block_number, false)?;

debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data");

Ok(())
}

/// Removes block data above the given block number from the database.
/// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove
/// `block_number`.
Expand All @@ -126,42 +69,22 @@ impl<DB: Database> PersistenceService<DB> {
self.pruner.run(block_num).expect("todo: handle errors")
}

/// Updates checkpoints related to block headers and bodies. This should be called after new
/// transactions have been successfully written to disk.
fn update_transaction_meta(
&self,
block_num: u64,
td: U256,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<()> {
debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing");
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block_num, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?;
Ok(())
}

/// Writes the transactions to static files.
///
/// Returns the block number and new total difficulty.
///
/// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called
/// after this, to update the checkpoints for headers and block bodies.
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")]
fn write_transactions(
&self,
block: Arc<SealedBlock>,
provider_rw: &DatabaseProviderRW<DB>,
) -> ProviderResult<(u64, U256)> {
) -> ProviderResult<()> {
debug!(target: "tree::persistence", "Writing transactions");
let provider = self.provider.static_file_provider();

let new_td = {
let td = {
let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?;
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer));
let new_td = storage_writer.append_headers_from_blocks(
let td = storage_writer.append_headers_from_blocks(
block.header().number,
std::iter::once(&(block.header(), block.hash())),
)?;
Expand All @@ -177,44 +100,89 @@ impl<DB: Database> PersistenceService<DB> {
std::iter::once(&no_hash_transactions),
)?;

new_td
td
};

Ok((block.number, new_td))
debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing");
provider_rw
.tx_ref()
.put::<tables::HeaderTerminalDifficulties>(block.number, CompactU256(td))?;
provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?;
provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?;

Ok(())
}

/// Write execution-related block data to database and/or static files.
fn write_execution_data(
/// Writes the cloned tree state to database
fn save_blocks(
&self,
blocks: &[ExecutedBlock],
provider_rw: &DatabaseProviderRW<DB>,
static_file_provider: &StaticFileProvider,
) -> ProviderResult<()> {
if blocks.is_empty() {
debug!(target: "tree::persistence", "Attempted to write empty block range");
return Ok(())
}
let provider = self.provider.static_file_provider();

// NOTE: checked non-empty above
let first_block = blocks.first().unwrap().block();
let last_block = blocks.last().unwrap().block().clone();
let first_number = first_block.number;
let last_block_number = last_block.number;

// Only write receipts to static files if there is no receipt pruning configured.
let mut storage_writer = if provider_rw.prune_modes_ref().has_receipts_pruning() {
StorageWriter::new(Some(provider_rw), None)
} else {
StorageWriter::new(
Some(provider_rw),
Some(
static_file_provider
.get_writer(first_block.number, StaticFileSegment::Receipts)?,
),
)
};

debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks and execution data to storage");

// TODO: remove all the clones and do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * indices (already done basically)
// Insert the blocks
for block in blocks {
let sealed_block =
block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap();
provider_rw.insert_block(sealed_block)?;
self.write_transactions(block.block.clone(), provider_rw)?;

// use the storage writer
let current_block = first_block.number;
debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files");

let receipts_writer =
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;

{
let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);
receipts.first().unwrap().clone()
});
storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?;
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
provider_rw.write_hashed_state(&hashed_state.clone().into_sorted())?;
provider_rw.write_trie_updates(&trie_updates)?;
}
}

// update history indices
provider_rw.update_history_indices(first_number..=last_block_number)?;

// Update pipeline progress
provider_rw.update_pipeline_stages(last_block_number, false)?;

debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data");

Ok(())
}

Expand Down Expand Up @@ -289,19 +257,11 @@ where
let last_block_hash = blocks.last().unwrap().block().hash();

let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors");
self.write(&blocks, &provider_rw).expect("todo: handle errors");

for block in &blocks {
// first write transactions
let (block_num, td) = self
.write_transactions(block.block.clone(), &provider_rw)
.expect("todo: handle errors");
self.update_transaction_meta(block_num, td, &provider_rw)
.expect("todo: handle errors");
}
let static_file_provider = self.provider.static_file_provider();
self.save_blocks(&blocks, &provider_rw, &static_file_provider)
.expect("todo: handle errors");

self.provider.static_file_provider().commit().expect("todo: handle errors");
static_file_provider.commit().expect("todo: handle errors");
provider_rw.commit().expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand Down
Loading

0 comments on commit 7552a4e

Please sign in to comment.