From ee5339a158b68ac44b2bd067b62273e9188d1ccb Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 23 Jan 2024 16:25:43 -0800 Subject: [PATCH] feat(748): Add compile time feature flag for db metrics Closes #748. This patch adds a temporary feature flag for enabling db metrics. Metrics are recorded synchronously at the moment. This can have a noticable impact on latency. Compiling with this flag will enable metrics collection. This new flag will be turned on by default. Hence metrics will be collected by default. Note this flag is temporary. It will be removed once metrics are recorded async. --- .github/workflows/ci.yml | 4 +- crates/core/Cargo.toml | 4 +- crates/core/src/db/commit_log.rs | 2 + .../locking_tx_datastore/committed_state.rs | 1 + .../db/datastore/locking_tx_datastore/mod.rs | 82 ++++++++++--------- .../datastore/locking_tx_datastore/mut_tx.rs | 37 +++++---- .../locking_tx_datastore/state_view.rs | 2 + crates/core/src/db/relational_db.rs | 3 + .../src/host/wasmtime/wasm_instance_env.rs | 6 ++ crates/core/src/subscription/subscription.rs | 2 + 10 files changed, 85 insertions(+), 58 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 99bb1eb96e0..f924d8af00e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,7 +45,7 @@ jobs: sudo chmod 777 /stdb - name: Run cargo test - run: cargo test --all --features odb_rocksdb,odb_sled + run: cargo test --all --features odb_rocksdb,odb_sled,metrics lints: name: Lints @@ -61,7 +61,7 @@ jobs: run: cargo fmt --all -- --check - name: Run cargo clippy - run: cargo clippy --all --tests --features odb_rocksdb,odb_sled -- -D warnings + run: cargo clippy --all --tests --features odb_rocksdb,odb_sled,metrics -- -D warnings - name: Check benchmarks run: cd crates/bench && cargo check --benches diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ba264d74c8e..137331c7484 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -94,10 +94,12 @@ wasmtime.workspace = true rocksdb = {workspace = true, optional = true} [features] +# Enable metrics in spacetimedb. +metrics = [] # Optional storage engines. odb_rocksdb = ["dep:rocksdb"] odb_sled = [] -default = ["odb_sled"] +default = ["odb_sled", "metrics"] [dev-dependencies] spacetimedb-lib = { path = "../lib", features = ["proptest"] } diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index 9848313e627..7d82fcd40d6 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -377,6 +377,7 @@ impl CommitLogMut { let operation = match record.op { TxOp::Insert(_) => { // Increment rows inserted metric + #[cfg(feature = "metrics")] DB_METRICS .rdb_num_rows_inserted .with_label_values(workload, db, reducer, &table_id, table_name) @@ -390,6 +391,7 @@ impl CommitLogMut { } TxOp::Delete => { // Increment rows deleted metric + #[cfg(feature = "metrics")] DB_METRICS .rdb_num_rows_deleted .with_label_values(workload, db, reducer, &table_id, table_name) diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index c1b553d7fb3..d374bce436c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -385,6 +385,7 @@ pub struct CommittedIndexIter<'a> { pub(crate) num_committed_rows_fetched: u64, } +#[cfg(feature = "metrics")] impl Drop for CommittedIndexIter<'_> { fn drop(&mut self) { let table_name = self diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 0d227b2f3e4..01cc9a3acff 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -346,14 +346,17 @@ impl traits::MutTx for Locking { let reducer = ctx.reducer_name(); let elapsed_time = tx.timer.elapsed(); let cpu_time = elapsed_time - tx.lock_wait_time; + #[cfg(feature = "metrics")] DB_METRICS .rdb_num_txns .with_label_values(workload, db, reducer, &false) .inc(); + #[cfg(feature = "metrics")] DB_METRICS .rdb_txn_cpu_time_sec .with_label_values(workload, db, reducer) .observe(cpu_time.as_secs_f64()); + #[cfg(feature = "metrics")] DB_METRICS .rdb_txn_elapsed_time_sec .with_label_values(workload, db, reducer) @@ -362,44 +365,47 @@ impl traits::MutTx for Locking { } fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) -> super::Result> { - let workload = &ctx.workload(); - let db = &ctx.database(); - let reducer = ctx.reducer_name(); - let elapsed_time = tx.timer.elapsed(); - let cpu_time = elapsed_time - tx.lock_wait_time; - - let elapsed_time = elapsed_time.as_secs_f64(); - let cpu_time = cpu_time.as_secs_f64(); - // Note, we record empty transactions in our metrics. - // That is, transactions that don't write any rows to the commit log. - DB_METRICS - .rdb_num_txns - .with_label_values(workload, db, reducer, &true) - .inc(); - DB_METRICS - .rdb_txn_cpu_time_sec - .with_label_values(workload, db, reducer) - .observe(cpu_time); - DB_METRICS - .rdb_txn_elapsed_time_sec - .with_label_values(workload, db, reducer) - .observe(elapsed_time); - - let mut guard = MAX_TX_CPU_TIME.lock().unwrap(); - let max_cpu_time = *guard - .entry((*db, *workload, reducer.to_owned())) - .and_modify(|max| { - if cpu_time > *max { - *max = cpu_time; - } - }) - .or_insert_with(|| cpu_time); - - drop(guard); - DB_METRICS - .rdb_txn_cpu_time_sec_max - .with_label_values(workload, db, reducer) - .set(max_cpu_time); + #[cfg(feature = "metrics")] + { + let workload = &ctx.workload(); + let db = &ctx.database(); + let reducer = ctx.reducer_name(); + let elapsed_time = tx.timer.elapsed(); + let cpu_time = elapsed_time - tx.lock_wait_time; + + let elapsed_time = elapsed_time.as_secs_f64(); + let cpu_time = cpu_time.as_secs_f64(); + // Note, we record empty transactions in our metrics. + // That is, transactions that don't write any rows to the commit log. + DB_METRICS + .rdb_num_txns + .with_label_values(workload, db, reducer, &true) + .inc(); + DB_METRICS + .rdb_txn_cpu_time_sec + .with_label_values(workload, db, reducer) + .observe(cpu_time); + DB_METRICS + .rdb_txn_elapsed_time_sec + .with_label_values(workload, db, reducer) + .observe(elapsed_time); + + let mut guard = MAX_TX_CPU_TIME.lock().unwrap(); + let max_cpu_time = *guard + .entry((*db, *workload, reducer.to_owned())) + .and_modify(|max| { + if cpu_time > *max { + *max = cpu_time; + } + }) + .or_insert_with(|| cpu_time); + + drop(guard); + DB_METRICS + .rdb_txn_cpu_time_sec_max + .with_label_values(workload, db, reducer) + .set(max_cpu_time); + } tx.commit() } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 94fd7e416fe..0624da4b1ef 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -89,23 +89,26 @@ impl StateView for TxId { #[allow(dead_code)] impl TxId { pub(crate) fn release(self, ctx: &ExecutionContext) { - let workload = &ctx.workload(); - let db = &ctx.database(); - let reducer = ctx.reducer_name(); - let elapsed_time = self.timer.elapsed(); - let cpu_time = elapsed_time - self.lock_wait_time; - DB_METRICS - .rdb_num_txns - .with_label_values(workload, db, reducer, &false) - .inc(); - DB_METRICS - .rdb_txn_cpu_time_sec - .with_label_values(workload, db, reducer) - .observe(cpu_time.as_secs_f64()); - DB_METRICS - .rdb_txn_elapsed_time_sec - .with_label_values(workload, db, reducer) - .observe(elapsed_time.as_secs_f64()); + #[cfg(feature = "metrics")] + { + let workload = &ctx.workload(); + let db = &ctx.database(); + let reducer = ctx.reducer_name(); + let elapsed_time = self.timer.elapsed(); + let cpu_time = elapsed_time - self.lock_wait_time; + DB_METRICS + .rdb_num_txns + .with_label_values(workload, db, reducer, &false) + .inc(); + DB_METRICS + .rdb_txn_cpu_time_sec + .with_label_values(workload, db, reducer) + .observe(cpu_time.as_secs_f64()); + DB_METRICS + .rdb_txn_elapsed_time_sec + .with_label_values(workload, db, reducer) + .observe(elapsed_time.as_secs_f64()); + } } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index db04d7dd050..a5bcaaf7f1c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -200,6 +200,7 @@ pub struct Iter<'a> { num_committed_rows_fetched: u64, } +#[cfg(feature = "metrics")] impl Drop for Iter<'_> { fn drop(&mut self) { DB_METRICS @@ -333,6 +334,7 @@ pub struct IndexSeekIterMutTxId<'a> { pub(crate) num_committed_rows_fetched: u64, } +#[cfg(feature = "metrics")] impl Drop for IndexSeekIterMutTxId<'_> { fn drop(&mut self) { let table_name = self diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index f646d2eba5b..45693b6c7c1 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -429,6 +429,7 @@ impl RelationalDB { } pub fn drop_table(&self, ctx: &ExecutionContext, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> { + #[cfg(feature = "metrics")] let _guard = DB_METRICS .rdb_drop_table_time .with_label_values(&table_id.0) @@ -619,6 +620,7 @@ impl RelationalDB { #[tracing::instrument(skip(self, tx, row))] pub fn insert(&self, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> Result { + #[cfg(feature = "metrics")] let _guard = DB_METRICS .rdb_insert_row_time .with_label_values(&table_id.0) @@ -644,6 +646,7 @@ impl RelationalDB { #[tracing::instrument(skip_all)] pub fn delete_by_rel(&self, tx: &mut MutTx, table_id: TableId, relation: R) -> u32 { + #[cfg(feature = "metrics")] let _guard = DB_METRICS .rdb_delete_by_rel_time .with_label_values(&table_id.0) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 13d1ea040bf..4736018856b 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -159,6 +159,7 @@ impl WasmInstanceEnv { // TODO: make this part of cvt(), maybe? /// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration + #[cfg(feature = "metrics")] fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer { let ctx = self.reducer_context(); let db = ctx.database(); @@ -381,6 +382,7 @@ impl WasmInstanceEnv { pub fn insert(caller: Caller<'_, Self>, table_id: u32, row: WasmPtr, row_len: u32) -> RtResult { // TODO: Instead of writing this metric on every insert call, // we should aggregate and write at the end of the transaction. + #[cfg(feature = "metrics")] let _guard = caller.data().start_abi_call_timer(AbiCall::Insert); Self::cvt(caller, AbiCall::Insert, |caller| { @@ -425,6 +427,7 @@ impl WasmInstanceEnv { value_len: u32, out: WasmPtr, ) -> RtResult { + #[cfg(feature = "metrics")] let _guard = caller.data().start_abi_call_timer(AbiCall::DeleteByColEq); Self::cvt_ret(caller, AbiCall::DeleteByColEq, out, |caller| { @@ -564,6 +567,7 @@ impl WasmInstanceEnv { val_len: u32, out: WasmPtr, ) -> RtResult { + #[cfg(feature = "metrics")] let _guard = caller.data().start_abi_call_timer(AbiCall::IterByColEq); Self::cvt_ret(caller, AbiCall::IterByColEq, out, |caller| { @@ -593,6 +597,7 @@ impl WasmInstanceEnv { /// - a table with the provided `table_id` doesn't exist // #[tracing::instrument(skip_all)] pub fn iter_start(caller: Caller<'_, Self>, table_id: u32, out: WasmPtr) -> RtResult { + #[cfg(feature = "metrics")] let _guard = caller.data().start_abi_call_timer(AbiCall::IterStart); Self::cvt_ret(caller, AbiCall::IterStart, out, |caller| { @@ -629,6 +634,7 @@ impl WasmInstanceEnv { filter_len: u32, out: WasmPtr, ) -> RtResult { + #[cfg(feature = "metrics")] let _guard = caller.data().start_abi_call_timer(AbiCall::IterStartFiltered); Self::cvt_ret(caller, AbiCall::IterStartFiltered, out, |caller| { diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 97098ef6b7a..b88d4d21c02 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -380,6 +380,7 @@ impl QuerySet { }); } } + #[cfg(feature = "metrics")] record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), start); } } @@ -394,6 +395,7 @@ impl QuerySet { } } +#[cfg(feature = "metrics")] fn record_query_duration_metrics(workload: WorkloadType, db: &Address, start: Instant) { let query_duration = start.elapsed().as_secs_f64();