From d5ac164f68bc2c97d8a4d233091a95bf275a7957 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 21:36:51 +0100 Subject: [PATCH 01/20] feat(storage): database/transaction/cursor metrics --- bin/reth/src/node/mod.rs | 26 ++-- crates/storage/db/Cargo.toml | 1 + .../db/src/implementation/mdbx/cursor.rs | 122 +++++++++++------ .../storage/db/src/implementation/mdbx/mod.rs | 39 +++++- .../storage/db/src/implementation/mdbx/tx.rs | 129 ++++++++++++------ crates/storage/db/src/lib.rs | 2 + crates/storage/db/src/metrics/listener.rs | 68 +++++++++ crates/storage/db/src/metrics/metrics.rs | 67 +++++++++ crates/storage/db/src/metrics/mod.rs | 79 +++++++++++ 9 files changed, 435 insertions(+), 98 deletions(-) create mode 100644 crates/storage/db/src/metrics/listener.rs create mode 100644 crates/storage/db/src/metrics/metrics.rs create mode 100644 crates/storage/db/src/metrics/mod.rs diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index e7f0cfae7fd4..7a564e568802 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -72,7 +72,6 @@ use reth_stages::{ IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, TotalDifficultyStage, TransactionLookupStage, }, - MetricEventsSender, MetricsListener, }; use reth_tasks::TaskExecutor; use reth_transaction_pool::{ @@ -247,9 +246,14 @@ impl NodeCommand { // always store reth.toml in the data dir, not the chain specific data dir info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + debug!(target: "reth::cli", "Spawning database metrics listener task"); + let (db_metrics_tx, db_metrics_rx) = unbounded_channel(); + let db_metrics_listener = reth_db::MetricsListener::new(db_metrics_rx); + ctx.task_executor.spawn_critical("database metrics listener task", db_metrics_listener); + let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(&db_path, self.db.log_level)?); + let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics_tx(db_metrics_tx)); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint(Arc::clone(&db)).await?; @@ -269,10 +273,10 @@ impl NodeCommand { self.init_trusted_nodes(&mut config); - debug!(target: "reth::cli", "Spawning metrics listener task"); - let (metrics_tx, metrics_rx) = unbounded_channel(); - let metrics_listener = MetricsListener::new(metrics_rx); - ctx.task_executor.spawn_critical("metrics listener task", metrics_listener); + debug!(target: "reth::cli", "Spawning stages metrics listener task"); + let (sync_metrics_tx, sync_metrics_rx) = unbounded_channel(); + let sync_metrics_listener = reth_stages::MetricsListener::new(sync_metrics_rx); + ctx.task_executor.spawn_critical("stages metrics listener task", sync_metrics_listener); let prune_config = self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone()); @@ -289,7 +293,7 @@ impl NodeCommand { BlockchainTreeConfig::default(), prune_config.clone().map(|config| config.segments), )? - .with_sync_metrics_tx(metrics_tx.clone()); + .with_sync_metrics_tx(sync_metrics_tx.clone()); let canon_state_notification_sender = tree.canon_state_notification_sender(); let blockchain_tree = ShareableBlockchainTree::new(tree); @@ -408,7 +412,7 @@ impl NodeCommand { Arc::clone(&consensus), db.clone(), &ctx.task_executor, - metrics_tx, + sync_metrics_tx, prune_config.clone(), max_block, ) @@ -428,7 +432,7 @@ impl NodeCommand { Arc::clone(&consensus), db.clone(), &ctx.task_executor, - metrics_tx, + sync_metrics_tx, prune_config.clone(), max_block, ) @@ -564,7 +568,7 @@ impl NodeCommand { consensus: Arc, db: DB, task_executor: &TaskExecutor, - metrics_tx: MetricEventsSender, + metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, max_block: Option, ) -> eyre::Result> @@ -789,7 +793,7 @@ impl NodeCommand { consensus: Arc, max_block: Option, continuous: bool, - metrics_tx: MetricEventsSender, + metrics_tx: reth_stages::MetricEventsSender, prune_config: Option, ) -> eyre::Result> where diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 937bb232b9b2..36832db9ef55 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -45,6 +45,7 @@ parking_lot.workspace = true derive_more = "0.99" eyre.workspace = true paste = "1.0" +tokio = { workspace = true, features = ["sync"] } # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 936069ca86db..c35fc1d31ff9 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -1,7 +1,7 @@ //! Cursor wrapper for libmdbx-sys. use reth_interfaces::db::DatabaseWriteOperation; -use std::{borrow::Cow, collections::Bound, ops::RangeBounds}; +use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, time::Instant}; use crate::{ common::{PairResult, ValueOnlyResult}, @@ -9,6 +9,7 @@ use crate::{ DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, ReverseWalker, Walker, }, + metrics::{MetricEvent, MetricEventsSender, Operation}, table::{Compress, DupSort, Encode, Table}, tables::utils::*, DatabaseError, @@ -24,13 +25,39 @@ pub type CursorRW<'tx, T> = Cursor<'tx, RW, T>; #[derive(Debug)] pub struct Cursor<'tx, K: TransactionKind, T: Table> { /// Inner `libmdbx` cursor. - pub inner: reth_libmdbx::Cursor<'tx, K>, - /// Table name as is inside the database. - pub table: &'static str, - /// Phantom data to enforce encoding/decoding. - pub _dbi: std::marker::PhantomData, + pub(crate) inner: reth_libmdbx::Cursor<'tx, K>, /// Cache buffer that receives compressed values. - pub buf: Vec, + buf: Vec, + metrics_tx: Option, + /// Phantom data to enforce encoding/decoding. + _dbi: PhantomData, +} + +impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { + pub(crate) fn new(inner: reth_libmdbx::Cursor<'tx, K>) -> Self { + Self { inner, buf: Vec::new(), metrics_tx: None, _dbi: PhantomData::default() } + } + + pub(crate) fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + self.metrics_tx = Some(metrics_tx); + self + } + + fn execute_with_operation_metric( + &mut self, + operation: Operation, + f: impl FnOnce(&mut Self) -> Result<(), DatabaseError>, + ) -> Result<(), DatabaseError> { + let start = Instant::now(); + let result = f(self); + + if let Some(metrics_tx) = &self.metrics_tx { + let _ = + metrics_tx.send(MetricEvent::Operation { operation, duration: start.elapsed() }); + } + + result + } } /// Takes `(key, value)` from the database and decodes it appropriately. @@ -229,62 +256,73 @@ impl DbCursorRW for Cursor<'_, RW, T> { /// found, before calling `upsert`. fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - // Default `WriteFlags` is UPSERT - self.inner.put(key.as_ref(), compress_or_ref!(self, value), WriteFlags::UPSERT).map_err( - |e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorUpsert, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }, - ) + self.execute_with_operation_metric(Operation::CursorUpsert, |this| { + this.inner.put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::UPSERT).map_err( + |e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorUpsert, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }, + ) + }) } fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.inner - .put(key.as_ref(), compress_or_ref!(self, value), WriteFlags::NO_OVERWRITE) - .map_err(|e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorInsert, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }) + self.execute_with_operation_metric(Operation::CursorInsert, |this| { + this.inner + .put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::NO_OVERWRITE) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorInsert, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }) } /// Appends the data to the end of the table. Consequently, the append operation /// will fail if the inserted key is less than the last table key fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.inner.put(key.as_ref(), compress_or_ref!(self, value), WriteFlags::APPEND).map_err( - |e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorAppend, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }, - ) + self.execute_with_operation_metric(Operation::CursorAppend, |this| { + this.inner.put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::APPEND).map_err( + |e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorAppend, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }, + ) + }) } fn delete_current(&mut self) -> Result<(), DatabaseError> { - self.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into())) + self.execute_with_operation_metric(Operation::CursorDeleteCurrent, |this| { + this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into())) + }) } } impl DbDupCursorRW for Cursor<'_, RW, T> { fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> { - self.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into())) + self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, |this| { + this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into())) + }) } fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.inner.put(key.as_ref(), compress_or_ref!(self, value), WriteFlags::APPEND_DUP).map_err( - |e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorAppendDup, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }, - ) + self.execute_with_operation_metric(Operation::CursorAppendDup, |this| { + this.inner + .put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::APPEND_DUP) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorAppendDup, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }) } } diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index ea82ed4285d6..9ed48b13db5f 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -2,6 +2,7 @@ use crate::{ database::{Database, DatabaseGAT}, + metrics::{MetricEvent, MetricEventsSender, TransactionMode}, tables::{TableType, Tables}, utils::default_page_size, DatabaseError, @@ -37,6 +38,7 @@ pub enum EnvKind { pub struct Env { /// Libmdbx-sys environment. pub inner: Environment, + metrics_tx: Option, } impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env { @@ -46,15 +48,31 @@ impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env { impl Database for Env { fn tx(&self) -> Result<>::TX, DatabaseError> { - Ok(Tx::new( + let mut tx = Tx::new( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?, - )) + ); + if let Some(metrics_tx) = &self.metrics_tx { + tx = tx.with_metrics_tx(metrics_tx.clone()); + let _ = metrics_tx.send(MetricEvent::OpenTransaction { + txn_id: tx.id(), + mode: TransactionMode::ReadOnly, + }); + } + Ok(tx) } fn tx_mut(&self) -> Result<>::TXMut, DatabaseError> { - Ok(Tx::new( + let mut tx = Tx::new( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?, - )) + ); + if let Some(metrics_tx) = &self.metrics_tx { + tx = tx.with_metrics_tx(metrics_tx.clone()); + let _ = metrics_tx.send(MetricEvent::OpenTransaction { + txn_id: tx.id(), + mode: TransactionMode::ReadWrite, + }); + } + Ok(tx) } } @@ -120,12 +138,19 @@ impl Env { } } - let env = - Env { inner: inner_env.open(path).map_err(|e| DatabaseError::FailedToOpen(e.into()))? }; + let env = Env { + inner: inner_env.open(path).map_err(|e| DatabaseError::FailedToOpen(e.into()))?, + metrics_tx: None, + }; Ok(env) } + pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + self.metrics_tx = Some(metrics_tx); + self + } + /// Creates all the defined tables, if necessary. pub fn create_tables(&self) -> Result<(), DatabaseError> { let tx = self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?; @@ -804,7 +829,7 @@ mod tests { assert!(result.expect(ERROR_RETURN_VALUE) == 200); } - let env = Env::::open(&path, EnvKind::RO, None).expect(ERROR_DB_CREATION); + let env = Env::::open(&path, EnvKind::RO, None, None).expect(ERROR_DB_CREATION); // GET let result = diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 2bf8450ca91f..8ab8958c6416 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -2,6 +2,7 @@ use super::cursor::Cursor; use crate::{ + metrics::{MetricEvent, MetricEventsSender, Operation, TransactionOutcome}, table::{Compress, DupSort, Encode, Table, TableImporter}, tables::{utils::decode_one, Tables, NUM_TABLES}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, @@ -10,16 +11,18 @@ use crate::{ use parking_lot::RwLock; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{ffi::DBI, EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; -use reth_metrics::metrics::histogram; -use std::{marker::PhantomData, str::FromStr, sync::Arc, time::Instant}; +use std::{str::FromStr, sync::Arc, time::Instant}; /// Wrapper for the libmdbx transaction. #[derive(Debug)] pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { /// Libmdbx-sys transaction. pub inner: Transaction<'a, K, E>, - /// Database table handle cache - pub db_handles: Arc; NUM_TABLES]>>, + /// Cached internal transaction ID provided by libmdbx. + id: Option, + /// Database table handle cache. + pub(crate) db_handles: Arc; NUM_TABLES]>>, + metrics_tx: Option, } impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { @@ -28,12 +31,17 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { where 'a: 'env, { - Self { inner, db_handles: Default::default() } + Self { inner, id: None, db_handles: Default::default(), metrics_tx: None } + } + + pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { + self.metrics_tx = Some(metrics_tx); + self } /// Gets this transaction ID. - pub fn id(&self) -> u64 { - self.inner.id() + pub fn id(&mut self) -> u64 { + *self.id.get_or_insert_with(|| self.inner.id()) } /// Gets a table database handle if it exists, otherwise creates it. @@ -57,15 +65,25 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { /// Create db Cursor pub fn new_cursor(&self) -> Result, DatabaseError> { - Ok(Cursor { - inner: self - .inner + Ok(Cursor::new( + self.inner .cursor_with_dbi(self.get_dbi::()?) .map_err(|e| DatabaseError::InitCursor(e.into()))?, - table: T::NAME, - _dbi: PhantomData, - buf: vec![], - }) + )) + } + + fn execute_with_operation_metric( + &self, + operation: Operation, + f: impl FnOnce(&Transaction<'_, K, E>) -> Result, + ) -> Result { + let start = Instant::now(); + let result = f(&self.inner); + if let Some(metrics_tx) = &self.metrics_tx { + let _ = + metrics_tx.send(MetricEvent::Operation { operation, duration: start.elapsed() }); + } + result } } @@ -83,34 +101,59 @@ impl TableImporter for Tx<'_, RW, E> {} impl DbTx for Tx<'_, K, E> { fn get(&self, key: T::Key) -> Result::Value>, DatabaseError> { - self.inner - .get(self.get_dbi::()?, key.encode().as_ref()) - .map_err(|e| DatabaseError::Read(e.into()))? - .map(decode_one::) - .transpose() + self.execute_with_operation_metric(Operation::Get, |tx| { + tx.get(self.get_dbi::()?, key.encode().as_ref()) + .map_err(|e| DatabaseError::Read(e.into()))? + .map(decode_one::) + .transpose() + }) } - fn commit(self) -> Result { + fn commit(mut self) -> Result { let start = Instant::now(); + let metrics = self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)); let result = self.inner.commit().map_err(|e| DatabaseError::Commit(e.into())); - histogram!("tx.commit", start.elapsed()); + if let Some((txn_id, metrics_tx)) = metrics { + let _ = metrics_tx.send(MetricEvent::CloseTransaction { + txn_id, + outcome: TransactionOutcome::Commit, + commit_duration: start.elapsed(), + }); + } result } - fn drop(self) { - drop(self.inner) + fn drop(mut self) { + let start = Instant::now(); + let metrics = self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)); + drop(self.inner); + if let Some((txn_id, metrics_tx)) = metrics { + let _ = metrics_tx.send(MetricEvent::CloseTransaction { + txn_id, + outcome: TransactionOutcome::Abort, + commit_duration: start.elapsed(), + }); + } } // Iterate over read only values in database. fn cursor_read(&self) -> Result<>::Cursor, DatabaseError> { - self.new_cursor() + let mut cursor = self.new_cursor()?; + if let Some(metrics_tx) = &self.metrics_tx { + cursor = cursor.with_metrics_tx(metrics_tx.clone()); + } + Ok(cursor) } /// Iterate over read only values in database. fn cursor_dup_read( &self, ) -> Result<>::DupCursor, DatabaseError> { - self.new_cursor() + let mut cursor = self.new_cursor()?; + if let Some(metrics_tx) = &self.metrics_tx { + cursor = cursor.with_metrics_tx(metrics_tx.clone()); + } + Ok(cursor) } /// Returns number of entries in the table using cheap DB stats invocation. @@ -126,14 +169,15 @@ impl DbTx for Tx<'_, K, E> { impl DbTxMut for Tx<'_, RW, E> { fn put(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.inner - .put(self.get_dbi::()?, key.as_ref(), &value.compress(), WriteFlags::UPSERT) - .map_err(|e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::Put, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }) + self.execute_with_operation_metric(Operation::Put, |tx| { + tx.put(self.get_dbi::()?, key.as_ref(), &value.compress(), WriteFlags::UPSERT) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::Put, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }) } fn delete( @@ -148,9 +192,10 @@ impl DbTxMut for Tx<'_, RW, E> { data = Some(value.as_ref()); }; - self.inner - .del(self.get_dbi::()?, key.encode(), data) - .map_err(|e| DatabaseError::Delete(e.into())) + self.execute_with_operation_metric(Operation::Delete, |tx| { + tx.del(self.get_dbi::()?, key.encode(), data) + .map_err(|e| DatabaseError::Delete(e.into())) + }) } fn clear(&self) -> Result<(), DatabaseError> { @@ -162,12 +207,20 @@ impl DbTxMut for Tx<'_, RW, E> { fn cursor_write( &self, ) -> Result<>::CursorMut, DatabaseError> { - self.new_cursor() + let mut cursor = self.new_cursor()?; + if let Some(metrics_tx) = &self.metrics_tx { + cursor = cursor.with_metrics_tx(metrics_tx.clone()); + } + Ok(cursor) } fn cursor_dup_write( &self, ) -> Result<>::DupCursorMut, DatabaseError> { - self.new_cursor() + let mut cursor = self.new_cursor()?; + if let Some(metrics_tx) = &self.metrics_tx { + cursor = cursor.with_metrics_tx(metrics_tx.clone()); + } + Ok(cursor) } } diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 8cb3fe80c19e..9684aa17df12 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -68,6 +68,8 @@ pub mod abstraction; mod implementation; +mod metrics; +pub use crate::metrics::*; pub mod snapshot; pub mod tables; mod utils; diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs new file mode 100644 index 000000000000..b19e096e26c0 --- /dev/null +++ b/crates/storage/db/src/metrics/listener.rs @@ -0,0 +1,68 @@ +use crate::metrics::{metrics::Metrics, Operation, TransactionMode, TransactionOutcome}; +use reth_tracing::tracing::trace; +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, + time::Duration, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +/// Alias type for metric producers to use. +pub type MetricEventsSender = UnboundedSender; + +/// Collection of metric events. +#[derive(Clone, Copy, Debug)] +pub enum MetricEvent { + OpenTransaction { txn_id: u64, mode: TransactionMode }, + CloseTransaction { txn_id: u64, outcome: TransactionOutcome, commit_duration: Duration }, + Operation { operation: Operation, duration: Duration }, +} + +/// Metrics routine that listens to new metric events on the `events_rx` receiver. +/// Upon receiving new event, related metrics are updated. +#[derive(Debug)] +pub struct MetricsListener { + events_rx: UnboundedReceiver, + metrics: Metrics, +} + +impl MetricsListener { + /// Creates a new [MetricsListener] with the provided receiver of [MetricEvent]. + pub fn new(events_rx: UnboundedReceiver) -> Self { + Self { events_rx, metrics: Metrics::default() } + } + + fn handle_event(&mut self, event: MetricEvent) { + trace!(target: "storage::metrics", ?event, "Metric event received"); + match event { + MetricEvent::OpenTransaction { txn_id, mode } => { + self.metrics.record_open_transaction(txn_id, mode) + } + MetricEvent::CloseTransaction { txn_id, outcome, commit_duration } => { + self.metrics.record_close_transaction(txn_id, outcome, commit_duration) + } + MetricEvent::Operation { operation, duration } => { + self.metrics.record_operation(operation, duration) + } + } + } +} + +impl Future for MetricsListener { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Loop until we drain the `events_rx` channel + loop { + let Some(event) = ready!(this.events_rx.poll_recv(cx)) else { + // Channel has closed + return Poll::Ready(()) + }; + + this.handle_event(event); + } + } +} diff --git a/crates/storage/db/src/metrics/metrics.rs b/crates/storage/db/src/metrics/metrics.rs new file mode 100644 index 000000000000..0caf9545a455 --- /dev/null +++ b/crates/storage/db/src/metrics/metrics.rs @@ -0,0 +1,67 @@ +use crate::metrics::{Operation, Transaction, TransactionMode, TransactionOutcome}; +use metrics::Histogram; +use reth_metrics::{metrics::Counter, Metrics}; +use std::{ + collections::HashMap, + time::{Duration, Instant}, +}; + +#[derive(Debug, Default)] +pub(crate) struct Metrics { + transactions: HashMap, + transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, + + operation_metrics: HashMap, +} + +impl Metrics { + pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { + self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); + } + + pub(crate) fn record_close_transaction( + &mut self, + txn_id: u64, + outcome: TransactionOutcome, + commit_duration: Duration, + ) { + if let Some(transaction) = self.transactions.remove(&txn_id) { + let metrics = + self.transaction_metrics.entry((transaction.mode, outcome)).or_insert_with(|| { + TransactionMetrics::new_with_labels(&[ + ("mode", transaction.mode.to_string()), + ("outcome", outcome.to_string()), + ]) + }); + metrics.open_duration.record(transaction.begin.elapsed()); + metrics.commit_duration.record(commit_duration); + } + } + + pub(crate) fn record_operation(&mut self, operation: Operation, duration: Duration) { + let metrics = self.operation_metrics.entry(operation).or_insert_with(|| { + OperationMetrics::new_with_labels(&[("operation", operation.to_string())]) + }); + + metrics.duration.record(duration); + metrics.called.increment(1); + } +} + +#[derive(Metrics)] +#[metrics(scope = "database.transaction")] +struct TransactionMetrics { + /// open duration + pub(crate) open_duration: Histogram, + /// commit duration + pub(crate) commit_duration: Histogram, +} + +#[derive(Metrics)] +#[metrics(scope = "database.operation")] +struct OperationMetrics { + /// called + pub(crate) called: Counter, + /// duration + pub(crate) duration: Histogram, +} diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs new file mode 100644 index 000000000000..99d6bbaeb53c --- /dev/null +++ b/crates/storage/db/src/metrics/mod.rs @@ -0,0 +1,79 @@ +use std::{ + fmt::{Display, Formatter}, + time::Instant, +}; + +mod listener; +mod metrics; + +pub use listener::{MetricEvent, MetricEventsSender, MetricsListener}; + +#[derive(Debug, Clone, Copy)] +struct Transaction { + mode: TransactionMode, + begin: Instant, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub enum TransactionMode { + ReadOnly, + ReadWrite, +} + +impl Display for TransactionMode { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TransactionMode::ReadOnly => write!(f, "read-only"), + TransactionMode::ReadWrite => write!(f, "read-write"), + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub enum TransactionOutcome { + Commit, + Abort, +} + +impl Display for TransactionOutcome { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + TransactionOutcome::Commit => write!(f, "commit"), + TransactionOutcome::Abort => write!(f, "abort"), + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub enum Operation { + Get, + Put, + Delete, + CursorUpsert, + CursorInsert, + CursorAppend, + CursorAppendDup, + CursorDeleteCurrent, + CursorDeleteCurrentDuplicates, +} + +impl Display for Operation { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Operation::Get => write!(f, "get"), + Operation::Put => write!(f, "put"), + Operation::Delete => write!(f, "delete"), + Operation::CursorUpsert => write!(f, "cursor-upsert"), + Operation::CursorInsert => write!(f, "cursor-insert"), + Operation::CursorAppend => write!(f, "cursor-append"), + Operation::CursorAppendDup => write!(f, "cursor-append-dup"), + Operation::CursorDeleteCurrent => write!(f, "cursor-delete-current"), + Operation::CursorDeleteCurrentDuplicates => { + write!(f, "cursor-delete-current-duplicates") + } + } + } +} From a11116f473f3ddf97acae51ffcbc8f948a43db8f Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 21:43:13 +0100 Subject: [PATCH 02/20] improve metric docs and names --- crates/storage/db/src/metrics/metrics.rs | 39 +++++++++++++++--------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/crates/storage/db/src/metrics/metrics.rs b/crates/storage/db/src/metrics/metrics.rs index 0caf9545a455..25844cc59f2e 100644 --- a/crates/storage/db/src/metrics/metrics.rs +++ b/crates/storage/db/src/metrics/metrics.rs @@ -1,22 +1,29 @@ use crate::metrics::{Operation, Transaction, TransactionMode, TransactionOutcome}; -use metrics::Histogram; +use metrics::{Gauge, Histogram}; use reth_metrics::{metrics::Counter, Metrics}; use std::{ collections::HashMap, time::{Duration, Instant}, }; -#[derive(Debug, Default)] +#[derive(Metrics)] +#[metrics(scope = "database")] pub(crate) struct Metrics { + /// Total number of currently open database transactions + open_transactions_total: Gauge, + + #[metric(skip)] transactions: HashMap, + #[metric(skip)] transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, - + #[metric(skip)] operation_metrics: HashMap, } impl Metrics { pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); + self.open_transactions_total.set(self.transactions.len() as f64) } pub(crate) fn record_close_transaction( @@ -33,8 +40,10 @@ impl Metrics { ("outcome", outcome.to_string()), ]) }); - metrics.open_duration.record(transaction.begin.elapsed()); - metrics.commit_duration.record(commit_duration); + metrics.open_duration_seconds.record(transaction.begin.elapsed()); + metrics.commit_duration_seconds.record(commit_duration); + + self.open_transactions_total.set(self.transactions.len() as f64) } } @@ -43,25 +52,25 @@ impl Metrics { OperationMetrics::new_with_labels(&[("operation", operation.to_string())]) }); - metrics.duration.record(duration); - metrics.called.increment(1); + metrics.calls_total.increment(1); + metrics.duration_seconds.record(duration); } } #[derive(Metrics)] #[metrics(scope = "database.transaction")] struct TransactionMetrics { - /// open duration - pub(crate) open_duration: Histogram, - /// commit duration - pub(crate) commit_duration: Histogram, + /// The time a database transaction has been open + pub(crate) open_duration_seconds: Histogram, + /// Database transaction commit duration + pub(crate) commit_duration_seconds: Histogram, } #[derive(Metrics)] #[metrics(scope = "database.operation")] struct OperationMetrics { - /// called - pub(crate) called: Counter, - /// duration - pub(crate) duration: Histogram, + /// Total number of database operations made + pub(crate) calls_total: Counter, + /// Database operation duration + pub(crate) duration_seconds: Histogram, } From bddfd052e472d5c52343f3af48d6e8a027b400ee Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 21:47:37 +0100 Subject: [PATCH 03/20] fix clippy --- crates/storage/db/src/implementation/mdbx/mod.rs | 2 ++ crates/storage/db/src/implementation/mdbx/tx.rs | 1 + crates/storage/db/src/metrics/listener.rs | 1 + 3 files changed, 4 insertions(+) diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 9ed48b13db5f..009518e7f1d6 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -146,6 +146,8 @@ impl Env { Ok(env) } + /// Sets the [MetricEventsSender] to report metrics about the database, transactions and + /// cursors. pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); self diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 8ab8958c6416..2cf6f18f3c1e 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -34,6 +34,7 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { Self { inner, id: None, db_handles: Default::default(), metrics_tx: None } } + /// Sets the [MetricEventsSender] to report metrics about the transaction and cursors. pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { self.metrics_tx = Some(metrics_tx); self diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs index b19e096e26c0..fe1dab54120e 100644 --- a/crates/storage/db/src/metrics/listener.rs +++ b/crates/storage/db/src/metrics/listener.rs @@ -13,6 +13,7 @@ pub type MetricEventsSender = UnboundedSender; /// Collection of metric events. #[derive(Clone, Copy, Debug)] +#[allow(missing_docs)] pub enum MetricEvent { OpenTransaction { txn_id: u64, mode: TransactionMode }, CloseTransaction { txn_id: u64, outcome: TransactionOutcome, commit_duration: Duration }, From dcf0e2b3259edccaae4ed3918bb88424c5ab3192 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 21:55:04 +0100 Subject: [PATCH 04/20] DRY close transaction metric --- .../db/src/implementation/mdbx/cursor.rs | 6 +- .../storage/db/src/implementation/mdbx/tx.rs | 56 ++++++++++--------- 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index c35fc1d31ff9..579a80c6bcac 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -43,11 +43,11 @@ impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { self } - fn execute_with_operation_metric( + fn execute_with_operation_metric( &mut self, operation: Operation, - f: impl FnOnce(&mut Self) -> Result<(), DatabaseError>, - ) -> Result<(), DatabaseError> { + f: impl FnOnce(&mut Self) -> R, + ) -> R { let start = Instant::now(); let result = f(self); diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 2cf6f18f3c1e..93dad1928982 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -73,11 +73,30 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { )) } - fn execute_with_operation_metric( + fn execute_with_close_transaction_metric( + mut self, + outcome: TransactionOutcome, + f: impl FnOnce(Self) -> R, + ) -> R { + let start = Instant::now(); + let (txn_id, metrics_tx) = + self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)).unzip(); + let result = f(self); + if let (Some(txn_id), Some(metrics_tx)) = (txn_id, metrics_tx) { + let _ = metrics_tx.send(MetricEvent::CloseTransaction { + txn_id, + outcome, + commit_duration: start.elapsed(), + }); + } + result + } + + fn execute_with_operation_metric( &self, operation: Operation, - f: impl FnOnce(&Transaction<'_, K, E>) -> Result, - ) -> Result { + f: impl FnOnce(&Transaction<'_, K, E>) -> R, + ) -> R { let start = Instant::now(); let result = f(&self.inner); if let Some(metrics_tx) = &self.metrics_tx { @@ -110,31 +129,16 @@ impl DbTx for Tx<'_, K, E> { }) } - fn commit(mut self) -> Result { - let start = Instant::now(); - let metrics = self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)); - let result = self.inner.commit().map_err(|e| DatabaseError::Commit(e.into())); - if let Some((txn_id, metrics_tx)) = metrics { - let _ = metrics_tx.send(MetricEvent::CloseTransaction { - txn_id, - outcome: TransactionOutcome::Commit, - commit_duration: start.elapsed(), - }); - } - result + fn commit(self) -> Result { + self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| { + this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) + }) } - fn drop(mut self) { - let start = Instant::now(); - let metrics = self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)); - drop(self.inner); - if let Some((txn_id, metrics_tx)) = metrics { - let _ = metrics_tx.send(MetricEvent::CloseTransaction { - txn_id, - outcome: TransactionOutcome::Abort, - commit_duration: start.elapsed(), - }); - } + fn drop(self) { + self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| { + drop(this.inner) + }) } // Iterate over read only values in database. From 48d8c5a83f4c44a269f70f92177bcb3e9817699a Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 23:15:34 +0100 Subject: [PATCH 05/20] phantomdata --- crates/storage/db/src/implementation/mdbx/cursor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 579a80c6bcac..4684088b2fef 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -35,7 +35,7 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> { impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { pub(crate) fn new(inner: reth_libmdbx::Cursor<'tx, K>) -> Self { - Self { inner, buf: Vec::new(), metrics_tx: None, _dbi: PhantomData::default() } + Self { inner, buf: Vec::new(), metrics_tx: None, _dbi: PhantomData } } pub(crate) fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { From 43631edcf49bfc14b787f9caf1dca9d353692056 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 23 Oct 2023 23:21:31 +0100 Subject: [PATCH 06/20] reorg metrics module --- .../storage/db/src/implementation/mdbx/mod.rs | 2 +- crates/storage/db/src/metrics/listener.rs | 2 +- crates/storage/db/src/metrics/metrics.rs | 76 ------------------- crates/storage/db/src/metrics/mod.rs | 76 ++++++++++++++++++- 4 files changed, 75 insertions(+), 81 deletions(-) delete mode 100644 crates/storage/db/src/metrics/metrics.rs diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 009518e7f1d6..54f10bcb69b3 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -831,7 +831,7 @@ mod tests { assert!(result.expect(ERROR_RETURN_VALUE) == 200); } - let env = Env::::open(&path, EnvKind::RO, None, None).expect(ERROR_DB_CREATION); + let env = Env::::open(&path, EnvKind::RO, None).expect(ERROR_DB_CREATION); // GET let result = diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs index fe1dab54120e..6ca5ff9587f2 100644 --- a/crates/storage/db/src/metrics/listener.rs +++ b/crates/storage/db/src/metrics/listener.rs @@ -1,4 +1,4 @@ -use crate::metrics::{metrics::Metrics, Operation, TransactionMode, TransactionOutcome}; +use crate::metrics::{Metrics, Operation, TransactionMode, TransactionOutcome}; use reth_tracing::tracing::trace; use std::{ future::Future, diff --git a/crates/storage/db/src/metrics/metrics.rs b/crates/storage/db/src/metrics/metrics.rs deleted file mode 100644 index 25844cc59f2e..000000000000 --- a/crates/storage/db/src/metrics/metrics.rs +++ /dev/null @@ -1,76 +0,0 @@ -use crate::metrics::{Operation, Transaction, TransactionMode, TransactionOutcome}; -use metrics::{Gauge, Histogram}; -use reth_metrics::{metrics::Counter, Metrics}; -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; - -#[derive(Metrics)] -#[metrics(scope = "database")] -pub(crate) struct Metrics { - /// Total number of currently open database transactions - open_transactions_total: Gauge, - - #[metric(skip)] - transactions: HashMap, - #[metric(skip)] - transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, - #[metric(skip)] - operation_metrics: HashMap, -} - -impl Metrics { - pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { - self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); - self.open_transactions_total.set(self.transactions.len() as f64) - } - - pub(crate) fn record_close_transaction( - &mut self, - txn_id: u64, - outcome: TransactionOutcome, - commit_duration: Duration, - ) { - if let Some(transaction) = self.transactions.remove(&txn_id) { - let metrics = - self.transaction_metrics.entry((transaction.mode, outcome)).or_insert_with(|| { - TransactionMetrics::new_with_labels(&[ - ("mode", transaction.mode.to_string()), - ("outcome", outcome.to_string()), - ]) - }); - metrics.open_duration_seconds.record(transaction.begin.elapsed()); - metrics.commit_duration_seconds.record(commit_duration); - - self.open_transactions_total.set(self.transactions.len() as f64) - } - } - - pub(crate) fn record_operation(&mut self, operation: Operation, duration: Duration) { - let metrics = self.operation_metrics.entry(operation).or_insert_with(|| { - OperationMetrics::new_with_labels(&[("operation", operation.to_string())]) - }); - - metrics.calls_total.increment(1); - metrics.duration_seconds.record(duration); - } -} - -#[derive(Metrics)] -#[metrics(scope = "database.transaction")] -struct TransactionMetrics { - /// The time a database transaction has been open - pub(crate) open_duration_seconds: Histogram, - /// Database transaction commit duration - pub(crate) commit_duration_seconds: Histogram, -} - -#[derive(Metrics)] -#[metrics(scope = "database.operation")] -struct OperationMetrics { - /// Total number of database operations made - pub(crate) calls_total: Counter, - /// Database operation duration - pub(crate) duration_seconds: Histogram, -} diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs index 99d6bbaeb53c..d455b0d2c8d8 100644 --- a/crates/storage/db/src/metrics/mod.rs +++ b/crates/storage/db/src/metrics/mod.rs @@ -1,11 +1,12 @@ +use metrics::{Gauge, Histogram}; +use reth_metrics::{metrics::Counter, Metrics}; use std::{ + collections::HashMap, fmt::{Display, Formatter}, - time::Instant, + time::{Duration, Instant}, }; mod listener; -mod metrics; - pub use listener::{MetricEvent, MetricEventsSender, MetricsListener}; #[derive(Debug, Clone, Copy)] @@ -77,3 +78,72 @@ impl Display for Operation { } } } + +#[derive(Metrics)] +#[metrics(scope = "database")] +struct Metrics { + /// Total number of currently open database transactions + open_transactions_total: Gauge, + + #[metric(skip)] + transactions: HashMap, + #[metric(skip)] + transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, + #[metric(skip)] + operation_metrics: HashMap, +} + +impl Metrics { + pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { + self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); + self.open_transactions_total.set(self.transactions.len() as f64) + } + + pub(crate) fn record_close_transaction( + &mut self, + txn_id: u64, + outcome: TransactionOutcome, + commit_duration: Duration, + ) { + if let Some(transaction) = self.transactions.remove(&txn_id) { + let metrics = + self.transaction_metrics.entry((transaction.mode, outcome)).or_insert_with(|| { + TransactionMetrics::new_with_labels(&[ + ("mode", transaction.mode.to_string()), + ("outcome", outcome.to_string()), + ]) + }); + metrics.open_duration_seconds.record(transaction.begin.elapsed()); + metrics.commit_duration_seconds.record(commit_duration); + + self.open_transactions_total.set(self.transactions.len() as f64) + } + } + + pub(crate) fn record_operation(&mut self, operation: Operation, duration: Duration) { + let metrics = self.operation_metrics.entry(operation).or_insert_with(|| { + OperationMetrics::new_with_labels(&[("operation", operation.to_string())]) + }); + + metrics.calls_total.increment(1); + metrics.duration_seconds.record(duration); + } +} + +#[derive(Metrics)] +#[metrics(scope = "database.transaction")] +struct TransactionMetrics { + /// The time a database transaction has been open + pub(crate) open_duration_seconds: Histogram, + /// Database transaction commit duration + pub(crate) commit_duration_seconds: Histogram, +} + +#[derive(Metrics)] +#[metrics(scope = "database.operation")] +struct OperationMetrics { + /// Total number of database operations made + pub(crate) calls_total: Counter, + /// Database operation duration + pub(crate) duration_seconds: Histogram, +} From ea52657449e0a63d115bfd620b561869adcac3ab Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 10:38:03 +0100 Subject: [PATCH 07/20] unused import --- crates/net/network/tests/it/clique/mod.rs | 2 +- crates/storage/codecs/src/lib.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/net/network/tests/it/clique/mod.rs b/crates/net/network/tests/it/clique/mod.rs index a8b2b8894db9..d8fb1070c663 100644 --- a/crates/net/network/tests/it/clique/mod.rs +++ b/crates/net/network/tests/it/clique/mod.rs @@ -1,5 +1,5 @@ pub mod clique_middleware; mod geth; -pub use clique_middleware::{CliqueError, CliqueMiddleware, CliqueMiddlewareError}; +pub use clique_middleware::CliqueMiddleware; pub use geth::CliqueGethInstance; diff --git a/crates/storage/codecs/src/lib.rs b/crates/storage/codecs/src/lib.rs index 2866e79fb382..fee674a23abf 100644 --- a/crates/storage/codecs/src/lib.rs +++ b/crates/storage/codecs/src/lib.rs @@ -14,8 +14,6 @@ use revm_primitives::{ Address, Bytes, B256, U256, }; -pub use codecs_derive::*; - /// Trait that implements the `Compact` codec. /// /// When deriving the trait for custom structs, be aware of certain limitations/recommendations: From e98aad951272ed0dc4bd32b3b71de04611cc182b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 13:44:06 +0100 Subject: [PATCH 08/20] install prometheus recorder before initializing metrics --- bin/reth/src/node/mod.rs | 24 ++++++++++++++++++++---- bin/reth/src/prometheus_exporter.rs | 26 ++++++++++++++++---------- bin/reth/src/stage/run.rs | 1 + 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 7a564e568802..a5ae73f22c07 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -25,6 +25,7 @@ use clap::{value_parser, Parser}; use eyre::Context; use fdlimit::raise_fd_limit; use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; +use metrics_exporter_prometheus::PrometheusHandle; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode}; use reth_beacon_consensus::{ hooks::{EngineHooks, PruneHook}, @@ -246,6 +247,8 @@ impl NodeCommand { // always store reth.toml in the data dir, not the chain specific data dir info!(target: "reth::cli", path = ?config_path, "Configuration loaded"); + let prometheus_handle = self.install_prometheus_recorder()?; + debug!(target: "reth::cli", "Spawning database metrics listener task"); let (db_metrics_tx, db_metrics_rx) = unbounded_channel(); let db_metrics_listener = reth_db::MetricsListener::new(db_metrics_rx); @@ -256,7 +259,7 @@ impl NodeCommand { let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics_tx(db_metrics_tx)); info!(target: "reth::cli", "Database opened"); - self.start_metrics_endpoint(Arc::clone(&db)).await?; + self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?; debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); @@ -631,11 +634,24 @@ impl NodeCommand { } } - async fn start_metrics_endpoint(&self, db: Arc) -> eyre::Result<()> { + fn install_prometheus_recorder(&self) -> eyre::Result { + prometheus_exporter::install_recorder() + } + + async fn start_metrics_endpoint( + &self, + prometheus_handle: PrometheusHandle, + db: Arc, + ) -> eyre::Result<()> { if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); - prometheus_exporter::initialize(listen_addr, db, metrics_process::Collector::default()) - .await?; + prometheus_exporter::initialize( + listen_addr, + prometheus_handle, + db, + metrics_process::Collector::default(), + ) + .await?; } Ok(()) diff --git a/bin/reth/src/prometheus_exporter.rs b/bin/reth/src/prometheus_exporter.rs index bb612d53825f..2654b052c7c1 100644 --- a/bin/reth/src/prometheus_exporter.rs +++ b/bin/reth/src/prometheus_exporter.rs @@ -15,17 +15,28 @@ use tracing::error; pub(crate) trait Hook: Fn() + Send + Sync {} impl Hook for T {} +pub(crate) fn install_recorder() -> eyre::Result { + let recorder = PrometheusBuilder::new().build_recorder(); + let handle = recorder.handle(); + + // Build metrics stack + Stack::new(recorder) + .push(PrefixLayer::new("reth")) + .install() + .wrap_err("Couldn't set metrics recorder.")?; + + Ok(handle) +} + /// Installs Prometheus as the metrics recorder and serves it over HTTP with hooks. /// /// The hooks are called every time the metrics are requested at the given endpoint, and can be used /// to record values for pull-style metrics, i.e. metrics that are not automatically updated. pub(crate) async fn initialize_with_hooks( listen_addr: SocketAddr, + handle: PrometheusHandle, hooks: impl IntoIterator, ) -> eyre::Result<()> { - let recorder = PrometheusBuilder::new().build_recorder(); - let handle = recorder.handle(); - let hooks: Vec<_> = hooks.into_iter().collect(); // Start endpoint @@ -33,12 +44,6 @@ pub(crate) async fn initialize_with_hooks( .await .wrap_err("Could not start Prometheus endpoint")?; - // Build metrics stack - Stack::new(recorder) - .push(PrefixLayer::new("reth")) - .install() - .wrap_err("Couldn't set metrics recorder.")?; - Ok(()) } @@ -71,6 +76,7 @@ async fn start_endpoint( /// metrics. pub(crate) async fn initialize( listen_addr: SocketAddr, + handle: PrometheusHandle, db: Arc, process: metrics_process::Collector, ) -> eyre::Result<()> { @@ -119,7 +125,7 @@ pub(crate) async fn initialize( Box::new(move || cloned_process.collect()), Box::new(collect_memory_stats), ]; - initialize_with_hooks(listen_addr, hooks).await?; + initialize_with_hooks(listen_addr, handle, hooks).await?; // We describe the metrics after the recorder is installed, otherwise this information is not // registered diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index d08c8835e3f2..821bc8ec5ccd 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -133,6 +133,7 @@ impl Command { info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr); prometheus_exporter::initialize( listen_addr, + prometheus_exporter::install_recorder()?, Arc::clone(&db), metrics_process::Collector::default(), ) From f0887f1aa496ff0e4d03cd2e2675ff6ad4bca5e3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 13:46:26 +0100 Subject: [PATCH 09/20] improve exporter comments --- bin/reth/src/node/mod.rs | 2 +- bin/reth/src/prometheus_exporter.rs | 12 ++++++------ bin/reth/src/stage/run.rs | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index a5ae73f22c07..224ad13434be 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -645,7 +645,7 @@ impl NodeCommand { ) -> eyre::Result<()> { if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", addr = %listen_addr, "Starting metrics endpoint"); - prometheus_exporter::initialize( + prometheus_exporter::serve( listen_addr, prometheus_handle, db, diff --git a/bin/reth/src/prometheus_exporter.rs b/bin/reth/src/prometheus_exporter.rs index 2654b052c7c1..4fa622bffda4 100644 --- a/bin/reth/src/prometheus_exporter.rs +++ b/bin/reth/src/prometheus_exporter.rs @@ -15,6 +15,7 @@ use tracing::error; pub(crate) trait Hook: Fn() + Send + Sync {} impl Hook for T {} +/// Installs Prometheus as the metrics recorder. pub(crate) fn install_recorder() -> eyre::Result { let recorder = PrometheusBuilder::new().build_recorder(); let handle = recorder.handle(); @@ -28,11 +29,11 @@ pub(crate) fn install_recorder() -> eyre::Result { Ok(handle) } -/// Installs Prometheus as the metrics recorder and serves it over HTTP with hooks. +/// Serves Prometheus metrics over HTTP with hooks. /// /// The hooks are called every time the metrics are requested at the given endpoint, and can be used /// to record values for pull-style metrics, i.e. metrics that are not automatically updated. -pub(crate) async fn initialize_with_hooks( +pub(crate) async fn serve_with_hooks( listen_addr: SocketAddr, handle: PrometheusHandle, hooks: impl IntoIterator, @@ -72,9 +73,8 @@ async fn start_endpoint( Ok(()) } -/// Installs Prometheus as the metrics recorder and serves it over HTTP with database and process -/// metrics. -pub(crate) async fn initialize( +/// Serves Prometheus metrics over HTTP with database and process metrics. +pub(crate) async fn serve( listen_addr: SocketAddr, handle: PrometheusHandle, db: Arc, @@ -125,7 +125,7 @@ pub(crate) async fn initialize( Box::new(move || cloned_process.collect()), Box::new(collect_memory_stats), ]; - initialize_with_hooks(listen_addr, handle, hooks).await?; + serve_with_hooks(listen_addr, handle, hooks).await?; // We describe the metrics after the recorder is installed, otherwise this information is not // registered diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 821bc8ec5ccd..53ae2eec89fd 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -131,7 +131,7 @@ impl Command { if let Some(listen_addr) = self.metrics { info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr); - prometheus_exporter::initialize( + prometheus_exporter::serve( listen_addr, prometheus_exporter::install_recorder()?, Arc::clone(&db), From 63449f6463588c50b2279d8a8456cff762b160a8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 13:59:06 +0100 Subject: [PATCH 10/20] track open transactions by mode --- crates/storage/db/src/metrics/mod.rs | 56 +++++++++++++++++----------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs index d455b0d2c8d8..bf6cdd058f24 100644 --- a/crates/storage/db/src/metrics/mod.rs +++ b/crates/storage/db/src/metrics/mod.rs @@ -79,24 +79,25 @@ impl Display for Operation { } } -#[derive(Metrics)] -#[metrics(scope = "database")] +#[derive(Debug, Default)] struct Metrics { - /// Total number of currently open database transactions - open_transactions_total: Gauge, - - #[metric(skip)] + open_transactions: HashMap, transactions: HashMap, - #[metric(skip)] transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, - #[metric(skip)] operation_metrics: HashMap, } impl Metrics { pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); - self.open_transactions_total.set(self.transactions.len() as f64) + + self.open_transactions + .entry(mode) + .or_insert_with(|| { + OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) + }) + .total + .increment(1.0) } pub(crate) fn record_close_transaction( @@ -106,17 +107,23 @@ impl Metrics { commit_duration: Duration, ) { if let Some(transaction) = self.transactions.remove(&txn_id) { - let metrics = - self.transaction_metrics.entry((transaction.mode, outcome)).or_insert_with(|| { - TransactionMetrics::new_with_labels(&[ - ("mode", transaction.mode.to_string()), - ("outcome", outcome.to_string()), - ]) - }); + let mode = transaction.mode; + let metrics = self.transaction_metrics.entry((mode, outcome)).or_insert_with(|| { + TransactionMetrics::new_with_labels(&[ + ("mode", mode.to_string()), + ("outcome", outcome.to_string()), + ]) + }); metrics.open_duration_seconds.record(transaction.begin.elapsed()); metrics.commit_duration_seconds.record(commit_duration); - self.open_transactions_total.set(self.transactions.len() as f64) + self.open_transactions + .entry(mode) + .or_insert_with(|| { + OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) + }) + .total + .decrement(1.0) } } @@ -130,20 +137,27 @@ impl Metrics { } } +#[derive(Metrics)] +#[metrics(scope = "database.open_transactions")] +struct OpenTransactionMetrics { + /// Total number of currently open database transactions + total: Gauge, +} + #[derive(Metrics)] #[metrics(scope = "database.transaction")] struct TransactionMetrics { /// The time a database transaction has been open - pub(crate) open_duration_seconds: Histogram, + open_duration_seconds: Histogram, /// Database transaction commit duration - pub(crate) commit_duration_seconds: Histogram, + commit_duration_seconds: Histogram, } #[derive(Metrics)] #[metrics(scope = "database.operation")] struct OperationMetrics { /// Total number of database operations made - pub(crate) calls_total: Counter, + calls_total: Counter, /// Database operation duration - pub(crate) duration_seconds: Histogram, + duration_seconds: Histogram, } From c34e3142984410005a2ba85c5a7ee3350741203f Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 14:27:55 +0100 Subject: [PATCH 11/20] metrics handler --- crates/storage/db/src/abstraction/mock.rs | 2 +- .../storage/db/src/abstraction/transaction.rs | 4 +- .../storage/db/src/implementation/mdbx/tx.rs | 71 ++++++++++++------- crates/storage/db/src/metrics/listener.rs | 6 +- crates/storage/db/src/metrics/mod.rs | 10 +-- 5 files changed, 57 insertions(+), 36 deletions(-) diff --git a/crates/storage/db/src/abstraction/mock.rs b/crates/storage/db/src/abstraction/mock.rs index b476aab4d194..c451b6f45deb 100644 --- a/crates/storage/db/src/abstraction/mock.rs +++ b/crates/storage/db/src/abstraction/mock.rs @@ -63,7 +63,7 @@ impl DbTx for TxMock { todo!() } - fn drop(self) { + fn abort(self) { todo!() } diff --git a/crates/storage/db/src/abstraction/transaction.rs b/crates/storage/db/src/abstraction/transaction.rs index 798b1d276ace..bbbd775d7a16 100644 --- a/crates/storage/db/src/abstraction/transaction.rs +++ b/crates/storage/db/src/abstraction/transaction.rs @@ -39,8 +39,8 @@ pub trait DbTx: for<'a> DbTxGAT<'a> { /// Commit for read only transaction will consume and free transaction and allows /// freeing of memory pages fn commit(self) -> Result; - /// Drops transaction - fn drop(self); + /// Aborts transaction + fn abort(self); /// Iterate over read only values in table. fn cursor_read(&self) -> Result<>::Cursor, DatabaseError>; /// Iterate over read only values in dup sorted table. diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 93dad1928982..2b4c93bc5fe7 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -11,18 +11,20 @@ use crate::{ use parking_lot::RwLock; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{ffi::DBI, EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; -use std::{str::FromStr, sync::Arc, time::Instant}; +use std::{ + str::FromStr, + sync::Arc, + time::{Duration, Instant}, +}; /// Wrapper for the libmdbx transaction. #[derive(Debug)] pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { /// Libmdbx-sys transaction. pub inner: Transaction<'a, K, E>, - /// Cached internal transaction ID provided by libmdbx. - id: Option, /// Database table handle cache. pub(crate) db_handles: Arc; NUM_TABLES]>>, - metrics_tx: Option, + metrics_handler: Option, } impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { @@ -31,18 +33,18 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { where 'a: 'env, { - Self { inner, id: None, db_handles: Default::default(), metrics_tx: None } + Self { inner, db_handles: Default::default(), metrics_handler: None } } /// Sets the [MetricEventsSender] to report metrics about the transaction and cursors. pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { - self.metrics_tx = Some(metrics_tx); + self.metrics_handler = Some(MetricsHandler { txn_id: self.id(), metrics_tx }); self } /// Gets this transaction ID. - pub fn id(&mut self) -> u64 { - *self.id.get_or_insert_with(|| self.inner.id()) + pub fn id(&self) -> u64 { + self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id) } /// Gets a table database handle if it exists, otherwise creates it. @@ -79,14 +81,13 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { f: impl FnOnce(Self) -> R, ) -> R { let start = Instant::now(); - let (txn_id, metrics_tx) = - self.metrics_tx.take().map(|metrics_tx| (self.id(), metrics_tx)).unzip(); + let metrics_handler = self.metrics_handler.take(); let result = f(self); - if let (Some(txn_id), Some(metrics_tx)) = (txn_id, metrics_tx) { - let _ = metrics_tx.send(MetricEvent::CloseTransaction { - txn_id, + if let Some(handler) = metrics_handler { + let _ = handler.metrics_tx.send(MetricEvent::CloseTransaction { + txn_id: handler.txn_id, outcome, - commit_duration: start.elapsed(), + close_duration: start.elapsed(), }); } result @@ -99,14 +100,32 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { ) -> R { let start = Instant::now(); let result = f(&self.inner); - if let Some(metrics_tx) = &self.metrics_tx { - let _ = - metrics_tx.send(MetricEvent::Operation { operation, duration: start.elapsed() }); + if let Some(handler) = &self.metrics_handler { + let _ = handler + .metrics_tx + .send(MetricEvent::Operation { operation, duration: start.elapsed() }); } result } } +#[derive(Debug)] +struct MetricsHandler { + /// Cached internal transaction ID provided by libmdbx. + txn_id: u64, + metrics_tx: MetricEventsSender, +} + +impl Drop for MetricsHandler { + fn drop(&mut self) { + let _ = self.metrics_tx.send(MetricEvent::CloseTransaction { + txn_id: self.txn_id, + outcome: TransactionOutcome::Drop, + close_duration: Duration::default(), + }); + } +} + impl<'a, K: TransactionKind, E: EnvironmentKind> DbTxGAT<'a> for Tx<'_, K, E> { type Cursor = Cursor<'a, K, T>; type DupCursor = Cursor<'a, K, T>; @@ -135,7 +154,7 @@ impl DbTx for Tx<'_, K, E> { }) } - fn drop(self) { + fn abort(self) { self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| { drop(this.inner) }) @@ -144,8 +163,8 @@ impl DbTx for Tx<'_, K, E> { // Iterate over read only values in database. fn cursor_read(&self) -> Result<>::Cursor, DatabaseError> { let mut cursor = self.new_cursor()?; - if let Some(metrics_tx) = &self.metrics_tx { - cursor = cursor.with_metrics_tx(metrics_tx.clone()); + if let Some(handler) = &self.metrics_handler { + cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); } Ok(cursor) } @@ -155,8 +174,8 @@ impl DbTx for Tx<'_, K, E> { &self, ) -> Result<>::DupCursor, DatabaseError> { let mut cursor = self.new_cursor()?; - if let Some(metrics_tx) = &self.metrics_tx { - cursor = cursor.with_metrics_tx(metrics_tx.clone()); + if let Some(handler) = &self.metrics_handler { + cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); } Ok(cursor) } @@ -213,8 +232,8 @@ impl DbTxMut for Tx<'_, RW, E> { &self, ) -> Result<>::CursorMut, DatabaseError> { let mut cursor = self.new_cursor()?; - if let Some(metrics_tx) = &self.metrics_tx { - cursor = cursor.with_metrics_tx(metrics_tx.clone()); + if let Some(handler) = &self.metrics_handler { + cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); } Ok(cursor) } @@ -223,8 +242,8 @@ impl DbTxMut for Tx<'_, RW, E> { &self, ) -> Result<>::DupCursorMut, DatabaseError> { let mut cursor = self.new_cursor()?; - if let Some(metrics_tx) = &self.metrics_tx { - cursor = cursor.with_metrics_tx(metrics_tx.clone()); + if let Some(handler) = &self.metrics_handler { + cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); } Ok(cursor) } diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs index 6ca5ff9587f2..35277b5febcf 100644 --- a/crates/storage/db/src/metrics/listener.rs +++ b/crates/storage/db/src/metrics/listener.rs @@ -16,7 +16,7 @@ pub type MetricEventsSender = UnboundedSender; #[allow(missing_docs)] pub enum MetricEvent { OpenTransaction { txn_id: u64, mode: TransactionMode }, - CloseTransaction { txn_id: u64, outcome: TransactionOutcome, commit_duration: Duration }, + CloseTransaction { txn_id: u64, outcome: TransactionOutcome, close_duration: Duration }, Operation { operation: Operation, duration: Duration }, } @@ -40,8 +40,8 @@ impl MetricsListener { MetricEvent::OpenTransaction { txn_id, mode } => { self.metrics.record_open_transaction(txn_id, mode) } - MetricEvent::CloseTransaction { txn_id, outcome, commit_duration } => { - self.metrics.record_close_transaction(txn_id, outcome, commit_duration) + MetricEvent::CloseTransaction { txn_id, outcome, close_duration } => { + self.metrics.record_close_transaction(txn_id, outcome, close_duration) } MetricEvent::Operation { operation, duration } => { self.metrics.record_operation(operation, duration) diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs index bf6cdd058f24..13a10934357b 100644 --- a/crates/storage/db/src/metrics/mod.rs +++ b/crates/storage/db/src/metrics/mod.rs @@ -36,6 +36,7 @@ impl Display for TransactionMode { pub enum TransactionOutcome { Commit, Abort, + Drop, } impl Display for TransactionOutcome { @@ -43,6 +44,7 @@ impl Display for TransactionOutcome { match self { TransactionOutcome::Commit => write!(f, "commit"), TransactionOutcome::Abort => write!(f, "abort"), + TransactionOutcome::Drop => write!(f, "drop"), } } } @@ -104,7 +106,7 @@ impl Metrics { &mut self, txn_id: u64, outcome: TransactionOutcome, - commit_duration: Duration, + close_duration: Duration, ) { if let Some(transaction) = self.transactions.remove(&txn_id) { let mode = transaction.mode; @@ -115,7 +117,7 @@ impl Metrics { ]) }); metrics.open_duration_seconds.record(transaction.begin.elapsed()); - metrics.commit_duration_seconds.record(commit_duration); + metrics.close_duration_seconds.record(close_duration); self.open_transactions .entry(mode) @@ -149,8 +151,8 @@ struct OpenTransactionMetrics { struct TransactionMetrics { /// The time a database transaction has been open open_duration_seconds: Histogram, - /// Database transaction commit duration - commit_duration_seconds: Histogram, + /// The time it took to close a database transaction + close_duration_seconds: Histogram, } #[derive(Metrics)] From e0aefc7b61ff8ab7f6a3636bd3475d209e0181ba Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 14:45:59 +0100 Subject: [PATCH 12/20] inc open transactions only on new inserts --- crates/storage/db/src/metrics/mod.rs | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs index 13a10934357b..4ac7b38c490b 100644 --- a/crates/storage/db/src/metrics/mod.rs +++ b/crates/storage/db/src/metrics/mod.rs @@ -91,15 +91,18 @@ struct Metrics { impl Metrics { pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { - self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }); - - self.open_transactions - .entry(mode) - .or_insert_with(|| { - OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) - }) - .total - .increment(1.0) + let is_new = + self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }).is_none(); + + if is_new { + self.open_transactions + .entry(mode) + .or_insert_with(|| { + OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) + }) + .total + .increment(1.0) + } } pub(crate) fn record_close_transaction( From 3b07cc4170c23622c06bebdc64c9fea9a39f2c88 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 15:31:59 +0100 Subject: [PATCH 13/20] add table to operation metrics --- .../storage/db/src/implementation/mdbx/cursor.rs | 7 +++++-- crates/storage/db/src/implementation/mdbx/tx.rs | 16 +++++++++------- crates/storage/db/src/metrics/listener.rs | 6 +++--- crates/storage/db/src/metrics/mod.rs | 12 ++++++++++-- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 4684088b2fef..826b21157af2 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -52,8 +52,11 @@ impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { let result = f(self); if let Some(metrics_tx) = &self.metrics_tx { - let _ = - metrics_tx.send(MetricEvent::Operation { operation, duration: start.elapsed() }); + let _ = metrics_tx.send(MetricEvent::Operation { + table: T::NAME, + operation, + duration: start.elapsed(), + }); } result diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 2b4c93bc5fe7..4a2079b42971 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -93,7 +93,7 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { result } - fn execute_with_operation_metric( + fn execute_with_operation_metric( &self, operation: Operation, f: impl FnOnce(&Transaction<'_, K, E>) -> R, @@ -101,9 +101,11 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { let start = Instant::now(); let result = f(&self.inner); if let Some(handler) = &self.metrics_handler { - let _ = handler - .metrics_tx - .send(MetricEvent::Operation { operation, duration: start.elapsed() }); + let _ = handler.metrics_tx.send(MetricEvent::Operation { + table: T::NAME, + operation, + duration: start.elapsed(), + }); } result } @@ -140,7 +142,7 @@ impl TableImporter for Tx<'_, RW, E> {} impl DbTx for Tx<'_, K, E> { fn get(&self, key: T::Key) -> Result::Value>, DatabaseError> { - self.execute_with_operation_metric(Operation::Get, |tx| { + self.execute_with_operation_metric::(Operation::Get, |tx| { tx.get(self.get_dbi::()?, key.encode().as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(decode_one::) @@ -193,7 +195,7 @@ impl DbTx for Tx<'_, K, E> { impl DbTxMut for Tx<'_, RW, E> { fn put(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric(Operation::Put, |tx| { + self.execute_with_operation_metric::(Operation::Put, |tx| { tx.put(self.get_dbi::()?, key.as_ref(), &value.compress(), WriteFlags::UPSERT) .map_err(|e| DatabaseError::Write { code: e.into(), @@ -216,7 +218,7 @@ impl DbTxMut for Tx<'_, RW, E> { data = Some(value.as_ref()); }; - self.execute_with_operation_metric(Operation::Delete, |tx| { + self.execute_with_operation_metric::(Operation::Delete, |tx| { tx.del(self.get_dbi::()?, key.encode(), data) .map_err(|e| DatabaseError::Delete(e.into())) }) diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs index 35277b5febcf..b9c0a53cfa84 100644 --- a/crates/storage/db/src/metrics/listener.rs +++ b/crates/storage/db/src/metrics/listener.rs @@ -17,7 +17,7 @@ pub type MetricEventsSender = UnboundedSender; pub enum MetricEvent { OpenTransaction { txn_id: u64, mode: TransactionMode }, CloseTransaction { txn_id: u64, outcome: TransactionOutcome, close_duration: Duration }, - Operation { operation: Operation, duration: Duration }, + Operation { table: &'static str, operation: Operation, duration: Duration }, } /// Metrics routine that listens to new metric events on the `events_rx` receiver. @@ -43,8 +43,8 @@ impl MetricsListener { MetricEvent::CloseTransaction { txn_id, outcome, close_duration } => { self.metrics.record_close_transaction(txn_id, outcome, close_duration) } - MetricEvent::Operation { operation, duration } => { - self.metrics.record_operation(operation, duration) + MetricEvent::Operation { table, operation, duration } => { + self.metrics.record_operation(table, operation, duration) } } } diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs index 4ac7b38c490b..64e976f396e3 100644 --- a/crates/storage/db/src/metrics/mod.rs +++ b/crates/storage/db/src/metrics/mod.rs @@ -132,9 +132,17 @@ impl Metrics { } } - pub(crate) fn record_operation(&mut self, operation: Operation, duration: Duration) { + pub(crate) fn record_operation( + &mut self, + table: &'static str, + operation: Operation, + duration: Duration, + ) { let metrics = self.operation_metrics.entry(operation).or_insert_with(|| { - OperationMetrics::new_with_labels(&[("operation", operation.to_string())]) + OperationMetrics::new_with_labels(&[ + ("table", table.to_string()), + ("operation", operation.to_string()), + ]) }); metrics.calls_total.increment(1); From 4bb82f6a88acb240dcecd5de03c0c24427a3aabf Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 24 Oct 2023 15:35:38 +0100 Subject: [PATCH 14/20] update dashboard --- etc/grafana/dashboards/overview.json | 528 +++++++++++++++++++++++---- 1 file changed, 461 insertions(+), 67 deletions(-) diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index d83cd3aa3270..7b263aa07c56 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -523,7 +523,7 @@ "calcs": [], "displayMode": "list", "placement": "bottom", - "showLegend": true + "showLegend": false }, "tooltip": { "mode": "single", @@ -539,7 +539,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "rate(reth_tx_commit_sum{instance=~\"$instance\"}[$__rate_interval]) / rate(reth_tx_commit_count{instance=~\"$instance\"}[$__rate_interval])", + "expr": "sum(rate(reth_database_transaction_close_duration_seconds_sum{instance=~\"$instance\", outcome = \"commit\"}[$__rate_interval]) / rate(reth_database_transaction_close_duration_seconds_count{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]))", "format": "time_series", "instant": false, "legendFormat": "Commit time", @@ -628,7 +628,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(increase(reth_tx_commit{instance=~\"$instance\"}[$__interval])) by (quantile)", + "expr": "sum(increase(reth_database_transaction_close_duration_seconds{instance=~\"$instance\", outcome=\"commit\"}[$__interval])) by (quantile)", "format": "time_series", "instant": false, "legendFormat": "{{quantile}}", @@ -639,6 +639,403 @@ "title": "Commit time heatmap", "type": "heatmap" }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The average time a database transaction was open.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic", + "seriesBy": "last" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "points", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 26 + }, + "id": 117, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "9.3.6", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(rate(reth_database_transaction_open_duration_seconds_sum{instance=~\"$instance\"}[$__rate_interval]) / rate(reth_database_transaction_open_duration_seconds_count{instance=~\"$instance\"}[$__rate_interval])) by (outcome, mode)", + "format": "time_series", + "instant": false, + "legendFormat": "{{mode}}, {{outcome}}", + "range": true, + "refId": "A" + } + ], + "title": "Average transaction open time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The maximum time the database transaction was open.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "points", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 26 + }, + "id": 116, + "maxDataPoints": 25, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(reth_database_transaction_open_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (outcome, mode)", + "format": "time_series", + "instant": false, + "legendFormat": "{{mode}}, {{outcome}}", + "range": true, + "refId": "A" + } + ], + "title": "Max transaction open time", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 34 + }, + "id": 119, + "maxDataPoints": 25, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(reth_database_open_transactions_total{instance=~\"$instance\"}) by (mode)", + "format": "time_series", + "instant": false, + "legendFormat": "{{mode}}", + "range": true, + "refId": "A" + } + ], + "title": "Number of open transactions", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "The maximum time the database transaction operation took.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "points", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 34 + }, + "id": 118, + "maxDataPoints": 25, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.0", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": false, + "expr": "sum(increase(reth_database_operation_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (operation)", + "format": "time_series", + "instant": false, + "legendFormat": "{{operation}}", + "range": true, + "refId": "A" + } + ], + "title": "Max operation time", + "type": "timeseries" + }, { "datasource": { "type": "prometheus", @@ -666,7 +1063,7 @@ "h": 8, "w": 12, "x": 0, - "y": 26 + "y": 42 }, "id": 48, "options": { @@ -776,7 +1173,7 @@ "h": 8, "w": 12, "x": 12, - "y": 26 + "y": 42 }, "id": 52, "options": { @@ -834,7 +1231,7 @@ "h": 8, "w": 12, "x": 0, - "y": 34 + "y": 50 }, "id": 50, "options": { @@ -1002,7 +1399,7 @@ "h": 8, "w": 12, "x": 12, - "y": 34 + "y": 50 }, "id": 58, "options": { @@ -1101,7 +1498,7 @@ "h": 8, "w": 12, "x": 0, - "y": 42 + "y": 58 }, "id": 113, "options": { @@ -1138,7 +1535,7 @@ "h": 1, "w": 24, "x": 0, - "y": 50 + "y": 66 }, "id": 46, "panels": [], @@ -1207,7 +1604,7 @@ "h": 8, "w": 24, "x": 0, - "y": 51 + "y": 67 }, "id": 56, "options": { @@ -1280,7 +1677,7 @@ "h": 1, "w": 24, "x": 0, - "y": 59 + "y": 75 }, "id": 6, "panels": [], @@ -1352,7 +1749,7 @@ "h": 8, "w": 8, "x": 0, - "y": 60 + "y": 76 }, "id": 18, "options": { @@ -1446,7 +1843,7 @@ "h": 8, "w": 8, "x": 8, - "y": 60 + "y": 76 }, "id": 16, "options": { @@ -1566,7 +1963,7 @@ "h": 8, "w": 8, "x": 16, - "y": 60 + "y": 76 }, "id": 8, "options": { @@ -1649,7 +2046,7 @@ "h": 8, "w": 8, "x": 0, - "y": 68 + "y": 84 }, "id": 54, "options": { @@ -1870,7 +2267,7 @@ "h": 8, "w": 14, "x": 8, - "y": 68 + "y": 84 }, "id": 103, "options": { @@ -1907,7 +2304,7 @@ "h": 1, "w": 24, "x": 0, - "y": 76 + "y": 92 }, "id": 24, "panels": [], @@ -1962,8 +2359,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2003,7 +2399,7 @@ "h": 8, "w": 12, "x": 0, - "y": 77 + "y": 93 }, "id": 26, "options": { @@ -2118,8 +2514,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2135,7 +2530,7 @@ "h": 8, "w": 12, "x": 12, - "y": 77 + "y": 93 }, "id": 33, "options": { @@ -2237,8 +2632,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2253,7 +2647,7 @@ "h": 8, "w": 12, "x": 0, - "y": 85 + "y": 101 }, "id": 36, "options": { @@ -2302,7 +2696,7 @@ "h": 1, "w": 24, "x": 0, - "y": 93 + "y": 109 }, "id": 32, "panels": [], @@ -2407,7 +2801,7 @@ "h": 8, "w": 12, "x": 0, - "y": 94 + "y": 110 }, "id": 30, "options": { @@ -2570,7 +2964,7 @@ "h": 8, "w": 12, "x": 12, - "y": 94 + "y": 110 }, "id": 28, "options": { @@ -2687,7 +3081,7 @@ "h": 8, "w": 12, "x": 0, - "y": 102 + "y": 118 }, "id": 35, "options": { @@ -2810,7 +3204,7 @@ "h": 8, "w": 12, "x": 12, - "y": 102 + "y": 118 }, "id": 73, "options": { @@ -2860,7 +3254,7 @@ "h": 1, "w": 24, "x": 0, - "y": 110 + "y": 126 }, "id": 89, "panels": [], @@ -2932,7 +3326,7 @@ "h": 8, "w": 12, "x": 0, - "y": 111 + "y": 127 }, "id": 91, "options": { @@ -3049,7 +3443,7 @@ "h": 8, "w": 12, "x": 12, - "y": 111 + "y": 127 }, "id": 92, "options": { @@ -3184,7 +3578,7 @@ "h": 8, "w": 12, "x": 0, - "y": 119 + "y": 135 }, "id": 102, "options": { @@ -3303,7 +3697,7 @@ "h": 8, "w": 12, "x": 12, - "y": 119 + "y": 135 }, "id": 94, "options": { @@ -3397,7 +3791,7 @@ "h": 8, "w": 12, "x": 0, - "y": 127 + "y": 143 }, "id": 104, "options": { @@ -3521,7 +3915,7 @@ "h": 8, "w": 12, "x": 12, - "y": 127 + "y": 143 }, "id": 93, "options": { @@ -3664,7 +4058,7 @@ "h": 8, "w": 12, "x": 0, - "y": 135 + "y": 151 }, "id": 95, "options": { @@ -3783,7 +4177,7 @@ "h": 8, "w": 12, "x": 12, - "y": 135 + "y": 151 }, "id": 115, "options": { @@ -3876,7 +4270,7 @@ "h": 1, "w": 24, "x": 0, - "y": 143 + "y": 159 }, "id": 79, "panels": [], @@ -3947,7 +4341,7 @@ "h": 8, "w": 12, "x": 0, - "y": 144 + "y": 160 }, "id": 74, "options": { @@ -4041,7 +4435,7 @@ "h": 8, "w": 12, "x": 12, - "y": 144 + "y": 160 }, "id": 80, "options": { @@ -4135,7 +4529,7 @@ "h": 8, "w": 12, "x": 0, - "y": 152 + "y": 168 }, "id": 81, "options": { @@ -4229,7 +4623,7 @@ "h": 8, "w": 12, "x": 12, - "y": 152 + "y": 168 }, "id": 114, "options": { @@ -4267,7 +4661,7 @@ "h": 1, "w": 24, "x": 0, - "y": 160 + "y": 176 }, "id": 87, "panels": [], @@ -4338,7 +4732,7 @@ "h": 8, "w": 12, "x": 0, - "y": 161 + "y": 177 }, "id": 83, "options": { @@ -4431,7 +4825,7 @@ "h": 8, "w": 12, "x": 12, - "y": 161 + "y": 177 }, "id": 84, "options": { @@ -4536,7 +4930,7 @@ "h": 8, "w": 12, "x": 0, - "y": 169 + "y": 185 }, "id": 85, "options": { @@ -4573,7 +4967,7 @@ "h": 1, "w": 24, "x": 0, - "y": 177 + "y": 193 }, "id": 68, "panels": [], @@ -4644,7 +5038,7 @@ "h": 8, "w": 12, "x": 0, - "y": 178 + "y": 194 }, "id": 60, "options": { @@ -4737,7 +5131,7 @@ "h": 8, "w": 12, "x": 12, - "y": 178 + "y": 194 }, "id": 62, "options": { @@ -4830,7 +5224,7 @@ "h": 8, "w": 12, "x": 0, - "y": 186 + "y": 202 }, "id": 64, "options": { @@ -4867,7 +5261,7 @@ "h": 1, "w": 24, "x": 0, - "y": 194 + "y": 210 }, "id": 97, "panels": [], @@ -4936,7 +5330,7 @@ "h": 8, "w": 12, "x": 0, - "y": 195 + "y": 211 }, "id": 98, "options": { @@ -5096,7 +5490,7 @@ "h": 8, "w": 12, "x": 12, - "y": 195 + "y": 211 }, "id": 101, "options": { @@ -5191,7 +5585,7 @@ "h": 8, "w": 12, "x": 0, - "y": 203 + "y": 219 }, "id": 99, "options": { @@ -5286,7 +5680,7 @@ "h": 8, "w": 12, "x": 12, - "y": 203 + "y": 219 }, "id": 100, "options": { @@ -5324,7 +5718,7 @@ "h": 1, "w": 24, "x": 0, - "y": 211 + "y": 227 }, "id": 105, "panels": [], @@ -5394,7 +5788,7 @@ "h": 8, "w": 12, "x": 0, - "y": 212 + "y": 228 }, "id": 106, "options": { @@ -5489,7 +5883,7 @@ "h": 8, "w": 12, "x": 12, - "y": 212 + "y": 228 }, "id": 107, "options": { @@ -5527,7 +5921,7 @@ "h": 1, "w": 24, "x": 0, - "y": 220 + "y": 236 }, "id": 108, "panels": [], @@ -5550,7 +5944,7 @@ "h": 8, "w": 12, "x": 0, - "y": 221 + "y": 237 }, "hiddenSeries": false, "id": 109, @@ -5638,7 +6032,7 @@ "h": 8, "w": 12, "x": 12, - "y": 221 + "y": 237 }, "hiddenSeries": false, "id": 110, @@ -5735,7 +6129,7 @@ "h": 8, "w": 12, "x": 0, - "y": 229 + "y": 245 }, "id": 111, "maxDataPoints": 25, @@ -5824,7 +6218,7 @@ "h": 8, "w": 12, "x": 12, - "y": 229 + "y": 245 }, "id": 112, "maxDataPoints": 25, @@ -5928,6 +6322,6 @@ "timezone": "", "title": "reth", "uid": "2k8BXz24x", - "version": 10, + "version": 11, "weekStart": "" } \ No newline at end of file From 67e57740153420c8c8d90eba0738d162205951c7 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 25 Oct 2023 18:24:59 +0100 Subject: [PATCH 15/20] no listener --- bin/reth/src/node/mod.rs | 7 +- .../db/src/implementation/mdbx/cursor.rs | 37 ++-- .../storage/db/src/implementation/mdbx/mod.rs | 38 ++-- .../storage/db/src/implementation/mdbx/tx.rs | 146 +++++++++------ crates/storage/db/src/lib.rs | 1 - crates/storage/db/src/metrics.rs | 143 ++++++++++++++ crates/storage/db/src/metrics/listener.rs | 69 ------- crates/storage/db/src/metrics/mod.rs | 176 ------------------ crates/storage/libmdbx-rs/src/transaction.rs | 7 +- 9 files changed, 262 insertions(+), 362 deletions(-) create mode 100644 crates/storage/db/src/metrics.rs delete mode 100644 crates/storage/db/src/metrics/listener.rs delete mode 100644 crates/storage/db/src/metrics/mod.rs diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 224ad13434be..18d79a2ab952 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -249,14 +249,9 @@ impl NodeCommand { let prometheus_handle = self.install_prometheus_recorder()?; - debug!(target: "reth::cli", "Spawning database metrics listener task"); - let (db_metrics_tx, db_metrics_rx) = unbounded_channel(); - let db_metrics_listener = reth_db::MetricsListener::new(db_metrics_rx); - ctx.task_executor.spawn_critical("database metrics listener task", db_metrics_listener); - let db_path = data_dir.db_path(); info!(target: "reth::cli", path = ?db_path, "Opening database"); - let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics_tx(db_metrics_tx)); + let db = Arc::new(init_db(&db_path, self.db.log_level)?.with_metrics()); info!(target: "reth::cli", "Database opened"); self.start_metrics_endpoint(prometheus_handle, Arc::clone(&db)).await?; diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 826b21157af2..959ce7f5d23e 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -9,7 +9,7 @@ use crate::{ DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, ReverseWalker, Walker, }, - metrics::{MetricEvent, MetricEventsSender, Operation}, + metrics::{Operation, OperationMetrics}, table::{Compress, DupSort, Encode, Table}, tables::utils::*, DatabaseError, @@ -28,19 +28,17 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> { pub(crate) inner: reth_libmdbx::Cursor<'tx, K>, /// Cache buffer that receives compressed values. buf: Vec, - metrics_tx: Option, + with_metrics: bool, /// Phantom data to enforce encoding/decoding. _dbi: PhantomData, } impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { - pub(crate) fn new(inner: reth_libmdbx::Cursor<'tx, K>) -> Self { - Self { inner, buf: Vec::new(), metrics_tx: None, _dbi: PhantomData } - } - - pub(crate) fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { - self.metrics_tx = Some(metrics_tx); - self + pub(crate) fn new_with_metrics( + inner: reth_libmdbx::Cursor<'tx, K>, + with_metrics: bool, + ) -> Self { + Self { inner, buf: Vec::new(), with_metrics, _dbi: PhantomData } } fn execute_with_operation_metric( @@ -48,18 +46,17 @@ impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { operation: Operation, f: impl FnOnce(&mut Self) -> R, ) -> R { - let start = Instant::now(); - let result = f(self); - - if let Some(metrics_tx) = &self.metrics_tx { - let _ = metrics_tx.send(MetricEvent::Operation { - table: T::NAME, - operation, - duration: start.elapsed(), - }); - } + if self.with_metrics { + let start = Instant::now(); + let result = f(self); + let duration = start.elapsed(); - result + OperationMetrics::record(T::NAME, operation, duration); + + result + } else { + f(self) + } } } diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 54f10bcb69b3..05e66155b971 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -2,7 +2,6 @@ use crate::{ database::{Database, DatabaseGAT}, - metrics::{MetricEvent, MetricEventsSender, TransactionMode}, tables::{TableType, Tables}, utils::default_page_size, DatabaseError, @@ -38,7 +37,7 @@ pub enum EnvKind { pub struct Env { /// Libmdbx-sys environment. pub inner: Environment, - metrics_tx: Option, + with_metrics: bool, } impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env { @@ -48,31 +47,17 @@ impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env { impl Database for Env { fn tx(&self) -> Result<>::TX, DatabaseError> { - let mut tx = Tx::new( + Ok(Tx::new_with_metrics( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?, - ); - if let Some(metrics_tx) = &self.metrics_tx { - tx = tx.with_metrics_tx(metrics_tx.clone()); - let _ = metrics_tx.send(MetricEvent::OpenTransaction { - txn_id: tx.id(), - mode: TransactionMode::ReadOnly, - }); - } - Ok(tx) + self.with_metrics, + )) } fn tx_mut(&self) -> Result<>::TXMut, DatabaseError> { - let mut tx = Tx::new( + Ok(Tx::new_with_metrics( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTransaction(e.into()))?, - ); - if let Some(metrics_tx) = &self.metrics_tx { - tx = tx.with_metrics_tx(metrics_tx.clone()); - let _ = metrics_tx.send(MetricEvent::OpenTransaction { - txn_id: tx.id(), - mode: TransactionMode::ReadWrite, - }); - } - Ok(tx) + self.with_metrics, + )) } } @@ -140,16 +125,15 @@ impl Env { let env = Env { inner: inner_env.open(path).map_err(|e| DatabaseError::FailedToOpen(e.into()))?, - metrics_tx: None, + with_metrics: false, }; Ok(env) } - /// Sets the [MetricEventsSender] to report metrics about the database, transactions and - /// cursors. - pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { - self.metrics_tx = Some(metrics_tx); + /// Enables metrics on the database. + pub fn with_metrics(mut self) -> Self { + self.with_metrics = true; self } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 4a2079b42971..dbae0c19659c 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -2,7 +2,9 @@ use super::cursor::Cursor; use crate::{ - metrics::{MetricEvent, MetricEventsSender, Operation, TransactionOutcome}, + metrics::{ + Operation, OperationMetrics, TransactionMetrics, TransactionMode, TransactionOutcome, + }, table::{Compress, DupSort, Encode, Table, TableImporter}, tables::{utils::decode_one, Tables, NUM_TABLES}, transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT}, @@ -11,11 +13,7 @@ use crate::{ use parking_lot::RwLock; use reth_interfaces::db::DatabaseWriteOperation; use reth_libmdbx::{ffi::DBI, EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; -use std::{ - str::FromStr, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{marker::PhantomData, str::FromStr, sync::Arc, time::Instant}; /// Wrapper for the libmdbx transaction. #[derive(Debug)] @@ -24,7 +22,7 @@ pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { pub inner: Transaction<'a, K, E>, /// Database table handle cache. pub(crate) db_handles: Arc; NUM_TABLES]>>, - metrics_handler: Option, + metrics_handler: Option>, } impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { @@ -36,10 +34,22 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { Self { inner, db_handles: Default::default(), metrics_handler: None } } - /// Sets the [MetricEventsSender] to report metrics about the transaction and cursors. - pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { - self.metrics_handler = Some(MetricsHandler { txn_id: self.id(), metrics_tx }); - self + /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics. + pub fn new_with_metrics<'a>(inner: Transaction<'a, K, E>, with_metrics: bool) -> Self + where + 'a: 'env, + { + let metrics_handler = with_metrics.then(|| { + let handler = MetricsHandler:: { + txn_id: inner.id(), + start: Instant::now(), + close_recorded: false, + _marker: PhantomData, + }; + TransactionMetrics::record_open(handler.transaction_mode()); + handler + }); + Self { inner, db_handles: Default::default(), metrics_handler } } /// Gets this transaction ID. @@ -68,11 +78,12 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { /// Create db Cursor pub fn new_cursor(&self) -> Result, DatabaseError> { - Ok(Cursor::new( - self.inner - .cursor_with_dbi(self.get_dbi::()?) - .map_err(|e| DatabaseError::InitCursor(e.into()))?, - )) + let inner = self + .inner + .cursor_with_dbi(self.get_dbi::()?) + .map_err(|e| DatabaseError::InitCursor(e.into()))?; + + Ok(Cursor::new_with_metrics(inner, self.metrics_handler.is_some())) } fn execute_with_close_transaction_metric( @@ -80,17 +91,25 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { outcome: TransactionOutcome, f: impl FnOnce(Self) -> R, ) -> R { - let start = Instant::now(); - let metrics_handler = self.metrics_handler.take(); - let result = f(self); - if let Some(handler) = metrics_handler { - let _ = handler.metrics_tx.send(MetricEvent::CloseTransaction { - txn_id: handler.txn_id, + if let Some(mut metrics_handler) = self.metrics_handler.take() { + metrics_handler.close_recorded = true; + + let start = Instant::now(); + let result = f(self); + let close_duration = start.elapsed(); + let open_duration = metrics_handler.start.elapsed(); + + TransactionMetrics::record_close( + metrics_handler.transaction_mode(), outcome, - close_duration: start.elapsed(), - }); + open_duration, + Some(close_duration), + ); + + result + } else { + f(self) } - result } fn execute_with_operation_metric( @@ -98,33 +117,52 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { operation: Operation, f: impl FnOnce(&Transaction<'_, K, E>) -> R, ) -> R { - let start = Instant::now(); - let result = f(&self.inner); - if let Some(handler) = &self.metrics_handler { - let _ = handler.metrics_tx.send(MetricEvent::Operation { - table: T::NAME, - operation, - duration: start.elapsed(), - }); + if self.metrics_handler.is_some() { + let start = Instant::now(); + let result = f(&self.inner); + let duration = start.elapsed(); + + OperationMetrics::record(T::NAME, operation, duration); + + result + } else { + f(&self.inner) } - result } } #[derive(Debug)] -struct MetricsHandler { +struct MetricsHandler { /// Cached internal transaction ID provided by libmdbx. txn_id: u64, - metrics_tx: MetricEventsSender, + /// The time when transaction has started. + start: Instant, + /// If true, the metric about transaction closing has already been recorded and we don't need + /// to do anything on [Drop::drop]. + close_recorded: bool, + _marker: PhantomData, +} + +impl MetricsHandler { + const fn transaction_mode(&self) -> TransactionMode { + if K::IS_READ_ONLY { + TransactionMode::ReadOnly + } else { + TransactionMode::ReadWrite + } + } } -impl Drop for MetricsHandler { +impl Drop for MetricsHandler { fn drop(&mut self) { - let _ = self.metrics_tx.send(MetricEvent::CloseTransaction { - txn_id: self.txn_id, - outcome: TransactionOutcome::Drop, - close_duration: Duration::default(), - }); + if !self.close_recorded { + TransactionMetrics::record_close( + self.transaction_mode(), + TransactionOutcome::Drop, + self.start.elapsed(), + None, + ); + } } } @@ -164,22 +202,14 @@ impl DbTx for Tx<'_, K, E> { // Iterate over read only values in database. fn cursor_read(&self) -> Result<>::Cursor, DatabaseError> { - let mut cursor = self.new_cursor()?; - if let Some(handler) = &self.metrics_handler { - cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); - } - Ok(cursor) + self.new_cursor() } /// Iterate over read only values in database. fn cursor_dup_read( &self, ) -> Result<>::DupCursor, DatabaseError> { - let mut cursor = self.new_cursor()?; - if let Some(handler) = &self.metrics_handler { - cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); - } - Ok(cursor) + self.new_cursor() } /// Returns number of entries in the table using cheap DB stats invocation. @@ -233,20 +263,12 @@ impl DbTxMut for Tx<'_, RW, E> { fn cursor_write( &self, ) -> Result<>::CursorMut, DatabaseError> { - let mut cursor = self.new_cursor()?; - if let Some(handler) = &self.metrics_handler { - cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); - } - Ok(cursor) + self.new_cursor() } fn cursor_dup_write( &self, ) -> Result<>::DupCursorMut, DatabaseError> { - let mut cursor = self.new_cursor()?; - if let Some(handler) = &self.metrics_handler { - cursor = cursor.with_metrics_tx(handler.metrics_tx.clone()); - } - Ok(cursor) + self.new_cursor() } } diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index 9684aa17df12..c0b77b08c108 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -69,7 +69,6 @@ pub mod abstraction; mod implementation; mod metrics; -pub use crate::metrics::*; pub mod snapshot; pub mod tables; mod utils; diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs new file mode 100644 index 000000000000..2c28bbac2969 --- /dev/null +++ b/crates/storage/db/src/metrics.rs @@ -0,0 +1,143 @@ +use metrics::{Gauge, Histogram}; +use reth_metrics::{metrics::Counter, Metrics}; +use std::time::Duration; + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub(crate) enum TransactionMode { + ReadOnly, + ReadWrite, +} + +impl TransactionMode { + pub(crate) const fn as_str(&self) -> &'static str { + match self { + TransactionMode::ReadOnly => "read-only", + TransactionMode::ReadWrite => "read-write", + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub(crate) enum TransactionOutcome { + Commit, + Abort, + Drop, +} + +impl TransactionOutcome { + pub(crate) const fn as_str(&self) -> &'static str { + match self { + TransactionOutcome::Commit => "commit", + TransactionOutcome::Abort => "abort", + TransactionOutcome::Drop => "drop", + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[allow(missing_docs)] +pub(crate) enum Operation { + Get, + Put, + Delete, + CursorUpsert, + CursorInsert, + CursorAppend, + CursorAppendDup, + CursorDeleteCurrent, + CursorDeleteCurrentDuplicates, +} + +impl Operation { + pub(crate) const fn as_str(&self) -> &'static str { + match self { + Operation::Get => "get", + Operation::Put => "put", + Operation::Delete => "delete", + Operation::CursorUpsert => "cursor-upsert", + Operation::CursorInsert => "cursor-insert", + Operation::CursorAppend => "cursor-append", + Operation::CursorAppendDup => "cursor-append-dup", + Operation::CursorDeleteCurrent => "cursor-delete-current", + Operation::CursorDeleteCurrentDuplicates => "cursor-delete-current-duplicates", + } + } +} + +enum Labels { + Table, + TransactionMode, + TransactionOutcome, + Operation, +} + +impl Labels { + pub(crate) fn as_str(&self) -> &'static str { + match self { + Labels::Table => "table", + Labels::TransactionMode => "mode", + Labels::TransactionOutcome => "outcome", + Labels::Operation => "operation", + } + } +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "database.transaction")] +pub(crate) struct TransactionMetrics { + /// Total number of currently open database transactions + open_total: Gauge, + /// The time a database transaction has been open + open_duration_seconds: Histogram, + /// The time it took to close a database transaction + close_duration_seconds: Histogram, +} + +impl TransactionMetrics { + pub(crate) fn record_open(mode: TransactionMode) { + let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]); + metrics.open_total.increment(1.0); + } + + pub(crate) fn record_close( + mode: TransactionMode, + outcome: TransactionOutcome, + open_duration: Duration, + close_duration: Option, + ) { + let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]); + metrics.open_total.decrement(1.0); + + let metrics = Self::new_with_labels(&[ + (Labels::TransactionMode.as_str(), mode.as_str()), + (Labels::TransactionOutcome.as_str(), outcome.as_str()), + ]); + metrics.open_duration_seconds.record(open_duration); + + if let Some(close_duration) = close_duration { + metrics.close_duration_seconds.record(close_duration) + } + } +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "database.operation")] +pub(crate) struct OperationMetrics { + /// Total number of database operations made + calls_total: Counter, + /// Database operation duration + duration_seconds: Histogram, +} + +impl OperationMetrics { + pub(crate) fn record(table: &'static str, operation: Operation, duration: Duration) { + let metrics = Self::new_with_labels(&[ + (Labels::Table.as_str(), table), + (Labels::Operation.as_str(), operation.as_str()), + ]); + metrics.duration_seconds.record(duration); + metrics.calls_total.increment(1); + } +} diff --git a/crates/storage/db/src/metrics/listener.rs b/crates/storage/db/src/metrics/listener.rs deleted file mode 100644 index b9c0a53cfa84..000000000000 --- a/crates/storage/db/src/metrics/listener.rs +++ /dev/null @@ -1,69 +0,0 @@ -use crate::metrics::{Metrics, Operation, TransactionMode, TransactionOutcome}; -use reth_tracing::tracing::trace; -use std::{ - future::Future, - pin::Pin, - task::{ready, Context, Poll}, - time::Duration, -}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; - -/// Alias type for metric producers to use. -pub type MetricEventsSender = UnboundedSender; - -/// Collection of metric events. -#[derive(Clone, Copy, Debug)] -#[allow(missing_docs)] -pub enum MetricEvent { - OpenTransaction { txn_id: u64, mode: TransactionMode }, - CloseTransaction { txn_id: u64, outcome: TransactionOutcome, close_duration: Duration }, - Operation { table: &'static str, operation: Operation, duration: Duration }, -} - -/// Metrics routine that listens to new metric events on the `events_rx` receiver. -/// Upon receiving new event, related metrics are updated. -#[derive(Debug)] -pub struct MetricsListener { - events_rx: UnboundedReceiver, - metrics: Metrics, -} - -impl MetricsListener { - /// Creates a new [MetricsListener] with the provided receiver of [MetricEvent]. - pub fn new(events_rx: UnboundedReceiver) -> Self { - Self { events_rx, metrics: Metrics::default() } - } - - fn handle_event(&mut self, event: MetricEvent) { - trace!(target: "storage::metrics", ?event, "Metric event received"); - match event { - MetricEvent::OpenTransaction { txn_id, mode } => { - self.metrics.record_open_transaction(txn_id, mode) - } - MetricEvent::CloseTransaction { txn_id, outcome, close_duration } => { - self.metrics.record_close_transaction(txn_id, outcome, close_duration) - } - MetricEvent::Operation { table, operation, duration } => { - self.metrics.record_operation(table, operation, duration) - } - } - } -} - -impl Future for MetricsListener { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - // Loop until we drain the `events_rx` channel - loop { - let Some(event) = ready!(this.events_rx.poll_recv(cx)) else { - // Channel has closed - return Poll::Ready(()) - }; - - this.handle_event(event); - } - } -} diff --git a/crates/storage/db/src/metrics/mod.rs b/crates/storage/db/src/metrics/mod.rs deleted file mode 100644 index 64e976f396e3..000000000000 --- a/crates/storage/db/src/metrics/mod.rs +++ /dev/null @@ -1,176 +0,0 @@ -use metrics::{Gauge, Histogram}; -use reth_metrics::{metrics::Counter, Metrics}; -use std::{ - collections::HashMap, - fmt::{Display, Formatter}, - time::{Duration, Instant}, -}; - -mod listener; -pub use listener::{MetricEvent, MetricEventsSender, MetricsListener}; - -#[derive(Debug, Clone, Copy)] -struct Transaction { - mode: TransactionMode, - begin: Instant, -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -#[allow(missing_docs)] -pub enum TransactionMode { - ReadOnly, - ReadWrite, -} - -impl Display for TransactionMode { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TransactionMode::ReadOnly => write!(f, "read-only"), - TransactionMode::ReadWrite => write!(f, "read-write"), - } - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -#[allow(missing_docs)] -pub enum TransactionOutcome { - Commit, - Abort, - Drop, -} - -impl Display for TransactionOutcome { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - TransactionOutcome::Commit => write!(f, "commit"), - TransactionOutcome::Abort => write!(f, "abort"), - TransactionOutcome::Drop => write!(f, "drop"), - } - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] -#[allow(missing_docs)] -pub enum Operation { - Get, - Put, - Delete, - CursorUpsert, - CursorInsert, - CursorAppend, - CursorAppendDup, - CursorDeleteCurrent, - CursorDeleteCurrentDuplicates, -} - -impl Display for Operation { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - Operation::Get => write!(f, "get"), - Operation::Put => write!(f, "put"), - Operation::Delete => write!(f, "delete"), - Operation::CursorUpsert => write!(f, "cursor-upsert"), - Operation::CursorInsert => write!(f, "cursor-insert"), - Operation::CursorAppend => write!(f, "cursor-append"), - Operation::CursorAppendDup => write!(f, "cursor-append-dup"), - Operation::CursorDeleteCurrent => write!(f, "cursor-delete-current"), - Operation::CursorDeleteCurrentDuplicates => { - write!(f, "cursor-delete-current-duplicates") - } - } - } -} - -#[derive(Debug, Default)] -struct Metrics { - open_transactions: HashMap, - transactions: HashMap, - transaction_metrics: HashMap<(TransactionMode, TransactionOutcome), TransactionMetrics>, - operation_metrics: HashMap, -} - -impl Metrics { - pub(crate) fn record_open_transaction(&mut self, txn_id: u64, mode: TransactionMode) { - let is_new = - self.transactions.insert(txn_id, Transaction { begin: Instant::now(), mode }).is_none(); - - if is_new { - self.open_transactions - .entry(mode) - .or_insert_with(|| { - OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) - }) - .total - .increment(1.0) - } - } - - pub(crate) fn record_close_transaction( - &mut self, - txn_id: u64, - outcome: TransactionOutcome, - close_duration: Duration, - ) { - if let Some(transaction) = self.transactions.remove(&txn_id) { - let mode = transaction.mode; - let metrics = self.transaction_metrics.entry((mode, outcome)).or_insert_with(|| { - TransactionMetrics::new_with_labels(&[ - ("mode", mode.to_string()), - ("outcome", outcome.to_string()), - ]) - }); - metrics.open_duration_seconds.record(transaction.begin.elapsed()); - metrics.close_duration_seconds.record(close_duration); - - self.open_transactions - .entry(mode) - .or_insert_with(|| { - OpenTransactionMetrics::new_with_labels(&[("mode", mode.to_string())]) - }) - .total - .decrement(1.0) - } - } - - pub(crate) fn record_operation( - &mut self, - table: &'static str, - operation: Operation, - duration: Duration, - ) { - let metrics = self.operation_metrics.entry(operation).or_insert_with(|| { - OperationMetrics::new_with_labels(&[ - ("table", table.to_string()), - ("operation", operation.to_string()), - ]) - }); - - metrics.calls_total.increment(1); - metrics.duration_seconds.record(duration); - } -} - -#[derive(Metrics)] -#[metrics(scope = "database.open_transactions")] -struct OpenTransactionMetrics { - /// Total number of currently open database transactions - total: Gauge, -} - -#[derive(Metrics)] -#[metrics(scope = "database.transaction")] -struct TransactionMetrics { - /// The time a database transaction has been open - open_duration_seconds: Histogram, - /// The time it took to close a database transaction - close_duration_seconds: Histogram, -} - -#[derive(Metrics)] -#[metrics(scope = "database.operation")] -struct OperationMetrics { - /// Total number of database operations made - calls_total: Counter, - /// Database operation duration - duration_seconds: Histogram, -} diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index bb6b42486069..61cf48c877b1 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -23,12 +23,15 @@ mod private { impl Sealed for RW {} } -pub trait TransactionKind: private::Sealed + Debug + 'static { +pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static { #[doc(hidden)] const ONLY_CLEAN: bool; #[doc(hidden)] const OPEN_FLAGS: MDBX_txn_flags_t; + + #[doc(hidden)] + const IS_READ_ONLY: bool; } #[derive(Debug)] @@ -42,10 +45,12 @@ pub struct RW; impl TransactionKind for RO { const ONLY_CLEAN: bool = true; const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY; + const IS_READ_ONLY: bool = true; } impl TransactionKind for RW { const ONLY_CLEAN: bool = false; const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE; + const IS_READ_ONLY: bool = false; } /// An MDBX transaction. From d51ca94280fefe0e56b3163d7f643cbb48d26b15 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 26 Oct 2023 12:16:49 +0100 Subject: [PATCH 16/20] doc comments --- crates/storage/db/src/implementation/mdbx/cursor.rs | 5 +++++ crates/storage/db/src/implementation/mdbx/mod.rs | 1 + crates/storage/db/src/implementation/mdbx/tx.rs | 8 ++++++++ 3 files changed, 14 insertions(+) diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 959ce7f5d23e..45ff898c941f 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -28,6 +28,7 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> { pub(crate) inner: reth_libmdbx::Cursor<'tx, K>, /// Cache buffer that receives compressed values. buf: Vec, + /// Whether to record metrics or not. with_metrics: bool, /// Phantom data to enforce encoding/decoding. _dbi: PhantomData, @@ -41,6 +42,10 @@ impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { Self { inner, buf: Vec::new(), with_metrics, _dbi: PhantomData } } + /// If `self.with_metrics == true`, measure the time it takes to execute the closure and record + /// a metric with the provided operation. + /// + /// Otherwise, just execute the closure. fn execute_with_operation_metric( &mut self, operation: Operation, diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 05e66155b971..6471e4562f66 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -37,6 +37,7 @@ pub enum EnvKind { pub struct Env { /// Libmdbx-sys environment. pub inner: Environment, + /// Whether to record metrics or not. with_metrics: bool, } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index dbae0c19659c..5f38d43c547e 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -86,6 +86,10 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { Ok(Cursor::new_with_metrics(inner, self.metrics_handler.is_some())) } + /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and + /// record a metric with the provided transaction outcome. + /// + /// Otherwise, just execute the closure. fn execute_with_close_transaction_metric( mut self, outcome: TransactionOutcome, @@ -112,6 +116,10 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { } } + /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and + /// record a metric with the provided operation. + /// + /// Otherwise, just execute the closure. fn execute_with_operation_metric( &self, operation: Operation, From fdf3799a51d2adbaaf8defe5fa4e598dc4ebd53d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 26 Oct 2023 12:29:08 +0100 Subject: [PATCH 17/20] remove tokio dep --- crates/storage/db/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 36832db9ef55..937bb232b9b2 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -45,7 +45,6 @@ parking_lot.workspace = true derive_more = "0.99" eyre.workspace = true paste = "1.0" -tokio = { workspace = true, features = ["sync"] } # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } From 747f9d293647672b9112f35aaf7aad92f367ec16 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 26 Oct 2023 12:56:49 +0100 Subject: [PATCH 18/20] fix dashboard, add more doc comments --- .../storage/db/src/implementation/mdbx/tx.rs | 4 ++++ crates/storage/db/src/metrics.rs | 6 +++++- etc/grafana/dashboards/overview.json | 20 ++++++++----------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 5f38d43c547e..88faaf3852ee 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -22,6 +22,10 @@ pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { pub inner: Transaction<'a, K, E>, /// Database table handle cache. pub(crate) db_handles: Arc; NUM_TABLES]>>, + /// Handler for metrics with its own [Drop] implementation for cases when the transaction isn't + /// closed by [Tx::commit] or [Tx::abort], but we still need to report it in the metrics. + /// + /// If [Some], then metrics are reported. metrics_handler: Option>, } diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs index 2c28bbac2969..9d449319f928 100644 --- a/crates/storage/db/src/metrics.rs +++ b/crates/storage/db/src/metrics.rs @@ -96,11 +96,14 @@ pub(crate) struct TransactionMetrics { } impl TransactionMetrics { + /// Record transaction opening. pub(crate) fn record_open(mode: TransactionMode) { let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]); metrics.open_total.increment(1.0); } + /// Record transaction closing with the duration it was open and the duration it took to close + /// it. pub(crate) fn record_close( mode: TransactionMode, outcome: TransactionOutcome, @@ -127,11 +130,12 @@ impl TransactionMetrics { pub(crate) struct OperationMetrics { /// Total number of database operations made calls_total: Counter, - /// Database operation duration + /// The time it took to execute a database operation duration_seconds: Histogram, } impl OperationMetrics { + /// Record operation metric with the duration it took to execute. pub(crate) fn record(table: &'static str, operation: Operation, duration: Duration) { let metrics = Self::new_with_labels(&[ (Labels::Table.as_str(), table), diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 7b263aa07c56..5ac04955a921 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -539,7 +539,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(rate(reth_database_transaction_close_duration_seconds_sum{instance=~\"$instance\", outcome = \"commit\"}[$__rate_interval]) / rate(reth_database_transaction_close_duration_seconds_count{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]))", + "expr": "sum(rate(reth_database_transaction_close_duration_seconds_sum{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]) / rate(reth_database_transaction_close_duration_seconds_count{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]))", "format": "time_series", "instant": false, "legendFormat": "Commit time", @@ -725,7 +725,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(rate(reth_database_transaction_open_duration_seconds_sum{instance=~\"$instance\"}[$__rate_interval]) / rate(reth_database_transaction_open_duration_seconds_count{instance=~\"$instance\"}[$__rate_interval])) by (outcome, mode)", + "expr": "sum(rate(reth_database_transaction_open_duration_seconds_sum{instance=~\"$instance\", outcome!=\"\"}[$__rate_interval]) / rate(reth_database_transaction_open_duration_seconds_count{instance=~\"$instance\", outcome!=\"\"}[$__rate_interval])) by (outcome, mode)", "format": "time_series", "instant": false, "legendFormat": "{{mode}}, {{outcome}}", @@ -825,7 +825,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(increase(reth_database_transaction_open_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (outcome, mode)", + "expr": "sum(increase(reth_database_transaction_open_duration_seconds{instance=~\"$instance\", outcome!=\"\", quantile=\"1\"}[$__interval])) by (outcome, mode)", "format": "time_series", "instant": false, "legendFormat": "{{mode}}, {{outcome}}", @@ -925,7 +925,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(reth_database_open_transactions_total{instance=~\"$instance\"}) by (mode)", + "expr": "sum(reth_database_transaction_open_total{instance=~\"$instance\"}) by (mode)", "format": "time_series", "instant": false, "legendFormat": "{{mode}}", @@ -1733,8 +1733,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1827,8 +1826,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -1946,8 +1944,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", @@ -2251,8 +2248,7 @@ "mode": "absolute", "steps": [ { - "color": "green", - "value": null + "color": "green" }, { "color": "red", From 56452e53ac33d907e4fae1bee301196968e0323d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 27 Oct 2023 17:43:29 +0100 Subject: [PATCH 19/20] record durations only for operations with large values --- .../db/src/implementation/mdbx/cursor.rs | 124 ++++++++++-------- .../storage/db/src/implementation/mdbx/tx.rs | 35 ++--- crates/storage/db/src/metrics.rs | 33 ++++- etc/grafana/dashboards/overview.json | 33 +++-- 4 files changed, 135 insertions(+), 90 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 45ff898c941f..e8a6f1e3399e 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -1,7 +1,7 @@ //! Cursor wrapper for libmdbx-sys. use reth_interfaces::db::DatabaseWriteOperation; -use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, time::Instant}; +use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds}; use crate::{ common::{PairResult, ValueOnlyResult}, @@ -42,23 +42,17 @@ impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { Self { inner, buf: Vec::new(), with_metrics, _dbi: PhantomData } } - /// If `self.with_metrics == true`, measure the time it takes to execute the closure and record - /// a metric with the provided operation. + /// If `self.with_metrics == true`, record a metric with the provided operation and value size. /// /// Otherwise, just execute the closure. fn execute_with_operation_metric( &mut self, operation: Operation, + value_size: Option, f: impl FnOnce(&mut Self) -> R, ) -> R { if self.with_metrics { - let start = Instant::now(); - let result = f(self); - let duration = start.elapsed(); - - OperationMetrics::record(T::NAME, operation, duration); - - result + OperationMetrics::record(T::NAME, operation, value_size, || f(self)) } else { f(self) } @@ -75,14 +69,14 @@ macro_rules! decode { /// Some types don't support compression (eg. B256), and we don't want to be copying them to the /// allocated buffer when we can just use their reference. -macro_rules! compress_or_ref { +macro_rules! compress_to_buf_or_ref { ($self:expr, $value:expr) => { if let Some(value) = $value.uncompressable_ref() { - value + Some(value) } else { $self.buf.truncate(0); $value.compress_to_buf(&mut $self.buf); - $self.buf.as_ref() + None } }; } @@ -261,50 +255,65 @@ impl DbCursorRW for Cursor<'_, RW, T> { /// found, before calling `upsert`. fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric(Operation::CursorUpsert, |this| { - this.inner.put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::UPSERT).map_err( - |e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorUpsert, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }, - ) - }) + let value = compress_to_buf_or_ref!(self, value); + self.execute_with_operation_metric( + Operation::CursorUpsert, + Some(value.unwrap_or(&self.buf).len()), + |this| { + this.inner + .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::UPSERT) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorUpsert, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }, + ) } fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric(Operation::CursorInsert, |this| { - this.inner - .put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::NO_OVERWRITE) - .map_err(|e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorInsert, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }) - }) + let value = compress_to_buf_or_ref!(self, value); + self.execute_with_operation_metric( + Operation::CursorInsert, + Some(value.unwrap_or(&self.buf).len()), + |this| { + this.inner + .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::NO_OVERWRITE) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorInsert, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }, + ) } /// Appends the data to the end of the table. Consequently, the append operation /// will fail if the inserted key is less than the last table key fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric(Operation::CursorAppend, |this| { - this.inner.put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::APPEND).map_err( - |e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorAppend, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }, - ) - }) + let value = compress_to_buf_or_ref!(self, value); + self.execute_with_operation_metric( + Operation::CursorAppend, + Some(value.unwrap_or(&self.buf).len()), + |this| { + this.inner + .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorAppend, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }, + ) } fn delete_current(&mut self) -> Result<(), DatabaseError> { - self.execute_with_operation_metric(Operation::CursorDeleteCurrent, |this| { + self.execute_with_operation_metric(Operation::CursorDeleteCurrent, None, |this| { this.inner.del(WriteFlags::CURRENT).map_err(|e| DatabaseError::Delete(e.into())) }) } @@ -312,22 +321,27 @@ impl DbCursorRW for Cursor<'_, RW, T> { impl DbDupCursorRW for Cursor<'_, RW, T> { fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> { - self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, |this| { + self.execute_with_operation_metric(Operation::CursorDeleteCurrentDuplicates, None, |this| { this.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| DatabaseError::Delete(e.into())) }) } fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric(Operation::CursorAppendDup, |this| { - this.inner - .put(key.as_ref(), compress_or_ref!(this, value), WriteFlags::APPEND_DUP) - .map_err(|e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::CursorAppendDup, - table_name: T::NAME, - key: Box::from(key.as_ref()), - }) - }) + let value = compress_to_buf_or_ref!(self, value); + self.execute_with_operation_metric( + Operation::CursorAppendDup, + Some(value.unwrap_or(&self.buf).len()), + |this| { + this.inner + .put(key.as_ref(), value.unwrap_or(&this.buf), WriteFlags::APPEND_DUP) + .map_err(|e| DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::CursorAppendDup, + table_name: T::NAME, + key: Box::from(key.as_ref()), + }) + }, + ) } } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 88faaf3852ee..276cd594d140 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -127,16 +127,11 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { fn execute_with_operation_metric( &self, operation: Operation, + value_size: Option, f: impl FnOnce(&Transaction<'_, K, E>) -> R, ) -> R { if self.metrics_handler.is_some() { - let start = Instant::now(); - let result = f(&self.inner); - let duration = start.elapsed(); - - OperationMetrics::record(T::NAME, operation, duration); - - result + OperationMetrics::record(T::NAME, operation, value_size, || f(&self.inner)) } else { f(&self.inner) } @@ -192,7 +187,7 @@ impl TableImporter for Tx<'_, RW, E> {} impl DbTx for Tx<'_, K, E> { fn get(&self, key: T::Key) -> Result::Value>, DatabaseError> { - self.execute_with_operation_metric::(Operation::Get, |tx| { + self.execute_with_operation_metric::(Operation::Get, None, |tx| { tx.get(self.get_dbi::()?, key.encode().as_ref()) .map_err(|e| DatabaseError::Read(e.into()))? .map(decode_one::) @@ -237,15 +232,21 @@ impl DbTx for Tx<'_, K, E> { impl DbTxMut for Tx<'_, RW, E> { fn put(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError> { let key = key.encode(); - self.execute_with_operation_metric::(Operation::Put, |tx| { - tx.put(self.get_dbi::()?, key.as_ref(), &value.compress(), WriteFlags::UPSERT) - .map_err(|e| DatabaseError::Write { - code: e.into(), - operation: DatabaseWriteOperation::Put, - table_name: T::NAME, - key: Box::from(key.as_ref()), + let value = value.compress(); + self.execute_with_operation_metric::( + Operation::Put, + Some(value.as_ref().len()), + |tx| { + tx.put(self.get_dbi::()?, key.as_ref(), value, WriteFlags::UPSERT).map_err(|e| { + DatabaseError::Write { + code: e.into(), + operation: DatabaseWriteOperation::Put, + table_name: T::NAME, + key: Box::from(key.as_ref()), + } }) - }) + }, + ) } fn delete( @@ -260,7 +261,7 @@ impl DbTxMut for Tx<'_, RW, E> { data = Some(value.as_ref()); }; - self.execute_with_operation_metric::(Operation::Delete, |tx| { + self.execute_with_operation_metric::(Operation::Delete, None, |tx| { tx.del(self.get_dbi::()?, key.encode(), data) .map_err(|e| DatabaseError::Delete(e.into())) }) diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs index 9d449319f928..fff6cecbd6ac 100644 --- a/crates/storage/db/src/metrics.rs +++ b/crates/storage/db/src/metrics.rs @@ -1,6 +1,8 @@ use metrics::{Gauge, Histogram}; use reth_metrics::{metrics::Counter, Metrics}; -use std::time::Duration; +use std::time::{Duration, Instant}; + +const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096; #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] #[allow(missing_docs)] @@ -130,18 +132,37 @@ impl TransactionMetrics { pub(crate) struct OperationMetrics { /// Total number of database operations made calls_total: Counter, - /// The time it took to execute a database operation - duration_seconds: Histogram, + /// The time it took to execute a database operation (put/upsert/insert/append/append_dup) with + /// value larger than [LARGE_VALUE_THRESHOLD_BYTES] bytes. + large_value_duration_seconds: Histogram, } impl OperationMetrics { - /// Record operation metric with the duration it took to execute. - pub(crate) fn record(table: &'static str, operation: Operation, duration: Duration) { + /// Record operation metric. + /// + /// The duration it took to execute the closure is recorded only if the provided `value_size` is + /// larger than [LARGE_VALUE_THRESHOLD_BYTES]. + pub(crate) fn record( + table: &'static str, + operation: Operation, + value_size: Option, + f: impl FnOnce() -> T, + ) -> T { let metrics = Self::new_with_labels(&[ (Labels::Table.as_str(), table), (Labels::Operation.as_str(), operation.as_str()), ]); - metrics.duration_seconds.record(duration); metrics.calls_total.increment(1); + + // Record duration only for large values to prevent the performance hit of clock syscall + // on small operations + if value_size.map_or(false, |size| size > LARGE_VALUE_THRESHOLD_BYTES) { + let start = Instant::now(); + let result = f(); + metrics.large_value_duration_seconds.record(start.elapsed()); + result + } else { + f() + } } } diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 5ac04955a921..5094928807a8 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -628,7 +628,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(increase(reth_database_transaction_close_duration_seconds{instance=~\"$instance\", outcome=\"commit\"}[$__interval])) by (quantile)", + "expr": "avg(last_over_time(reth_database_transaction_close_duration_seconds{instance=~\"$instance\", outcome=\"commit\"}[$__interval])) by (quantile)", "format": "time_series", "instant": false, "legendFormat": "{{quantile}}", @@ -825,7 +825,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(increase(reth_database_transaction_open_duration_seconds{instance=~\"$instance\", outcome!=\"\", quantile=\"1\"}[$__interval])) by (outcome, mode)", + "expr": "max(max_over_time(reth_database_transaction_open_duration_seconds{instance=~\"$instance\", outcome!=\"\", quantile=\"1\"}[$__interval])) by (outcome, mode)", "format": "time_series", "instant": false, "legendFormat": "{{mode}}, {{outcome}}", @@ -1025,7 +1025,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(increase(reth_database_operation_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (operation)", + "expr": "max(max_over_time(reth_database_operation_large_value_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (operation)", "format": "time_series", "instant": false, "legendFormat": "{{operation}}", @@ -1733,7 +1733,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1826,7 +1827,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -1944,7 +1946,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2248,7 +2251,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2355,7 +2359,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2510,7 +2515,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2628,7 +2634,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2748,7 +2755,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null }, { "color": "red", @@ -2948,7 +2956,8 @@ "mode": "absolute", "steps": [ { - "color": "green" + "color": "green", + "value": null } ] }, From 093710b42ed6aae6a3ac8627315bde5fe9cab059 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 27 Oct 2023 18:38:20 +0100 Subject: [PATCH 20/20] update dashboard --- etc/grafana/dashboards/overview.json | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/etc/grafana/dashboards/overview.json b/etc/grafana/dashboards/overview.json index 5094928807a8..b31b03cfa209 100644 --- a/etc/grafana/dashboards/overview.json +++ b/etc/grafana/dashboards/overview.json @@ -497,7 +497,6 @@ } }, "mappings": [], - "min": 0, "thresholds": { "mode": "absolute", "steps": [ @@ -539,7 +538,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "sum(rate(reth_database_transaction_close_duration_seconds_sum{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]) / rate(reth_database_transaction_close_duration_seconds_count{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]))", + "expr": "avg(rate(reth_database_transaction_close_duration_seconds_sum{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]) / rate(reth_database_transaction_close_duration_seconds_count{instance=~\"$instance\", outcome=\"commit\"}[$__rate_interval]) >= 0)", "format": "time_series", "instant": false, "legendFormat": "Commit time", @@ -628,7 +627,7 @@ }, "editorMode": "code", "exemplar": false, - "expr": "avg(last_over_time(reth_database_transaction_close_duration_seconds{instance=~\"$instance\", outcome=\"commit\"}[$__interval])) by (quantile)", + "expr": "avg(avg_over_time(reth_database_transaction_close_duration_seconds{instance=~\"$instance\", outcome=\"commit\"}[$__interval])) by (quantile)", "format": "time_series", "instant": false, "legendFormat": "{{quantile}}", @@ -683,7 +682,6 @@ } }, "mappings": [], - "min": 0, "thresholds": { "mode": "absolute", "steps": [ @@ -785,10 +783,6 @@ { "color": "green", "value": null - }, - { - "color": "red", - "value": 80 } ] }, @@ -803,7 +797,6 @@ "y": 26 }, "id": 116, - "maxDataPoints": 25, "options": { "legend": { "calcs": [], @@ -903,7 +896,6 @@ "y": 34 }, "id": 119, - "maxDataPoints": 25, "options": { "legend": { "calcs": [], @@ -941,7 +933,7 @@ "type": "prometheus", "uid": "${DS_PROMETHEUS}" }, - "description": "The maximum time the database transaction operation took.", + "description": "The maximum time the database transaction operation which inserts a large value took.", "fieldConfig": { "defaults": { "color": { @@ -950,7 +942,6 @@ "custom": { "axisCenteredZero": false, "axisColorMode": "text", - "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, "drawStyle": "points", @@ -1003,7 +994,6 @@ "y": 34 }, "id": 118, - "maxDataPoints": 25, "options": { "legend": { "calcs": [], @@ -1025,15 +1015,15 @@ }, "editorMode": "code", "exemplar": false, - "expr": "max(max_over_time(reth_database_operation_large_value_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval])) by (operation)", + "expr": "max(max_over_time(reth_database_operation_large_value_duration_seconds{instance=~\"$instance\", quantile=\"1\"}[$__interval]) > 0) by (table)", "format": "time_series", "instant": false, - "legendFormat": "{{operation}}", + "legendFormat": "{{table}}", "range": true, "refId": "A" } ], - "title": "Max operation time", + "title": "Max insertion operation time", "type": "timeseries" }, {