diff --git a/crates/bench/benches/subscription.rs b/crates/bench/benches/subscription.rs index 904e0c19ba8..10723502abf 100644 --- a/crates/bench/benches/subscription.rs +++ b/crates/bench/benches/subscription.rs @@ -6,6 +6,7 @@ use spacetimedb::execution_context::ExecutionContext; use spacetimedb::host::module_host::{DatabaseTableUpdate, DatabaseUpdate}; use spacetimedb::subscription::query::compile_read_only_query; use spacetimedb::subscription::subscription::ExecutionSet; +use spacetimedb::util::slow::SlowQueryConfig; use spacetimedb_bench::database::BenchDatabase as _; use spacetimedb_bench::spacetime_raw::SpacetimeRaw; use spacetimedb_lib::identity::AuthCtx; @@ -107,7 +108,7 @@ fn eval(c: &mut Criterion) { let tx = raw.db.begin_tx(); let query = compile_read_only_query(&raw.db, &tx, &auth, sql).unwrap(); let query: ExecutionSet = query.into(); - let ctx = &ExecutionContext::subscribe(raw.db.address()); + let ctx = &ExecutionContext::subscribe(raw.db.address(), SlowQueryConfig::default()); b.iter(|| drop(black_box(query.eval(ctx, Protocol::Binary, &raw.db, &tx).unwrap()))) }); }; @@ -130,7 +131,7 @@ fn eval(c: &mut Criterion) { ); bench_eval(c, "full-join", &name); - let ctx_incr = &ExecutionContext::incremental_update(raw.db.address()); + let ctx_incr = &ExecutionContext::incremental_update(raw.db.address(), SlowQueryConfig::default()); // To profile this benchmark for 30s // samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- incr-select --exact --profile-time=30 diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index fb95523ee7d..3de9c724b91 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -1,5 +1,13 @@ +use crate::util::slow::{SlowQueryConfig, Threshold}; +use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType}; +use spacetimedb_vm::dsl::mem_table; +use spacetimedb_vm::errors::{ConfigError, ErrorVm}; +use spacetimedb_vm::relation::MemTable; use std::env::temp_dir; +use std::fmt::{Display, Formatter}; use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; #[cfg(not(any(target_os = "macos", target_os = "windows")))] mod paths { @@ -152,3 +160,112 @@ impl SpacetimeDbFiles for FilesGlobal { paths::config_path() } } + +/// Enumeration of options for reading configuration settings. +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum ReadConfigOption { + SlowQueryThreshold, + SlowIncrementalUpdatesThreshold, + SlowSubscriptionsThreshold, +} + +impl ReadConfigOption { + pub fn type_of(&self) -> AlgebraicType { + AlgebraicType::U64 + } +} + +impl Display for ReadConfigOption { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let value = match self { + ReadConfigOption::SlowQueryThreshold => "slow_ad_hoc_query_ms", + ReadConfigOption::SlowIncrementalUpdatesThreshold => "slow_tx_update_ms", + ReadConfigOption::SlowSubscriptionsThreshold => "slow_subscription_query_ms", + }; + write!(f, "{value}") + } +} + +impl FromStr for ReadConfigOption { + type Err = ConfigError; + + fn from_str(s: &str) -> Result { + match s { + "slow_ad_hoc_query_ms" => Ok(Self::SlowQueryThreshold), + "slow_tx_update_ms" => Ok(Self::SlowIncrementalUpdatesThreshold), + "slow_subscription_query_ms" => Ok(Self::SlowSubscriptionsThreshold), + x => Err(ConfigError::NotFound(x.into())), + } + } +} + +pub enum SetConfigOption { + SlowQuery(Threshold), +} + +/// Holds a list of the runtime configurations settings of the database +#[derive(Debug, Clone, Copy)] +pub struct DatabaseConfig { + pub(crate) slow_query: SlowQueryConfig, +} + +impl DatabaseConfig { + /// Creates a new `DatabaseConfig` with the specified slow query settings. + pub fn with_slow_query(slow_query: SlowQueryConfig) -> Self { + Self { slow_query } + } + + /// Reads a configuration setting specified by parsing `key`. + pub fn read(&self, key: &str) -> Result, ConfigError> { + let key = ReadConfigOption::from_str(key)?; + + Ok(match key { + ReadConfigOption::SlowQueryThreshold => self.slow_query.queries, + ReadConfigOption::SlowIncrementalUpdatesThreshold => self.slow_query.incremental_updates, + ReadConfigOption::SlowSubscriptionsThreshold => self.slow_query.subscriptions, + }) + } + + /// Reads a configuration setting specified by parsing `key` and converts it into a `MemTable`. + /// + /// For returning as `table` for `SQL` queries. + pub fn read_key_into_table(&self, key: &str) -> Result { + let value = if let Some(value) = self.read(key)? { + value.as_millis() + } else { + 0u128 + }; + + let value: AlgebraicValue = if value == 0 { + AlgebraicValue::OptionNone() + } else { + AlgebraicValue::OptionSome(value.into()) + }; + + let head = ProductType::from([(key, value.type_of())]); + + Ok(mem_table(head, vec![product!(value)])) + } + + /// Writes the configuration setting specified by parsing `key` and `value`. + pub fn set_config(&mut self, key: &str, value: AlgebraicValue) -> Result<(), ErrorVm> { + let config = ReadConfigOption::from_str(key)?; + let millis = if let Some(value) = value.as_u64() { + if *value == 0 { + None + } else { + Some(Duration::from_millis(*value)) + } + } else { + return Err(ConfigError::TypeError(key.into(), value, AlgebraicType::U64).into()); + }; + + match config { + ReadConfigOption::SlowQueryThreshold => self.slow_query.queries = millis, + ReadConfigOption::SlowIncrementalUpdatesThreshold => self.slow_query.incremental_updates = millis, + ReadConfigOption::SlowSubscriptionsThreshold => self.slow_query.subscriptions = millis, + }; + + Ok(()) + } +} diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 6b3c231ccf8..357316ded1e 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -10,11 +10,14 @@ use super::datastore::{ }; use super::relational_operators::Relation; use crate::address::Address; +use crate::config::DatabaseConfig; use crate::db::db_metrics::DB_METRICS; use crate::error::{DBError, DatabaseError, TableError}; use crate::execution_context::ExecutionContext; use crate::hash::Hash; +use crate::util::slow::SlowQueryConfig; use fs2::FileExt; +use parking_lot::RwLock; use spacetimedb_commitlog as commitlog; use spacetimedb_durability::{self as durability, Durability}; use spacetimedb_primitives::*; @@ -22,6 +25,7 @@ use spacetimedb_sats::db::auth::{StAccess, StTableType}; use spacetimedb_sats::db::def::{ColumnDef, IndexDef, SequenceDef, TableDef, TableSchema}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue}; use spacetimedb_table::indexes::RowPointer; +use spacetimedb_vm::errors::ErrorVm; use std::borrow::Cow; use std::fs::{create_dir_all, File}; use std::io; @@ -59,6 +63,7 @@ pub struct RelationalDB { // Release file lock last when dropping. _lock: Arc, + config: Arc>, } impl std::fmt::Debug for RelationalDB { @@ -136,6 +141,9 @@ impl RelationalDB { disk_size_fn, _lock: Arc::new(lock), + config: Arc::new(RwLock::new(DatabaseConfig::with_slow_query( + SlowQueryConfig::with_defaults(), + ))), }) } @@ -750,6 +758,15 @@ impl RelationalDB { pub(crate) fn set_program_hash(&self, tx: &mut MutTx, fence: u128, hash: Hash) -> Result<(), DBError> { self.inner.set_program_hash(tx, fence, hash) } + + /// Set a runtime configurations setting of the database + pub fn set_config(&self, key: &str, value: AlgebraicValue) -> Result<(), ErrorVm> { + self.config.write().set_config(key, value) + } + /// Read the runtime configurations settings of the database + pub fn read_config(&self) -> DatabaseConfig { + *self.config.read() + } } #[cfg(any(test, feature = "test"))] diff --git a/crates/core/src/execution_context.rs b/crates/core/src/execution_context.rs index 5a31779c660..f7e8fcf8dcd 100644 --- a/crates/core/src/execution_context.rs +++ b/crates/core/src/execution_context.rs @@ -6,6 +6,7 @@ use spacetimedb_lib::Address; use spacetimedb_primitives::TableId; use crate::db::db_metrics::DB_METRICS; +use crate::util::slow::SlowQueryConfig; pub enum MetricType { IndexSeeks, @@ -118,6 +119,8 @@ pub struct ExecutionContext { workload: WorkloadType, /// The Metrics to be reported for this transaction. pub metrics: Arc>, + /// Configuration threshold for detecting slow queries. + pub slow_query_config: SlowQueryConfig, } /// Classifies a transaction according to its workload. @@ -141,38 +144,44 @@ impl Default for WorkloadType { impl ExecutionContext { /// Returns an [ExecutionContext] with the provided parameters and empty metrics. - fn new(database: Address, reducer: Option, workload: WorkloadType) -> Self { + fn new( + database: Address, + reducer: Option, + workload: WorkloadType, + slow_query_config: SlowQueryConfig, + ) -> Self { Self { database, reducer, workload, metrics: <_>::default(), + slow_query_config, } } /// Returns an [ExecutionContext] for a reducer transaction. pub fn reducer(database: Address, name: String) -> Self { - Self::new(database, Some(name), WorkloadType::Reducer) + Self::new(database, Some(name), WorkloadType::Reducer, Default::default()) } /// Returns an [ExecutionContext] for a one-off sql query. - pub fn sql(database: Address) -> Self { - Self::new(database, None, WorkloadType::Sql) + pub fn sql(database: Address, slow_query_config: SlowQueryConfig) -> Self { + Self::new(database, None, WorkloadType::Sql, slow_query_config) } /// Returns an [ExecutionContext] for an initial subscribe call. - pub fn subscribe(database: Address) -> Self { - Self::new(database, None, WorkloadType::Subscribe) + pub fn subscribe(database: Address, slow_query_config: SlowQueryConfig) -> Self { + Self::new(database, None, WorkloadType::Subscribe, slow_query_config) } /// Returns an [ExecutionContext] for a subscription update. - pub fn incremental_update(database: Address) -> Self { - Self::new(database, None, WorkloadType::Update) + pub fn incremental_update(database: Address, slow_query_config: SlowQueryConfig) -> Self { + Self::new(database, None, WorkloadType::Update, slow_query_config) } /// Returns an [ExecutionContext] for an internal database operation. pub fn internal(database: Address) -> Self { - Self::new(database, None, WorkloadType::Internal) + Self::new(database, None, WorkloadType::Internal, Default::default()) } /// Returns the address of the database on which we are operating. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7c2ba1bb494..76cb75c99fa 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -268,7 +268,8 @@ impl Module for WasmModuleHostActor { let db = &self.database_instance_context.relational_db; let auth = AuthCtx::new(self.database_instance_context.identity, caller_identity); log::debug!("One-off query: {query}"); - let ctx = &ExecutionContext::sql(db.address()); + // Don't need the `slow query` logger on compilation + let ctx = &ExecutionContext::sql(db.address(), db.read_config().slow_query); let compiled: Vec<_> = db.with_read_only(ctx, |tx| { let ast = sql::compiler::compile_sql(db, tx, &query)?; ast.into_iter() @@ -282,7 +283,7 @@ impl Module for WasmModuleHostActor { .collect::>() })?; - sql::execute::execute_sql(db, compiled, auth) + sql::execute::execute_sql(db, &query, compiled, auth) } fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> { diff --git a/crates/core/src/sql/ast.rs b/crates/core/src/sql/ast.rs index c1b7e3e3317..cb42571c549 100644 --- a/crates/core/src/sql/ast.rs +++ b/crates/core/src/sql/ast.rs @@ -1,3 +1,4 @@ +use crate::config::ReadConfigOption; use crate::db::relational_db::{MutTx, RelationalDB, Tx}; use crate::error::{DBError, PlanError}; use itertools::Itertools; @@ -18,6 +19,7 @@ use sqlparser::ast::{ }; use sqlparser::dialect::PostgreSqlDialect; use sqlparser::parser::Parser; +use std::str::FromStr; use std::sync::Arc; /// Simplify to detect features of the syntax we don't support yet @@ -269,6 +271,13 @@ pub enum SqlAst { kind: DbType, table_access: StAccess, }, + SetVar { + name: String, + value: AlgebraicValue, + }, + ReadVar { + name: String, + }, } fn extract_field(table: &From, of: &SqlExpr) -> Result, PlanError> { @@ -321,6 +330,16 @@ fn infer_str_or_enum(field: Option<&ProductTypeElement>, value: String) -> Resul } } +/// Parses `name` as a [ReadConfigOption] and then parse the numeric value. +fn infer_config(name: &str, value: &str, is_long: bool) -> Result { + let config = ReadConfigOption::from_str(name)?; + infer_number( + Some(&ProductTypeElement::new_named(config.type_of(), name)), + value, + is_long, + ) +} + /// Compiles a [SqlExpr] expression into a [ColumnOp] fn compile_expr_value(table: &From, field: Option<&ProductTypeElement>, of: SqlExpr) -> Result { Ok(ColumnOp::Field(match of { @@ -909,6 +928,51 @@ fn compile_drop(name: &ObjectName, kind: ObjectType) -> Result) -> Result { + let name = name.to_string(); + + let value = match value.as_slice() { + [first] => first.clone(), + _ => { + return Err(PlanError::Unsupported { + feature: format!("Invalid value for config: {name} => {value:?}."), + }); + } + }; + + let value = match value { + SqlExpr::Value(x) => match x { + Value::Number(value, is_long) => infer_config(&name, &value, is_long)?, + x => { + return Err(PlanError::Unsupported { + feature: format!("Unsupported value for config: {x}."), + }); + } + }, + x => { + return Err(PlanError::Unsupported { + feature: format!("Unsupported expression for config: {x}"), + }); + } + }; + + Ok(SqlAst::SetVar { name, value }) +} + +/// Compiles the equivalent of `SHOW key` +fn compile_read_config(name: Vec) -> Result { + let name = match name.as_slice() { + [first] => first.to_string(), + _ => { + return Err(PlanError::Unsupported { + feature: format!("Invalid name for config: {name:?}"), + }); + } + }; + Ok(SqlAst::ReadVar { name }) +} + /// Compiles a `SQL` clause fn compile_statement(db: &RelationalDB, tx: &T, statement: Statement) -> Result { match statement { @@ -1078,6 +1142,16 @@ fn compile_statement(db: &RelationalDB, tx: &T, statement: S }; compile_drop(name, object_type) } + Statement::SetVariable { + local, + hivevar, + variable, + value, + } => { + unsupported!("SET", local, hivevar); + compile_set_config(variable, value) + } + Statement::ShowVariable { variable } => compile_read_config(variable), x => Err(PlanError::Unsupported { feature: format!("Syntax {x}"), }), diff --git a/crates/core/src/sql/compiler.rs b/crates/core/src/sql/compiler.rs index da6d119aacc..bba5a5b1328 100644 --- a/crates/core/src/sql/compiler.rs +++ b/crates/core/src/sql/compiler.rs @@ -281,6 +281,8 @@ fn compile_statement(db: &RelationalDB, statement: SqlAst) -> Result compile_drop(name, kind, table_access)?, + SqlAst::SetVar { name, value } => CrudExpr::SetVar { name, value }, + SqlAst::ReadVar { name } => CrudExpr::ReadVar { name }, }; Ok(q.optimize(&|table_id, table_name| db.row_count(table_id, table_name))) @@ -1165,7 +1167,7 @@ mod tests { let sql = "select * from enum where a = 'Player'"; let q = compile_sql(&db, &tx, sql); assert!(q.is_ok()); - let result = execute_sql(&db, q.unwrap(), AuthCtx::for_testing())?; + let result = execute_sql(&db, sql, q.unwrap(), AuthCtx::for_testing())?; assert_eq!(result[0].data, vec![product![AlgebraicValue::enum_simple(0)]]); Ok(()) } diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 5d16b348a69..a9e9197e672 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -3,6 +3,7 @@ use crate::database_instance_context_controller::DatabaseInstanceContextControll use crate::db::relational_db::RelationalDB; use crate::error::{DBError, DatabaseError}; use crate::execution_context::ExecutionContext; +use crate::util::slow::SlowQueryLogger; use crate::vm::{DbProgram, TxMode}; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{ProductType, ProductValue}; @@ -53,11 +54,15 @@ pub(crate) fn collect_result(result: &mut Vec, r: CodeResult) -> Resul /// Run the compiled `SQL` expression inside the `vm` created by [DbProgram] /// /// Evaluates `ast` and accordingly triggers mutable or read tx to execute -pub fn execute_sql(db: &RelationalDB, ast: Vec, auth: AuthCtx) -> Result, DBError> { +/// +/// Also, in case the execution takes more than x, log it as `slow query` +pub fn execute_sql(db: &RelationalDB, sql: &str, ast: Vec, auth: AuthCtx) -> Result, DBError> { let total = ast.len(); - let ctx = ExecutionContext::sql(db.address()); + let ctx = ExecutionContext::sql(db.address(), db.read_config().slow_query); let mut result = Vec::with_capacity(total); let sources = [].into(); + let slow_logger = SlowQueryLogger::query(ctx.slow_query_config, sql); + match CrudExpr::is_reads(&ast) { false => db.with_auto_commit(&ctx, |mut_tx| { let mut tx: TxMode = mut_tx.into(); @@ -74,15 +79,16 @@ pub fn execute_sql(db: &RelationalDB, ast: Vec, auth: AuthCtx) -> Resu collect_result(&mut result, run_ast(p, q, sources).into()) }), }?; + slow_logger.log(); Ok(result) } /// Run the `SQL` string using the `auth` credentials pub fn run(db: &RelationalDB, sql_text: &str, auth: AuthCtx) -> Result, DBError> { - let ctx = &ExecutionContext::sql(db.address()); + let ctx = &ExecutionContext::sql(db.address(), db.read_config().slow_query); let ast = db.with_read_only(ctx, |tx| compile_sql(db, tx, sql_text))?; - execute_sql(db, ast, auth) + execute_sql(db, sql_text, ast, auth) } #[cfg(test)] diff --git a/crates/core/src/subscription/execution_unit.rs b/crates/core/src/subscription/execution_unit.rs index d37dc4d2510..bd7a7744dac 100644 --- a/crates/core/src/subscription/execution_unit.rs +++ b/crates/core/src/subscription/execution_unit.rs @@ -5,6 +5,7 @@ use crate::error::DBError; use crate::execution_context::ExecutionContext; use crate::host::module_host::{DatabaseTableUpdate, DatabaseTableUpdateCow, TableOp, UpdatesCow}; use crate::json::client_api::TableUpdateJson; +use crate::util::slow::SlowQueryLogger; use crate::vm::{build_query, TxMode}; use spacetimedb_client_api_messages::client_api::{TableRowOperation, TableUpdate}; use spacetimedb_lib::ProductValue; @@ -71,6 +72,7 @@ enum EvalIncrPlan { pub struct ExecutionUnit { hash: QueryHash, + pub(crate) sql: String, /// A version of the plan optimized for `eval`, /// whose source is a [`DbTable`]. /// @@ -133,16 +135,18 @@ impl ExecutionUnit { SupportedQuery { kind: query::Supported::Select, expr, + .. } => EvalIncrPlan::Select(Self::compile_select_eval_incr(expr)), SupportedQuery { kind: query::Supported::Semijoin, expr, + .. } => EvalIncrPlan::Semijoin(IncrementalJoin::new(expr)?), }; - let eval_plan = eval_plan.expr; Ok(ExecutionUnit { hash, - eval_plan, + sql: eval_plan.sql, + eval_plan: eval_plan.expr, eval_incr_plan, }) } @@ -211,8 +215,9 @@ impl ExecutionUnit { ctx: &ExecutionContext, db: &RelationalDB, tx: &Tx, + sql: &str, ) -> Result, DBError> { - let table_row_operations = Self::eval_query_expr(ctx, db, tx, &self.eval_plan, |row| { + let table_row_operations = Self::eval_query_expr(ctx, db, tx, &self.eval_plan, sql, |row| { TableOp::insert(row.into_product_value()).into() })?; Ok((!table_row_operations.is_empty()).then(|| TableUpdateJson { @@ -229,9 +234,10 @@ impl ExecutionUnit { ctx: &ExecutionContext, db: &RelationalDB, tx: &Tx, + sql: &str, ) -> Result, DBError> { let mut buf = Vec::new(); - let table_row_operations = Self::eval_query_expr(ctx, db, tx, &self.eval_plan, |row| { + let table_row_operations = Self::eval_query_expr(ctx, db, tx, &self.eval_plan, sql, |row| { row.to_bsatn_extend(&mut buf).unwrap(); let row = buf.clone(); buf.clear(); @@ -249,11 +255,14 @@ impl ExecutionUnit { db: &RelationalDB, tx: &Tx, eval_plan: &QueryExpr, + sql: &str, convert: impl FnMut(RelValue<'_>) -> T, ) -> Result, DBError> { let tx: TxMode = tx.into(); + let slow_query = SlowQueryLogger::query(ctx.slow_query_config, sql); let query = build_query(ctx, db, &tx, eval_plan, &mut NoInMemUsed)?; let ops = query.collect_vec(convert)?; + slow_query.log(); Ok(ops) } @@ -263,12 +272,16 @@ impl ExecutionUnit { ctx: &'a ExecutionContext, db: &'a RelationalDB, tx: &'a TxMode<'a>, + sql: &'a str, tables: impl 'a + Clone + Iterator, ) -> Result>, DBError> { + let slow_query = SlowQueryLogger::query(ctx.slow_query_config, sql); let updates = match &self.eval_incr_plan { EvalIncrPlan::Select(plan) => Self::eval_incr_query_expr(ctx, db, tx, tables, plan, self.return_table())?, EvalIncrPlan::Semijoin(plan) => plan.eval(ctx, db, tx, tables)?, }; + slow_query.log(); + Ok(updates.has_updates().then(|| DatabaseTableUpdateCow { table_id: self.return_table(), table_name: self.return_name(), diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index a44bb33b38f..0506e7c06d5 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -44,7 +44,10 @@ impl ModuleSubscriptions { timer: Instant, _assert: Option, ) -> Result<(), DBError> { - let ctx = ExecutionContext::subscribe(self.relational_db.address()); + let ctx = ExecutionContext::subscribe( + self.relational_db.address(), + self.relational_db.read_config().slow_query, + ); let tx = scopeguard::guard(self.relational_db.begin_tx(), |tx| { self.relational_db.release_tx(&ctx, tx); }); diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 0a3ac1a2189..c9274293c2b 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -114,9 +114,9 @@ impl SubscriptionManager { #[tracing::instrument(skip_all)] pub fn eval_updates(&self, db: &RelationalDB, event: Arc) { let tables = &event.status.database_update().unwrap().tables; - + let slow = db.read_config().slow_query; let tx = scopeguard::guard(db.begin_tx(), |tx| { - tx.release(&ExecutionContext::incremental_update(db.address())); + tx.release(&ExecutionContext::incremental_update(db.address(), slow)); }); // Put the main work on a rayon compute thread. @@ -134,13 +134,13 @@ impl SubscriptionManager { } let span = tracing::info_span!("eval_incr").entered(); - let ctx = ExecutionContext::incremental_update(db.address()); + let ctx = ExecutionContext::incremental_update(db.address(), slow); let tx = &tx.deref().into(); let eval = units .into_par_iter() .filter_map(|(hash, tables)| self.queries.get(hash).map(|unit| (hash, tables, unit))) .filter_map(|(hash, tables, unit)| { - match unit.eval_incr(&ctx, db, tx, tables.into_iter()) { + match unit.eval_incr(&ctx, db, tx, &unit.sql, tables.into_iter()) { Ok(None) => None, Ok(Some(table)) => Some((hash, table)), Err(err) => { diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 36271c4ad3d..c6281f29e1c 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -52,6 +52,8 @@ pub fn compile_read_only_query( return Err(SubscriptionError::SideEffect(Crud::Create(DbType::Table)).into()) } CrudExpr::Drop { kind, .. } => return Err(SubscriptionError::SideEffect(Crud::Drop(kind)).into()), + CrudExpr::SetVar { .. } => return Err(SubscriptionError::SideEffect(Crud::Config).into()), + CrudExpr::ReadVar { .. } => return Err(SubscriptionError::SideEffect(Crud::Config).into()), } } @@ -106,6 +108,7 @@ mod tests { use crate::sql::execute::collect_result; use crate::sql::execute::run; use crate::subscription::subscription::ExecutionSet; + use crate::util::slow::SlowQueryConfig; use crate::vm::tests::create_table_with_rows; use crate::vm::DbProgram; use itertools::Itertools; @@ -286,7 +289,7 @@ mod tests { total_tables: usize, rows: &[ProductValue], ) -> ResultTest<()> { - let ctx = &ExecutionContext::incremental_update(db.address()); + let ctx = &ExecutionContext::incremental_update(db.address(), SlowQueryConfig::default()); let tx = &tx.into(); let result = s.eval_incr(ctx, db, tx, update)?; assert_eq!( @@ -335,8 +338,8 @@ mod tests { Ok(()) } - fn singleton_execution_set(expr: QueryExpr) -> ResultTest { - Ok(ExecutionSet::from_iter([SupportedQuery::try_from(expr)?])) + fn singleton_execution_set(expr: QueryExpr, sql: String) -> ResultTest { + Ok(ExecutionSet::from_iter([SupportedQuery::try_from((expr, sql))?])) } #[test] @@ -374,9 +377,9 @@ mod tests { panic!("unexpected query {:#?}", exp[0]); }; - let query: ExecutionSet = singleton_execution_set(query)?; + let query: ExecutionSet = singleton_execution_set(query, sql.into())?; - let ctx = &ExecutionContext::incremental_update(db.address()); + let ctx = &ExecutionContext::incremental_update(db.address(), SlowQueryConfig::default()); let tx = (&tx).into(); let result = query.eval_incr(ctx, &db, &tx, &update)?; @@ -437,7 +440,7 @@ mod tests { scalar(1u64), ); - let s = singleton_execution_set(q_id)?; + let s = singleton_execution_set(q_id, "SELECT * FROM inventory WHERE inventory_id = 1".into())?; let data = DatabaseTableUpdate { table_id: schema.table_id, @@ -557,7 +560,7 @@ mod tests { let row_2 = product!(2u64, "jhon doe"); let tx = db.begin_tx(); let s = compile_read_only_query(&db, &tx, &AuthCtx::for_testing(), SUBSCRIBE_TO_ALL_QUERY)?.into(); - let ctx = ExecutionContext::subscribe(db.address()); + let ctx = ExecutionContext::subscribe(db.address(), SlowQueryConfig::default()); check_query_eval(&ctx, &db, &tx, &s, 2, &[row_1.clone(), row_2.clone()])?; let data1 = DatabaseTableUpdate { @@ -675,7 +678,7 @@ mod tests { let Some(CrudExpr::Query(query)) = exp.pop() else { panic!("unexpected query {:#?}", exp[0]); }; - singleton_execution_set(query) + singleton_execution_set(query, sql.into()) }) } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index a17dd878260..9dbd0075996 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -61,12 +61,13 @@ use std::sync::Arc; pub struct SupportedQuery { pub kind: query::Supported, pub expr: QueryExpr, + pub sql: String, } impl SupportedQuery { - pub fn new(expr: QueryExpr, text: String) -> Result { - let kind = query::classify(&expr).ok_or(SubscriptionError::Unsupported(text))?; - Ok(Self { kind, expr }) + pub fn new(expr: QueryExpr, sql: String) -> Result { + let kind = query::classify(&expr).ok_or_else(|| SubscriptionError::Unsupported(sql.clone()))?; + Ok(Self { kind, expr, sql }) } pub fn kind(&self) -> query::Supported { @@ -113,12 +114,12 @@ impl SupportedQuery { } #[cfg(test)] -impl TryFrom for SupportedQuery { +impl TryFrom<(QueryExpr, String)> for SupportedQuery { type Error = DBError; - fn try_from(expr: QueryExpr) -> Result { + fn try_from((expr, sql): (QueryExpr, String)) -> Result { let kind = query::classify(&expr).context("Unsupported query expression")?; - Ok(Self { kind, expr }) + Ok(Self { kind, expr, sql }) } } @@ -553,7 +554,7 @@ impl ExecutionSet { self.exec_units // if you need eval to run single-threaded for debugging, change this to .iter() .par_iter() - .filter_map(|unit| unit.eval_json(ctx, db, tx).transpose()) + .filter_map(|unit| unit.eval_json(ctx, db, tx, &unit.sql).transpose()) .collect::, _>>() } @@ -563,7 +564,7 @@ impl ExecutionSet { self.exec_units // if you need eval to run single-threaded for debugging, change this to .iter() .par_iter() - .filter_map(|unit| unit.eval_binary(ctx, db, tx).transpose()) + .filter_map(|unit| unit.eval_binary(ctx, db, tx, &unit.sql).transpose()) .collect::, _>>() } @@ -577,7 +578,7 @@ impl ExecutionSet { ) -> Result, DBError> { let mut tables = Vec::new(); for unit in &self.exec_units { - if let Some(table) = unit.eval_incr(ctx, db, tx, database_update.tables.iter())? { + if let Some(table) = unit.eval_incr(ctx, db, tx, &unit.sql, database_update.tables.iter())? { tables.push(table); } } @@ -636,6 +637,7 @@ pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) -> .map(|src| SupportedQuery { kind: query::Supported::Select, expr: QueryExpr::new(src), + sql: format!("SELECT * FROM {}", src.table_name), }) .collect()) } diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index fc71a56fb61..1241ae4bd22 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -9,6 +9,7 @@ pub mod prometheus_handle; mod future_queue; pub mod lending_pool; pub mod notify_once; +pub mod slow; pub use future_queue::{future_queue, FutureQueue}; diff --git a/crates/core/src/util/slow.rs b/crates/core/src/util/slow.rs new file mode 100644 index 00000000000..003e4f09c32 --- /dev/null +++ b/crates/core/src/util/slow.rs @@ -0,0 +1,243 @@ +use std::time::{Duration, Instant}; + +/// Default threshold for general queries in `ms`. +const THRESHOLD_QUERIES_MILLIS: u64 = 100; + +/// Configuration threshold for detecting slow queries. +#[derive(Debug, Clone, Copy)] +pub struct SlowQueryConfig { + /// The threshold duration for incremental updates. + pub(crate) incremental_updates: Option, + /// The threshold duration for subscriptions. + pub(crate) subscriptions: Option, + /// The threshold duration for general queries. + pub(crate) queries: Option, +} + +impl SlowQueryConfig { + /// Creates a new `SlowQueryConfig` with all the threshold set to [None]. + pub fn new() -> Self { + Self { + incremental_updates: None, + subscriptions: None, + queries: None, + } + } + + /// Creates a new `SlowQueryConfig` with [THRESHOLD_QUERIES_MILLIS] for `queries` and the rest set to [None]. + pub fn with_defaults() -> Self { + Self { + incremental_updates: None, + subscriptions: None, + queries: Some(Duration::from_millis(THRESHOLD_QUERIES_MILLIS)), + } + } + + /// Sets the threshold for incremental updates. + pub fn with_incremental_updates(mut self, duration: Duration) -> Self { + self.incremental_updates = Some(duration); + self + } + /// Sets the threshold for subscriptions. + pub fn with_subscriptions(mut self, duration: Duration) -> Self { + self.subscriptions = Some(duration); + self + } + /// Sets the threshold for general queries. + pub fn with_queries(mut self, duration: Duration) -> Self { + self.queries = Some(duration); + self + } +} + +impl Default for SlowQueryConfig { + fn default() -> Self { + Self::new() + } +} + +/// Represents `threshold` for [SlowQueryLogger]. +pub enum Threshold { + IncrementalUpdates(Option), + Subscriptions(Option), + Queries(Option), +} + +/// Start the recording of a `sql` with a specific [Threshold]. +pub struct SlowQueryLogger<'a> { + /// The SQL statement of the query. + sql: &'a str, + /// The start time of the query execution. + start: Instant, + /// Which [Threshold] to use. + threshold: Threshold, +} + +impl<'a> SlowQueryLogger<'a> { + pub fn new(sql: &'a str, threshold: Threshold) -> Self { + Self { + sql, + start: Instant::now(), + threshold, + } + } + + /// Creates a new [SlowQueryLogger] instance for general queries. + pub fn query(config: SlowQueryConfig, sql: &'a str) -> Self { + Self::new(sql, Threshold::Queries(config.queries)) + } + + /// Creates a new [SlowQueryLogger] instance for subscriptions. + pub fn subscription(config: SlowQueryConfig, sql: &'a str) -> Self { + Self::new(sql, Threshold::Subscriptions(config.subscriptions)) + } + + /// Creates a new [SlowQueryLogger] instance for incremental updates. + pub fn incremental_updates(config: SlowQueryConfig, sql: &'a str) -> Self { + Self::new(sql, Threshold::IncrementalUpdates(config.queries)) + } + + /// Log as `tracing::warn!` the query if it exceeds the threshold. + pub fn log(&self) -> Option { + let (kind, dur) = match self.threshold { + Threshold::IncrementalUpdates(dur) => ("IncrementalUpdates", dur), + Threshold::Subscriptions(dur) => ("Subscriptions", dur), + Threshold::Queries(dur) => ("Queries", dur), + }; + if let Some(dur) = dur { + let elapsed = self.start.elapsed(); + if elapsed > dur { + tracing::warn!(kind = kind, threshold = ?dur, elapsed = ?elapsed, sql = self.sql, "SLOW QUERY"); + return Some(elapsed); + } + }; + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::execution_context::ExecutionContext; + use crate::sql::compiler::compile_sql; + use crate::sql::execute::execute_sql; + use spacetimedb_lib::error::ResultTest; + use spacetimedb_lib::identity::AuthCtx; + + use crate::config::ReadConfigOption; + use crate::db::relational_db::tests_utils::TestDB; + use crate::db::relational_db::RelationalDB; + use spacetimedb_sats::{product, AlgebraicType}; + use spacetimedb_vm::relation::MemTable; + + fn run_query(db: &RelationalDB, sql: String) -> ResultTest { + let tx = db.begin_tx(); + let q = compile_sql(db, &tx, &sql)?; + Ok(execute_sql(db, &sql, q, AuthCtx::for_testing())?.pop().unwrap()) + } + + fn run_query_write(db: &RelationalDB, sql: String) -> ResultTest<()> { + let tx = db.begin_tx(); + let q = compile_sql(db, &tx, &sql)?; + drop(tx); + + execute_sql(db, &sql, q, AuthCtx::for_testing())?; + + Ok(()) + } + + #[test] + fn test_slow_queries() -> ResultTest<()> { + let db = TestDB::in_memory()?.db; + + let table_id = + db.create_table_for_test("test", &[("x", AlgebraicType::I32), ("y", AlgebraicType::I32)], &[])?; + + db.with_auto_commit(&ExecutionContext::default(), |tx| -> ResultTest<_> { + for i in 0..100_000 { + db.insert(tx, table_id, product![i, i * 2])?; + } + Ok(()) + })?; + let tx = db.begin_tx(); + + let sql = "select * from test where x > 0"; + let q = compile_sql(&db, &tx, sql)?; + + let slow = SlowQueryLogger::query(SlowQueryConfig::default().with_queries(Duration::from_millis(1)), sql); + + let result = execute_sql(&db, sql, q, AuthCtx::for_testing())?; + assert_eq!(result[0].data[0], product![1, 2]); + assert!(slow.log().is_some()); + + Ok(()) + } + + // Verify we can change the threshold at runtime + #[test] + fn test_runtime_config() -> ResultTest<()> { + let db = TestDB::in_memory()?.db; + + let config = db.read_config(); + + let check = |table: MemTable, x: Option| { + assert_eq!( + table.data[0] + .field_as_sum(0, None) + .unwrap() + .value + .as_u128() + .map(|x| x.0), + x.map(|x| x.as_millis()) + ); + }; + + // Check we can read the default config + let result = run_query(&db, format!("SHOW {}", ReadConfigOption::SlowQueryThreshold))?; + check(result, config.slow_query.queries); + let result = run_query(&db, format!("SHOW {}", ReadConfigOption::SlowSubscriptionsThreshold))?; + check(result, config.slow_query.subscriptions); + let result = run_query( + &db, + format!("SHOW {}", ReadConfigOption::SlowIncrementalUpdatesThreshold), + )?; + check(result, config.slow_query.incremental_updates); + // Check we can write a new config + run_query_write(&db, format!("SET {} TO 1", ReadConfigOption::SlowQueryThreshold))?; + run_query_write( + &db, + format!("SET {} TO 1", ReadConfigOption::SlowSubscriptionsThreshold), + )?; + run_query_write( + &db, + format!("SET {} TO 1", ReadConfigOption::SlowIncrementalUpdatesThreshold), + )?; + + let config = db.read_config(); + + assert_eq!(config.slow_query.queries, Some(Duration::from_millis(1))); + assert_eq!(config.slow_query.subscriptions, Some(Duration::from_millis(1))); + assert_eq!(config.slow_query.incremental_updates, Some(Duration::from_millis(1))); + + // And the new config + let result = run_query(&db, format!("SHOW {}", ReadConfigOption::SlowQueryThreshold))?; + check(result, config.slow_query.queries); + let result = run_query(&db, format!("SHOW {}", ReadConfigOption::SlowSubscriptionsThreshold))?; + check(result, config.slow_query.subscriptions); + let result = run_query( + &db, + format!("SHOW {}", ReadConfigOption::SlowIncrementalUpdatesThreshold), + )?; + check(result, config.slow_query.incremental_updates); + + // And disable the config + run_query_write(&db, format!("SET {} TO 0", ReadConfigOption::SlowQueryThreshold))?; + + let config = db.read_config(); + + let result = run_query(&db, format!("SHOW {}", ReadConfigOption::SlowQueryThreshold))?; + check(result, config.slow_query.queries); + Ok(()) + } +} diff --git a/crates/core/src/vm.rs b/crates/core/src/vm.rs index 9f325d5b088..f00e0906d59 100644 --- a/crates/core/src/vm.rs +++ b/crates/core/src/vm.rs @@ -523,6 +523,17 @@ impl<'db, 'tx> DbProgram<'db, 'tx> { TxMode::Tx(_) => unreachable!("mutable operation is invalid with read tx"), } } + + fn _set_config(&mut self, name: String, value: AlgebraicValue) -> Result { + self.db.set_config(&name, value)?; + Ok(Code::Pass) + } + + fn _read_config(&self, name: String) -> Result { + let config = self.db.read_config(); + + Ok(Code::Table(config.read_key_into_table(&name)?)) + } } impl ProgramVm for DbProgram<'_, '_> { @@ -582,6 +593,8 @@ impl ProgramVm for DbProgram<'_, '_> { CrudExpr::Delete { query } => self._delete_query(&query, sources), CrudExpr::CreateTable { table } => self._create_table(table), CrudExpr::Drop { name, kind, .. } => self._drop(&name, kind), + CrudExpr::SetVar { name, value } => self._set_config(name, value), + CrudExpr::ReadVar { name } => self._read_config(name), } } } diff --git a/crates/sqltest/src/space.rs b/crates/sqltest/src/space.rs index 7dcac52686c..bd03a879f6e 100644 --- a/crates/sqltest/src/space.rs +++ b/crates/sqltest/src/space.rs @@ -76,7 +76,7 @@ impl SpaceDb { pub(crate) fn run_sql(&self, sql: &str) -> anyhow::Result> { self.conn.with_read_only(&ExecutionContext::default(), |tx| { let ast = compile_sql(&self.conn, tx, sql)?; - let result = execute_sql(&self.conn, ast, self.auth)?; + let result = execute_sql(&self.conn, sql, ast, self.auth)?; //remove comments to see which SQL worked. Can't collect it outside from lack of a hook in the external `sqllogictest` crate... :( //append_file(&std::path::PathBuf::from(".ok.sql"), sql)?; Ok(result) diff --git a/crates/vm/src/errors.rs b/crates/vm/src/errors.rs index dd098f5af87..afc0dada2a2 100644 --- a/crates/vm/src/errors.rs +++ b/crates/vm/src/errors.rs @@ -1,10 +1,18 @@ use spacetimedb_sats::db::error::{AuthError, RelationError}; -use spacetimedb_sats::AlgebraicValue; +use spacetimedb_sats::{AlgebraicType, AlgebraicValue}; use std::fmt; use thiserror::Error; use crate::expr::SourceId; +#[derive(Error, Debug)] +pub enum ConfigError { + #[error("Config parameter `{0}` not found.")] + NotFound(String), + #[error("Value for config parameter `{0}` is invalid: `{1:?}`. Expected: `{2:?}`")] + TypeError(String, AlgebraicValue, AlgebraicType), +} + /// Typing Errors #[derive(Error, Debug)] pub enum ErrorType { @@ -29,6 +37,8 @@ pub enum ErrorVm { Unsupported(String), #[error("No source table with index {0:?}")] NoSuchSource(SourceId), + #[error("ConfigError: {0}")] + Config(#[from] ConfigError), #[error("{0}")] Other(#[from] anyhow::Error), } @@ -124,6 +134,7 @@ impl From for ErrorLang { ErrorVm::Unsupported(err) => ErrorLang::new(ErrorKind::Compiler, Some(&err)), ErrorVm::Lang(err) => err, ErrorVm::Auth(err) => ErrorLang::new(ErrorKind::Unauthorized, Some(&err.to_string())), + ErrorVm::Config(err) => ErrorLang::new(ErrorKind::Db, Some(&err.to_string())), err @ ErrorVm::NoSuchSource(_) => ErrorLang { kind: ErrorKind::Invalid, msg: Some(format!("{err:?}")), diff --git a/crates/vm/src/expr.rs b/crates/vm/src/expr.rs index f2d0d0a222a..01b07575301 100644 --- a/crates/vm/src/expr.rs +++ b/crates/vm/src/expr.rs @@ -768,6 +768,7 @@ pub enum Crud { Delete, Create(DbType), Drop(DbType), + Config, } #[derive(Debug, Eq, PartialEq)] @@ -792,6 +793,13 @@ pub enum CrudExpr { kind: DbType, table_access: StAccess, }, + SetVar { + name: String, + value: AlgebraicValue, + }, + ReadVar { + name: String, + }, } impl CrudExpr { @@ -803,7 +811,9 @@ impl CrudExpr { } pub fn is_reads<'a>(exprs: impl IntoIterator) -> bool { - exprs.into_iter().all(|expr| matches!(expr, CrudExpr::Query(_))) + exprs + .into_iter() + .all(|expr| matches!(expr, CrudExpr::Query(_) | CrudExpr::ReadVar { .. })) } } diff --git a/crates/vm/src/program.rs b/crates/vm/src/program.rs index e1962909a18..b9665088d7e 100644 --- a/crates/vm/src/program.rs +++ b/crates/vm/src/program.rs @@ -53,6 +53,12 @@ impl ProgramVm for Program { CrudExpr::Drop { .. } => { todo!() } + CrudExpr::SetVar { .. } => { + todo!() + } + CrudExpr::ReadVar { .. } => { + todo!() + } } } }