Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow query log #1052

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use spacetimedb::execution_context::ExecutionContext;
use spacetimedb::host::module_host::{DatabaseTableUpdate, DatabaseUpdate};
use spacetimedb::subscription::query::compile_read_only_query;
use spacetimedb::subscription::subscription::ExecutionSet;
use spacetimedb::util::slow::SlowQueryConfig;
use spacetimedb_bench::database::BenchDatabase as _;
use spacetimedb_bench::spacetime_raw::SpacetimeRaw;
use spacetimedb_lib::identity::AuthCtx;
Expand Down Expand Up @@ -107,7 +108,7 @@ fn eval(c: &mut Criterion) {
let tx = raw.db.begin_tx();
let query = compile_read_only_query(&raw.db, &tx, &auth, sql).unwrap();
let query: ExecutionSet = query.into();
let ctx = &ExecutionContext::subscribe(raw.db.address());
let ctx = &ExecutionContext::subscribe(raw.db.address(), SlowQueryConfig::default());
b.iter(|| drop(black_box(query.eval(ctx, Protocol::Binary, &raw.db, &tx).unwrap())))
});
};
Expand All @@ -130,7 +131,7 @@ fn eval(c: &mut Criterion) {
);
bench_eval(c, "full-join", &name);

let ctx_incr = &ExecutionContext::incremental_update(raw.db.address());
let ctx_incr = &ExecutionContext::incremental_update(raw.db.address(), SlowQueryConfig::default());

// To profile this benchmark for 30s
// samply record -r 10000000 cargo bench --bench=subscription --profile=profiling -- incr-select --exact --profile-time=30
Expand Down
117 changes: 117 additions & 0 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
use crate::util::slow::{SlowQueryConfig, Threshold};
use spacetimedb_sats::{product, AlgebraicType, AlgebraicValue, ProductType};
use spacetimedb_vm::dsl::mem_table;
use spacetimedb_vm::errors::{ConfigError, ErrorVm};
use spacetimedb_vm::relation::MemTable;
use std::env::temp_dir;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;

#[cfg(not(any(target_os = "macos", target_os = "windows")))]
mod paths {
Expand Down Expand Up @@ -152,3 +160,112 @@ impl SpacetimeDbFiles for FilesGlobal {
paths::config_path()
}
}

/// Enumeration of options for reading configuration settings.
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum ReadConfigOption {
SlowQueryThreshold,
SlowIncrementalUpdatesThreshold,
SlowSubscriptionsThreshold,
}

impl ReadConfigOption {
pub fn type_of(&self) -> AlgebraicType {
AlgebraicType::U64
}
}

impl Display for ReadConfigOption {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let value = match self {
ReadConfigOption::SlowQueryThreshold => "slow_ad_hoc_query_ms",
ReadConfigOption::SlowIncrementalUpdatesThreshold => "slow_tx_update_ms",
ReadConfigOption::SlowSubscriptionsThreshold => "slow_subscription_query_ms",
};
write!(f, "{value}")
}
}

impl FromStr for ReadConfigOption {
type Err = ConfigError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"slow_ad_hoc_query_ms" => Ok(Self::SlowQueryThreshold),
"slow_tx_update_ms" => Ok(Self::SlowIncrementalUpdatesThreshold),
"slow_subscription_query_ms" => Ok(Self::SlowSubscriptionsThreshold),
x => Err(ConfigError::NotFound(x.into())),
}
}
}

pub enum SetConfigOption {
SlowQuery(Threshold),
}

/// Holds a list of the runtime configurations settings of the database
#[derive(Debug, Clone, Copy)]
pub struct DatabaseConfig {
pub(crate) slow_query: SlowQueryConfig,
}

impl DatabaseConfig {
/// Creates a new `DatabaseConfig` with the specified slow query settings.
pub fn with_slow_query(slow_query: SlowQueryConfig) -> Self {
Self { slow_query }
}

/// Reads a configuration setting specified by parsing `key`.
pub fn read(&self, key: &str) -> Result<Option<Duration>, ConfigError> {
let key = ReadConfigOption::from_str(key)?;

Ok(match key {
ReadConfigOption::SlowQueryThreshold => self.slow_query.queries,
ReadConfigOption::SlowIncrementalUpdatesThreshold => self.slow_query.incremental_updates,
ReadConfigOption::SlowSubscriptionsThreshold => self.slow_query.subscriptions,
})
}

/// Reads a configuration setting specified by parsing `key` and converts it into a `MemTable`.
///
/// For returning as `table` for `SQL` queries.
pub fn read_key_into_table(&self, key: &str) -> Result<MemTable, ConfigError> {
let value = if let Some(value) = self.read(key)? {
value.as_millis()
} else {
0u128
};

let value: AlgebraicValue = if value == 0 {
AlgebraicValue::OptionNone()
} else {
AlgebraicValue::OptionSome(value.into())
};

let head = ProductType::from([(key, value.type_of())]);

Ok(mem_table(head, vec![product!(value)]))
}

/// Writes the configuration setting specified by parsing `key` and `value`.
pub fn set_config(&mut self, key: &str, value: AlgebraicValue) -> Result<(), ErrorVm> {
let config = ReadConfigOption::from_str(key)?;
let millis = if let Some(value) = value.as_u64() {
if *value == 0 {
None
} else {
Some(Duration::from_millis(*value))
}
} else {
return Err(ConfigError::TypeError(key.into(), value, AlgebraicType::U64).into());
};

match config {
ReadConfigOption::SlowQueryThreshold => self.slow_query.queries = millis,
ReadConfigOption::SlowIncrementalUpdatesThreshold => self.slow_query.incremental_updates = millis,
ReadConfigOption::SlowSubscriptionsThreshold => self.slow_query.subscriptions = millis,
};

Ok(())
}
}
17 changes: 17 additions & 0 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,22 @@ use super::datastore::{
};
use super::relational_operators::Relation;
use crate::address::Address;
use crate::config::DatabaseConfig;
use crate::db::db_metrics::DB_METRICS;
use crate::error::{DBError, DatabaseError, TableError};
use crate::execution_context::ExecutionContext;
use crate::hash::Hash;
use crate::util::slow::SlowQueryConfig;
use fs2::FileExt;
use parking_lot::RwLock;
use spacetimedb_commitlog as commitlog;
use spacetimedb_durability::{self as durability, Durability};
use spacetimedb_primitives::*;
use spacetimedb_sats::db::auth::{StAccess, StTableType};
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, SequenceDef, TableDef, TableSchema};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_vm::errors::ErrorVm;
use std::borrow::Cow;
use std::fs::{create_dir_all, File};
use std::io;
Expand Down Expand Up @@ -59,6 +63,7 @@ pub struct RelationalDB {

// Release file lock last when dropping.
_lock: Arc<File>,
config: Arc<RwLock<DatabaseConfig>>,
}

impl std::fmt::Debug for RelationalDB {
Expand Down Expand Up @@ -136,6 +141,9 @@ impl RelationalDB {
disk_size_fn,

_lock: Arc::new(lock),
config: Arc::new(RwLock::new(DatabaseConfig::with_slow_query(
SlowQueryConfig::with_defaults(),
))),
})
}

Expand Down Expand Up @@ -750,6 +758,15 @@ impl RelationalDB {
pub(crate) fn set_program_hash(&self, tx: &mut MutTx, fence: u128, hash: Hash) -> Result<(), DBError> {
self.inner.set_program_hash(tx, fence, hash)
}

/// Set a runtime configurations setting of the database
pub fn set_config(&self, key: &str, value: AlgebraicValue) -> Result<(), ErrorVm> {
self.config.write().set_config(key, value)
}
/// Read the runtime configurations settings of the database
pub fn read_config(&self) -> DatabaseConfig {
*self.config.read()
}
}

#[cfg(any(test, feature = "test"))]
Expand Down
27 changes: 18 additions & 9 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use spacetimedb_lib::Address;
use spacetimedb_primitives::TableId;

use crate::db::db_metrics::DB_METRICS;
use crate::util::slow::SlowQueryConfig;

pub enum MetricType {
IndexSeeks,
Expand Down Expand Up @@ -118,6 +119,8 @@ pub struct ExecutionContext {
workload: WorkloadType,
/// The Metrics to be reported for this transaction.
pub metrics: Arc<RwLock<Metrics>>,
/// Configuration threshold for detecting slow queries.
pub slow_query_config: SlowQueryConfig,
}

/// Classifies a transaction according to its workload.
Expand All @@ -141,38 +144,44 @@ impl Default for WorkloadType {

impl ExecutionContext {
/// Returns an [ExecutionContext] with the provided parameters and empty metrics.
fn new(database: Address, reducer: Option<String>, workload: WorkloadType) -> Self {
fn new(
database: Address,
reducer: Option<String>,
workload: WorkloadType,
slow_query_config: SlowQueryConfig,
) -> Self {
Self {
database,
reducer,
workload,
metrics: <_>::default(),
slow_query_config,
}
}

/// Returns an [ExecutionContext] for a reducer transaction.
pub fn reducer(database: Address, name: String) -> Self {
Self::new(database, Some(name), WorkloadType::Reducer)
Self::new(database, Some(name), WorkloadType::Reducer, Default::default())
}

/// Returns an [ExecutionContext] for a one-off sql query.
pub fn sql(database: Address) -> Self {
Self::new(database, None, WorkloadType::Sql)
pub fn sql(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Sql, slow_query_config)
}

/// Returns an [ExecutionContext] for an initial subscribe call.
pub fn subscribe(database: Address) -> Self {
Self::new(database, None, WorkloadType::Subscribe)
pub fn subscribe(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Subscribe, slow_query_config)
}

/// Returns an [ExecutionContext] for a subscription update.
pub fn incremental_update(database: Address) -> Self {
Self::new(database, None, WorkloadType::Update)
pub fn incremental_update(database: Address, slow_query_config: SlowQueryConfig) -> Self {
Self::new(database, None, WorkloadType::Update, slow_query_config)
}

/// Returns an [ExecutionContext] for an internal database operation.
pub fn internal(database: Address) -> Self {
Self::new(database, None, WorkloadType::Internal)
Self::new(database, None, WorkloadType::Internal, Default::default())
}

/// Returns the address of the database on which we are operating.
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
let db = &self.database_instance_context.relational_db;
let auth = AuthCtx::new(self.database_instance_context.identity, caller_identity);
log::debug!("One-off query: {query}");
let ctx = &ExecutionContext::sql(db.address());
// Don't need the `slow query` logger on compilation
let ctx = &ExecutionContext::sql(db.address(), db.read_config().slow_query);
let compiled: Vec<_> = db.with_read_only(ctx, |tx| {
let ast = sql::compiler::compile_sql(db, tx, &query)?;
ast.into_iter()
Expand All @@ -282,7 +283,7 @@ impl<T: WasmModule> Module for WasmModuleHostActor<T> {
.collect::<Result<_, _>>()
})?;

sql::execute::execute_sql(db, compiled, auth)
sql::execute::execute_sql(db, &query, compiled, auth)
}

fn clear_table(&self, table_name: &str) -> Result<(), anyhow::Error> {
Expand Down
Loading
Loading