diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 57a9f1ee85..a2e645c9e2 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -103,6 +103,26 @@ metrics_group!( #[labels(txn_type: WorkloadType, db: Address, reducer: str)] pub rdb_txn_cpu_time_sec_max: GaugeVec, + #[name = spacetime_query_cpu_time_sec] + #[help = "The time spent executing a query (in seconds)"] + #[labels(txn_type: WorkloadType, db: Address, query: str)] + pub rdb_query_cpu_time_sec: HistogramVec, + + #[name = spacetime_query_cpu_time_sec_max] + #[help = "The cpu time of the longest running query (in seconds)"] + #[labels(txn_type: WorkloadType, db: Address, query: str)] + pub rdb_query_cpu_time_sec_max: GaugeVec, + + #[name = spacetime_query_compile_time_sec] + #[help = "The time spent compiling a query (in seconds)"] + #[labels(txn_type: WorkloadType, db: Address, query: str)] + pub rdb_query_compile_time_sec: HistogramVec, + + #[name = spacetime_query_compile_time_sec_max] + #[help = "The maximum query compilation time (in seconds)"] + #[labels(txn_type: WorkloadType, db: Address, query: str)] + pub rdb_query_compile_time_sec_max: GaugeVec, + #[name = spacetime_wasm_abi_call_duration_sec] #[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"] #[labels(db: Address, reducer: str, call: AbiCall)] @@ -126,8 +146,12 @@ metrics_group!( ); pub static MAX_TX_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +pub static MAX_QUERY_CPU_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +pub static MAX_QUERY_COMPILE_TIME: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); pub static DB_METRICS: Lazy = Lazy::new(DbMetrics::new); pub fn reset_counters() { MAX_TX_CPU_TIME.lock().unwrap().clear(); + MAX_QUERY_CPU_TIME.lock().unwrap().clear(); + MAX_QUERY_COMPILE_TIME.lock().unwrap().clear(); } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 43bbae9407..361587be13 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -1,13 +1,19 @@ +use std::collections::hash_map::DefaultHasher; +use std::hash::Hasher; +use std::time::Instant; + use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::db_metrics::{DB_METRICS, MAX_QUERY_COMPILE_TIME}; use crate::db::relational_db::RelationalDB; use crate::error::{DBError, SubscriptionError}; -use crate::execution_context::ExecutionContext; +use crate::execution_context::{ExecutionContext, WorkloadType}; use crate::host::module_host::DatabaseTableUpdate; use crate::sql::compiler::compile_sql; use crate::sql::execute::execute_single_sql; use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::subscription::{QuerySet, SupportedQuery}; use spacetimedb_lib::identity::AuthCtx; +use spacetimedb_lib::Address; use spacetimedb_sats::relation::{Column, FieldName, MemTable, RelValue}; use spacetimedb_sats::AlgebraicType; use spacetimedb_sats::DataKey; @@ -109,7 +115,11 @@ pub fn compile_read_only_query( return QuerySet::get_all(relational_db, tx, auth); } - let compiled = compile_sql(relational_db, tx, input)?; + let start = Instant::now(); + let compiled = compile_sql(relational_db, tx, input).map(|expr| { + record_query_compilation_metrics(WorkloadType::Subscribe, &relational_db.address(), input, start); + expr + })?; let mut queries = Vec::with_capacity(compiled.len()); for q in compiled { match q { @@ -138,6 +148,40 @@ pub fn compile_read_only_query( } } +fn record_query_compilation_metrics(workload: WorkloadType, db: &Address, query: &str, start: Instant) { + let compile_duration = start.elapsed().as_secs_f64(); + + DB_METRICS + .rdb_query_compile_time_sec + .with_label_values(&workload, db, query) + .observe(compile_duration); + + fn hash(a: WorkloadType, b: &Address, c: &str) -> u64 { + use std::hash::Hash; + let mut hasher = DefaultHasher::new(); + a.hash(&mut hasher); + b.hash(&mut hasher); + c.hash(&mut hasher); + hasher.finish() + } + + let max_compile_duration = *MAX_QUERY_COMPILE_TIME + .lock() + .unwrap() + .entry(hash(workload, db, query)) + .and_modify(|max| { + if compile_duration > *max { + *max = compile_duration; + } + }) + .or_insert_with(|| compile_duration); + + DB_METRICS + .rdb_query_compile_time_sec_max + .with_label_values(&workload, db, query) + .set(max_compile_duration); +} + /// The kind of [`QueryExpr`] currently supported for incremental evaluation. #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] pub enum Supported { diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index a196068a4b..cc1bd92214 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -25,12 +25,16 @@ use anyhow::Context; use derive_more::{Deref, DerefMut, From, IntoIterator}; +use std::collections::hash_map::DefaultHasher; use std::collections::{btree_set, BTreeSet, HashMap, HashSet}; +use std::hash::Hasher; use std::ops::Deref; +use std::time::Instant; use crate::db::datastore::locking_tx_datastore::MutTxId; +use crate::db::db_metrics::{DB_METRICS, MAX_QUERY_CPU_TIME}; use crate::error::DBError; -use crate::execution_context::ExecutionContext; +use crate::execution_context::{ExecutionContext, WorkloadType}; use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::query::{run_query, OP_TYPE_FIELD_NAME}; use crate::{ @@ -39,7 +43,7 @@ use crate::{ host::module_host::{DatabaseTableUpdate, DatabaseUpdate, TableOp}, }; use spacetimedb_lib::identity::AuthCtx; -use spacetimedb_lib::PrimaryKey; +use spacetimedb_lib::{Address, PrimaryKey}; use spacetimedb_sats::db::auth::{StAccess, StTableType}; use spacetimedb_sats::relation::{DbTable, MemTable, RelValue}; use spacetimedb_sats::{AlgebraicValue, DataKey, ProductValue}; @@ -222,6 +226,7 @@ impl QuerySet { for SupportedQuery { kind, expr, info } in self { use query::Supported::*; + let start = Instant::now(); match kind { Scan => { let source = expr @@ -245,7 +250,6 @@ impl QuerySet { } } } - Semijoin => { if let Some(plan) = IncrementalJoin::new(expr, info, database_update.tables.iter())? { let table_id = plan.lhs.table.table_id; @@ -266,6 +270,7 @@ impl QuerySet { } } } + record_query_duration_metrics(WorkloadType::Update, &db.address(), info.source(), start); } for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { output.tables.push(DatabaseTableUpdate { @@ -292,6 +297,7 @@ impl QuerySet { for SupportedQuery { expr, info, .. } in self { if let Some(t) = expr.source.get_db_table() { + let start = Instant::now(); // Get the TableOps for this table let (_, table_row_operations) = table_ops .entry(t.table_id) @@ -321,6 +327,7 @@ impl QuerySet { }); } } + record_query_duration_metrics(WorkloadType::Subscribe, &db.address(), info.source(), start); } } for (table_id, (table_name, ops)) in table_ops.into_iter().filter(|(_, (_, ops))| !ops.is_empty()) { @@ -334,6 +341,40 @@ impl QuerySet { } } +fn record_query_duration_metrics(workload: WorkloadType, db: &Address, query: &str, start: Instant) { + let query_duration = start.elapsed().as_secs_f64(); + + DB_METRICS + .rdb_query_cpu_time_sec + .with_label_values(&workload, db, query) + .observe(query_duration); + + fn hash(a: WorkloadType, b: &Address, c: &str) -> u64 { + use std::hash::Hash; + let mut hasher = DefaultHasher::new(); + a.hash(&mut hasher); + b.hash(&mut hasher); + c.hash(&mut hasher); + hasher.finish() + } + + let max_query_duration = *MAX_QUERY_CPU_TIME + .lock() + .unwrap() + .entry(hash(workload, db, query)) + .and_modify(|max| { + if query_duration > *max { + *max = query_duration; + } + }) + .or_insert_with(|| query_duration); + + DB_METRICS + .rdb_query_cpu_time_sec_max + .with_label_values(&workload, db, query) + .set(max_query_duration); +} + /// Helper to retain [`PrimaryKey`] before converting to [`TableOp`]. /// /// [`PrimaryKey`] is [`Copy`], while [`TableOp`] stores it as a [`Vec`].