Skip to content

Commit

Permalink
feat: make NodeState generic over DB with DatabaseMetadata (#5691)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Dec 18, 2023
1 parent 61c9587 commit 900fe7e
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 48 deletions.
35 changes: 19 additions & 16 deletions bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::node::cl_events::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_db::DatabaseEnv;
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
Expand All @@ -17,7 +17,6 @@ use std::{
fmt::{Display, Formatter},
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
Expand All @@ -28,11 +27,11 @@ use tracing::{info, warn};
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);

/// The current high-level state of the node.
struct NodeState {
struct NodeState<DB> {
/// Database environment.
/// Used for freelist calculation reported in the "Status" log message.
/// See [EventHandler::poll].
db: Arc<DatabaseEnv>,
db: DB,
/// Connection to the network.
network: Option<NetworkHandle>,
/// The stage currently being executed.
Expand All @@ -41,12 +40,8 @@ struct NodeState {
latest_block: Option<BlockNumber>,
}

impl NodeState {
fn new(
db: Arc<DatabaseEnv>,
network: Option<NetworkHandle>,
latest_block: Option<BlockNumber>,
) -> Self {
impl<DB> NodeState<DB> {
fn new(db: DB, network: Option<NetworkHandle>, latest_block: Option<BlockNumber>) -> Self {
Self { db, network, current_stage: None, latest_block }
}

Expand Down Expand Up @@ -200,6 +195,12 @@ impl NodeState {
}
}

impl<DB: DatabaseMetadata> NodeState<DB> {
fn freelist(&self) -> Option<usize> {
self.db.metadata().freelist_size()
}
}

/// Helper type for formatting of optional fields:
/// - If [Some(x)], then `x` is written
/// - If [None], then `None` is written
Expand Down Expand Up @@ -270,13 +271,14 @@ impl From<PrunerEvent> for NodeEvent {

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E>(
pub async fn handle_events<E, DB>(
network: Option<NetworkHandle>,
latest_block_number: Option<BlockNumber>,
events: E,
db: Arc<DatabaseEnv>,
db: DB,
) where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata + Database + 'static,
{
let state = NodeState::new(db, network, latest_block_number);

Expand All @@ -290,25 +292,26 @@ pub async fn handle_events<E>(

/// Handles events emitted by the node and logs them accordingly.
#[pin_project::pin_project]
struct EventHandler<E> {
state: NodeState,
struct EventHandler<E, DB> {
state: NodeState<DB>,
#[pin]
events: E,
#[pin]
info_interval: Interval,
}

impl<E> Future for EventHandler<E>
impl<E, DB> Future for EventHandler<E, DB>
where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata + Database + 'static,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

while this.info_interval.poll_tick(cx).is_ready() {
let freelist = OptionalField(this.state.db.freelist().ok());
let freelist = OptionalField(this.state.freelist());

if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
&this.state.current_stage
Expand Down
62 changes: 61 additions & 1 deletion crates/storage/db/src/abstraction/database_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,74 @@
use metrics::{counter, gauge, histogram, Label};
use std::sync::Arc;

/// Represents a type that can report metrics, used mainly with the database. The `report_metrics`
/// method can be used as a prometheus hook.
pub trait DatabaseMetrics {
/// Reports metrics for the database.
fn report_metrics(&self);
fn report_metrics(&self) {
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, value, labels);
}

for (name, value, labels) in self.counter_metrics() {
counter!(name, value, labels);
}

for (name, value, labels) in self.histogram_metrics() {
histogram!(name, value, labels);
}
}

/// Returns a list of [Gauge](metrics::Gauge) metrics for the database.
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}

/// Returns a list of [Counter](metrics::Counter) metrics for the database.
fn counter_metrics(&self) -> Vec<(&'static str, u64, Vec<Label>)> {
vec![]
}

/// Returns a list of [Histogram](metrics::Histogram) metrics for the database.
fn histogram_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}
}

impl<DB: DatabaseMetrics> DatabaseMetrics for Arc<DB> {
fn report_metrics(&self) {
<DB as DatabaseMetrics>::report_metrics(self)
}
}

/// The type used to store metadata about the database.
#[derive(Debug, Default)]
pub struct DatabaseMetadataValue {
/// The freelist size
freelist_size: Option<usize>,
}

impl DatabaseMetadataValue {
/// Creates a new [DatabaseMetadataValue] with the given freelist size.
pub fn new(freelist_size: Option<usize>) -> Self {
Self { freelist_size }
}

/// Returns the freelist size, if available.
pub fn freelist_size(&self) -> Option<usize> {
self.freelist_size
}
}

/// Includes a method to return a [DatabaseMetadataValue] type, which can be used to dynamically
/// retrieve information about the database.
pub trait DatabaseMetadata {
/// Returns a metadata type, [DatabaseMetadataValue] for the database.
fn metadata(&self) -> DatabaseMetadataValue;
}

impl<DB: DatabaseMetadata> DatabaseMetadata for Arc<DB> {
fn metadata(&self) -> DatabaseMetadataValue {
<DB as DatabaseMetadata>::metadata(self)
}
}
97 changes: 67 additions & 30 deletions crates/storage/db/src/implementation/mdbx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

use crate::{
database::Database,
database_metrics::DatabaseMetrics,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
tables::{TableType, Tables},
utils::default_page_size,
DatabaseError,
};
use eyre::Context;
use metrics::gauge;
use metrics::{gauge, Label};
use reth_interfaces::db::LogLevel;
use reth_libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, Geometry, Mode, PageSize, SyncMode, RO, RW,
Expand Down Expand Up @@ -65,39 +65,76 @@ impl Database for DatabaseEnv {

impl DatabaseMetrics for DatabaseEnv {
fn report_metrics(&self) {
let _ = self.view(|tx| {
for table in Tables::ALL.iter().map(|table| table.name()) {
let table_db =
tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {table}"))?;

let page_size = stats.page_size() as usize;
let leaf_pages = stats.leaf_pages();
let branch_pages = stats.branch_pages();
let overflow_pages = stats.overflow_pages();
let num_pages = leaf_pages + branch_pages + overflow_pages;
let table_size = page_size * num_pages;
let entries = stats.entries();

gauge!("db.table_size", table_size as f64, "table" => table);
gauge!("db.table_pages", leaf_pages as f64, "table" => table, "type" => "leaf");
gauge!("db.table_pages", branch_pages as f64, "table" => table, "type" => "branch");
gauge!("db.table_pages", overflow_pages as f64, "table" => table, "type" => "overflow");
gauge!("db.table_entries", entries as f64, "table" => table);
}
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, value, labels);
}
}

Ok::<(), eyre::Report>(())
}).map_err(|error| error!(?error, "Failed to read db table stats"));
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
let mut metrics = Vec::new();

let _ = self
.view(|tx| {
for table in Tables::ALL.iter().map(|table| table.name()) {
let table_db = tx.inner.open_db(Some(table)).wrap_err("Could not open db.")?;

let stats = tx
.inner
.db_stat(&table_db)
.wrap_err(format!("Could not find table: {table}"))?;

let page_size = stats.page_size() as usize;
let leaf_pages = stats.leaf_pages();
let branch_pages = stats.branch_pages();
let overflow_pages = stats.overflow_pages();
let num_pages = leaf_pages + branch_pages + overflow_pages;
let table_size = page_size * num_pages;
let entries = stats.entries();

metrics.push((
"db.table_size",
table_size as f64,
vec![Label::new("table", table)],
));
metrics.push((
"db.table_pages",
leaf_pages as f64,
vec![Label::new("table", table), Label::new("type", "leaf")],
));
metrics.push((
"db.table_pages",
branch_pages as f64,
vec![Label::new("table", table), Label::new("type", "branch")],
));
metrics.push((
"db.table_pages",
overflow_pages as f64,
vec![Label::new("table", table), Label::new("type", "overflow")],
));
metrics.push((
"db.table_entries",
entries as f64,
vec![Label::new("table", table)],
));
}

Ok::<(), eyre::Report>(())
})
.map_err(|error| error!(?error, "Failed to read db table stats"));

if let Ok(freelist) =
self.freelist().map_err(|error| error!(?error, "Failed to read db.freelist"))
{
gauge!("db.freelist", freelist as f64);
metrics.push(("db.freelist", freelist as f64, vec![]));
}

metrics
}
}

impl DatabaseMetadata for DatabaseEnv {
fn metadata(&self) -> DatabaseMetadataValue {
DatabaseMetadataValue::new(self.freelist().ok())
}
}

Expand Down Expand Up @@ -190,7 +227,7 @@ impl DatabaseEnv {
LogLevel::Extra => 7,
});
} else {
return Err(DatabaseError::LogLevelUnavailable(log_level))
return Err(DatabaseError::LogLevelUnavailable(log_level));
}
}

Expand Down
11 changes: 10 additions & 1 deletion crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ pub fn open_db(path: &Path, log_level: Option<LogLevel>) -> eyre::Result<Databas
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils {
use super::*;
use crate::{database::Database, database_metrics::DatabaseMetrics};
use crate::{
database::Database,
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
};
use std::{path::PathBuf, sync::Arc};

/// Error during database open
Expand Down Expand Up @@ -216,6 +219,12 @@ pub mod test_utils {
}
}

impl<DB: DatabaseMetadata> DatabaseMetadata for TempDatabase<DB> {
fn metadata(&self) -> DatabaseMetadataValue {
self.db().metadata()
}
}

/// Create read/write database for testing
pub fn create_test_rw_db() -> Arc<TempDatabase<DatabaseEnv>> {
let path = tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path();
Expand Down

0 comments on commit 900fe7e

Please sign in to comment.