diff --git a/crates/core/src/db/commit_log.rs b/crates/core/src/db/commit_log.rs index f6d4dff728..c8632311de 100644 --- a/crates/core/src/db/commit_log.rs +++ b/crates/core/src/db/commit_log.rs @@ -328,7 +328,7 @@ impl CommitLogMut { let mut unwritten_commit = self.unwritten_commit.lock().unwrap(); let mut writes = Vec::with_capacity(tx_data.records.len()); - let txn_type = &ctx.txn_type(); + let workload = &ctx.workload(); let db = &ctx.database(); let reducer = &ctx.reducer_name().unwrap_or_default(); @@ -340,7 +340,7 @@ impl CommitLogMut { // Increment rows inserted metric DB_METRICS .rdb_num_rows_inserted - .with_label_values(txn_type, db, reducer, &table_id) + .with_label_values(workload, db, reducer, &table_id) .inc(); // Increment table rows gauge DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).inc(); @@ -350,7 +350,7 @@ impl CommitLogMut { // Increment rows deleted metric DB_METRICS .rdb_num_rows_deleted - .with_label_values(txn_type, db, reducer, &table_id) + .with_label_values(workload, db, reducer, &table_id) .inc(); // Decrement table rows gauge DB_METRICS.rdb_num_table_rows.with_label_values(db, &table_id).dec(); diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index f19520aaf5..2cb962470c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -21,7 +21,7 @@ use crate::db::datastore::system_tables::{ TABLE_ID_SEQUENCE_ID, WASM_MODULE, }; use crate::db::db_metrics::{DB_METRICS, MAX_TX_CPU_TIME}; -use crate::{db::datastore::system_tables, execution_context::TransactionType}; +use crate::{db::datastore::system_tables, execution_context::WorkloadType}; use crate::{ db::datastore::traits::{TxOp, TxRecord}, db::{ @@ -1721,7 +1721,7 @@ impl Drop for Iter<'_> { DB_METRICS .rdb_num_rows_fetched .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.into(), @@ -1841,7 +1841,7 @@ impl Drop for IndexSeekIterInner<'_> { DB_METRICS .rdb_num_index_seeks .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -1852,7 +1852,7 @@ impl Drop for IndexSeekIterInner<'_> { DB_METRICS .rdb_num_keys_scanned .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -1863,7 +1863,7 @@ impl Drop for IndexSeekIterInner<'_> { DB_METRICS .rdb_num_rows_fetched .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -1915,7 +1915,7 @@ impl Drop for CommittedIndexIter<'_> { DB_METRICS .rdb_num_index_seeks .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -1926,7 +1926,7 @@ impl Drop for CommittedIndexIter<'_> { DB_METRICS .rdb_num_keys_scanned .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -1937,7 +1937,7 @@ impl Drop for CommittedIndexIter<'_> { DB_METRICS .rdb_num_rows_fetched .with_label_values( - &self.ctx.txn_type(), + &self.ctx.workload(), &self.ctx.database(), self.ctx.reducer_name().unwrap_or_default(), &self.table_id.0, @@ -2093,28 +2093,28 @@ impl traits::MutTx for Locking { } fn rollback_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) { - let txn_type = &ctx.txn_type(); + let workload = &ctx.workload(); let db = &ctx.database(); let reducer = ctx.reducer_name().unwrap_or_default(); let elapsed_time = tx.timer.elapsed(); let cpu_time = elapsed_time - tx.lock_wait_time; DB_METRICS .rdb_num_txns - .with_label_values(txn_type, db, reducer, &false) + .with_label_values(workload, db, reducer, &false) .inc(); DB_METRICS .rdb_txn_cpu_time_sec - .with_label_values(txn_type, db, reducer) + .with_label_values(workload, db, reducer) .observe(cpu_time.as_secs_f64()); DB_METRICS .rdb_txn_elapsed_time_sec - .with_label_values(txn_type, db, reducer) + .with_label_values(workload, db, reducer) .observe(elapsed_time.as_secs_f64()); tx.lock.rollback(); } fn commit_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) -> super::Result> { - let txn_type = &ctx.txn_type(); + let workload = &ctx.workload(); let db = &ctx.database(); let reducer = ctx.reducer_name().unwrap_or_default(); let elapsed_time = tx.timer.elapsed(); @@ -2126,18 +2126,18 @@ impl traits::MutTx for Locking { // That is, transactions that don't write any rows to the commit log. DB_METRICS .rdb_num_txns - .with_label_values(txn_type, db, reducer, &true) + .with_label_values(workload, db, reducer, &true) .inc(); DB_METRICS .rdb_txn_cpu_time_sec - .with_label_values(txn_type, db, reducer) + .with_label_values(workload, db, reducer) .observe(cpu_time); DB_METRICS .rdb_txn_elapsed_time_sec - .with_label_values(txn_type, db, reducer) + .with_label_values(workload, db, reducer) .observe(elapsed_time); - fn hash(a: &TransactionType, b: &Address, c: &str) -> u64 { + fn hash(a: &WorkloadType, b: &Address, c: &str) -> u64 { use std::hash::Hash; let mut hasher = DefaultHasher::new(); a.hash(&mut hasher); @@ -2148,7 +2148,7 @@ impl traits::MutTx for Locking { let mut guard = MAX_TX_CPU_TIME.lock().unwrap(); let max_cpu_time = *guard - .entry(hash(txn_type, db, reducer)) + .entry(hash(workload, db, reducer)) .and_modify(|max| { if cpu_time > *max { *max = cpu_time; @@ -2159,7 +2159,7 @@ impl traits::MutTx for Locking { drop(guard); DB_METRICS .rdb_txn_cpu_time_sec_max - .with_label_values(txn_type, db, reducer) + .with_label_values(workload, db, reducer) .set(max_cpu_time); tx.lock.commit() diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 65b6eef331..d10ce50de0 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Mutex}; -use crate::{execution_context::TransactionType, host::AbiCall, util::typed_prometheus::metrics_group}; +use crate::{execution_context::WorkloadType, host::AbiCall, util::typed_prometheus::metrics_group}; use once_cell::sync::Lazy; use prometheus::{GaugeVec, Histogram, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_lib::Address; @@ -60,52 +60,52 @@ metrics_group!( #[name = spacetime_num_rows_inserted_cumulative] #[help = "The cumulative number of rows inserted into a table"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)] pub rdb_num_rows_inserted: IntCounterVec, #[name = spacetime_num_rows_deleted_cumulative] #[help = "The cumulative number of rows deleted from a table"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)] pub rdb_num_rows_deleted: IntCounterVec, #[name = spacetime_num_rows_fetched_cumulative] #[help = "The cumulative number of rows fetched from a table"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)] pub rdb_num_rows_fetched: IntCounterVec, #[name = spacetime_num_index_keys_scanned_cumulative] #[help = "The cumulative number of keys scanned from an index"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)] pub rdb_num_keys_scanned: IntCounterVec, #[name = spacetime_num_index_seeks_cumulative] #[help = "The cumulative number of index seeks"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, table_id: u32)] pub rdb_num_index_seeks: IntCounterVec, #[name = spacetime_num_txns_cumulative] #[help = "The cumulative number of transactions, including both commits and rollbacks"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, committed: bool)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, committed: bool)] pub rdb_num_txns: IntCounterVec, #[name = spacetime_txn_elapsed_time_sec] #[help = "The total elapsed (wall) time of a transaction (in seconds)"] - #[labels(txn_type: TransactionType, db: Address, reducer: str)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str)] pub rdb_txn_elapsed_time_sec: HistogramVec, #[name = spacetime_txn_cpu_time_sec] #[help = "The time spent executing a transaction (in seconds), excluding time spent waiting to acquire database locks"] - #[labels(txn_type: TransactionType, db: Address, reducer: str)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str)] pub rdb_txn_cpu_time_sec: HistogramVec, #[name = spacetime_txn_cpu_time_sec_max] #[help = "The cpu time of the longest running transaction (in seconds)"] - #[labels(txn_type: TransactionType, db: Address, reducer: str)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str)] pub rdb_txn_cpu_time_sec_max: GaugeVec, #[name = spacetime_wasm_abi_call_duration_sec] #[help = "The total duration of a spacetime wasm abi call (in seconds); includes row serialization and copying into wasm memory"] - #[labels(txn_type: TransactionType, db: Address, reducer: str, call: AbiCall)] + #[labels(txn_type: WorkloadType, db: Address, reducer: str, call: AbiCall)] pub wasm_abi_call_duration_sec: HistogramVec, #[name = spacetime_message_log_size_bytes] diff --git a/crates/core/src/execution_context.rs b/crates/core/src/execution_context.rs index 2141fcd693..a4ab05c5e8 100644 --- a/crates/core/src/execution_context.rs +++ b/crates/core/src/execution_context.rs @@ -15,26 +15,28 @@ pub struct ExecutionContext<'a> { reducer: Option<&'a str>, /// The SQL query being executed, if any. /// Note: this will never be set at the same time as `reducer`. - /// It is also NOT guaranteed to be set, even if txn_type == Sql. + /// It is also NOT guaranteed to be set, even if workload == Sql. /// This is because some transactions tagged "SQL" don't exactly correspond /// to any particular query. query_debug_info: Option<&'a QueryDebugInfo>, - // The type of transaction that is being executed. - txn_type: TransactionType, + // The type of workload that is being executed. + workload: WorkloadType, } -/// Classifies a transaction according to where it originates. +/// Classifies a transaction according to its workload. /// A transaction can be executing a reducer. /// It can be used to satisfy a one-off sql query or subscription. /// It can also be an internal operation that is not associated with a reducer or sql request. #[derive(Clone, Copy, Display, Hash, PartialEq, Eq)] -pub enum TransactionType { +pub enum WorkloadType { Reducer, Sql, + Subscribe, + Update, Internal, } -impl Default for TransactionType { +impl Default for WorkloadType { fn default() -> Self { Self::Internal } @@ -47,17 +49,37 @@ impl<'a> ExecutionContext<'a> { database, reducer: Some(name), query_debug_info: None, - txn_type: TransactionType::Reducer, + workload: WorkloadType::Reducer, } } - /// Returns an [ExecutionContext] for a sql or subscription transaction. + /// Returns an [ExecutionContext] for a one-off sql query. pub fn sql(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { Self { database, reducer: None, query_debug_info, - txn_type: TransactionType::Sql, + workload: WorkloadType::Sql, + } + } + + /// Returns an [ExecutionContext] for an initial subscribe call. + pub fn subscribe(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { + Self { + database, + reducer: None, + query_debug_info, + workload: WorkloadType::Subscribe, + } + } + + /// Returns an [ExecutionContext] for a subscription update. + pub fn incremental_update(database: Address, query_debug_info: Option<&'a QueryDebugInfo>) -> Self { + Self { + database, + reducer: None, + query_debug_info, + workload: WorkloadType::Update, } } @@ -67,7 +89,7 @@ impl<'a> ExecutionContext<'a> { database, reducer: None, query_debug_info: None, - txn_type: TransactionType::Internal, + workload: WorkloadType::Internal, } } @@ -91,9 +113,9 @@ impl<'a> ExecutionContext<'a> { self.query_debug_info } - /// Returns the type of transaction that is being executed. + /// Returns the type of workload that is being executed. #[inline] - pub fn txn_type(&self) -> TransactionType { - self.txn_type + pub fn workload(&self) -> WorkloadType { + self.workload } } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 131c7e2ffa..58772c9ff3 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -161,12 +161,12 @@ impl WasmInstanceEnv { /// Gather the appropriate metadata and log a wasm_abi_call_duration_ns with the given AbiCall & duration fn start_abi_call_timer(&self, call: AbiCall) -> prometheus::HistogramTimer { let ctx = self.reducer_context(); - let txn_type = ctx.txn_type(); + let workload = ctx.workload(); let db = ctx.database(); DB_METRICS .wasm_abi_call_duration_sec - .with_label_values(&txn_type, &db, &self.reducer_name, &call) + .with_label_values(&workload, &db, &self.reducer_name, &call) .start_timer() } diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 5a2216ecc0..d1c1b8ab8e 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -60,16 +60,15 @@ fn collect_result(result: &mut Vec, r: CodeResult) -> Result<(), DBErr Ok(()) } -#[tracing::instrument(skip(db, tx, auth))] +#[tracing::instrument(skip_all)] pub fn execute_single_sql( + cx: &ExecutionContext, db: &RelationalDB, tx: &mut MutTxId, ast: CrudExpr, - query_debug_info: Option<&QueryDebugInfo>, auth: AuthCtx, ) -> Result, DBError> { - let ctx = ExecutionContext::sql(db.address(), query_debug_info); - let p = &mut DbProgram::new(&ctx, db, tx, auth); + let p = &mut DbProgram::new(cx, db, tx, auth); let q = Expr::Crud(Box::new(ast)); let mut result = Vec::with_capacity(1); diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 156a9ad3c0..1083fdc071 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -170,7 +170,7 @@ impl ModuleSubscriptionActor { // Note: the missing QueryDebugInfo here is only used for finishing the transaction; // all of the relevant queries already executed, with debug info, in _add_subscription - let ctx = ExecutionContext::sql(self.relational_db.address(), None); + let ctx = ExecutionContext::subscribe(self.relational_db.address(), None); self.relational_db.finish_tx(&ctx, tx, result) } @@ -218,7 +218,7 @@ impl ModuleSubscriptionActor { //Split logic to properly handle `Error` + `Tx` let mut tx = self.relational_db.begin_tx(); let result = self._broadcast_commit_event(event, &mut tx).await; - let ctx = ExecutionContext::sql(self.relational_db.address(), None); + let ctx = ExecutionContext::incremental_update(self.relational_db.address(), None); self.relational_db.finish_tx(&ctx, tx, result) } } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 6cb5486d6b..43bbae9407 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -1,6 +1,7 @@ use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::db::relational_db::RelationalDB; use crate::error::{DBError, SubscriptionError}; +use crate::execution_context::ExecutionContext; use crate::host::module_host::DatabaseTableUpdate; use crate::sql::compiler::compile_sql; use crate::sql::execute::execute_single_sql; @@ -67,13 +68,13 @@ pub fn to_mem_table(of: QueryExpr, data: &DatabaseTableUpdate) -> QueryExpr { /// Runs a query that evaluates if the changes made should be reported to the [ModuleSubscriptionManager] #[tracing::instrument(skip_all)] pub(crate) fn run_query( + cx: &ExecutionContext, db: &RelationalDB, tx: &mut MutTxId, query: &QueryExpr, - query_debug_info: Option<&QueryDebugInfo>, auth: AuthCtx, ) -> Result, DBError> { - execute_single_sql(db, tx, CrudExpr::Query(query.clone()), query_debug_info, auth) + execute_single_sql(cx, db, tx, CrudExpr::Query(query.clone()), auth) } // TODO: It's semantically wrong to `SUBSCRIBE_TO_ALL_QUERY` @@ -130,7 +131,7 @@ pub fn compile_read_only_query( if !queries.is_empty() { Ok(queries .into_iter() - .map(|query| SupportedQuery::new(query, Some(info.clone()))) + .map(|query| SupportedQuery::new(query, info.clone())) .collect::>()?) } else { Err(SubscriptionError::Empty.into()) @@ -338,7 +339,7 @@ mod tests { data: &DatabaseTableUpdate, ) -> ResultTest<()> { let q = to_mem_table(q.clone(), data); - let result = run_query(db, tx, &q, None, AuthCtx::for_testing())?; + let result = run_query(&ExecutionContext::default(), db, tx, &q, AuthCtx::for_testing())?; assert_eq!( Some(table.as_without_table_name()), @@ -807,10 +808,10 @@ mod tests { let q = to_mem_table(q, &data); //Try access the private table match run_query( + &ExecutionContext::default(), &db, &mut tx, &q, - None, AuthCtx::new(Identity::__dummy(), Identity::from_byte_array([1u8; 32])), ) { Ok(_) => { @@ -933,7 +934,13 @@ mod tests { let qset = compile_read_only_query(&db, &tx, &AuthCtx::for_testing(), sql_query)?; for q in qset { - let result = run_query(&db, &mut tx, q.as_expr(), None, AuthCtx::for_testing())?; + let result = run_query( + &ExecutionContext::default(), + &db, + &mut tx, + q.as_expr(), + AuthCtx::for_testing(), + )?; assert_eq!(result.len(), 1, "Join query did not return any rows"); } diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 468c235e3c..a196068a4b 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -30,6 +30,7 @@ use std::ops::Deref; use crate::db::datastore::locking_tx_datastore::MutTxId; use crate::error::DBError; +use crate::execution_context::ExecutionContext; use crate::sql::query_debug_info::QueryDebugInfo; use crate::subscription::query::{run_query, OP_TYPE_FIELD_NAME}; use crate::{ @@ -84,18 +85,13 @@ impl Subscription { pub struct SupportedQuery { kind: query::Supported, expr: QueryExpr, - /// The text of the query, available on a best-effort basis. - query_debug_info: Option, + info: QueryDebugInfo, } impl SupportedQuery { - pub fn new(expr: QueryExpr, query_debug_info: Option) -> Result { + pub fn new(expr: QueryExpr, info: QueryDebugInfo) -> Result { let kind = query::classify(&expr).context("Unsupported query expression")?; - Ok(Self { - kind, - expr, - query_debug_info, - }) + Ok(Self { kind, expr, info }) } pub fn kind(&self) -> query::Supported { @@ -107,6 +103,7 @@ impl SupportedQuery { } } +#[cfg(test)] impl TryFrom for SupportedQuery { type Error = DBError; @@ -115,7 +112,7 @@ impl TryFrom for SupportedQuery { Ok(Self { kind, expr, - query_debug_info: None, + info: QueryDebugInfo::from_source(""), }) } } @@ -163,6 +160,7 @@ impl Extend for QuerySet { } } +#[cfg(test)] impl TryFrom for QuerySet { type Error = DBError; @@ -198,7 +196,7 @@ impl QuerySet { .map(|src| SupportedQuery { kind: query::Supported::Scan, expr: QueryExpr::new(src), - query_debug_info: Some(QueryDebugInfo::from_source(format!("SELECT * FROM {}", src.table_name))), + info: QueryDebugInfo::from_source(format!("SELECT * FROM {}", src.table_name)), }) .collect(); @@ -213,7 +211,7 @@ impl QuerySet { #[tracing::instrument(skip_all)] pub fn eval_incr( &self, - relational_db: &RelationalDB, + db: &RelationalDB, tx: &mut MutTxId, database_update: &DatabaseUpdate, auth: AuthCtx, @@ -222,12 +220,7 @@ impl QuerySet { let mut table_ops = HashMap::new(); let mut seen = HashSet::new(); - for SupportedQuery { - kind, - expr, - query_debug_info, - } in self - { + for SupportedQuery { kind, expr, info } in self { use query::Supported::*; match kind { Scan => { @@ -245,7 +238,7 @@ impl QuerySet { let plan = query::to_mem_table(expr.clone(), table); // Evaluate the new plan and capture the new row operations - for op in eval_incremental(relational_db, tx, &auth, &plan, query_debug_info.as_ref())? + for op in eval_incremental(db, tx, &auth, &plan, info)? .filter_map(|op| seen.insert((table.table_id, op.row_pk)).then(|| op.into())) { table_row_operations.push(op); @@ -254,9 +247,7 @@ impl QuerySet { } Semijoin => { - if let Some(plan) = - IncrementalJoin::new(expr, query_debug_info.as_ref(), database_update.tables.iter())? - { + if let Some(plan) = IncrementalJoin::new(expr, info, database_update.tables.iter())? { let table_id = plan.lhs.table.table_id; let header = &plan.lhs.table.head; @@ -267,7 +258,7 @@ impl QuerySet { // Evaluate the plan and capture the new row operations for op in plan - .eval(relational_db, tx, &auth)? + .eval(db, tx, &auth)? .filter_map(|op| seen.insert((table_id, op.row_pk)).then(|| op.into())) { table_row_operations.push(op); @@ -294,26 +285,24 @@ impl QuerySet { /// /// This is a *major* difference with normal query execution, where is expected to return the full result set for each query. #[tracing::instrument(skip_all)] - pub fn eval( - &self, - relational_db: &RelationalDB, - tx: &mut MutTxId, - auth: AuthCtx, - ) -> Result { + pub fn eval(&self, db: &RelationalDB, tx: &mut MutTxId, auth: AuthCtx) -> Result { let mut database_update: DatabaseUpdate = DatabaseUpdate { tables: vec![] }; let mut table_ops = HashMap::new(); let mut seen = HashSet::new(); - for SupportedQuery { - expr, query_debug_info, .. - } in self - { + for SupportedQuery { expr, info, .. } in self { if let Some(t) = expr.source.get_db_table() { // Get the TableOps for this table let (_, table_row_operations) = table_ops .entry(t.table_id) .or_insert_with(|| (t.head.table_name.clone(), vec![])); - for table in run_query(relational_db, tx, expr, query_debug_info.as_ref(), auth)? { + for table in run_query( + &ExecutionContext::subscribe(db.address(), Some(info)), + db, + tx, + expr, + auth, + )? { for row in table.data { let row_pk = pk_for_row(&row); @@ -377,9 +366,10 @@ fn eval_incremental( tx: &mut MutTxId, auth: &AuthCtx, expr: &QueryExpr, - query_debug_info: Option<&QueryDebugInfo>, + info: &QueryDebugInfo, ) -> Result, DBError> { - let results = run_query(db, tx, expr, query_debug_info, *auth)?; + let ctx = &ExecutionContext::incremental_update(db.address(), Some(info)); + let results = run_query(ctx, db, tx, expr, *auth)?; let ops = results .into_iter() .filter(|result| !result.data.is_empty()) @@ -419,7 +409,7 @@ fn eval_incremental( /// Helper for evaluating a [`query::Supported::Semijoin`]. struct IncrementalJoin<'a> { expr: &'a QueryExpr, - query_debug_info: Option<&'a QueryDebugInfo>, + info: &'a QueryDebugInfo, lhs: JoinSide<'a>, rhs: JoinSide<'a>, } @@ -470,7 +460,7 @@ impl<'a> IncrementalJoin<'a> { /// An error is returned if the expression is not well-formed. pub fn new( expr: &'a QueryExpr, - query_debug_info: Option<&'a QueryDebugInfo>, + info: &'a QueryDebugInfo, updates: impl Iterator, ) -> anyhow::Result> { let mut lhs = expr @@ -514,12 +504,7 @@ impl<'a> IncrementalJoin<'a> { if lhs.updates.ops.is_empty() && rhs.updates.ops.is_empty() { Ok(None) } else { - Ok(Some(Self { - expr, - query_debug_info, - lhs, - rhs, - })) + Ok(Some(Self { expr, info, lhs, rhs })) } } @@ -561,6 +546,7 @@ impl<'a> IncrementalJoin<'a> { tx: &mut MutTxId, auth: &AuthCtx, ) -> Result, DBError> { + let ctx = &ExecutionContext::incremental_update(db.address(), Some(self.info)); let mut inserts = { // Replan query after replacing left table with virtual table, // since join order may need to be reversed. @@ -568,9 +554,9 @@ impl<'a> IncrementalJoin<'a> { let rhs_virt = self.to_mem_table_rhs(self.rhs.inserts()); // {A+ join B} - let a = eval_incremental(db, tx, auth, &lhs_virt, self.query_debug_info)?; + let a = eval_incremental(db, tx, auth, &lhs_virt, self.info)?; // {A join B+} - let b = run_query(db, tx, &rhs_virt, self.query_debug_info, *auth)? + let b = run_query(ctx, db, tx, &rhs_virt, *auth)? .into_iter() .filter(|result| !result.data.is_empty()) .flat_map(|result| { @@ -594,9 +580,9 @@ impl<'a> IncrementalJoin<'a> { let rhs_virt = self.to_mem_table_rhs(self.rhs.deletes()); // {A- join B} - let a = eval_incremental(db, tx, auth, &lhs_virt, self.query_debug_info)?; + let a = eval_incremental(db, tx, auth, &lhs_virt, self.info)?; // {A join B-} - let b = run_query(db, tx, &rhs_virt, self.query_debug_info, *auth)? + let b = run_query(ctx, db, tx, &rhs_virt, *auth)? .into_iter() .filter(|result| !result.data.is_empty()) .flat_map(|result| { @@ -614,7 +600,7 @@ impl<'a> IncrementalJoin<'a> { tx, auth, &query::to_mem_table(rhs_virt, &self.lhs.deletes()), - self.query_debug_info, + self.info, )?; // {A- join B} U {A join B-} U {A- join B-} let mut set = a.map(|op| (op.row_pk, op)).collect::>(); diff --git a/crates/core/src/util/typed_prometheus.rs b/crates/core/src/util/typed_prometheus.rs index e24cdf7af9..3834e648ac 100644 --- a/crates/core/src/util/typed_prometheus.rs +++ b/crates/core/src/util/typed_prometheus.rs @@ -128,7 +128,7 @@ macro_rules! metrics_vec { } pub use metrics_vec; -use crate::{execution_context::TransactionType, host::AbiCall}; +use crate::{execution_context::WorkloadType, host::AbiCall}; pub trait AsPrometheusLabel { type Str<'a>: AsRef + 'a @@ -156,7 +156,7 @@ impl_prometheusvalue_string!( Hash, Identity, Address, - TransactionType, + WorkloadType, AbiCall, bool, u8,