diff --git a/Cargo.lock b/Cargo.lock index 62f9b944849..01ec071dcf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5997,6 +5997,7 @@ dependencies = [ "fs2", "fs_extra", "futures", + "futures-util", "hashbrown 0.15.3", "hex", "hostname", @@ -8157,6 +8158,7 @@ checksum = "f38dbf42dc56a6fe41ccd77211ea8ec90855de05e52cd00df5a0a3bca87d6147" dependencies = [ "addr2line 0.22.0", "anyhow", + "async-trait", "bitflags 2.9.0", "bumpalo", "cc", @@ -8187,6 +8189,7 @@ dependencies = [ "wasmtime-component-macro", "wasmtime-cranelift", "wasmtime-environ", + "wasmtime-fiber", "wasmtime-jit-icache-coherence", "wasmtime-slab", "wasmtime-versioned-export-macros", @@ -8293,6 +8296,21 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "wasmtime-fiber" +version = "25.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "665ccc1bb0f28496e6fa02e94c575ee9ad6e3202c7df8591e5dda78106d5aa4a" +dependencies = [ + "anyhow", + "cc", + "cfg-if", + "rustix 0.38.44", + "wasmtime-asm-macros", + "wasmtime-versioned-export-macros", + "windows-sys 0.52.0", +] + [[package]] name = "wasmtime-jit-icache-coherence" version = "25.0.3" diff --git a/Cargo.toml b/Cargo.toml index 446a3e35c9e..30ea3f38c1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,6 +182,7 @@ fs_extra = "1.3.0" fs2 = "0.4.3" futures = "0.3" futures-channel = "0.3" +futures-util = "0.3" getrandom02 = { package = "getrandom", version = "0.2" } glob = "0.3.1" hashbrown = { version = "0.15", default-features = false, features = ["equivalent", "inline-more"] } @@ -312,6 +313,7 @@ version = "25" default-features = false features = [ "addr2line", + "async", "cache", "cranelift", "demangle", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 142a19e7484..c3c3053f907 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -58,6 +58,7 @@ enum-map.workspace = true flate2.workspace = true fs2.workspace = true futures.workspace = true +futures-util.workspace = true hashbrown = { workspace = true, features = ["rayon", "serde"] } hex.workspace = true hostname.workspace = true diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 75b0a7b0cc7..f508bbbc5be 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -16,7 +16,7 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; use crate::util::asyncify; -use crate::util::jobs::{JobCore, JobCores}; +use crate::util::jobs::{JobCores, SingleCoreExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use async_trait::async_trait; @@ -583,7 +583,7 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result<(Program, ModuleHost)> { // `make_actor` is blocking, as it needs to compile the wasm to native code, // which may be computationally expensive - sometimes up to 1s for a large module. @@ -591,6 +591,8 @@ async fn make_module_host( // threads, but those aren't for computation. Also, wasmtime uses rayon // to run compilation in parallel, so it'll need to run stuff in rayon anyway. asyncify(move || { + let database_identity = replica_ctx.database_identity; + let mcc = ModuleCreationContext { replica_ctx, scheduler, @@ -603,12 +605,12 @@ async fn make_module_host( HostType::Wasm => { let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, core) + ModuleHost::new(actor, unregister, executor, database_identity) } HostType::Js => { let actor = runtimes.v8.make_actor(mcc)?; trace!("v8::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, core) + ModuleHost::new(actor, unregister, executor, database_identity) } }; Ok((program, module_host)) @@ -641,7 +643,7 @@ async fn launch_module( energy_monitor: Arc, replica_dir: ReplicaDir, runtimes: Arc, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; @@ -658,7 +660,7 @@ async fn launch_module( program, energy_monitor.clone(), on_panic, - core, + executor, ) .await?; @@ -874,7 +876,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -906,7 +908,7 @@ impl Host { Arc::new(NullEnergyMonitor), phony_replica_dir, runtimes.clone(), - core, + executor, ) .await?; @@ -939,7 +941,7 @@ impl Host { policy: MigrationPolicy, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -952,7 +954,7 @@ impl Host { program, energy_monitor, on_panic, - core, + executor, ) .await?; @@ -1094,7 +1096,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); - let core = JobCore::default(); + let core = SingleCoreExecutor::in_current_tokio_runtime(); let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; // this should always succeed, but sometimes it doesn't let module_def = match Arc::try_unwrap(module_info) { diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index d5826ff54c5..55bd47b888e 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -4,7 +4,7 @@ use crate::{ energy::EnergyMonitor, host::{ - module_host::{DynModule, ModuleInfo}, + module_host::ModuleInfo, wasm_common::{module_host_actor::DescribeError, DESCRIBE_MODULE_DUNDER}, Scheduler, }, @@ -80,12 +80,12 @@ impl ModuleCommon { } } -impl DynModule for ModuleCommon { - fn replica_ctx(&self) -> &Arc { +impl ModuleCommon { + pub fn replica_ctx(&self) -> &Arc { &self.replica_context } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { &self.scheduler } } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 5e93a350d53..8d03f012f05 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -8,7 +8,7 @@ use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; use crate::identity::Identity; -use crate::messages::control_db::Database; +use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; @@ -17,12 +17,13 @@ use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::BuildableWebsocketFormat; -use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; +use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; use derive_more::From; +use futures::lock::Mutex; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; @@ -50,7 +51,9 @@ use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed; use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; use spacetimedb_vm::relation::RelValue; +use std::collections::VecDeque; use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::sync::oneshot; @@ -315,24 +318,63 @@ impl ReducersMap { /// A runtime that can create modules. pub trait ModuleRuntime { /// Creates a module based on the context `mcc`. - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; } -pub trait DynModule: Send + Sync + 'static { - fn replica_ctx(&self) -> &Arc; - fn scheduler(&self) -> &Scheduler; +pub enum Module { + Wasm(super::wasmtime::Module), + Js(super::v8::JsModule), } -pub trait Module: DynModule { - type Instance: ModuleInstance; - type InitialInstances<'a>: IntoIterator + 'a; - fn initial_instances(&mut self) -> Self::InitialInstances<'_>; - fn info(&self) -> Arc; - fn create_instance(&self) -> Self::Instance; +pub enum Instance { + // Box these instances because they're very different sizes, + // which makes Clippy sad and angry. + Wasm(Box), + Js(Box), } -pub trait ModuleInstance: Send + 'static { - fn trapped(&self) -> bool; +impl Module { + fn replica_ctx(&self) -> &Arc { + match self { + Module::Wasm(module) => module.replica_ctx(), + Module::Js(module) => module.replica_ctx(), + } + } + + fn scheduler(&self) -> &Scheduler { + match self { + Module::Wasm(module) => module.scheduler(), + Module::Js(module) => module.scheduler(), + } + } + + fn info(&self) -> Arc { + match self { + Module::Wasm(module) => module.info(), + Module::Js(module) => module.info(), + } + } + fn create_instance(&self) -> Instance { + match self { + Module::Wasm(module) => Instance::Wasm(Box::new(module.create_instance())), + Module::Js(module) => Instance::Js(Box::new(module.create_instance())), + } + } + fn host_type(&self) -> HostType { + match self { + Module::Wasm(_) => HostType::Wasm, + Module::Js(_) => HostType::Js, + } + } +} + +impl Instance { + fn trapped(&self) -> bool { + match self { + Instance::Wasm(inst) => inst.trapped(), + Instance::Js(inst) => inst.trapped(), + } + } /// Update the module instance's database to match the schema of the module instance. fn update_database( @@ -340,9 +382,19 @@ pub trait ModuleInstance: Send + 'static { program: Program, old_module_info: Arc, policy: MigrationPolicy, - ) -> anyhow::Result; + ) -> anyhow::Result { + match self { + Instance::Wasm(inst) => inst.update_database(program, old_module_info, policy), + Instance::Js(inst) => inst.update_database(program, old_module_info, policy), + } + } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult; + fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + match self { + Instance::Wasm(inst) => inst.call_reducer(tx, params), + Instance::Js(inst) => inst.call_reducer(tx, params), + } + } } /// Creates the table for `table_def` in `stdb`. @@ -362,7 +414,7 @@ pub fn create_table_from_def( fn init_database( replica_ctx: &ReplicaContext, module_def: &ModuleDef, - inst: &mut dyn ModuleInstance, + inst: &mut Instance, program: Program, ) -> anyhow::Result> { log::debug!("init database"); @@ -441,49 +493,96 @@ pub struct CallReducerParams { pub args: ArgsTuple, } -// TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and -// let the get_instance logic handle it? -struct AutoReplacingModuleInstance { - inst: T::Instance, - module: Arc, +/// Holds a [`Module`] and a set of [`Instance`]s from it, +/// and allocates the [`Instance`]s to be used for function calls. +/// +/// Capable of managing and allocating multiple instances of the same module, +/// but this functionality is currently unused, as only one reducer runs at a time. +/// When we introduce procedures, it will be necessary to have multiple instances, +/// as each procedure invocation will have its own sandboxed instance, +/// and multiple procedures can run concurrently with up to one reducer. +struct ModuleInstanceManager { + instances: VecDeque, + module: Arc, + create_instance_time_metric: CreateInstanceTimeMetric, } -impl AutoReplacingModuleInstance { - fn check_trap(&mut self) { - if self.inst.trapped() { - self.inst = self.module.create_instance() - } +/// Handle on the `spacetime_module_create_instance_time_seconds` label for a particular database +/// which calls `remove_label_values` to clean up on drop. +struct CreateInstanceTimeMetric { + metric: Histogram, + host_type: HostType, + database_identity: Identity, +} + +impl Drop for CreateInstanceTimeMetric { + fn drop(&mut self) { + let _ = WORKER_METRICS + .module_create_instance_time_seconds + .remove_label_values(&self.database_identity, &self.host_type); } } -impl ModuleInstance for AutoReplacingModuleInstance { - fn trapped(&self) -> bool { - self.inst.trapped() +impl CreateInstanceTimeMetric { + fn observe(&self, duration: std::time::Duration) { + self.metric.observe(duration.as_secs_f64()); } - fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { - let ret = self.inst.update_database(program, old_module_info, policy); - self.check_trap(); - ret +} + +impl ModuleInstanceManager { + fn new(module: Arc, database_identity: Identity) -> Self { + let host_type = module.host_type(); + let create_instance_time_metric = CreateInstanceTimeMetric { + metric: WORKER_METRICS + .module_create_instance_time_seconds + .with_label_values(&database_identity, &host_type), + host_type, + database_identity, + }; + Self { + instances: Default::default(), + module, + create_instance_time_metric, + } } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let ret = self.inst.call_reducer(tx, params); - self.check_trap(); - ret + fn get_instance(&mut self) -> Instance { + if let Some(inst) = self.instances.pop_back() { + inst + } else { + let start_time = std::time::Instant::now(); + // TODO: should we be calling `create_instance` on the `SingleCoreExecutor` rather than the calling thread? + let res = self.module.create_instance(); + let elapsed_time = start_time.elapsed(); + self.create_instance_time_metric.observe(elapsed_time); + res + } + } + + fn return_instance(&mut self, inst: Instance) { + if inst.trapped() { + // Don't return trapped instances; + // they may have left internal data structures in the guest `Instance` + // (WASM linear memory, V8 global scope) in a bad state. + return; + } + + self.instances.push_front(inst); } } #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - module: Arc, + module: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, - job_tx: JobThread, + instance_manager: Arc>, + executor: SingleCoreExecutor, + + /// Marks whether this module has been closed by [`Self::exit`]. + /// + /// When this is true, most operations will fail with [`NoSuchModule`]. + closed: Arc, } impl fmt::Debug for ModuleHost { @@ -497,9 +596,11 @@ impl fmt::Debug for ModuleHost { pub struct WeakModuleHost { info: Arc, - inner: Weak, + inner: Weak, on_panic: Weak, - tx: WeakJobThread, + instance_manager: Weak>, + executor: WeakSingleCoreExecutor, + closed: Weak, } #[derive(Debug)] @@ -563,24 +664,27 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self { + pub(super) fn new( + module: Module, + on_panic: impl Fn() + Send + Sync + 'static, + executor: SingleCoreExecutor, + database_identity: Identity, + ) -> Self { let info = module.info(); let module = Arc::new(module); let on_panic = Arc::new(on_panic); let module_clone = module.clone(); - let job_tx = core.start( - move || AutoReplacingModuleInstance { - inst: module_clone.create_instance(), - module: module_clone, - }, - |x| x as &mut dyn ModuleInstance, - ); + + let instance_manager = Arc::new(Mutex::new(ModuleInstanceManager::new(module_clone, database_identity))); + ModuleHost { info, module, on_panic, - job_tx, + instance_manager, + executor, + closed: Arc::new(AtomicBool::new(false)), } } @@ -594,6 +698,20 @@ impl ModuleHost { &self.info.subscriptions } + fn is_marked_closed(&self) -> bool { + // `self.closed` isn't used for any synchronization, it's just a shared flag, + // so `Ordering::Relaxed` is sufficient. + self.closed.load(std::sync::atomic::Ordering::Relaxed) + } + + fn guard_closed(&self) -> Result<(), NoSuchModule> { + if self.is_marked_closed() { + Err(NoSuchModule) + } else { + Ok(()) + } + } + /// Run a function on the JobThread for this module. /// This would deadlock if it is called within another call to `on_module_thread`. /// Since this is async, and `f` is sync, deadlocking shouldn't be a problem. @@ -602,20 +720,22 @@ impl ModuleHost { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - // Run the provided function on the module instance. - // This is a convenience method that ensures the module instance is available - // and handles any errors that may occur. - self.call(label, |_| f()) - .await - .map_err(|_| anyhow::Error::from(NoSuchModule)) + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + + let res = self + .executor + .run_sync_job(move || { + drop(timer_guard); + f() + }) + .await; + + Ok(res) } - /// Run a function on the JobThread for this module which has access to the module instance. - async fn call(&self, label: &str, f: F) -> Result - where - F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, - R: Send + 'static, - { + fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(())> { // Record the time until our function starts running. let queue_timer = WORKER_METRICS .reducer_wait_time @@ -633,19 +753,28 @@ impl ModuleHost { .observe(queue_length as f64); } // Ensure that we always decrement the gauge. - let timer_guard = scopeguard::guard((), move |_| { + scopeguard::guard((), move |_| { // Decrement the queue length gauge when we're done. // This is done in a defer so that it happens even if the reducer call panics. queue_length_gauge.dec(); queue_timer.stop_and_record(); - }); + }) + } + + /// Run a function on the JobThread for this module which has access to the module instance. + async fn call(&self, label: &str, f: F) -> Result + where + F: FnOnce(&mut Instance) -> R + Send + 'static, + R: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentialyl take a long time + // partially because the computation can potentially take a long time // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` inside of `asyncify()`, which runs - // the provided closure in a tokio blocking task, and bubbles up any - // panic that may occur. + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. // If a reducer call panics, we **must** ensure to call `self.on_panic` // so that the module is discarded by the host controller. @@ -653,13 +782,21 @@ impl ModuleHost { log::warn!("reducer {label} panicked"); (self.on_panic)(); }); - self.job_tx - .run(move |inst| { + + let mut instance = self.instance_manager.lock().await.get_instance(); + + let (res, instance) = self + .executor + .run_sync_job(move || { drop(timer_guard); - f(inst) + let res = f(&mut instance); + (res, instance) }) - .await - .map_err(|_: JobThreadClosed| NoSuchModule) + .await; + + self.instance_manager.lock().await.return_instance(instance); + + Ok(res) } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -727,7 +864,7 @@ impl ModuleHost { if let Some((reducer_id, reducer_def)) = reducer_lookup { // The module defined a lifecycle reducer to handle new connections. // Call this reducer. - // If the call fails (as in, something unexpectedly goes wrong with WASM execution), + // If the call fails (as in, something unexpectedly goes wrong with guest execution), // abort the connection: we can't really recover. let reducer_outcome = me.call_reducer_inner_with_inst( Some(ScopeGuard::into_inner(mut_tx)), @@ -779,7 +916,7 @@ impl ModuleHost { } }) .await - .map_err(Into::::into)? + .map_err(ReducerCallError::from)? } /// Invokes the `client_disconnected` reducer, if present, @@ -790,7 +927,7 @@ impl ModuleHost { &self, caller_identity: Identity, caller_connection_id: ConnectionId, - inst: &mut dyn ModuleInstance, + inst: &mut Instance, ) -> Result<(), ReducerCallError> { let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); let reducer_name = reducer_lookup @@ -923,8 +1060,7 @@ impl ModuleHost { self.call("call_identity_disconnected", move |inst| { me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) }) - .await - .map_err(Into::::into)? + .await? } /// Empty the system tables tracking clients without running any lifecycle reducers. @@ -958,23 +1094,23 @@ impl ModuleHost { let args = args.into_tuple(reducer_seed)?; let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); - self.call(&reducer_def.name, move |inst| { - inst.call_reducer( - None, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - ) - }) - .await - .map_err(Into::into) + Ok(self + .call(&reducer_def.name, move |inst| { + inst.call_reducer( + None, + CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }, + ) + }) + .await?) } fn call_reducer_inner_with_inst( &self, @@ -987,7 +1123,7 @@ impl ModuleHost { reducer_id: ReducerId, reducer_def: &ReducerDef, args: ReducerArgs, - module_instance: &mut dyn ModuleInstance, + module_instance: &mut Instance, ) -> Result { let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def)); let args = args.into_tuple(reducer_seed)?; @@ -1069,7 +1205,7 @@ impl ModuleHost { // scheduled reducer name not fetched yet, anyway this is only for logging purpose const REDUCER: &str = "scheduled_reducer"; let module = self.info.clone(); - self.call(REDUCER, move |inst: &mut dyn ModuleInstance| { + self.call(REDUCER, move |inst: &mut Instance| { let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); match call_reducer_params(&mut tx) { @@ -1101,8 +1237,7 @@ impl ModuleHost { })), } }) - .await - .unwrap_or_else(|e| Err(e.into())) + .await? } pub fn subscribe_to_logs(&self) -> anyhow::Result> { @@ -1132,13 +1267,14 @@ impl ModuleHost { } pub async fn exit(&self) { + // As in `Self::marked_closed`, `Relaxed` is sufficient because we're not synchronizing any external state. + self.closed.store(true, std::sync::atomic::Ordering::Relaxed); self.module.scheduler().close(); - self.job_tx.close(); self.exited().await; } pub async fn exited(&self) { - tokio::join!(self.module.scheduler().closed(), self.job_tx.closed()); + self.module.scheduler().closed().await; } pub fn inject_logs(&self, log_level: LogLevel, reducer_name: &str, message: &str) { @@ -1285,7 +1421,9 @@ impl ModuleHost { info: self.info.clone(), inner: Arc::downgrade(&self.module), on_panic: Arc::downgrade(&self.on_panic), - tx: self.job_tx.downgrade(), + instance_manager: Arc::downgrade(&self.instance_manager), + executor: self.executor.downgrade(), + closed: Arc::downgrade(&self.closed), } } @@ -1306,12 +1444,16 @@ impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let on_panic = self.on_panic.upgrade()?; - let tx = self.tx.upgrade()?; + let instance_manager = self.instance_manager.upgrade()?; + let executor = self.executor.upgrade()?; + let closed = self.closed.upgrade()?; Some(ModuleHost { info: self.info.clone(), module: inner, on_panic, - job_tx: tx, + instance_manager, + executor, + closed, }) } } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index e5f7656e4a2..809e3da097f 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; -use super::module_host::{CallReducerParams, DynModule, Module, ModuleInfo, ModuleInstance, ModuleRuntime}; +use super::module_host::{CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::instance_env::{ChunkPool, InstanceEnv}; use crate::host::wasm_common::instrumentation::CallTimes; @@ -15,7 +15,7 @@ use crate::{module_host_context::ModuleCreationContext, replica_context::Replica use core::ffi::c_void; use core::sync::atomic::{AtomicBool, Ordering}; use core::time::Duration; -use core::{iter, ptr, str}; +use core::{ptr, str}; use de::deserialize_js; use error::{ catch_exception, exception_already_thrown, log_traceback, BufferTooSmall, CodeError, FnRet, JsStackTrace, @@ -51,7 +51,7 @@ pub struct V8Runtime { } impl ModuleRuntime for V8Runtime { - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { V8_RUNTIME_GLOBAL.make_actor(mcc) } } @@ -88,7 +88,7 @@ impl V8RuntimeInner { } impl ModuleRuntime for V8RuntimeInner { - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { #![allow(unreachable_code, unused_variables)] log::trace!( @@ -110,40 +110,30 @@ impl ModuleRuntime for V8RuntimeInner { // Validate and create a common module rom the raw definition. let common = build_common_module_from_raw(mcc, desc)?; - Ok(JsModule { common, program }) + Ok(Module::Js(JsModule { common, program })) } } #[derive(Clone)] -struct JsModule { +pub struct JsModule { common: ModuleCommon, program: Arc, } -impl DynModule for JsModule { - fn replica_ctx(&self) -> &Arc { +impl JsModule { + pub fn replica_ctx(&self) -> &Arc { self.common.replica_ctx() } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { self.common.scheduler() } -} - -impl Module for JsModule { - type Instance = JsInstance; - - type InitialInstances<'a> = iter::Empty; - - fn initial_instances(&mut self) -> Self::InitialInstances<'_> { - iter::empty() - } - fn info(&self) -> Arc { + pub fn info(&self) -> Arc { self.common.info().clone() } - fn create_instance(&self) -> Self::Instance { + pub fn create_instance(&self) -> JsInstance { // TODO(v8): do we care about preinits / setup or are they unnecessary? let common = &self.common; @@ -292,7 +282,7 @@ impl JsInstanceEnv { } } -struct JsInstance { +pub struct JsInstance { /// Information common to instances of all runtimes. /// /// (The type is shared, the data is not.) @@ -308,12 +298,12 @@ struct JsInstance { program: Arc, } -impl ModuleInstance for JsInstance { - fn trapped(&self) -> bool { +impl JsInstance { + pub fn trapped(&self) -> bool { self.common.trapped } - fn update_database( + pub fn update_database( &mut self, program: Program, old_module_info: Arc, @@ -324,7 +314,7 @@ impl ModuleInstance for JsInstance { .update_database(replica_ctx, program, old_module_info, policy) } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { let replica_ctx = &self.instance.get_mut().replica_ctx().clone(); self.common diff --git a/crates/core/src/host/wasm_common/instrumentation.rs b/crates/core/src/host/wasm_common/instrumentation.rs index cf5c00fdc7c..0acd59b80e7 100644 --- a/crates/core/src/host/wasm_common/instrumentation.rs +++ b/crates/core/src/host/wasm_common/instrumentation.rs @@ -91,6 +91,12 @@ pub struct CallTimes { times: EnumMap, } +impl Default for CallTimes { + fn default() -> Self { + Self::new() + } +} + impl CallTimes { /// Create a new timing structure, with times for all calls set to zero. pub fn new() -> Self { @@ -115,7 +121,7 @@ impl CallTimes { /// will `take`` the CallTimes after running a reducer and report the taken times, /// leaving a fresh zeroed CallTimes for the next reducer invocation. pub fn take(&mut self) -> CallTimes { - std::mem::replace(self, Self::new()) + std::mem::take(self) } } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e327b0b95b0..16d2d585819 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -12,8 +12,7 @@ use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ - CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, - ModuleInstance, + CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; use crate::host::{ArgsTuple, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult}; use crate::identity::Identity; @@ -83,7 +82,7 @@ pub struct ExecuteResult { pub call_result: Result>, anyhow::Error>, } -pub(crate) struct WasmModuleHostActor { +pub struct WasmModuleHostActor { module: T::InstancePre, initial_instance: Option>>, common: ModuleCommon, @@ -171,30 +170,20 @@ impl WasmModuleHostActor { } } -impl DynModule for WasmModuleHostActor { - fn replica_ctx(&self) -> &Arc { +impl WasmModuleHostActor { + pub fn replica_ctx(&self) -> &Arc { self.common.replica_ctx() } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { self.common.scheduler() } -} - -impl Module for WasmModuleHostActor { - type Instance = WasmModuleInstance; - - type InitialInstances<'a> = Option; - fn initial_instances(&mut self) -> Self::InitialInstances<'_> { - self.initial_instance.take().map(|x| *x) - } - - fn info(&self) -> Arc { + pub fn info(&self) -> Arc { self.common.info() } - fn create_instance(&self) -> Self::Instance { + pub fn create_instance(&self) -> WasmModuleInstance { let common = &self.common; let env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone()); // this shouldn't fail, since we already called module.create_instance() @@ -221,12 +210,12 @@ impl std::fmt::Debug for WasmModuleInstance { } } -impl ModuleInstance for WasmModuleInstance { - fn trapped(&self) -> bool { +impl WasmModuleInstance { + pub fn trapped(&self) -> bool { self.common.trapped } - fn update_database( + pub fn update_database( &mut self, program: Program, old_module_info: Arc, @@ -237,7 +226,7 @@ impl ModuleInstance for WasmModuleInstance { .update_database(replica_ctx, program, old_module_info, policy) } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) } } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 06d80f85da1..d348d1e9a33 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Context; use spacetimedb_paths::server::{ServerDataDir, WasmtimeCacheDir}; -use wasmtime::{Engine, Linker, Module, StoreContext, StoreContextMut}; +use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; use crate::energy::{EnergyQuanta, ReducerBudget}; use crate::error::NodesError; @@ -13,11 +13,11 @@ use crate::module_host_context::ModuleCreationContext; mod wasm_instance_env; mod wasmtime_module; -use wasmtime_module::WasmtimeModule; +use wasmtime_module::{WasmtimeInstance, WasmtimeModule}; use self::wasm_instance_env::WasmInstanceEnv; -use super::wasm_common::module_host_actor::InitializationError; +use super::wasm_common::module_host_actor::{InitializationError, WasmModuleInstance}; use super::wasm_common::{abi, module_host_actor::WasmModuleHostActor, ModuleCreationError}; pub struct WasmtimeRuntime { @@ -52,7 +52,17 @@ impl WasmtimeRuntime { .cranelift_opt_level(wasmtime::OptLevel::Speed) .consume_fuel(true) .epoch_interruption(true) - .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable) + // We need async support to enable suspending execution of procedures + // when waiting for e.g. HTTP responses or the transaction lock. + // We don't enable either fuel-based or epoch-based yielding + // (see https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.epoch_deadline_async_yield_and_update + // and https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.fuel_async_yield_interval) + // so reducers will always execute to completion during the first `Future::poll` call, + // and procedures will only yield when performing an asynchronous operation. + // These futures are executed on a separate single-threaded executor not related to the "global" Tokio runtime, + // which is responsible only for executing WASM. See `crate::util::jobs` for this infrastructure. + .async_support(true); // Offer a compile-time flag for enabling perfmap generation, // so `perf` can display JITted symbol names. @@ -96,9 +106,13 @@ impl WasmtimeRuntime { } } +pub type Module = WasmModuleHostActor; +pub type ModuleInstance = WasmModuleInstance; + impl ModuleRuntime for WasmtimeRuntime { - fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result { - let module = Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; + fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result { + let module = + wasmtime::Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; let func_imports = module .imports() @@ -114,7 +128,9 @@ impl ModuleRuntime for WasmtimeRuntime { let module = WasmtimeModule::new(module); - WasmModuleHostActor::new(mcc, module).map_err(Into::into) + WasmModuleHostActor::new(mcc, module) + .map_err(Into::into) + .map(super::module_host::Module::Wasm) } } diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 7496b6ca854..c2906f65ee3 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -8,8 +8,12 @@ use crate::host::module_common::run_describer; use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; use crate::host::wasm_common::*; use crate::util::string_from_utf8_lossy_owned; +use futures_util::FutureExt; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; -use wasmtime::{AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace}; +use wasmtime::{ + AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, + WasmResults, +}; fn log_traceback(func_type: &str, func: &str, e: &wasmtime::Error) { log::info!("{func_type} \"{func}\" runtime error: {e}"); @@ -86,15 +90,34 @@ fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), Box> { const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; +/// Invoke `typed_func` and assert that it doesn't yield. +/// +/// Our Wasmtime is configured for `async` execution, and will panic if we use the non-async [`TypedFunc::call`]. +/// The `async` config is necessary to allow procedures to suspend, e.g. when making HTTP calls or acquiring transactions. +/// However, most of the WASM we execute, incl. reducers and startup functions, should never block/yield. +/// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`FutureExt::now_or_never`], +/// an "async executor" which invokes [`std::task::Future::poll`] exactly once. +fn call_sync_typed_func( + typed_func: &TypedFunc, + store: &mut Store, + args: Args, +) -> anyhow::Result { + let fut = typed_func.call_async(store, args); + fut.now_or_never() + .expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") +} + impl module_host_actor::WasmInstancePre for WasmtimeModule { type Instance = WasmtimeInstance; fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result { let env = WasmInstanceEnv::new(env); let mut store = Store::new(self.module.module().engine(), env); - let instance = self - .module - .instantiate(&mut store) + let instance_fut = self.module.instantiate_async(&mut store); + + let instance = instance_fut + .now_or_never() + .expect("`instantiate_async` did not immediately return `Ready`") .map_err(InitializationError::Instantiation)?; let mem = Mem::extract(&instance, &mut store).unwrap(); @@ -115,16 +138,15 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { for preinit in &func_names.preinits { let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap(); - func.call(&mut store, ()) - .map_err(|err| InitializationError::RuntimeError { - err, - func: preinit.clone(), - })?; + call_sync_typed_func(&func, &mut store, ()).map_err(|err| InitializationError::RuntimeError { + err, + func: preinit.clone(), + })?; } if let Ok(init) = instance.get_typed_func::(&mut store, SETUP_DUNDER) { let setup_error = store.data_mut().setup_standard_bytes_sink(); - let res = init.call(&mut store, setup_error); + let res = call_sync_typed_func(&init, &mut store, setup_error); let error = store.data_mut().take_standard_bytes_sink(); match res { // TODO: catch this and return the error message to the http client @@ -167,7 +189,9 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let sink = self.store.data_mut().setup_standard_bytes_sink(); - run_describer(log_traceback, || describer.call(&mut self.store, sink))?; + run_describer(log_traceback, || { + call_sync_typed_func(&describer, &mut self.store, sink) + })?; // Fetch the bsatn returned by the describer call. let bytes = self.store.data_mut().take_standard_bytes_sink(); @@ -195,7 +219,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp); - let call_result = self.call_reducer.call( + let call_result = call_sync_typed_func( + &self.call_reducer, &mut *store, ( op.id.0, diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index 8f92f350324..d3d209e728c 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -76,7 +76,20 @@ pub struct NodeStatus { pub state: String, } #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default, Serialize, Deserialize, serde::Deserialize, + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + Default, + Serialize, + Deserialize, + serde::Deserialize, + strum::AsRefStr, + strum::Display, )] #[repr(i32)] pub enum HostType { diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 140e733799e..ebd8c079fc8 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -261,7 +261,7 @@ impl CoreReservations { #[derive(Default)] pub struct Cores { /// The cores to run database instances on. - pub databases: JobCores, + pub databases: DatabaseCores, /// The cores to run tokio worker threads on. pub tokio: TokioCores, /// The cores to run rayon threads on. @@ -286,8 +286,8 @@ impl Cores { let [_irq, reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores); + let databases = DatabaseCores(databases); let reserved = (!reserved.is_empty()).then(|| reserved.into()); - let databases = databases.into_iter().collect::(); let rayon = RayonCores((!rayon.is_empty()).then_some(rayon)); // see comment on `TokioCores.blocking` @@ -424,3 +424,25 @@ fn thread_spawn_handler( Ok(()) } } + +#[derive(Default)] +pub struct DatabaseCores(Vec); + +impl DatabaseCores { + /// Construct a [`JobCores`] manager suitable for running database WASM code on. + /// + /// The `global_runtime` should be a [`tokio::runtime::Handle`] to the [`tokio::runtime::Runtime`] + /// constructed from the [`TokioCores`] of this [`Cores`]. + /// + /// ```rust + /// # use spacetimedb::startup::pin_threads; + /// let cores = pin_threads(); + /// let mut builder = tokio::runtime::Builder::new_multi_thread(); + /// cores.tokio.configure(&mut builder); + /// let mut rt = builder.build().unwrap(); + /// let database_cores = cores.databases.make_database_runners(rt.handle()); + /// ``` + pub fn make_database_runners(self, global_runtime: &tokio::runtime::Handle) -> JobCores { + JobCores::from_pinned_cores(self.0, global_runtime.clone()) + } +} diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index b31be07727e..6634d1e57d2 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,108 +1,190 @@ +use std::future::Future; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; use indexmap::IndexMap; use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::runtime; +use tokio::sync::watch; -use super::notify_once::NotifyOnce; - -/// A handle to a pool of CPU cores for running job threads on. +/// A handle to a pool of Tokio executors for running database WASM code on. +/// +/// Each database has a [`SingleCoreExecutor`], +/// a handle to a single-threaded Tokio runtime which is pinned to a specific CPU core. +/// In multi-tenant environments, multiple databases' [`SingleCoreExecutor`]s may be handles on the same runtime/core, +/// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// -/// Each thread is represented by a [`JobThread`], which is pinned to a single -/// core and sequentially runs the jobs that are passed to [`JobThread::run`]. -/// This pool attempts to keep the number of `JobThread`s pinned to each core -/// as equitable as possible; new threads allocated by [`Self::take()`] are -/// assigned to cores in a round-robin fashion, and when a thread exits, it -/// takes a thread pinned to a busier core and repins it to the core it was -/// just running on. +/// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. +/// A `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, +/// will use the "global" Tokio executor to run database jobs, +/// rather than creating multiple un-pinned single-threaded runtimes. +/// This means that long-running reducers or queries may block Tokio worker threads. /// -/// Construction is done via the `FromIterator` impl. If created from an empty -/// iterator or via `JobCores::default()`, the job threads will work but not be -/// pinned to any threads. +/// This handle is cheaply cloneable, but at least one handle must be kept alive. +/// If all instances of it are dropped, the per-thread [`runtime::Runtime`]s will be dropped, +/// and so will stop executing jobs for databases. /// -/// This handle is cheaply cloneable. If all instances of it are dropped, -/// threads will continue running, but will no longer repin each other -/// when one exits. -#[derive(Default, Clone)] +/// Dropping the last handle on a `JobCores` from an `async` context will panic, +/// as Tokio doesn't like to shut down nested runtimes. +/// To avoid this, keep a handle on the `JobCores` alive outside of the `async` runner. +#[derive(Clone)] pub struct JobCores { - inner: Option>>, + inner: JobCoresInner, } -struct JobCoresInner { - /// A map to the repin_tx for each job thread - job_threads: HashMap>, +#[derive(Clone)] +enum JobCoresInner { + PinnedCores(Arc>), + NoPinning(runtime::Handle), +} + +struct PinnedCoresExecutorManager { + /// Channels to request that a [`SingleCoreExecutor`] move to a different Tokio runtime. + /// + /// Alongside each channel is the [`CoreId`] of the runtime to which that [`SingleCoreExecutor`] is currently pinned. + /// This is used as an index into `self.cores` to make load-balancing decisions when freeing a database executor + /// in [`Self::deallocate`]. + database_executor_move: HashMap)>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// /// This acts as a partition point in `cores`; all cores in `..index` have /// one fewer job on them than the cores in `index..`. next_core: usize, - next_id: JobThreadId, + next_id: SingleCoreExecutorId, } -#[derive(Default)] +/// Stores the [`tokio::Runtime`] pinned to a particular core, +/// and remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. struct CoreInfo { - jobs: SmallVec<[JobThreadId; 4]>, + jobs: SmallVec<[SingleCoreExecutorId; 4]>, + tokio_runtime: runtime::Runtime, +} + +impl CoreInfo { + fn spawn_executor(id: CoreId) -> CoreInfo { + let runtime = runtime::Builder::new_multi_thread() + .worker_threads(1) + // [`SingleCoreExecutor`]s should only be executing Wasmtime WASM futures, + // and so should never be doing [`Tokio::spawn_blocking`] or performing blocking I/O. + // However, `max_blocking_threads` will panic if passed 0, so we set a limit of 1 + // and use `on_thread_start` to log an error when spawning a blocking task. + .max_blocking_threads(1) + .on_thread_start({ + use std::sync::atomic::{AtomicBool, Ordering}; + let already_spawned_worker = AtomicBool::new(false); + move || { + // `Ordering::Relaxed`: No synchronization is happening here; + // we're not writing to any other memory or coordinating with any other atomic places. + // We rely on Tokio's infrastructure to impose a happens-before relationship + // between spawning worker threads and spawning blocking threads itself. + if already_spawned_worker.swap(true, Ordering::Relaxed) { + // We're spawning a blocking thread, naughty! + log::error!( + "`JobCores` Tokio runtime for `SingleCoreExecutor` use on core {id:?} spawned a blocking thread!" + ); + } else { + // We're spawning our 1 worker, so pin it to the appropriate thread. + core_affinity::set_for_current(id); + } + } + }) + .build() + .expect("Failed to start Tokio executor for `SingleCoreExecutor`"); + CoreInfo { + jobs: SmallVec::new(), + tokio_runtime: runtime, + } + } } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct JobThreadId(usize); +struct SingleCoreExecutorId(usize); impl JobCores { - /// Reserve a core from the pool to later start a job thread on. - pub fn take(&self) -> JobCore { - let inner = if let Some(inner) = &self.inner { - let cores = Arc::downgrade(inner); - let (id, repin_rx) = inner.lock().unwrap().allocate(); - Some(JobCoreInner { - repin_rx, - _guard: JobCoreGuard { cores, id }, - }) - } else { - None + /// Get a handle on a [`SingleCoreExecutor`] to later run a database's jobs on. + pub fn take(&self) -> SingleCoreExecutor { + let database_executor_inner = match &self.inner { + JobCoresInner::NoPinning(handle) => SingleCoreExecutorInner::without_load_balancing(handle.clone()), + JobCoresInner::PinnedCores(manager) => SingleCoreExecutorInner::with_load_balancing(manager), }; - - JobCore { inner } + SingleCoreExecutor::from_inner(database_executor_inner) } -} -impl FromIterator for JobCores { - fn from_iter>(iter: T) -> Self { - let cores: IndexMap<_, _> = iter.into_iter().map(|id| (id, CoreInfo::default())).collect(); - let inner = (!cores.is_empty()).then(|| { - Arc::new(Mutex::new(JobCoresInner { - job_threads: HashMap::default(), + /// Construct a [`JobCores`] which runs one Tokio runtime on each of the `cores`, + /// and pins each database to a particular runtime/core. + /// + /// If `cores` is empty, this falls back to [`Self::without_pinned_cores`] + /// and runs all databases in the `global_runtime`. + pub fn from_pinned_cores(cores: impl IntoIterator, global_runtime: runtime::Handle) -> Self { + let cores: IndexMap<_, _> = cores.into_iter().map(|id| (id, CoreInfo::spawn_executor(id))).collect(); + let inner = if cores.is_empty() { + JobCoresInner::NoPinning(global_runtime) + } else { + JobCoresInner::PinnedCores(Arc::new(Mutex::new(PinnedCoresExecutorManager { + database_executor_move: HashMap::default(), cores, next_core: 0, - next_id: JobThreadId(0), - })) - }); + next_id: SingleCoreExecutorId(0), + }))) + }; + Self { inner } } + + /// Construct a [`JobCores`] which does not perform any core pinning, + /// and just runs all database jobs in `global_runtime`. + /// + /// This will be used in deployments where there aren't enough available CPU cores + /// to reserve specific cores for database WASM execution. + pub fn without_pinned_cores(global_runtime: runtime::Handle) -> Self { + Self { + inner: JobCoresInner::NoPinning(global_runtime), + } + } } -impl JobCoresInner { - fn allocate(&mut self) -> (JobThreadId, watch::Receiver) { - let id = self.next_id; +impl PinnedCoresExecutorManager { + /// Get a [`runtime::Handle`] for running database operations on, + /// and store state in `self` necessary to move that database to a new runtime + /// for load-balancing purposes. + /// + /// The returned [`SingleCoreExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) + /// which should be passed to [`Self::deallocate`] when the database is no longer using this executor. + /// This is done automatically by [`LoadBalanceOnDropGuard`]. + /// + /// The returned `watch::Receiver` stores the Tokio [`runtime::Handle`] + /// on which the database should run its compute-intensive jobs. + /// This may occasionally be replaced with a new [`runtime::Handle`] to balance databases among available cores, + /// so databases should read from the [`watch::Receiver`] when spawning each job, + /// and should not spawn long-lived background tasks such as ones which loop over a channel. + fn allocate(&mut self) -> (SingleCoreExecutorId, watch::Receiver) { + let database_executor_id = self.next_id; self.next_id.0 += 1; - let (&core_id, core) = self.cores.get_index_mut(self.next_core).unwrap(); - core.jobs.push(id); + let (&core_id, runtime_handle) = { + let (core_id, core_info) = self.cores.get_index_mut(self.next_core).unwrap(); + core_info.jobs.push(database_executor_id); + (core_id, core_info.tokio_runtime.handle().clone()) + }; self.next_core = (self.next_core + 1) % self.cores.len(); - let (repin_tx, repin_rx) = watch::channel(core_id); - self.job_threads.insert(id, repin_tx); + let (move_runtime_tx, move_runtime_rx) = watch::channel(runtime_handle); + self.database_executor_move + .insert(database_executor_id, (core_id, move_runtime_tx)); - (id, repin_rx) + (database_executor_id, move_runtime_rx) } - /// Run when a `JobThread` exits. - fn deallocate(&mut self, id: JobThreadId) { - let core_id = *self.job_threads.remove(&id).unwrap().borrow(); + /// Mark the executor at `id` as no longer in use, free internal state which tracks it, + /// and move other executors to different cores as necessary to maintain a balanced distribution. + /// + /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleCoreExecutor`] is no longer in use. + fn deallocate(&mut self, id: SingleCoreExecutorId) { + let (freed_core_id, _) = self.database_executor_move.remove(&id).unwrap(); - let core_index = self.cores.get_index_of(&core_id).unwrap(); + let core_index = self.cores.get_index_of(&freed_core_id).unwrap(); // This core is now less busy than it should be - bump `next_core` back // by 1 and steal a thread from the core there. @@ -113,13 +195,14 @@ impl JobCoresInner { let steal_from_index = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); // if this core was already at `next_core - 1`, we don't need to steal from anywhere - let (core, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { + let (core_info, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { Ok([(_, core), (_, steal_from)]) => (core, Some(steal_from)), Err(_) => (&mut self.cores[core_index], None), }; - let pos = core.jobs.iter().position(|x| *x == id).unwrap(); - core.jobs.remove(pos); + let pos = core_info.jobs.iter().position(|x| *x == id).unwrap(); + // Swap remove because we don't care about ordering within `core_info.jobs` + core_info.jobs.swap_remove(pos); if let Some(steal_from) = steal_from { // This unwrap will never fail, since cores below `next_core` always have @@ -131,185 +214,138 @@ impl JobCoresInner { let stolen = steal_from.jobs.pop().unwrap(); // the way we pop and push here means that older job threads will be less // likely to be repinned, while younger ones are liable to bounce around. - core.jobs.push(stolen); - self.job_threads[&stolen].send_replace(core_id); + // Our use of `swap_remove` above makes this not entirely predictable, however. + core_info.jobs.push(stolen); + let (ref mut stolen_core_id, migrate_tx) = self.database_executor_move.get_mut(&stolen).unwrap(); + *stolen_core_id = freed_core_id; + migrate_tx.send_replace(core_info.tokio_runtime.handle().clone()); } self.next_core = steal_from_index; } } -/// A core taken from [`JobCores`], not yet running a job loop. -#[derive(Default)] -pub struct JobCore { - inner: Option, -} - -struct JobCoreInner { - repin_rx: watch::Receiver, - _guard: JobCoreGuard, +/// A handle to a Tokio executor which can be used to run WASM compute for a particular database. +/// +/// Use [`Self::run_job`] to run futures, and [`Self::run_sync_job`] to run functions. +/// +/// This handle is cheaply cloneable. +/// When all handles on this database executor have been dropped, +/// its use of the core to which it is pinned will be released, +/// and other databases may be migrated to that core to balance load. +#[derive(Clone)] +pub struct SingleCoreExecutor { + inner: Arc, } -impl JobCore { - /// Start running a job thread on this core. +struct SingleCoreExecutorInner { + /// Handle on the [`runtime::Runtime`] where this executor should run jobs. /// - /// `init` constructs the data provided to each job, and `unsize` unsizes - /// it to `&mut T`, if necessary. - pub fn start(self, init: F, unsize: F2) -> JobThread - where - F: FnOnce() -> U + Send + 'static, - F2: FnOnce(&mut U) -> &mut T + Send + 'static, - U: 'static, - T: ?Sized + 'static, - { - let (tx, rx) = mpsc::channel::>>(Self::JOB_CHANNEL_LENGTH); - let close = Arc::new(NotifyOnce::new()); - - let closed = close.clone(); - let handle = tokio::runtime::Handle::current(); - std::thread::spawn(move || { - let mut data = init(); - let data = unsize(&mut data); - handle.block_on(self.job_loop(rx, closed, data)) - }); - - JobThread { tx, close } - } - - // this shouldn't matter too much, since callers will need to wait for - // the job to finish anyway. - const JOB_CHANNEL_LENGTH: usize = 64; - - async fn job_loop(mut self, mut rx: mpsc::Receiver>>, closed: Arc, data: &mut T) { - // this function is async because we need to recv on the repin channel - // and the jobs channel, but the jobs being run are blocking + /// This will be occasionally updated by [`PinnedCoresExecutorManager::deallocate`] + /// to evenly distribute databases across the available runtimes/cores. + runtime: watch::Receiver, - let repin_rx = self.inner.as_mut().map(|inner| &mut inner.repin_rx); - let repin_loop = async { - if let Some(rx) = repin_rx { - rx.mark_changed(); - while rx.changed().await.is_ok() { - core_affinity::set_for_current(*rx.borrow_and_update()); - } - } - }; - - let job_loop = async { - while let Some(job) = rx.recv().await { - // blocking in place means that other futures on the same task - // won't get polled - in this case, that's just the repin loop, - // which is fine because it can just run before the next job. - tokio::task::block_in_place(|| job(data)) - } - }; + /// [`Drop`] guard which calls [`PinnedCoresExecutorManager::deallocate`] when this database dies, + /// allowing another database from a more-contended runtime/core to migrate here. + _guard: Option, +} - tokio::select! { - () = super::also_poll(job_loop, repin_loop) => {} - // when we receive a close notification, we immediately drop all - // remaining jobs in the queue. - () = closed.notified() => {} +impl SingleCoreExecutorInner { + fn without_load_balancing(handle: runtime::Handle) -> Self { + SingleCoreExecutorInner { + runtime: watch::channel(handle).1, + _guard: None, } } -} -/// On drop, tells the `JobCores` that this core has been freed up. -struct JobCoreGuard { - cores: Weak>, - id: JobThreadId, -} - -impl Drop for JobCoreGuard { - fn drop(&mut self) { - if let Some(cores) = self.cores.upgrade() { - cores.lock().unwrap().deallocate(self.id); + fn with_load_balancing(manager: &Arc>) -> Self { + let manager_weak = Arc::downgrade(manager); + let (database_executor_id, move_runtime_rx) = manager.lock().unwrap().allocate(); + SingleCoreExecutorInner { + runtime: move_runtime_rx, + _guard: Some(LoadBalanceOnDropGuard { + manager: manager_weak, + database_executor_id, + }), } } } -/// A handle to a thread running a job loop; see [`JobCores`] for more details. -/// -/// The thread stores data of type `T`; jobs run on the thread will be given -/// mutable access to it. -/// -/// This handle is cheaply cloneable. If all strong handles have been dropped, -/// the thread will shut down. -pub struct JobThread { - tx: mpsc::Sender>>, - close: Arc, -} - -impl Clone for JobThread { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - close: self.close.clone(), - } +impl SingleCoreExecutor { + fn from_inner(inner: SingleCoreExecutorInner) -> Self { + Self { inner: Arc::new(inner) } } -} -type Job = dyn FnOnce(&mut T) + Send; + /// Create a `SingleCoreExecutor` which runs jobs in [`tokio::runtime::Handle::current`]. + /// + /// Callers should most likely instead construct a `SingleCoreExecutor` via [`JobCores::take`], + /// which will intelligently pin each database to a particular core. + /// This method should only be used for short-lived instances which do not perform intense computation, + /// e.g. to extract the schema by calling `describe_module`. + pub fn in_current_tokio_runtime() -> Self { + Self::from_inner(SingleCoreExecutorInner::without_load_balancing( + runtime::Handle::current(), + )) + } -impl JobThread { - /// Run a blocking job on this `JobThread`. + /// Run a job for this database executor. /// - /// The job (`f`) will be placed in a queue, and will run strictly after - /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to - /// the calling task. - pub async fn run(&self, f: F) -> Result + /// `f` must not perform any `Tokio::spawn_blocking` blocking operations. + pub async fn run_job(&self, f: F) -> R where - F: FnOnce(&mut T) -> R + Send + 'static, + F: Future + Send + 'static, R: Send + 'static, { - let (ret_tx, ret_rx) = oneshot::channel(); - - let span = tracing::Span::current(); - self.tx - .send(Box::new(move |data| { - let _entered = span.entered(); - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(data))); - if let Err(Err(_panic)) = ret_tx.send(result) { - tracing::warn!("uncaught panic on threadpool") - } - })) - .await - .map_err(|_| JobThreadClosed)?; - - match ret_rx.await { - Ok(Ok(ret)) => Ok(ret), - Ok(Err(panic)) => std::panic::resume_unwind(panic), - Err(_closed) => Err(JobThreadClosed), + // Clone the handle rather than holding the `watch::Ref` alive + // because `watch::Ref` is not `Send`. + let handle = runtime::Handle::clone(&*self.inner.runtime.borrow()); + + match handle.spawn(f).await { + Ok(r) => r, + Err(e) => std::panic::resume_unwind(e.into_panic()), } } - /// Shutdown the job thread. - pub fn close(&self) { - self.close.notify(); + /// Run `f` on this database executor and return its result. + pub async fn run_sync_job(&self, f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.run_job(async { f() }).await } - /// Returns a future that resolves once the job thread has been closed. - pub async fn closed(&self) { - self.tx.closed().await + pub fn downgrade(&self) -> WeakSingleCoreExecutor { + WeakSingleCoreExecutor { + inner: Arc::downgrade(&self.inner), + } } +} - /// Obtain a weak version of this handle. - pub fn downgrade(&self) -> WeakJobThread { - let tx = self.tx.downgrade(); - let close = Arc::downgrade(&self.close); - WeakJobThread { tx, close } - } +/// On drop, tells the [`JobCores`] that this database is no longer occupying its Tokio runtime, +/// allowing databases from more-contended runtimes/cores to migrate there. +struct LoadBalanceOnDropGuard { + manager: Weak>, + database_executor_id: SingleCoreExecutorId, } -pub struct JobThreadClosed; +impl Drop for LoadBalanceOnDropGuard { + fn drop(&mut self) { + if let Some(cores) = self.manager.upgrade() { + cores.lock().unwrap().deallocate(self.database_executor_id); + } + } +} /// A weak version of `JobThread` that does not hold the thread open. // used in crate::core::module_host::WeakModuleHost -pub struct WeakJobThread { - tx: mpsc::WeakSender>>, - close: Weak, +#[derive(Clone)] +pub struct WeakSingleCoreExecutor { + inner: Weak, } -impl WeakJobThread { - pub fn upgrade(&self) -> Option> { - Option::zip(self.tx.upgrade(), self.close.upgrade()).map(|(tx, close)| JobThread { tx, close }) +impl WeakSingleCoreExecutor { + pub fn upgrade(&self) -> Option { + self.inner.upgrade().map(|inner| SingleCoreExecutor { inner }) } } diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index d8161404e0d..cf1ec661a23 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,4 +1,5 @@ use crate::hash::Hash; +use crate::messages::control_db::HostType; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -313,6 +314,17 @@ metrics_group!( #[labels(db: Identity)] pub replay_commitlog_num_commits: IntGaugeVec, + #[name = spacetime_module_create_instance_time_seconds] + #[help = "Time taken to construct a WASM instance or V8 isolate to run module code"] + #[labels(db: Identity, module_type: HostType)] + // As of writing (pgoldman 2025-09-25), calls to `create_instance` are rare, + // as they happen only when an instance traps (panics). + // However, this is not once-per-process, unlike the above replay metrics. + // I (pgoldman 2025-09-25) am not sure what range or distribution of values to expect, + // so I'm making up some buckets based on what I imagine are the upper and lower bounds of plausibility. + #[buckets(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 50, 100)] + pub module_create_instance_time_seconds: HistogramVec, + #[name = spacetime_snapshot_creation_time_total_sec] #[help = "The time (in seconds) it took to take and store a database snapshot, including scheduling overhead"] #[labels(db: Identity)] diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index e3fc8e55c53..4c33206a5ed 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -497,7 +497,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args, JobCores::default()).await + start::exec(&args, JobCores::without_pinned_cores(tokio::runtime::Handle::current())).await } #[cfg(test)] @@ -537,11 +537,22 @@ mod tests { websocket: WebSocketOptions::default(), }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?; + let _env = StandaloneEnv::init( + config, + &ca, + data_dir.clone(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()), + ) + .await?; // Ensure that we have a lock. - assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()) - .await - .is_err()); + assert!(StandaloneEnv::init( + config, + &ca, + data_dir.clone(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()) + ) + .await + .is_err()); Ok(()) } diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 81aee00bfa3..ca266f27d76 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -76,5 +76,13 @@ fn main() -> anyhow::Result<()> { cores.tokio.configure(&mut builder); let rt = builder.build().unwrap(); cores.rayon.configure(rt.handle()); - rt.block_on(async_main(cores.databases)) + let database_cores = cores.databases.make_database_runners(rt.handle()); + + // Keep a handle on the `database_cores` alive outside of `async_main` + // and explicitly drop it to avoid dropping it from an `async` context - + // Tokio gets angry when you drop a runtime within another runtime. + let res = rt.block_on(async_main(database_cores.clone())); + drop(database_cores); + + res } diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 1e7fb48f637..150171871e9 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -187,7 +187,7 @@ impl CompiledModule { }, &certs, paths.data_dir.into(), - JobCores::default(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()), ) .await .unwrap();