Skip to content

Commit

Permalink
replace closures with relative recordings
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Nov 2, 2023
1 parent 9714958 commit ccc4a84
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 211 deletions.
92 changes: 37 additions & 55 deletions crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -913,17 +913,14 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
pub fn make_canonical(&mut self, block_hash: &BlockHash) -> RethResult<CanonicalOutcome> {
let mut durations_recorder = MakeCanonicalDurationsRecorder::default();

let (old_block_indices, old_buffered_blocks) = durations_recorder
.record(MakeCanonicalAction::CloneOldBlocks, || {
(self.block_indices().clone(), self.state.buffered_blocks.parent_to_child.clone())
});
let old_block_indices = self.block_indices().clone();
let old_buffered_blocks = self.state.buffered_blocks.parent_to_child.clone();
durations_recorder.record_relative(MakeCanonicalAction::CloneOldBlocks);

// If block is already canonical don't return error.
if let Some(header) = durations_recorder
.record(MakeCanonicalAction::FindCanonicalHeader, || {
self.find_canonical_header(block_hash)
})?
{
let canonical_header = self.find_canonical_header(block_hash)?;
durations_recorder.record_relative(MakeCanonicalAction::FindCanonicalHeader);
if let Some(header) = canonical_header {
info!(target: "blockchain_tree", ?block_hash, "Block is already canonical, ignoring.");
// TODO: this could be fetched from the chainspec first
let td = self.externals.database().provider()?.header_td(block_hash)?.ok_or(
Expand Down Expand Up @@ -952,51 +949,42 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
trace!(target: "blockchain_tree", ?chain, "Found chain to make canonical");

// we are splitting chain at the block hash that we want to make canonical
let canonical = durations_recorder.record(MakeCanonicalAction::SplitChain, || {
self.split_chain(chain_id, chain, SplitAt::Hash(*block_hash))
});
let canonical = self.split_chain(chain_id, chain, SplitAt::Hash(*block_hash));
durations_recorder.record_relative(MakeCanonicalAction::SplitChain);

let mut block_fork = canonical.fork_block();
let mut block_fork_number = canonical.fork_block_number();
let mut chains_to_promote = vec![canonical];

// loop while fork blocks are found in Tree.
durations_recorder.record(MakeCanonicalAction::SplitChainForks, || {
while let Some(chain_id) = self.block_indices().get_blocks_chain_id(&block_fork.hash) {
let chain = self.state.chains.remove(&chain_id).expect("To fork to be present");
block_fork = chain.fork_block();
// canonical chain is lower part of the chain.
let canonical =
self.split_chain(chain_id, chain, SplitAt::Number(block_fork_number));
block_fork_number = canonical.fork_block_number();
chains_to_promote.push(canonical);
}
});
while let Some(chain_id) = self.block_indices().get_blocks_chain_id(&block_fork.hash) {
let chain = self.state.chains.remove(&chain_id).expect("To fork to be present");
block_fork = chain.fork_block();
// canonical chain is lower part of the chain.
let canonical = self.split_chain(chain_id, chain, SplitAt::Number(block_fork_number));
block_fork_number = canonical.fork_block_number();
chains_to_promote.push(canonical);
}
durations_recorder.record_relative(MakeCanonicalAction::SplitChainForks);

let old_tip = self.block_indices().canonical_tip();
// Merge all chains into one chain.
let (new_canon_chain, chain_appended) =
durations_recorder.record(MakeCanonicalAction::MergeAllChains, || {
let mut new_canon_chain =
chains_to_promote.pop().expect("There is at least one block");
trace!(target: "blockchain_tree", ?new_canon_chain, "Merging chains");
let mut chain_appended = false;
for chain in chains_to_promote.into_iter().rev() {
chain_appended = true;
trace!(target: "blockchain_tree", ?chain, "Appending chain");
new_canon_chain.append_chain(chain).expect("We have just build the chain.");
}

(new_canon_chain, chain_appended)
});
let mut new_canon_chain = chains_to_promote.pop().expect("There is at least one block");
trace!(target: "blockchain_tree", ?new_canon_chain, "Merging chains");
let mut chain_appended = false;
for chain in chains_to_promote.into_iter().rev() {
chain_appended = true;
trace!(target: "blockchain_tree", ?chain, "Appending chain");
new_canon_chain.append_chain(chain).expect("We have just build the chain.");
}
durations_recorder.record_relative(MakeCanonicalAction::MergeAllChains);

if chain_appended {
trace!(target: "blockchain_tree", ?new_canon_chain, "Canonical chain appended");
}
// update canonical index
durations_recorder.record(MakeCanonicalAction::UpdateCanonicalIndex, || {
self.block_indices_mut().canonicalize_blocks(new_canon_chain.blocks())
});
self.block_indices_mut().canonicalize_blocks(new_canon_chain.blocks());
durations_recorder.record_relative(MakeCanonicalAction::UpdateCanonicalIndex);

// event about new canonical chain.
let chain_notification;
Expand All @@ -1010,10 +998,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
chain_notification =
CanonStateNotification::Commit { new: Arc::new(new_canon_chain.clone()) };
// append to database
durations_recorder
.record(MakeCanonicalAction::CommitCanonicalChainToDatabase, || {
self.commit_canonical_to_database(new_canon_chain)
})?;
self.commit_canonical_to_database(new_canon_chain)?;
durations_recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);
} else {
// it forks to canonical block that is not the tip.

Expand All @@ -1029,10 +1015,9 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
unreachable!("all chains should point to canonical chain.");
}

let old_canon_chain = durations_recorder
.record(MakeCanonicalAction::RevertCanonicalChainFromDatabase, || {
self.revert_canonical_from_database(canon_fork.number)
});
let old_canon_chain = self.revert_canonical_from_database(canon_fork.number);
durations_recorder
.record_relative(MakeCanonicalAction::RevertCanonicalChainFromDatabase);

let old_canon_chain = match old_canon_chain {
val @ Err(_) => {
Expand All @@ -1049,10 +1034,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
Ok(val) => val,
};
// commit new canonical chain.
durations_recorder
.record(MakeCanonicalAction::CommitCanonicalChainToDatabase, || {
self.commit_canonical_to_database(new_canon_chain.clone())
})?;
self.commit_canonical_to_database(new_canon_chain.clone())?;
durations_recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);

if let Some(old_canon_chain) = old_canon_chain {
// state action
Expand All @@ -1063,9 +1046,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
let reorg_depth = old_canon_chain.len();

// insert old canon chain
durations_recorder.record(MakeCanonicalAction::InsertOldCanonicalChain, || {
self.insert_chain(AppendableChain::new(old_canon_chain))
});
self.insert_chain(AppendableChain::new(old_canon_chain));
durations_recorder.record_relative(MakeCanonicalAction::InsertOldCanonicalChain);

self.update_reorg_metrics(reorg_depth as f64);
} else {
Expand Down
30 changes: 19 additions & 11 deletions crates/blockchain-tree/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,33 @@ pub struct BlockBufferMetrics {
pub blocks: Gauge,
}

#[derive(Debug, Default)]
#[derive(Debug)]
pub(crate) struct MakeCanonicalDurationsRecorder {
pub(crate) actions: Vec<(MakeCanonicalAction, Duration)>,
latest: Instant,
}

impl MakeCanonicalDurationsRecorder {
/// Records the duration of `f` execution, saves for future logging and instantly reports as a
/// metric with `action` label.
pub(crate) fn record<T>(&mut self, action: MakeCanonicalAction, f: impl FnOnce() -> T) -> T {
let start = Instant::now();
let result = f();
let elapsed = start.elapsed();
impl Default for MakeCanonicalDurationsRecorder {
fn default() -> Self {
Self { actions: Vec::new(), latest: Instant::now() }
}
}

self.actions.push((action, elapsed));
impl MakeCanonicalDurationsRecorder {
/// Saves the provided duration for future logging and instantly reports as a metric with
/// `action` label.
pub(crate) fn record_duration(&mut self, action: MakeCanonicalAction, duration: Duration) {
self.actions.push((action, duration));
MakeCanonicalMetrics::new_with_labels(&[("action", format!("{action:?}"))])
.duration
.record(elapsed);
.record(duration);
self.latest = Instant::now();
}

result
/// Records the duration since last record, saves it for future logging and instantly reports as
/// a metric with `action` label.
pub(crate) fn record_relative(&mut self, action: MakeCanonicalAction) {
self.record_duration(action, self.latest.elapsed());
}
}

Expand Down
23 changes: 14 additions & 9 deletions crates/storage/provider/src/providers/database/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,31 @@ use metrics::Histogram;
use reth_metrics::Metrics;
use std::time::{Duration, Instant};

#[derive(Debug, Default)]
#[derive(Debug)]
pub(crate) struct DurationsRecorder {
pub(crate) actions: Vec<(Action, Duration)>,
latest: Instant,
}

impl DurationsRecorder {
/// Records the duration of `f` closure execution, saves for future logging and instantly
/// reports as a metric with `action` label.
pub(crate) fn record_closure<T>(&mut self, action: Action, f: impl FnOnce() -> T) -> T {
let start = Instant::now();
let result = f();
self.record_duration(action, start.elapsed());
result
impl Default for DurationsRecorder {
fn default() -> Self {
Self { actions: Vec::new(), latest: Instant::now() }
}
}

impl DurationsRecorder {
/// Saves the provided duration for future logging and instantly reports as a metric with
/// `action` label.
pub(crate) fn record_duration(&mut self, action: Action, duration: Duration) {
self.actions.push((action, duration));
Metrics::new_with_labels(&[("action", format!("{action:?}"))]).duration.record(duration);
self.latest = Instant::now();
}

/// Records the duration since last record, saves it for future logging and instantly reports as
/// a metric with `action` label.
pub(crate) fn record_relative(&mut self, action: Action) {
self.record_duration(action, self.latest.elapsed());
}
}

Expand Down
Loading

0 comments on commit ccc4a84

Please sign in to comment.