Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(748): Add compile time feature flag for table metrics #749

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ jobs:
sudo chmod 777 /stdb

- name: Run cargo test
run: cargo test --all --features odb_rocksdb,odb_sled
run: cargo test --all --features odb_rocksdb,odb_sled,metrics

lints:
name: Lints
Expand All @@ -61,7 +61,7 @@ jobs:
run: cargo fmt --all -- --check

- name: Run cargo clippy
run: cargo clippy --all --tests --features odb_rocksdb,odb_sled -- -D warnings
run: cargo clippy --all --tests --features odb_rocksdb,odb_sled,metrics -- -D warnings

- name: Check benchmarks
run: cd crates/bench && cargo check --benches
Expand Down
4 changes: 3 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ wasmtime.workspace = true
rocksdb = {workspace = true, optional = true}

[features]
# Enable metrics in spacetimedb.
metrics = []
# Optional storage engines.
odb_rocksdb = ["dep:rocksdb"]
odb_sled = []
default = ["odb_sled"]
default = ["odb_sled", "metrics"]

[dev-dependencies]
spacetimedb-lib = { path = "../lib", features = ["proptest"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ impl CommitLogMut {
let operation = match record.op {
TxOp::Insert(_) => {
// Increment rows inserted metric
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_rows_inserted
.with_label_values(workload, db, reducer, &table_id, table_name)
Expand All @@ -390,6 +391,7 @@ impl CommitLogMut {
}
TxOp::Delete => {
// Increment rows deleted metric
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_rows_deleted
.with_label_values(workload, db, reducer, &table_id, table_name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ pub struct CommittedIndexIter<'a> {
pub(crate) num_committed_rows_fetched: u64,
}

#[cfg(feature = "metrics")]
impl Drop for CommittedIndexIter<'_> {
fn drop(&mut self) {
let table_name = self
Expand Down
82 changes: 44 additions & 38 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,14 +346,17 @@ impl traits::MutTx for Locking {
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
#[cfg(feature = "metrics")]
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
Expand All @@ -362,44 +365,47 @@ impl traits::MutTx for Locking {
}

fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTx) -> super::Result<Option<TxData>> {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;

let elapsed_time = elapsed_time.as_secs_f64();
let cpu_time = cpu_time.as_secs_f64();
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry((*db, *workload, reducer.to_owned()))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
}
})
.or_insert_with(|| cpu_time);

drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(workload, db, reducer)
.set(max_cpu_time);
#[cfg(feature = "metrics")]
{
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;

let elapsed_time = elapsed_time.as_secs_f64();
let cpu_time = cpu_time.as_secs_f64();
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry((*db, *workload, reducer.to_owned()))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
}
})
.or_insert_with(|| cpu_time);

drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(workload, db, reducer)
.set(max_cpu_time);
}

tx.commit()
}
Expand Down
37 changes: 20 additions & 17 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,26 @@ impl StateView for TxId {
#[allow(dead_code)]
impl TxId {
pub(crate) fn release(self, ctx: &ExecutionContext) {
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = self.timer.elapsed();
let cpu_time = elapsed_time - self.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
#[cfg(feature = "metrics")]
{
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name();
let elapsed_time = self.timer.elapsed();
let cpu_time = elapsed_time - self.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub struct Iter<'a> {
num_committed_rows_fetched: u64,
}

#[cfg(feature = "metrics")]
impl Drop for Iter<'_> {
fn drop(&mut self) {
DB_METRICS
Expand Down Expand Up @@ -333,6 +334,7 @@ pub struct IndexSeekIterMutTxId<'a> {
pub(crate) num_committed_rows_fetched: u64,
}

#[cfg(feature = "metrics")]
impl Drop for IndexSeekIterMutTxId<'_> {
fn drop(&mut self) {
let table_name = self
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ impl RelationalDB {
}

pub fn drop_table(&self, ctx: &ExecutionContext, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_drop_table_time
.with_label_values(&table_id.0)
Expand Down Expand Up @@ -619,6 +620,7 @@ impl RelationalDB {

#[tracing::instrument(skip(self, tx, row))]
pub fn insert(&self, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> Result<ProductValue, DBError> {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_insert_row_time
.with_label_values(&table_id.0)
Expand All @@ -644,6 +646,7 @@ impl RelationalDB {

#[tracing::instrument(skip_all)]
pub fn delete_by_rel<R: Relation>(&self, tx: &mut MutTx, table_id: TableId, relation: R) -> u32 {
#[cfg(feature = "metrics")]
let _guard = DB_METRICS
.rdb_delete_by_rel_time
.with_label_values(&table_id.0)
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ impl WasmInstanceEnv {

// TODO: make this part of cvt(), maybe?
/// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration
#[cfg(feature = "metrics")]
fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer {
let ctx = self.reducer_context();
let db = ctx.database();
Expand Down Expand Up @@ -381,6 +382,7 @@ impl WasmInstanceEnv {
pub fn insert(caller: Caller<'_, Self>, table_id: u32, row: WasmPtr<u8>, row_len: u32) -> RtResult<u32> {
// TODO: Instead of writing this metric on every insert call,
// we should aggregate and write at the end of the transaction.
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::Insert);

Self::cvt(caller, AbiCall::Insert, |caller| {
Expand Down Expand Up @@ -425,6 +427,7 @@ impl WasmInstanceEnv {
value_len: u32,
out: WasmPtr<u32>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::DeleteByColEq);

Self::cvt_ret(caller, AbiCall::DeleteByColEq, out, |caller| {
Expand Down Expand Up @@ -564,6 +567,7 @@ impl WasmInstanceEnv {
val_len: u32,
out: WasmPtr<BufferIdx>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterByColEq);

Self::cvt_ret(caller, AbiCall::IterByColEq, out, |caller| {
Expand Down Expand Up @@ -593,6 +597,7 @@ impl WasmInstanceEnv {
/// - a table with the provided `table_id` doesn't exist
// #[tracing::instrument(skip_all)]
pub fn iter_start(caller: Caller<'_, Self>, table_id: u32, out: WasmPtr<BufferIterIdx>) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterStart);

Self::cvt_ret(caller, AbiCall::IterStart, out, |caller| {
Expand Down Expand Up @@ -629,6 +634,7 @@ impl WasmInstanceEnv {
filter_len: u32,
out: WasmPtr<BufferIterIdx>,
) -> RtResult<u32> {
#[cfg(feature = "metrics")]
let _guard = caller.data().start_abi_call_timer(AbiCall::IterStartFiltered);

Self::cvt_ret(caller, AbiCall::IterStartFiltered, out, |caller| {
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/subscription/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ impl QuerySet {
});
}
}
#[cfg(feature = "metrics")]
record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), start);
}
}
Expand All @@ -394,6 +395,7 @@ impl QuerySet {
}
}

#[cfg(feature = "metrics")]
fn record_query_duration_metrics(workload: WorkloadType, db: &Address, start: Instant) {
let query_duration = start.elapsed().as_secs_f64();

Expand Down
Loading