Skip to content

Commit

Permalink
feat(storage): database/transaction/cursor metrics (paradigmxyz#5149)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored and root committed Oct 31, 2023
1 parent 9834ead commit 979d77a
Show file tree
Hide file tree
Showing 12 changed files with 950 additions and 177 deletions.
45 changes: 30 additions & 15 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use clap::{value_parser, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
use metrics_exporter_prometheus::PrometheusHandle;
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook},
Expand Down Expand Up @@ -72,7 +73,6 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
},
MetricEventsSender, MetricsListener,
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{
Expand Down Expand Up @@ -247,12 +247,14 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
// always store reth.toml in the data dir, not the chain specific data dir
info!(target: "reth::cli", path = ?config_path, "Configuration loaded");

let prometheus_handle = self.install_prometheus_recorder()?;

let db_path = data_dir.db_path();
info!(target: "reth::cli", path = ?db_path, "Opening database");
let db = Arc::new(init_db(&db_path, self.db.log_level)?);
let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics());
info!(target: "reth::cli", "Database opened");

self.start_metrics_endpoint(Arc::clone(&db)).await?;
self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?;

debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis");

Expand All @@ -269,10 +271,10 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {

self.init_trusted_nodes(&mut config);

debug!(target: "reth::cli", "Spawning metrics listener task");
let (metrics_tx, metrics_rx) = unbounded_channel();
let metrics_listener = MetricsListener::new(metrics_rx);
ctx.task_executor.spawn_critical("metrics listener task", metrics_listener);
debug!(target: "reth::cli", "Spawning stages metrics listener task");
let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel();
let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx);
ctx.task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener);

let prune_config =
self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone());
Expand All @@ -289,7 +291,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
BlockchainTreeConfig::default(),
prune_config.clone().map(|config| config.segments),
)?
.with_sync_metrics_tx(metrics_tx.clone());
.with_sync_metrics_tx(sync_metrics_tx.clone());
let canon_state_notification_sender = tree.canon_state_notification_sender();
let blockchain_tree = ShareableBlockchainTree::new(tree);
debug!(target: "reth::cli", "configured blockchain tree");
Expand Down Expand Up @@ -409,7 +411,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
metrics_tx,
sync_metrics_tx,
prune_config.clone(),
max_block,
)
Expand All @@ -429,7 +431,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
metrics_tx,
sync_metrics_tx,
prune_config.clone(),
max_block,
)
Expand Down Expand Up @@ -565,7 +567,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
consensus: Arc<dyn Consensus>,
db: DB,
task_executor: &TaskExecutor,
metrics_tx: MetricEventsSender,
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
max_block: Option<BlockNumber>,
) -> eyre::Result<Pipeline<DB>>
Expand Down Expand Up @@ -628,11 +630,24 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
}
}

async fn start_metrics_endpoint(&self, db: Arc<DatabaseEnv>) -> eyre::Result<()> {
fn install_prometheus_recorder(&self) -> eyre::Result<PrometheusHandle> {
prometheus_exporter::install_recorder()
}

async fn start_metrics_endpoint(
&self,
prometheus_handle: PrometheusHandle,
db: Arc<DatabaseEnv>,
) -> eyre::Result<()> {
if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint");
prometheus_exporter::initialize(listen_addr, db, metrics_process::Collector::default())
.await?;
prometheus_exporter::serve(
listen_addr,
prometheus_handle,
db,
metrics_process::Collector::default(),
)
.await?;
}

Ok(())
Expand Down Expand Up @@ -790,7 +805,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
metrics_tx: MetricEventsSender,
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
) -> eyre::Result<Pipeline<DB>>
where
Expand Down
36 changes: 21 additions & 15 deletions bin/reth/src/prometheus_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,36 @@ use tracing::error;
pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}

/// Installs Prometheus as the metrics recorder and serves it over HTTP with hooks.
/// Installs Prometheus as the metrics recorder.
pub(crate) fn install_recorder() -> eyre::Result<PrometheusHandle> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();

// Build metrics stack
Stack::new(recorder)
.push(PrefixLayer::new("reth"))
.install()
.wrap_err("Couldn't set metrics recorder.")?;

Ok(handle)
}

/// Serves Prometheus metrics over HTTP with hooks.
///
/// The hooks are called every time the metrics are requested at the given endpoint, and can be used
/// to record values for pull-style metrics, i.e. metrics that are not automatically updated.
pub(crate) async fn initialize_with_hooks<F: Hook + 'static>(
pub(crate) async fn serve_with_hooks<F: Hook + 'static>(
listen_addr: SocketAddr,
handle: PrometheusHandle,
hooks: impl IntoIterator<Item = F>,
) -> eyre::Result<()> {
let recorder = PrometheusBuilder::new().build_recorder();
let handle = recorder.handle();

let hooks: Vec<_> = hooks.into_iter().collect();

// Start endpoint
start_endpoint(listen_addr, handle, Arc::new(move || hooks.iter().for_each(|hook| hook())))
.await
.wrap_err("Could not start Prometheus endpoint")?;

// Build metrics stack
Stack::new(recorder)
.push(PrefixLayer::new("reth"))
.install()
.wrap_err("Couldn't set metrics recorder.")?;

Ok(())
}

Expand Down Expand Up @@ -67,10 +73,10 @@ async fn start_endpoint<F: Hook + 'static>(
Ok(())
}

/// Installs Prometheus as the metrics recorder and serves it over HTTP with database and process
/// metrics.
pub(crate) async fn initialize(
/// Serves Prometheus metrics over HTTP with database and process metrics.
pub(crate) async fn serve(
listen_addr: SocketAddr,
handle: PrometheusHandle,
db: Arc<DatabaseEnv>,
process: metrics_process::Collector,
) -> eyre::Result<()> {
Expand Down Expand Up @@ -119,7 +125,7 @@ pub(crate) async fn initialize(
Box::new(move || cloned_process.collect()),
Box::new(collect_memory_stats),
];
initialize_with_hooks(listen_addr, hooks).await?;
serve_with_hooks(listen_addr, handle, hooks).await?;

// We describe the metrics after the recorder is installed, otherwise this information is not
// registered
Expand Down
3 changes: 2 additions & 1 deletion bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ impl Command {

if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
prometheus_exporter::initialize(
prometheus_exporter::serve(
listen_addr,
prometheus_exporter::install_recorder()?,
Arc::clone(&db),
metrics_process::Collector::default(),
)
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db/src/abstraction/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl DbTx for TxMock {
Ok(true)
}

fn drop(self) {}
fn abort(self) {}

fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/db/src/abstraction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ pub trait DbTx: for<'a> DbTxGAT<'a> {
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, DatabaseError>;
/// Drops transaction
fn drop(self);
/// Aborts transaction
fn abort(self);
/// Iterate over read only values in table.
fn cursor_read<T: Table>(&self) -> Result<<Self as DbTxGAT<'_>>::Cursor<T>, DatabaseError>;
/// Iterate over read only values in dup sorted table.
Expand Down
Loading

0 comments on commit 979d77a

Please sign in to comment.