Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(591): Distinguish query metrics as sql, subscription or incremental update #605

Merged
merged 1 commit into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/core/src/db/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl CommitLogMut {
let mut unwritten_commit = self.unwritten_commit.lock().unwrap();
let mut writes = Vec::with_capacity(tx_data.records.len());

let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = &ctx.reducer_name().unwrap_or_default();

Expand All @@ -340,7 +340,7 @@ impl CommitLogMut {
// Increment rows inserted metric
DB_METRICS
.rdb_num_rows_inserted
.with_label_values(txn_type, db, reducer, &table_id)
.with_label_values(workload, db, reducer, &table_id)
.inc();
// Increment table rows gauge
DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).inc();
Expand All @@ -350,7 +350,7 @@ impl CommitLogMut {
// Increment rows deleted metric
DB_METRICS
.rdb_num_rows_deleted
.with_label_values(txn_type, db, reducer, &table_id)
.with_label_values(workload, db, reducer, &table_id)
.inc();
// Decrement table rows gauge
DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).dec();
Expand Down
38 changes: 19 additions & 19 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::db::datastore::system_tables::{
TABLE_ID_SEQUENCE_ID, WASM_MODULE,
};
use crate::db::db_metrics::{DB_METRICS, MAX_TX_CPU_TIME};
use crate::{db::datastore::system_tables, execution_context::TransactionType};
use crate::{db::datastore::system_tables, execution_context::WorkloadType};
use crate::{
db::datastore::traits::{TxOp, TxRecord},
db::{
Expand Down Expand Up @@ -1721,7 +1721,7 @@ impl Drop for Iter<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.into(),
Expand Down Expand Up @@ -1841,7 +1841,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1852,7 +1852,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_keys_scanned
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1863,7 +1863,7 @@ impl Drop for IndexSeekIterInner<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand Down Expand Up @@ -1915,7 +1915,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_index_seeks
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1926,7 +1926,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_keys_scanned
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand All @@ -1937,7 +1937,7 @@ impl Drop for CommittedIndexIter<'_> {
DB_METRICS
.rdb_num_rows_fetched
.with_label_values(
&self.ctx.txn_type(),
&self.ctx.workload(),
&self.ctx.database(),
self.ctx.reducer_name().unwrap_or_default(),
&self.table_id.0,
Expand Down Expand Up @@ -2093,28 +2093,28 @@ impl traits::MutTx for Locking {
}

fn rollback_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) {
let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let elapsed_time = tx.timer.elapsed();
let cpu_time = elapsed_time - tx.lock_wait_time;
DB_METRICS
.rdb_num_txns
.with_label_values(txn_type, db, reducer, &false)
.with_label_values(workload, db, reducer, &false)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(cpu_time.as_secs_f64());
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(elapsed_time.as_secs_f64());
tx.lock.rollback();
}

fn commit_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) -> super::Result<Option<TxData>> {
let txn_type = &ctx.txn_type();
let workload = &ctx.workload();
let db = &ctx.database();
let reducer = ctx.reducer_name().unwrap_or_default();
let elapsed_time = tx.timer.elapsed();
Expand All @@ -2126,18 +2126,18 @@ impl traits::MutTx for Locking {
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns
.with_label_values(txn_type, db, reducer, &true)
.with_label_values(workload, db, reducer, &true)
.inc();
DB_METRICS
.rdb_txn_cpu_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(cpu_time);
DB_METRICS
.rdb_txn_elapsed_time_sec
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.observe(elapsed_time);

fn hash(a: &TransactionType, b: &Address, c: &str) -> u64 {
fn hash(a: &WorkloadType, b: &Address, c: &str) -> u64 {
use std::hash::Hash;
let mut hasher = DefaultHasher::new();
a.hash(&mut hasher);
Expand All @@ -2148,7 +2148,7 @@ impl traits::MutTx for Locking {

let mut guard = MAX_TX_CPU_TIME.lock().unwrap();
let max_cpu_time = *guard
.entry(hash(txn_type, db, reducer))
.entry(hash(workload, db, reducer))
.and_modify(|max| {
if cpu_time > *max {
*max = cpu_time;
Expand All @@ -2159,7 +2159,7 @@ impl traits::MutTx for Locking {
drop(guard);
DB_METRICS
.rdb_txn_cpu_time_sec_max
.with_label_values(txn_type, db, reducer)
.with_label_values(workload, db, reducer)
.set(max_cpu_time);

tx.lock.commit()
Expand Down
22 changes: 11 additions & 11 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Mutex};

use crate::{execution_context::TransactionType, host::AbiCall, util::typed_prometheus::metrics_group};
use crate::{execution_context::WorkloadType, host::AbiCall, util::typed_prometheus::metrics_group};
use once_cell::sync::Lazy;
use prometheus::{GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGaugeVec};
use spacetimedb_lib::Address;
Expand Down Expand Up @@ -60,52 +60,52 @@ metrics_group!(

#[name = spacetime_num_rows_inserted_cumulative]
#[help = "The cumulative number of rows inserted into a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_inserted: IntCounterVec,

#[name = spacetime_num_rows_deleted_cumulative]
#[help = "The cumulative number of rows deleted from a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_deleted: IntCounterVec,

#[name = spacetime_num_rows_fetched_cumulative]
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_fetched: IntCounterVec,

#[name = spacetime_num_index_keys_scanned_cumulative]
#[help = "The cumulative number of keys scanned from an index"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_keys_scanned: IntCounterVec,

#[name = spacetime_num_index_seeks_cumulative]
#[help = "The cumulative number of index seeks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_index_seeks: IntCounterVec,

#[name = spacetime_num_txns_cumulative]
#[help = "The cumulative number of transactions, including both commits and rollbacks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, committed: bool)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, committed: bool)]
pub rdb_num_txns: IntCounterVec,

#[name = spacetime_txn_elapsed_time_sec]
#[help = "The total elapsed (wall) time of a transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_elapsed_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_sec]
#[help = "The time spent executing a transaction (in seconds), excluding time spent waiting to acquire database locks"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec: HistogramVec,

#[name = spacetime_txn_cpu_time_sec_max]
#[help = "The cpu time of the longest running transaction (in seconds)"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str)]
pub rdb_txn_cpu_time_sec_max: GaugeVec,

#[name = spacetime_wasm_abi_call_duration_sec]
#[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, call: AbiCall)]
#[labels(txn_type: WorkloadType, db: Address, reducer: str, call: AbiCall)]
pub wasm_abi_call_duration_sec: HistogramVec,

#[name = spacetime_message_log_size_bytes]
Expand Down
48 changes: 35 additions & 13 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,28 @@ pub struct ExecutionContext<'a> {
reducer: Option<&'a str>,
/// The SQL query being executed, if any.
/// Note: this will never be set at the same time as `reducer`.
/// It is also NOT guaranteed to be set, even if txn_type == Sql.
/// It is also NOT guaranteed to be set, even if workload == Sql.
/// This is because some transactions tagged "SQL" don't exactly correspond
/// to any particular query.
query_debug_info: Option<&'a QueryDebugInfo>,
// The type of transaction that is being executed.
txn_type: TransactionType,
// The type of workload that is being executed.
workload: WorkloadType,
}

/// Classifies a transaction according to where it originates.
/// Classifies a transaction according to its workload.
/// A transaction can be executing a reducer.
/// It can be used to satisfy a one-off sql query or subscription.
/// It can also be an internal operation that is not associated with a reducer or sql request.
#[derive(Clone, Copy, Display, Hash, PartialEq, Eq)]
pub enum TransactionType {
pub enum WorkloadType {
Reducer,
Sql,
Subscribe,
Update,
Internal,
}

impl Default for TransactionType {
impl Default for WorkloadType {
fn default() -> Self {
Self::Internal
}
Expand All @@ -47,17 +49,37 @@ impl<'a> ExecutionContext<'a> {
database,
reducer: Some(name),
query_debug_info: None,
txn_type: TransactionType::Reducer,
workload: WorkloadType::Reducer,
}
}

/// Returns an [ExecutionContext] for a sql or subscription transaction.
/// Returns an [ExecutionContext] for a one-off sql query.
pub fn sql(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
txn_type: TransactionType::Sql,
workload: WorkloadType::Sql,
}
}

/// Returns an [ExecutionContext] for an initial subscribe call.
pub fn subscribe(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Subscribe,
}
}

/// Returns an [ExecutionContext] for a subscription update.
pub fn incremental_update(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self {
Self {
database,
reducer: None,
query_debug_info,
workload: WorkloadType::Update,
}
}

Expand All @@ -67,7 +89,7 @@ impl<'a> ExecutionContext<'a> {
database,
reducer: None,
query_debug_info: None,
txn_type: TransactionType::Internal,
workload: WorkloadType::Internal,
}
}

Expand All @@ -91,9 +113,9 @@ impl<'a> ExecutionContext<'a> {
self.query_debug_info
}

/// Returns the type of transaction that is being executed.
/// Returns the type of workload that is being executed.
#[inline]
pub fn txn_type(&self) -> TransactionType {
self.txn_type
pub fn workload(&self) -> WorkloadType {
self.workload
}
}
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ impl WasmInstanceEnv {
/// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration
fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer {
let ctx = self.reducer_context();
let txn_type = ctx.txn_type();
let workload = ctx.workload();
let db = ctx.database();

DB_METRICS
.wasm_abi_call_duration_sec
.with_label_values(&txn_type, &db, &self.reducer_name, &call)
.with_label_values(&workload, &db, &self.reducer_name, &call)
.start_timer()
}

Expand Down
7 changes: 3 additions & 4 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,15 @@ fn collect_result(result: &mut Vec<MemTable>, r: CodeResult) -> Result<(), DBErr
Ok(())
}

#[tracing::instrument(skip(db, tx, auth))]
#[tracing::instrument(skip_all)]
pub fn execute_single_sql(
cx: &ExecutionContext,
db: &RelationalDB,
tx: &mut MutTxId,
ast: CrudExpr,
query_debug_info: Option<&QueryDebugInfo>,
auth: AuthCtx,
) -> Result<Vec<MemTable>, DBError> {
let ctx = ExecutionContext::sql(db.address(), query_debug_info);
let p = &mut DbProgram::new(&ctx, db, tx, auth);
let p = &mut DbProgram::new(cx, db, tx, auth);
let q = Expr::Crud(Box::new(ast));

let mut result = Vec::with_capacity(1);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ModuleSubscriptionActor {

// Note: the missing QueryDebugInfo here is only used for finishing the transaction;
// all of the relevant queries already executed, with debug info, in _add_subscription
let ctx = ExecutionContext::sql(self.relational_db.address(), None);
let ctx = ExecutionContext::subscribe(self.relational_db.address(), None);
self.relational_db.finish_tx(&ctx, tx, result)
}

Expand Down Expand Up @@ -218,7 +218,7 @@ impl ModuleSubscriptionActor {
//Split logic to properly handle `Error` + `Tx`
let mut tx = self.relational_db.begin_tx();
let result = self._broadcast_commit_event(event, &mut tx).await;
let ctx = ExecutionContext::sql(self.relational_db.address(), None);
let ctx = ExecutionContext::incremental_update(self.relational_db.address(), None);
self.relational_db.finish_tx(&ctx, tx, result)
}
}
Loading