Skip to content

Commit

Permalink
feat: add metrics for in-memory state (paradigmxyz#10085)
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg authored and martinezjorge committed Aug 7, 2024
1 parent 8f35e85 commit 98c045f
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 71 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions crates/chain-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ workspace = true
reth-chainspec.workspace = true
reth-errors.workspace = true
reth-execution-types.workspace = true
reth-metrics.workspace = true
reth-primitives.workspace = true
reth-storage-api.workspace = true
reth-trie.workspace = true

revm = { workspace = true, optional = true}
revm = { workspace = true, optional = true }

# async
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
Expand All @@ -32,6 +33,7 @@ tracing.workspace = true
# misc
auto_impl.workspace = true
derive_more.workspace = true
metrics.workspace = true
parking_lot.workspace = true
pin-project.workspace = true
rand = { workspace = true, optional = true }
Expand All @@ -41,7 +43,4 @@ rand.workspace = true
revm.workspace = true

[features]
test-utils = [
"rand",
"revm"
]
test-utils = ["rand", "revm"]
178 changes: 112 additions & 66 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_metrics::{metrics::Gauge, Metrics};
use reth_primitives::{
Address, BlockNumHash, Header, Receipt, Receipts, SealedBlock, SealedBlockWithSenders,
SealedHeader, TransactionMeta, TransactionSigned, TxHash, B256,
Expand All @@ -24,6 +25,18 @@ use tokio::sync::broadcast;
/// Size of the broadcast channel used to notify canonical state events.
const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256;

/// Metrics for the in-memory state.
#[derive(Metrics)]
#[metrics(scope = "blockchain_tree.in_mem_state")]
pub(crate) struct InMemoryStateMetrics {
/// The block number of the earliest block in the in-memory state.
pub(crate) earliest_block: Gauge,
/// The block number of the latest block in the in-memory state.
pub(crate) latest_block: Gauge,
/// The number of blocks in the in-memory state.
pub(crate) num_blocks: Gauge,
}

/// Container type for in memory state data of the canonical chain.
///
/// This tracks blocks and their state that haven't been persisted to disk yet but are part of the
Expand All @@ -36,19 +49,40 @@ pub(crate) struct InMemoryState {
numbers: RwLock<BTreeMap<u64, B256>>,
/// The pending block that has not yet been made canonical.
pending: RwLock<Option<BlockState>>,
/// Metrics for the in-memory state.
metrics: InMemoryStateMetrics,
}

impl InMemoryState {
pub(crate) const fn new(
pub(crate) fn new(
blocks: HashMap<B256, Arc<BlockState>>,
numbers: BTreeMap<u64, B256>,
pending: Option<BlockState>,
) -> Self {
Self {
let this = Self {
blocks: RwLock::new(blocks),
numbers: RwLock::new(numbers),
pending: RwLock::new(pending),
metrics: Default::default(),
};
this.update_metrics();
this
}

/// Update the metrics for the in-memory state.
///
/// # Locking behavior
///
/// This tries to acquire a read lock. Drop any write locks before calling this.
pub(crate) fn update_metrics(&self) {
let numbers = self.numbers.read();
if let Some((earliest_block_number, _)) = numbers.first_key_value() {
self.metrics.earliest_block.set(*earliest_block_number as f64);
}
if let Some((latest_block_number, _)) = numbers.last_key_value() {
self.metrics.latest_block.set(*latest_block_number as f64);
}
self.metrics.num_blocks.set(numbers.len() as f64);
}

/// Returns the state for a given block hash.
Expand Down Expand Up @@ -98,13 +132,16 @@ pub(crate) struct CanonicalInMemoryStateInner {
impl CanonicalInMemoryStateInner {
/// Clears all entries in the in memory state.
fn clear(&self) {
let mut blocks = self.in_memory_state.blocks.write();
let mut numbers = self.in_memory_state.numbers.write();
let mut pending = self.in_memory_state.pending.write();

blocks.clear();
numbers.clear();
pending.take();
{
let mut blocks = self.in_memory_state.blocks.write();
let mut numbers = self.in_memory_state.numbers.write();
let mut pending = self.in_memory_state.pending.write();

blocks.clear();
numbers.clear();
pending.take();
}
self.in_memory_state.update_metrics();
}
}

Expand Down Expand Up @@ -183,40 +220,45 @@ impl CanonicalInMemoryState {
let parent = self.state_by_hash(pending.block().parent_hash);
let pending = BlockState::with_parent(pending, parent.map(|p| (*p).clone()));
*self.inner.in_memory_state.pending.write() = Some(pending);
self.inner.in_memory_state.update_metrics();
}

/// Append new blocks to the in memory state.
fn update_blocks<I>(&self, new_blocks: I, reorged: I)
where
I: IntoIterator<Item = ExecutedBlock>,
{
// acquire all locks
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut pending = self.inner.in_memory_state.pending.write();

// we first remove the blocks from the reorged chain
for block in reorged {
let hash = block.block().hash();
let number = block.block().number;
blocks.remove(&hash);
numbers.remove(&number);
}
{
// acquire all locks
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut pending = self.inner.in_memory_state.pending.write();

// we first remove the blocks from the reorged chain
for block in reorged {
let hash = block.block().hash();
let number = block.block().number;
blocks.remove(&hash);
numbers.remove(&number);
}

// insert the new blocks
for block in new_blocks {
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state = BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
let hash = block_state.hash();
let number = block_state.number();
// insert the new blocks
for block in new_blocks {
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state =
BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
let hash = block_state.hash();
let number = block_state.number();

// append new blocks
blocks.insert(hash, Arc::new(block_state));
numbers.insert(number, hash);
}

// append new blocks
blocks.insert(hash, Arc::new(block_state));
numbers.insert(number, hash);
// remove the pending state
pending.take();
}

// remove the pending state
pending.take();
self.inner.in_memory_state.update_metrics();
}

/// Update the in memory state with the given chain update.
Expand All @@ -236,41 +278,45 @@ impl CanonicalInMemoryState {
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut pending = self.inner.in_memory_state.pending.write();

// clear all numbers
numbers.clear();

// drain all blocks and only keep the ones that are not persisted
let mut old_blocks = blocks
.drain()
.map(|(_, b)| b.block.clone())
.filter(|b| b.block().number > persisted_height)
.collect::<Vec<_>>();

// sort the blocks by number so we can insert them back in natural order (low -> high)
old_blocks.sort_unstable_by_key(|block| block.block().number);

for block in old_blocks {
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state = BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
let hash = block_state.hash();
let number = block_state.number();

// append new blocks
blocks.insert(hash, Arc::new(block_state));
numbers.insert(number, hash);
}
{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut pending = self.inner.in_memory_state.pending.write();

// clear all numbers
numbers.clear();

// drain all blocks and only keep the ones that are not persisted
let mut old_blocks = blocks
.drain()
.map(|(_, b)| b.block.clone())
.filter(|b| b.block().number > persisted_height)
.collect::<Vec<_>>();

// sort the blocks by number so we can insert them back in natural order (low -> high)
old_blocks.sort_unstable_by_key(|block| block.block().number);

for block in old_blocks {
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state =
BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
let hash = block_state.hash();
let number = block_state.number();

// append new blocks
blocks.insert(hash, Arc::new(block_state));
numbers.insert(number, hash);
}

// also shift the pending state if it exists
if let Some(pending) = pending.as_mut() {
pending.parent = blocks
.get(&pending.block().block.parent_hash)
.cloned()
.map(|p| Box::new((*p).clone()));
// also shift the pending state if it exists
if let Some(pending) = pending.as_mut() {
pending.parent = blocks
.get(&pending.block().block.parent_hash)
.cloned()
.map(|p| Box::new((*p).clone()));
}
}
self.inner.in_memory_state.update_metrics();
}

/// Returns in memory state corresponding the given hash.
Expand Down

0 comments on commit 98c045f

Please sign in to comment.