Skip to content

Commit

Permalink
perf(591): Distinguish query metrics as sql, subscription or incremen…
Browse files Browse the repository at this point in the history
…tal update

Closes #591.
  • Loading branch information
joshua-spacetime committed Nov 28, 2023
1 parent 6e26222 commit cb51649
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 110 deletions.
6 changes: 3 additions & 3 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl CommitLogMut {
let mut unwritten_commit = self.unwritten_commit.lock().unwrap();
let mut writes = Vec::with_capacity(tx_data.records.len());

let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default();

Expand All @@ -340,7 +340,7 @@ impl CommitLogMut {
// Increment rows inserted metric
DB_METRICS
.rdb_num_rows_inserted
.with_label_values(txn_type, db, reducer, &table_id)
.with_label_values(workload, db, reducer, &table_id)
.inc();
// Increment table rows gauge
DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).inc();
Expand All @@ -350,7 +350,7 @@ impl CommitLogMut {
// Increment rows deleted metric
DB_METRICS
.rdb_num_rows_deleted
.with_label_values(txn_type, db, reducer, &table_id)
.with_label_values(workload, db, reducer, &table_id)
.inc();
// Decrement table rows gauge
DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).dec();
Expand Down
38 changes: 19 additions & 19 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::db::datastore::system_tables::{
TABLE_ID_SEQUENCE_ID, WASM_MODULE,
};
use crate::db::db_metrics::{DB_METRICS, MAX_TX_CPU_TIME};
use crate::{db::datastore::system_tables, execution_context::TransactionType};
use crate::{db::datastore::system_tables, execution_context::WorkloadType};
use crate::{
db::datastore::traits::{TxOp, TxRecord},
db::{
Expand Down Expand Up @@ -1721,7 +1721,7 @@ impl Drop for Iter<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.into(),
Expand Down Expand Up @@ -1841,7 +1841,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1852,7 +1852,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_keys_scanned
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1863,7 +1863,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand Down Expand Up @@ -1915,7 +1915,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1926,7 +1926,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_keys_scanned
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1937,7 +1937,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand Down Expand Up @@ -2093,28 +2093,28 @@ impl traits::MutTx for Locking {
}

fn rollback_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) {
let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(txn_type, db, reducer, &false)
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
tx.lock.rollback();
}

fn commit_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) -> super::Result<Option<TxData>> {
let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let elapsed_time = tx.timer.elapsed();
Expand All @@ -2126,18 +2126,18 @@ impl traits::MutTx for Locking {
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(txn_type, db, reducer, &true)
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

fn hash(a: &TransactionType, b: &Address, c: &str) -> u64 {
fn hash(a: &WorkloadType, b: &Address, c: &str) -> u64 {
use std::hash::Hash;
let mut hasher = DefaultHasher::new();
a.hash(&mut hasher);
Expand All @@ -2148,7 +2148,7 @@ impl traits::MutTx for Locking {

let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry(hash(txn_type, db, reducer))
.entry(hash(workload, db, reducer))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
Expand All @@ -2159,7 +2159,7 @@ impl traits::MutTx for Locking {
drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.set(max_cpu_time);

tx.lock.commit()
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Mutex};

use crate::{execution_context::TransactionType, host::AbiCall, util::typed_prometheus::metrics_group};
use crate::{execution_context::WorkloadType, host::AbiCall, util::typed_prometheus::metrics_group};
use once_cell::sync::Lazy;
use prometheus::{GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_lib::Address;
Expand Down Expand Up @@ -60,52 +60,52 @@ metrics_group!(

#[name = spacetime_num_rows_inserted_cumulative]
#[help = "The cumulative number of rows inserted into a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_inserted: IntCounterVec,

#[name = spacetime_num_rows_deleted_cumulative]
#[help = "The cumulative number of rows deleted from a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_deleted: IntCounterVec,

#[name = spacetime_num_rows_fetched_cumulative]
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_fetched: IntCounterVec,

#[name = spacetime_num_index_keys_scanned_cumulative]
#[help = "The cumulative number of keys scanned from an index"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_keys_scanned: IntCounterVec,

#[name = spacetime_num_index_seeks_cumulative]
#[help = "The cumulative number of index seeks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_index_seeks: IntCounterVec,

#[name = spacetime_num_txns_cumulative]
#[help = "The cumulative number of transactions, including both commits and rollbacks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, committed: bool)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, committed: bool)]
pub rdb_num_txns: IntCounterVec,

#[name = spacetime_txn_elapsed_time_sec]
#[help = "The total elapsed (wall) time of a transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_elapsed_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_sec]
#[help = "The time spent executing a transaction (in seconds), excluding time spent waiting to acquire database locks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_sec_max]
#[help = "The cpu time of the longest running transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec_max: GaugeVec,

#[name = spacetime_wasm_abi_call_duration_sec]
#[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, call: AbiCall)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, call: AbiCall)]
pub wasm_abi_call_duration_sec: HistogramVec,

#[name = spacetime_message_log_size_bytes]
Expand Down
48 changes: 35 additions & 13 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@ pub struct ExecutionContext<'a> {
reducer: Option<&'a str>,
/// The SQL query being executed, if any.
/// Note: this will never be set at the same time as `reducer`.
/// It is also NOT guaranteed to be set, even if txn_type == Sql.
/// It is also NOT guaranteed to be set, even if workload == Sql.
/// This is because some transactions tagged "SQL" don't exactly correspond
/// to any particular query.
query_debug_info: Option<&'a QueryDebugInfo>,
// The type of transaction that is being executed.
txn_type: TransactionType,
// The type of workload that is being executed.
workload: WorkloadType,
}

/// Classifies a transaction according to where it originates.
/// Classifies a transaction according to its workload.
/// A transaction can be executing a reducer.
/// It can be used to satisfy a one-off sql query or subscription.
/// It can also be an internal operation that is not associated with a reducer or sql request.
#[derive(Clone, Copy, Display, Hash, PartialEq, Eq)]
pub enum TransactionType {
pub enum WorkloadType {
Reducer,
Sql,
Subscribe,
Incremental,
Internal,
}

impl Default for TransactionType {
impl Default for WorkloadType {
fn default() -> Self {
Self::Internal
}
Expand All @@ -47,17 +49,37 @@ impl<'a> ExecutionContext<'a> {
database,
reducer: Some(name),
query_debug_info: None,
txn_type: TransactionType::Reducer,
workload: WorkloadType::Reducer,
}
}

/// Returns an [ExecutionContext] for a sql or subscription transaction.
/// Returns an [ExecutionContext] for a one-off sql query.
pub fn sql(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
txn_type: TransactionType::Sql,
workload: WorkloadType::Sql,
}
}

/// Returns an [ExecutionContext] for an initial subscribe call.
pub fn subscribe(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Subscribe,
}
}

/// Returns an [ExecutionContext] for a subscription update.
pub fn incremental(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Sql,
}
}

Expand All @@ -67,7 +89,7 @@ impl<'a> ExecutionContext<'a> {
database,
reducer: None,
query_debug_info: None,
txn_type: TransactionType::Internal,
workload: WorkloadType::Internal,
}
}

Expand All @@ -91,9 +113,9 @@ impl<'a> ExecutionContext<'a> {
self.query_debug_info
}

/// Returns the type of transaction that is being executed.
/// Returns the type of workload that is being executed.
#[inline]
pub fn txn_type(&self) -> TransactionType {
self.txn_type
pub fn workload(&self) -> WorkloadType {
self.workload
}
}
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ impl WasmInstanceEnv {
/// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration
fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer {
let ctx = self.reducer_context();
let txn_type = ctx.txn_type();
let workload = ctx.workload();
let db = ctx.database();

DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(&txn_type, &db, &self.reducer_name, &call)
.with_label_values(&workload, &db, &self.reducer_name, &call)
.start_timer()
}

Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,15 @@ fn collect_result(result: &mut Vec<MemTable>, r: CodeResult) -> Result<(), DBErr
Ok(())
}

#[tracing::instrument(skip(db, tx, auth))]
#[tracing::instrument(skip_all)]
pub fn execute_single_sql(
cx: &ExecutionContext,
db: &RelationalDB,
tx: &mut MutTxId,
ast: CrudExpr,
query_debug_info: Option<&QueryDebugInfo>,
auth: AuthCtx,
) -> Result<Vec<MemTable>, DBError> {
let ctx = ExecutionContext::sql(db.address(), query_debug_info);
let p = &mut DbProgram::new(&ctx, db, tx, auth);
let p = &mut DbProgram::new(cx, db, tx, auth);
let q = Expr::Crud(Box::new(ast));

let mut result = Vec::with_capacity(1);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ModuleSubscriptionActor {

// Note: the missing QueryDebugInfo here is only used for finishing the transaction;
// all of the relevant queries already executed, with debug info, in _add_subscription
let ctx = ExecutionContext::sql(self.relational_db.address(), None);
let ctx = ExecutionContext::subscribe(self.relational_db.address(), None);
self.relational_db.finish_tx(&ctx, tx, result)
}

Expand Down Expand Up @@ -218,7 +218,7 @@ impl ModuleSubscriptionActor {
//Split logic to properly handle `Error` + `Tx`
let mut tx = self.relational_db.begin_tx();
let result = self._broadcast_commit_event(event, &mut tx).await;
let ctx = ExecutionContext::sql(self.relational_db.address(), None);
let ctx = ExecutionContext::incremental(self.relational_db.address(), None);
self.relational_db.finish_tx(&ctx, tx, result)
}
}
Loading

0 comments on commit cb51649

Please sign in to comment.