From f26e20f1d99a66e7422ff86a037bd4aa80b44963 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 12 Sep 2025 08:42:17 -0400 Subject: [PATCH 01/16] Rewrite `jobs.rs` to use single-threaded Tokio runtimes At this stage, changes are not integrated, so build is broken. --- Cargo.lock | 17 ++ Cargo.toml | 1 + crates/core/src/host/wasmtime/mod.rs | 12 +- crates/core/src/util/jobs.rs | 409 +++++++++++++-------------- 4 files changed, 224 insertions(+), 215 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 87c02c91a7b..89b27634440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7721,6 +7721,7 @@ checksum = "f38dbf42dc56a6fe41ccd77211ea8ec90855de05e52cd00df5a0a3bca87d6147" dependencies = [ "addr2line 0.22.0", "anyhow", + "async-trait", "bitflags 2.9.0", "bumpalo", "cc", @@ -7751,6 +7752,7 @@ dependencies = [ "wasmtime-component-macro", "wasmtime-cranelift", "wasmtime-environ", + "wasmtime-fiber", "wasmtime-jit-icache-coherence", "wasmtime-slab", "wasmtime-versioned-export-macros", @@ -7857,6 +7859,21 @@ dependencies = [ "wasmtime-types", ] +[[package]] +name = "wasmtime-fiber" +version = "25.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "665ccc1bb0f28496e6fa02e94c575ee9ad6e3202c7df8591e5dda78106d5aa4a" +dependencies = [ + "anyhow", + "cc", + "cfg-if", + "rustix 0.38.44", + "wasmtime-asm-macros", + "wasmtime-versioned-export-macros", + "windows-sys 0.52.0", +] + [[package]] name = "wasmtime-jit-icache-coherence" version = "25.0.3" diff --git a/Cargo.toml b/Cargo.toml index 76529fe1ef9..998bc040c2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -308,6 +308,7 @@ version = "25" default-features = false features = [ "addr2line", + "async", "cache", "cranelift", "demangle", diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index 5a61d5e23ab..ea114d48c1a 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -36,7 +36,17 @@ impl WasmtimeRuntime { .cranelift_opt_level(wasmtime::OptLevel::Speed) .consume_fuel(true) .epoch_interruption(true) - .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); + .wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable) + // We need async support to enable suspending execution of procedures + // when waiting for e.g. HTTP responses or the transaction lock. + // We don't enable either fuel-based or epoch-based yielding + // (see https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.epoch_deadline_async_yield_and_update + // and https://docs.wasmtime.dev/api/wasmtime/struct.Store.html#method.fuel_async_yield_interval) + // so reducers will always execute to completion during the first `Future::poll` call, + // and procedures will only yield when performing an asynchronous operation. + // These futures are executed on a separate single-threaded executor not related to the "global" Tokio runtime, + // which is responsible only for executing WASM. See `crate::util::jobs` for this infrastructure. + .async_support(true); // Offer a compile-time flag for enabling perfmap generation, // so `perf` can display JITted symbol names. diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index b31be07727e..1cba69dcf9b 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -1,108 +1,184 @@ +use std::future::Future; use std::sync::{Arc, Mutex, Weak}; use core_affinity::CoreId; use indexmap::IndexMap; use smallvec::SmallVec; use spacetimedb_data_structures::map::HashMap; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::runtime; +use tokio::sync::watch; -use super::notify_once::NotifyOnce; - -/// A handle to a pool of CPU cores for running job threads on. +/// A handle to a pool of Tokio executors for running database WASM code on. /// -/// Each thread is represented by a [`JobThread`], which is pinned to a single -/// core and sequentially runs the jobs that are passed to [`JobThread::run`]. -/// This pool attempts to keep the number of `JobThread`s pinned to each core -/// as equitable as possible; new threads allocated by [`Self::take()`] are -/// assigned to cores in a round-robin fashion, and when a thread exits, it -/// takes a thread pinned to a busier core and repins it to the core it was -/// just running on. +/// Each database has a [`DatabaseExecutor`], +/// a handle to a single-threaded Tokio runtime which is pinned to a specific CPU core. +/// In multi-tenant environments, multiple databases' [`DatabaseExecutor`]s may be handles on the same runtime/core, +/// and a [`DatabaseExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// -/// Construction is done via the `FromIterator` impl. If created from an empty -/// iterator or via `JobCores::default()`, the job threads will work but not be -/// pinned to any threads. +/// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. +/// a `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, +/// will use the "global" Tokio executor to run database jobs, +/// rather than creating multiple un-pinned single-threaded runtimes. /// -/// This handle is cheaply cloneable. If all instances of it are dropped, -/// threads will continue running, but will no longer repin each other -/// when one exits. -#[derive(Default, Clone)] +/// This handle is cheaply cloneable, but at least one handle must be kept alive. +/// If all instances of it are dropped, the per-thread [`runtime::Runtime`]s will be dropped, +/// and so will stop executing jobs for databases. +#[derive(Clone)] pub struct JobCores { - inner: Option>>, + inner: JobCoresInner, +} + +#[derive(Clone)] +enum JobCoresInner { + PinnedCores(Arc>), + NoPinning(runtime::Handle), } -struct JobCoresInner { - /// A map to the repin_tx for each job thread - job_threads: HashMap>, +struct PinnedCoresExecutorManager { + /// Channels to request that a [`DatabaseExecutor`] move to a different Tokio runtime. + /// + /// Alongside each channel is the [`CoreId`] of the runtime to which that [`DatabaseExecutor`] is currently pinned. + /// This is used as an index into `self.cores` to make load-balancing decisions when freeing a database executor + /// in [`Self::deallocate`]. + database_executor_move: HashMap)>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// /// This acts as a partition point in `cores`; all cores in `..index` have /// one fewer job on them than the cores in `index..`. next_core: usize, - next_id: JobThreadId, + next_id: DatabaseExecutorId, } -#[derive(Default)] +/// Stores the [`tokio::Runtime`] pinned to a particular core, +/// and remembers the [`DatabaseExecutorId`]s for all databases sharing that executor. struct CoreInfo { - jobs: SmallVec<[JobThreadId; 4]>, + jobs: SmallVec<[DatabaseExecutorId; 4]>, + tokio_runtime: runtime::Runtime, +} + +impl CoreInfo { + fn spawn_executor(id: CoreId) -> CoreInfo { + let runtime = runtime::Builder::new_multi_thread() + .worker_threads(1) + // [`DatabaseExecutor`]s should only be executing Wasmtime WASM futures, + // and so should never be doing [`Tokio::spawn_blocking`] or performing blocking I/O. + // However, `max_blocking_threads` will panic if passed 0, so we set a limit of 1 + // and use `on_thread_start` to log an error when spawning a blocking task. + .max_blocking_threads(1) + .on_thread_start({ + use std::sync::atomic::{AtomicBool, Ordering}; + let already_spawned_worker = AtomicBool::new(false); + move || { + // `Ordering::Relaxed`: No synchronization is happening here; + // we're not writing to any other memory or coordinating with any other atomic places. + // We rely on Tokio's infrastructure to impose a happens-before relationship + // between spawning worker threads and spawning blocking threads itself. + if already_spawned_worker.swap(true, Ordering::Relaxed) { + // We're spawning a blocking thread, naughty! + log::error!( + "`JobCores` Tokio runtime for `DatabaseExecutor` use on core {id:?} spawned a blocking thread!" + ); + } else { + // We're spawning our 1 worker, so pin it to the appropriate thread. + core_affinity::set_for_current(id); + } + } + }) + .build() + .expect("Failed to start Tokio executor for `DatabaseExecutor`"); + CoreInfo { + jobs: SmallVec::new(), + tokio_runtime: runtime, + } + } } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct JobThreadId(usize); +struct DatabaseExecutorId(usize); impl JobCores { - /// Reserve a core from the pool to later start a job thread on. - pub fn take(&self) -> JobCore { - let inner = if let Some(inner) = &self.inner { - let cores = Arc::downgrade(inner); - let (id, repin_rx) = inner.lock().unwrap().allocate(); - Some(JobCoreInner { - repin_rx, - _guard: JobCoreGuard { cores, id }, - }) - } else { - None + /// Get a handle on a [`DatabaseExecutor`] to later run a database's jobs on. + pub fn take(&self) -> DatabaseExecutor { + let database_executor_inner = match &self.inner { + JobCoresInner::NoPinning(handle) => DatabaseExecutorInner { + runtime: watch::channel(handle.clone()).1, + _guard: None, + }, + JobCoresInner::PinnedCores(manager) => { + let manager_weak = Arc::downgrade(manager); + let (database_executor_id, move_runtime_rx) = manager.lock().unwrap().allocate(); + DatabaseExecutorInner { + runtime: move_runtime_rx, + _guard: Some(LoadBalanceOnDropGuard { + manager: manager_weak, + database_executor_id, + }), + } + } }; - - JobCore { inner } + DatabaseExecutor { + inner: Arc::new(database_executor_inner), + } } -} -impl FromIterator for JobCores { - fn from_iter>(iter: T) -> Self { - let cores: IndexMap<_, _> = iter.into_iter().map(|id| (id, CoreInfo::default())).collect(); - let inner = (!cores.is_empty()).then(|| { - Arc::new(Mutex::new(JobCoresInner { - job_threads: HashMap::default(), + /// Construct a [`JobCores`] which runs one Tokio runtime on each of the `cores`, + /// and pins each database to a particular runtime/core. + /// + /// If `cores` is empty, this falls back to [`Self::without_pinned_cores`] + /// and runs all databases in the `global_runtime`. + pub fn from_pinned_cores(cores: impl IntoIterator, global_runtime: runtime::Handle) -> Self { + let cores: IndexMap<_, _> = cores.into_iter().map(|id| (id, CoreInfo::spawn_executor(id))).collect(); + let inner = if cores.is_empty() { + JobCoresInner::NoPinning(global_runtime) + } else { + JobCoresInner::PinnedCores(Arc::new(Mutex::new(PinnedCoresExecutorManager { + database_executor_move: HashMap::default(), cores, next_core: 0, - next_id: JobThreadId(0), - })) - }); + next_id: DatabaseExecutorId(0), + }))) + }; + Self { inner } } + + /// Construct a [`JobCores`] which does not perform any core pinning, + /// and just runs all database jobs in `global_runtime`. + /// + /// This will be used in deployments where there aren't enough available CPU cores + /// to reserve specific cores for database WASM execution. + pub fn without_pinned_cores(global_runtime: runtime::Handle) -> Self { + Self { + inner: JobCoresInner::NoPinning(global_runtime), + } + } } -impl JobCoresInner { - fn allocate(&mut self) -> (JobThreadId, watch::Receiver) { - let id = self.next_id; +impl PinnedCoresExecutorManager { + fn allocate(&mut self) -> (DatabaseExecutorId, watch::Receiver) { + let database_executor_id = self.next_id; self.next_id.0 += 1; - let (&core_id, core) = self.cores.get_index_mut(self.next_core).unwrap(); - core.jobs.push(id); + let (&core_id, runtime_handle) = { + let (core_id, core_info) = self.cores.get_index_mut(self.next_core).unwrap(); + core_info.jobs.push(database_executor_id); + (core_id, core_info.tokio_runtime.handle().clone()) + }; self.next_core = (self.next_core + 1) % self.cores.len(); - let (repin_tx, repin_rx) = watch::channel(core_id); - self.job_threads.insert(id, repin_tx); + let (move_runtime_tx, move_runtime_rx) = watch::channel(runtime_handle); + self.database_executor_move + .insert(database_executor_id, (core_id, move_runtime_tx)); - (id, repin_rx) + (database_executor_id, move_runtime_rx) } /// Run when a `JobThread` exits. - fn deallocate(&mut self, id: JobThreadId) { - let core_id = *self.job_threads.remove(&id).unwrap().borrow(); + fn deallocate(&mut self, id: DatabaseExecutorId) { + let (freed_core_id, _) = self.database_executor_move.remove(&id).unwrap(); - let core_index = self.cores.get_index_of(&core_id).unwrap(); + let core_index = self.cores.get_index_of(&freed_core_id).unwrap(); // This core is now less busy than it should be - bump `next_core` back // by 1 and steal a thread from the core there. @@ -113,13 +189,14 @@ impl JobCoresInner { let steal_from_index = self.next_core.checked_sub(1).unwrap_or(self.cores.len() - 1); // if this core was already at `next_core - 1`, we don't need to steal from anywhere - let (core, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { + let (core_info, steal_from) = match self.cores.get_disjoint_indices_mut([core_index, steal_from_index]) { Ok([(_, core), (_, steal_from)]) => (core, Some(steal_from)), Err(_) => (&mut self.cores[core_index], None), }; - let pos = core.jobs.iter().position(|x| *x == id).unwrap(); - core.jobs.remove(pos); + let pos = core_info.jobs.iter().position(|x| *x == id).unwrap(); + // Swap remove because we don't care about ordering within `core_info.jobs` + core_info.jobs.swap_remove(pos); if let Some(steal_from) = steal_from { // This unwrap will never fail, since cores below `next_core` always have @@ -131,185 +208,89 @@ impl JobCoresInner { let stolen = steal_from.jobs.pop().unwrap(); // the way we pop and push here means that older job threads will be less // likely to be repinned, while younger ones are liable to bounce around. - core.jobs.push(stolen); - self.job_threads[&stolen].send_replace(core_id); + // Our use of `swap_remove` above makes this not entirely predictable, however. + core_info.jobs.push(stolen); + let (ref mut stolen_core_id, migrate_tx) = self.database_executor_move.get_mut(&stolen).unwrap(); + *stolen_core_id = freed_core_id; + migrate_tx.send_replace(core_info.tokio_runtime.handle().clone()); } self.next_core = steal_from_index; } } -/// A core taken from [`JobCores`], not yet running a job loop. -#[derive(Default)] -pub struct JobCore { - inner: Option, -} - -struct JobCoreInner { - repin_rx: watch::Receiver, - _guard: JobCoreGuard, -} - -impl JobCore { - /// Start running a job thread on this core. - /// - /// `init` constructs the data provided to each job, and `unsize` unsizes - /// it to `&mut T`, if necessary. - pub fn start(self, init: F, unsize: F2) -> JobThread - where - F: FnOnce() -> U + Send + 'static, - F2: FnOnce(&mut U) -> &mut T + Send + 'static, - U: 'static, - T: ?Sized + 'static, - { - let (tx, rx) = mpsc::channel::>>(Self::JOB_CHANNEL_LENGTH); - let close = Arc::new(NotifyOnce::new()); - - let closed = close.clone(); - let handle = tokio::runtime::Handle::current(); - std::thread::spawn(move || { - let mut data = init(); - let data = unsize(&mut data); - handle.block_on(self.job_loop(rx, closed, data)) - }); - - JobThread { tx, close } - } - - // this shouldn't matter too much, since callers will need to wait for - // the job to finish anyway. - const JOB_CHANNEL_LENGTH: usize = 64; - - async fn job_loop(mut self, mut rx: mpsc::Receiver>>, closed: Arc, data: &mut T) { - // this function is async because we need to recv on the repin channel - // and the jobs channel, but the jobs being run are blocking - - let repin_rx = self.inner.as_mut().map(|inner| &mut inner.repin_rx); - let repin_loop = async { - if let Some(rx) = repin_rx { - rx.mark_changed(); - while rx.changed().await.is_ok() { - core_affinity::set_for_current(*rx.borrow_and_update()); - } - } - }; - - let job_loop = async { - while let Some(job) = rx.recv().await { - // blocking in place means that other futures on the same task - // won't get polled - in this case, that's just the repin loop, - // which is fine because it can just run before the next job. - tokio::task::block_in_place(|| job(data)) - } - }; - - tokio::select! { - () = super::also_poll(job_loop, repin_loop) => {} - // when we receive a close notification, we immediately drop all - // remaining jobs in the queue. - () = closed.notified() => {} - } - } -} - -/// On drop, tells the `JobCores` that this core has been freed up. -struct JobCoreGuard { - cores: Weak>, - id: JobThreadId, -} - -impl Drop for JobCoreGuard { - fn drop(&mut self) { - if let Some(cores) = self.cores.upgrade() { - cores.lock().unwrap().deallocate(self.id); - } - } -} - -/// A handle to a thread running a job loop; see [`JobCores`] for more details. +/// A handle to a Tokio executor which can be used to run WASM compute for a particular database. /// -/// The thread stores data of type `T`; jobs run on the thread will be given -/// mutable access to it. +/// Use [`Self::run_job`] to run futures, and [`Self::run_sync_job`] to run functions. /// -/// This handle is cheaply cloneable. If all strong handles have been dropped, -/// the thread will shut down. -pub struct JobThread { - tx: mpsc::Sender>>, - close: Arc, +/// This handle is cheaply cloneable. +/// When all handles on this database executor have been dropped, +/// its use of the core to which it is pinned will be released, +/// and other databases may be migrated to that core to balance load. +pub struct DatabaseExecutor { + inner: Arc, } -impl Clone for JobThread { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - close: self.close.clone(), - } - } -} +struct DatabaseExecutorInner { + /// Handle on the [`runtime::Runtime`] where this executor should run jobs. + /// + /// This will be occasionally updated by [`PinnedCoresExecutorManager::deallocate`] + /// to evenly distribute databases across the available runtimes/cores. + runtime: watch::Receiver, -type Job = dyn FnOnce(&mut T) + Send; + /// [`Drop`] guard which calls [`PinnedCoresExecutorManager::deallocate`] when this database dies, + /// allowing another database from a more-contended runtime/core to migrate here. + _guard: Option, +} -impl JobThread { - /// Run a blocking job on this `JobThread`. +impl DatabaseExecutor { + /// Run a job for this database executor. /// - /// The job (`f`) will be placed in a queue, and will run strictly after - /// jobs ahead of it in the queue. If `f` panics, it will be bubbled up to - /// the calling task. - pub async fn run(&self, f: F) -> Result + /// `f` must not perform any `Tokio::spawn_blocking` blocking operations. + pub async fn run_job(&self, f: F) -> R where - F: FnOnce(&mut T) -> R + Send + 'static, + F: Future + Send + 'static, R: Send + 'static, { - let (ret_tx, ret_rx) = oneshot::channel(); - - let span = tracing::Span::current(); - self.tx - .send(Box::new(move |data| { - let _entered = span.entered(); - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| f(data))); - if let Err(Err(_panic)) = ret_tx.send(result) { - tracing::warn!("uncaught panic on threadpool") - } - })) - .await - .map_err(|_| JobThreadClosed)?; - - match ret_rx.await { - Ok(Ok(ret)) => Ok(ret), - Ok(Err(panic)) => std::panic::resume_unwind(panic), - Err(_closed) => Err(JobThreadClosed), + match self.inner.runtime.borrow().spawn(f).await { + Ok(r) => r, + Err(e) => std::panic::resume_unwind(e.into_panic()), } } - /// Shutdown the job thread. - pub fn close(&self) { - self.close.notify(); + /// Run `f` on this database executor and return its result. + pub async fn run_sync_job(&self, f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.run_job(async { f() }).await } +} - /// Returns a future that resolves once the job thread has been closed. - pub async fn closed(&self) { - self.tx.closed().await - } +/// On drop, tells the [`JobCores`] that this database is no longer occupying its Tokio runtime, +/// allowing databases from more-contended runtimes/cores to migrate there. +struct LoadBalanceOnDropGuard { + manager: Weak>, + database_executor_id: DatabaseExecutorId, +} - /// Obtain a weak version of this handle. - pub fn downgrade(&self) -> WeakJobThread { - let tx = self.tx.downgrade(); - let close = Arc::downgrade(&self.close); - WeakJobThread { tx, close } +impl Drop for LoadBalanceOnDropGuard { + fn drop(&mut self) { + if let Some(cores) = self.manager.upgrade() { + cores.lock().unwrap().deallocate(self.database_executor_id); + } } } -pub struct JobThreadClosed; - /// A weak version of `JobThread` that does not hold the thread open. // used in crate::core::module_host::WeakModuleHost -pub struct WeakJobThread { - tx: mpsc::WeakSender>>, - close: Weak, +pub struct WeakDatabaseExecutor { + inner: Weak, } -impl WeakJobThread { - pub fn upgrade(&self) -> Option> { - Option::zip(self.tx.upgrade(), self.close.upgrade()).map(|(tx, close)| JobThread { tx, close }) +impl WeakDatabaseExecutor { + pub fn upgrade(&self) -> Option { + self.inner.upgrade().map(|inner| DatabaseExecutor { inner }) } } From 54b7d5add98955d832e1f141e60fbf1ab7233a35 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 13:19:53 -0400 Subject: [PATCH 02/16] Thread `jobs.rs` changes through all of `core` crate. --- crates/core/src/host/host_controller.rs | 22 +- crates/core/src/host/module_common.rs | 11 +- crates/core/src/host/module_host.rs | 313 ++++++++++++------ crates/core/src/host/v8/mod.rs | 38 +-- .../src/host/wasm_common/module_host_actor.rs | 32 +- crates/core/src/host/wasmtime/mod.rs | 18 +- crates/core/src/startup.rs | 26 +- crates/core/src/util/jobs.rs | 128 ++++--- crates/standalone/src/lib.rs | 2 +- 9 files changed, 365 insertions(+), 225 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index cf56f7c6ede..785c3ec3a76 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -14,7 +14,7 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; use crate::util::asyncify; -use crate::util::jobs::{JobCore, JobCores}; +use crate::util::jobs::{JobCores, SingleCoreExecutor}; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, ensure, Context}; use async_trait::async_trait; @@ -698,7 +698,7 @@ async fn make_module_host( program: Program, energy_monitor: Arc, unregister: impl Fn() + Send + Sync + 'static, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result<(Program, ModuleHost)> { // `make_actor` is blocking, as it needs to compile the wasm to native code, // which may be computationally expensive - sometimes up to 1s for a large module. @@ -718,12 +718,12 @@ async fn make_module_host( HostType::Wasm => { let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, core) + ModuleHost::new(actor, unregister, executor) } HostType::Js => { let actor = runtimes.v8.make_actor(mcc)?; trace!("v8::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, core) + ModuleHost::new(actor, unregister, executor) } }; Ok((program, module_host)) @@ -756,7 +756,7 @@ async fn launch_module( energy_monitor: Arc, replica_dir: ReplicaDir, runtimes: Arc, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result<(Program, LaunchedModule)> { let db_identity = database.database_identity; let host_type = database.host_type; @@ -773,7 +773,7 @@ async fn launch_module( program, energy_monitor.clone(), on_panic, - core, + executor, ) .await?; @@ -991,7 +991,7 @@ impl Host { page_pool: PagePool, database: Database, program: Program, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result> { // Even in-memory databases acquire a lockfile. // Grab a tempdir to put that lockfile in. @@ -1024,7 +1024,7 @@ impl Host { Arc::new(NullEnergyMonitor), phony_replica_dir, runtimes.clone(), - core, + executor, ) .await?; @@ -1057,7 +1057,7 @@ impl Host { policy: MigrationPolicy, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, - core: JobCore, + executor: SingleCoreExecutor, ) -> anyhow::Result { let replica_ctx = &self.replica_ctx; let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone()); @@ -1070,7 +1070,7 @@ impl Host { program, energy_monitor, on_panic, - core, + executor, ) .await?; @@ -1187,7 +1187,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an let runtimes = HostRuntimes::new(None); let page_pool = PagePool::new(None); - let core = JobCore::default(); + let core = SingleCoreExecutor::in_current_tokio_runtime(); let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?; // this should always succeed, but sometimes it doesn't let module_def = match Arc::try_unwrap(module_info) { diff --git a/crates/core/src/host/module_common.rs b/crates/core/src/host/module_common.rs index e93b68c1172..f92d0c6c5bf 100644 --- a/crates/core/src/host/module_common.rs +++ b/crates/core/src/host/module_common.rs @@ -3,10 +3,7 @@ use crate::{ energy::EnergyMonitor, - host::{ - module_host::{DynModule, ModuleInfo}, - Scheduler, - }, + host::{module_host::ModuleInfo, Scheduler}, module_host_context::ModuleCreationContext, replica_context::ReplicaContext, }; @@ -79,12 +76,12 @@ impl ModuleCommon { } } -impl DynModule for ModuleCommon { - fn replica_ctx(&self) -> &Arc { +impl ModuleCommon { + pub fn replica_ctx(&self) -> &Arc { &self.replica_context } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { &self.scheduler } } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 44645bd3a39..532e6ce0b66 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -17,15 +17,17 @@ use crate::subscription::execute_plan; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::BuildableWebsocketFormat; -use crate::util::jobs::{JobCore, JobThread, JobThreadClosed, WeakJobThread}; +use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; use bytes::Bytes; use derive_more::From; +use futures::lock::Mutex; use indexmap::IndexSet; use itertools::Itertools; use prometheus::{Histogram, IntGauge}; +use scopeguard::ScopeGuard; use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate}; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; @@ -46,7 +48,9 @@ use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed; use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; use spacetimedb_vm::relation::RelValue; +use std::collections::VecDeque; use std::fmt; +use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::sync::oneshot; @@ -311,24 +315,61 @@ impl ReducersMap { /// A runtime that can create modules. pub trait ModuleRuntime { /// Creates a module based on the context `mcc`. - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; } -pub trait DynModule: Send + Sync + 'static { - fn replica_ctx(&self) -> &Arc; - fn scheduler(&self) -> &Scheduler; +pub enum Module { + Wasm(super::wasmtime::Module), + Js(super::v8::JsModule), } -pub trait Module: DynModule { - type Instance: ModuleInstance; - type InitialInstances<'a>: IntoIterator + 'a; - fn initial_instances(&mut self) -> Self::InitialInstances<'_>; - fn info(&self) -> Arc; - fn create_instance(&self) -> Self::Instance; +pub enum Instance { + Wasm(super::wasmtime::ModuleInstance), + Js(super::v8::JsInstance), } -pub trait ModuleInstance: Send + 'static { - fn trapped(&self) -> bool; +impl Module { + fn replica_ctx(&self) -> &Arc { + match self { + Module::Wasm(module) => module.replica_ctx(), + Module::Js(module) => module.replica_ctx(), + } + } + + fn scheduler(&self) -> &Scheduler { + match self { + Module::Wasm(module) => module.scheduler(), + Module::Js(module) => module.scheduler(), + } + } + + fn initial_instances(&mut self) -> impl Iterator { + match self { + Module::Wasm(module) => itertools::Either::Left(module.initial_instances().into_iter().map(Instance::Wasm)), + Module::Js(module) => itertools::Either::Right(module.initial_instances().map(Instance::Js)), + } + } + fn info(&self) -> Arc { + match self { + Module::Wasm(module) => module.info(), + Module::Js(module) => module.info(), + } + } + fn create_instance(&self) -> Instance { + match self { + Module::Wasm(module) => Instance::Wasm(module.create_instance()), + Module::Js(module) => Instance::Js(module.create_instance()), + } + } +} + +impl Instance { + fn trapped(&self) -> bool { + match self { + Instance::Wasm(inst) => inst.trapped(), + Instance::Js(inst) => inst.trapped(), + } + } /// Update the module instance's database to match the schema of the module instance. fn update_database( @@ -336,11 +377,23 @@ pub trait ModuleInstance: Send + 'static { program: Program, old_module_info: Arc, policy: MigrationPolicy, - ) -> anyhow::Result; + ) -> anyhow::Result { + match self { + Instance::Wasm(inst) => inst.update_database(program, old_module_info, policy), + Instance::Js(inst) => inst.update_database(program, old_module_info, policy), + } + } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult; + fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + match self { + Instance::Wasm(inst) => inst.call_reducer(tx, params), + Instance::Js(inst) => inst.call_reducer(tx, params), + } + } } +pub trait ModuleInstance: Send + 'static {} + /// Creates the table for `table_def` in `stdb`. pub fn create_table_from_def( stdb: &RelationalDB, @@ -358,7 +411,7 @@ pub fn create_table_from_def( fn init_database( replica_ctx: &ReplicaContext, module_def: &ModuleDef, - inst: &mut dyn ModuleInstance, + inst: &mut Instance, program: Program, ) -> anyhow::Result> { log::debug!("init database"); @@ -437,49 +490,45 @@ pub struct CallReducerParams { pub args: ArgsTuple, } -// TODO: figure out how we want to handle traps. maybe it should just not return to the LendingPool and -// let the get_instance logic handle it? -struct AutoReplacingModuleInstance { - inst: T::Instance, - module: Arc, +struct ModuleInstanceManager { + instances: VecDeque, + module: Arc, } -impl AutoReplacingModuleInstance { - fn check_trap(&mut self) { - if self.inst.trapped() { - self.inst = self.module.create_instance() +impl ModuleInstanceManager { + fn get_instance(&mut self) -> Instance { + if let Some(inst) = self.instances.pop_back() { + inst + } else { + // TODO: should we be calling `create_instance` on the `SingleCoreExecutor` rather than the calling thread? + self.module.create_instance() } } -} -impl ModuleInstance for AutoReplacingModuleInstance { - fn trapped(&self) -> bool { - self.inst.trapped() - } - fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { - let ret = self.inst.update_database(program, old_module_info, policy); - self.check_trap(); - ret - } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let ret = self.inst.call_reducer(tx, params); - self.check_trap(); - ret + fn return_instance(&mut self, inst: Instance) { + if inst.trapped() { + // Don't return trapped instances; + // they may have left internal data structures in WASM linear memory in a bad state. + return; + } + + self.instances.push_front(inst); } } #[derive(Clone)] pub struct ModuleHost { pub info: Arc, - module: Arc, + module: Arc, /// Called whenever a reducer call on this host panics. on_panic: Arc, - job_tx: JobThread, + instance_manager: Arc>, + executor: SingleCoreExecutor, + + /// Marks whether this module has been closed by [`Self::exit`]. + /// + /// When this is true, most operations will fail with [`NoSuchModule`]. + closed: Arc, } impl fmt::Debug for ModuleHost { @@ -493,9 +542,11 @@ impl fmt::Debug for ModuleHost { pub struct WeakModuleHost { info: Arc, - inner: Weak, + inner: Weak, on_panic: Weak, - tx: WeakJobThread, + instance_manager: Weak>, + executor: WeakSingleCoreExecutor, + closed: Weak, } #[derive(Debug)] @@ -556,24 +607,29 @@ pub enum ClientConnectedError { } impl ModuleHost { - pub(super) fn new(module: impl Module, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore) -> Self { + pub(super) fn new( + module: Module, + on_panic: impl Fn() + Send + Sync + 'static, + executor: SingleCoreExecutor, + ) -> Self { let info = module.info(); let module = Arc::new(module); let on_panic = Arc::new(on_panic); let module_clone = module.clone(); - let job_tx = core.start( - move || AutoReplacingModuleInstance { - inst: module_clone.create_instance(), - module: module_clone, - }, - |x| x as &mut dyn ModuleInstance, - ); + + let instance_manager = Arc::new(Mutex::new(ModuleInstanceManager { + instances: VecDeque::new(), + module: module_clone, + })); + ModuleHost { info, module, on_panic, - job_tx, + instance_manager, + executor, + closed: Arc::new(AtomicBool::new(false)), } } @@ -587,6 +643,20 @@ impl ModuleHost { &self.info.subscriptions } + fn is_marked_closed(&self) -> bool { + // `self.closed` isn't used for any synchronization, it's just a shared flag, + // so `Ordering::Relaxed` is sufficient. + self.closed.load(std::sync::atomic::Ordering::Relaxed) + } + + fn guard_closed(&self) -> Result<(), NoSuchModule> { + if self.is_marked_closed() { + Err(NoSuchModule) + } else { + Ok(()) + } + } + /// Run a function on the JobThread for this module. /// This would deadlock if it is called within another call to `on_module_thread`. /// Since this is async, and `f` is sync, deadlocking shouldn't be a problem. @@ -595,20 +665,22 @@ impl ModuleHost { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - // Run the provided function on the module instance. - // This is a convenience method that ensures the module instance is available - // and handles any errors that may occur. - self.call(label, |_| f()) - .await - .map_err(|_| anyhow::Error::from(NoSuchModule)) + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + + let res = self + .executor + .run_sync_job(move || { + drop(timer_guard); + f() + }) + .await; + + Ok(res) } - /// Run a function on the JobThread for this module which has access to the module instance. - async fn call(&self, label: &str, f: F) -> Result - where - F: FnOnce(&mut dyn ModuleInstance) -> R + Send + 'static, - R: Send + 'static, - { + fn start_call_timer(&self, label: &str) -> ScopeGuard<(), impl FnOnce(())> { // Record the time until our function starts running. let queue_timer = WORKER_METRICS .reducer_wait_time @@ -626,19 +698,28 @@ impl ModuleHost { .observe(queue_length as f64); } // Ensure that we always decrement the gauge. - let timer_guard = scopeguard::guard((), move |_| { + scopeguard::guard((), move |_| { // Decrement the queue length gauge when we're done. // This is done in a defer so that it happens even if the reducer call panics. queue_length_gauge.dec(); queue_timer.stop_and_record(); - }); + }) + } + + /// Run a function on the JobThread for this module which has access to the module instance. + async fn call(&self, label: &str, f: F) -> Result + where + F: FnOnce(&mut Instance) -> R + Send + 'static, + R: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); // Operations on module instances (e.g. calling reducers) is blocking, // partially because the computation can potentialyl take a long time // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` inside of `asyncify()`, which runs - // the provided closure in a tokio blocking task, and bubbles up any - // panic that may occur. + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. // If a reducer call panics, we **must** ensure to call `self.on_panic` // so that the module is discarded by the host controller. @@ -646,13 +727,21 @@ impl ModuleHost { log::warn!("reducer {label} panicked"); (self.on_panic)(); }); - self.job_tx - .run(move |inst| { + + let mut instance = self.instance_manager.lock().await.get_instance(); + + let (res, instance) = self + .executor + .run_sync_job(move || { drop(timer_guard); - f(inst) + let res = f(&mut instance); + (res, instance) }) - .await - .map_err(|_: JobThreadClosed| NoSuchModule) + .await; + + self.instance_manager.lock().await.return_instance(instance); + + Ok(res) } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -762,14 +851,14 @@ impl ModuleHost { } }) .await - .map_err(Into::::into)? + .map_err(ReducerCallError::from)? } pub fn call_identity_disconnected_inner( &self, caller_identity: Identity, caller_connection_id: ConnectionId, - inst: &mut dyn ModuleInstance, + inst: &mut Instance, ) -> Result<(), ReducerCallError> { let me = self.clone(); let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); @@ -877,8 +966,8 @@ impl ModuleHost { self.call("call_identity_disconnected", move |inst| { me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) }) - .await - .map_err(Into::::into)? + .await? + .map_err(Into::into) } async fn call_reducer_inner( @@ -896,23 +985,23 @@ impl ModuleHost { let args = args.into_tuple(reducer_seed)?; let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); - self.call(&reducer_def.name, move |inst| { - inst.call_reducer( - None, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - ) - }) - .await - .map_err(Into::into) + Ok(self + .call(&reducer_def.name, move |inst| { + inst.call_reducer( + None, + CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }, + ) + }) + .await?) } fn call_reducer_inner_with_inst( &self, @@ -924,7 +1013,7 @@ impl ModuleHost { reducer_id: ReducerId, reducer_def: &ReducerDef, args: ReducerArgs, - module_instance: &mut dyn ModuleInstance, + module_instance: &mut Instance, ) -> Result { let reducer_seed = ReducerArgsDeserializeSeed(self.info.module_def.typespace().with_type(reducer_def)); let args = args.into_tuple(reducer_seed)?; @@ -1006,7 +1095,7 @@ impl ModuleHost { // scheduled reducer name not fetched yet, anyway this is only for logging purpose const REDUCER: &str = "scheduled_reducer"; let module = self.info.clone(); - self.call(REDUCER, move |inst: &mut dyn ModuleInstance| { + self.call(REDUCER, move |inst: &mut Instance| { let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); match call_reducer_params(&mut tx) { @@ -1038,8 +1127,7 @@ impl ModuleHost { })), } }) - .await - .unwrap_or_else(|e| Err(e.into())) + .await? } pub fn subscribe_to_logs(&self) -> anyhow::Result> { @@ -1069,13 +1157,14 @@ impl ModuleHost { } pub async fn exit(&self) { + // As in `Self::marked_closed`, `Relaxed` is sufficient because we're not synchronizing any external state. + self.closed.store(true, std::sync::atomic::Ordering::Relaxed); self.module.scheduler().close(); - self.job_tx.close(); self.exited().await; } pub async fn exited(&self) { - tokio::join!(self.module.scheduler().closed(), self.job_tx.closed()); + self.module.scheduler().closed().await; } pub fn inject_logs(&self, log_level: LogLevel, message: &str) { @@ -1225,7 +1314,9 @@ impl ModuleHost { info: self.info.clone(), inner: Arc::downgrade(&self.module), on_panic: Arc::downgrade(&self.on_panic), - tx: self.job_tx.downgrade(), + instance_manager: Arc::downgrade(&self.instance_manager), + executor: self.executor.downgrade(), + closed: Arc::downgrade(&self.closed), } } @@ -1242,12 +1333,16 @@ impl WeakModuleHost { pub fn upgrade(&self) -> Option { let inner = self.inner.upgrade()?; let on_panic = self.on_panic.upgrade()?; - let tx = self.tx.upgrade()?; + let instance_manager = self.instance_manager.upgrade()?; + let executor = self.executor.upgrade()?; + let closed = self.closed.upgrade()?; Some(ModuleHost { info: self.info.clone(), module: inner, on_panic, - job_tx: tx, + instance_manager, + executor, + closed, }) } } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 92e1fb0c896..84df9b589a2 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use super::module_common::{build_common_module_from_raw, ModuleCommon}; -use super::module_host::{CallReducerParams, DynModule, Module, ModuleInfo, ModuleInstance, ModuleRuntime}; +use super::module_host::{CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{ @@ -39,8 +39,8 @@ pub struct V8Runtime { } impl ModuleRuntime for V8Runtime { - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { - V8_RUNTIME_GLOBAL.make_actor(mcc) + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { + V8_RUNTIME_GLOBAL.make_actor(mcc).map(Module::Js) } } @@ -71,7 +71,7 @@ impl V8RuntimeInner { Self { _priv: () } } - fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { + fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result { #![allow(unreachable_code, unused_variables)] log::trace!( @@ -93,49 +93,43 @@ impl V8RuntimeInner { } #[derive(Clone)] -struct JsModule { +pub struct JsModule { common: ModuleCommon, } -impl DynModule for JsModule { - fn replica_ctx(&self) -> &Arc { +impl JsModule { + pub fn replica_ctx(&self) -> &Arc { self.common.replica_ctx() } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { self.common.scheduler() } -} - -impl Module for JsModule { - type Instance = JsInstance; - - type InitialInstances<'a> = std::iter::Empty; - fn initial_instances(&mut self) -> Self::InitialInstances<'_> { + pub fn initial_instances(&mut self) -> impl Iterator { std::iter::empty() } - fn info(&self) -> Arc { + pub fn info(&self) -> Arc { self.common.info().clone() } - fn create_instance(&self) -> Self::Instance { + pub fn create_instance(&self) -> JsInstance { todo!() } } -struct JsInstance { +pub struct JsInstance { common: InstanceCommon, replica_ctx: Arc, } -impl ModuleInstance for JsInstance { - fn trapped(&self) -> bool { +impl JsInstance { + pub fn trapped(&self) -> bool { self.common.trapped } - fn update_database( + pub fn update_database( &mut self, program: Program, old_module_info: Arc, @@ -146,7 +140,7 @@ impl ModuleInstance for JsInstance { .update_database(replica_ctx, program, old_module_info, policy) } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { // TODO(centril): snapshots, module->host calls let mut isolate = Isolate::new(<_>::default()); let scope = &mut HandleScope::new(&mut isolate); diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index f46f69ed2be..29f2581a566 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -12,8 +12,7 @@ use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerpri use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ - CallReducerParams, DatabaseUpdate, DynModule, EventStatus, Module, ModuleEvent, ModuleFunctionCall, ModuleInfo, - ModuleInstance, + CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; use crate::host::{ArgsTuple, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, UpdateDatabaseResult}; use crate::identity::Identity; @@ -77,7 +76,7 @@ pub struct ExecuteResult { pub call_result: Result>, anyhow::Error>, } -pub(crate) struct WasmModuleHostActor { +pub struct WasmModuleHostActor { module: T::InstancePre, initial_instance: Option>>, common: ModuleCommon, @@ -174,30 +173,23 @@ impl WasmModuleHostActor { } } -impl DynModule for WasmModuleHostActor { - fn replica_ctx(&self) -> &Arc { +impl WasmModuleHostActor { + pub fn replica_ctx(&self) -> &Arc { self.common.replica_ctx() } - fn scheduler(&self) -> &Scheduler { + pub fn scheduler(&self) -> &Scheduler { self.common.scheduler() } -} - -impl Module for WasmModuleHostActor { - type Instance = WasmModuleInstance; - - type InitialInstances<'a> = Option; - - fn initial_instances(&mut self) -> Self::InitialInstances<'_> { + pub fn initial_instances(&mut self) -> Option> { self.initial_instance.take().map(|x| *x) } - fn info(&self) -> Arc { + pub fn info(&self) -> Arc { self.common.info() } - fn create_instance(&self) -> Self::Instance { + pub fn create_instance(&self) -> WasmModuleInstance { let common = &self.common; let env = InstanceEnv::new(common.replica_ctx().clone(), common.scheduler().clone()); // this shouldn't fail, since we already called module.create_instance() @@ -224,12 +216,12 @@ impl std::fmt::Debug for WasmModuleInstance { } } -impl ModuleInstance for WasmModuleInstance { - fn trapped(&self) -> bool { +impl WasmModuleInstance { + pub fn trapped(&self) -> bool { self.common.trapped } - fn update_database( + pub fn update_database( &mut self, program: Program, old_module_info: Arc, @@ -240,7 +232,7 @@ impl ModuleInstance for WasmModuleInstance { .update_database(replica_ctx, program, old_module_info, policy) } - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) } } diff --git a/crates/core/src/host/wasmtime/mod.rs b/crates/core/src/host/wasmtime/mod.rs index ea114d48c1a..e2bbe322713 100644 --- a/crates/core/src/host/wasmtime/mod.rs +++ b/crates/core/src/host/wasmtime/mod.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Context; use spacetimedb_paths::server::{ServerDataDir, WasmtimeCacheDir}; -use wasmtime::{Engine, Linker, Module, StoreContext, StoreContextMut}; +use wasmtime::{self, Engine, Linker, StoreContext, StoreContextMut}; use crate::energy::{EnergyQuanta, ReducerBudget}; use crate::error::NodesError; @@ -13,11 +13,11 @@ use crate::module_host_context::ModuleCreationContext; mod wasm_instance_env; mod wasmtime_module; -use wasmtime_module::WasmtimeModule; +use wasmtime_module::{WasmtimeInstance, WasmtimeModule}; use self::wasm_instance_env::WasmInstanceEnv; -use super::wasm_common::module_host_actor::InitializationError; +use super::wasm_common::module_host_actor::{InitializationError, WasmModuleInstance}; use super::wasm_common::{abi, module_host_actor::WasmModuleHostActor, ModuleCreationError}; pub struct WasmtimeRuntime { @@ -93,9 +93,13 @@ impl WasmtimeRuntime { } } +pub type Module = WasmModuleHostActor; +pub type ModuleInstance = WasmModuleInstance; + impl ModuleRuntime for WasmtimeRuntime { - fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result { - let module = Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; + fn make_actor(&self, mcc: ModuleCreationContext) -> anyhow::Result { + let module = + wasmtime::Module::new(&self.engine, &mcc.program.bytes).map_err(ModuleCreationError::WasmCompileError)?; let func_imports = module .imports() @@ -111,7 +115,9 @@ impl ModuleRuntime for WasmtimeRuntime { let module = WasmtimeModule::new(module); - WasmModuleHostActor::new(mcc, module).map_err(Into::into) + WasmModuleHostActor::new(mcc, module) + .map_err(Into::into) + .map(super::module_host::Module::Wasm) } } diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index 140e733799e..fd73bf15037 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -261,7 +261,7 @@ impl CoreReservations { #[derive(Default)] pub struct Cores { /// The cores to run database instances on. - pub databases: JobCores, + pub databases: DatabaseCores, /// The cores to run tokio worker threads on. pub tokio: TokioCores, /// The cores to run rayon threads on. @@ -286,8 +286,8 @@ impl Cores { let [_irq, reserved, databases, tokio_workers, rayon] = reservations.apply(&mut cores); + let databases = DatabaseCores(databases); let reserved = (!reserved.is_empty()).then(|| reserved.into()); - let databases = databases.into_iter().collect::(); let rayon = RayonCores((!rayon.is_empty()).then_some(rayon)); // see comment on `TokioCores.blocking` @@ -424,3 +424,25 @@ fn thread_spawn_handler( Ok(()) } } + +#[derive(Default)] +pub struct DatabaseCores(Vec); + +impl DatabaseCores { + /// Construct a [`JobCores`] manager suitable for running database WASM code on. + /// + /// The `global_runtime` should be a [`tokio::runtime::Handle`] to the [`tokio::runtime::Runtime`] + /// constructed from the [`TokioCores`] of this [`Cores`]. + /// + /// ```rust + /// # use spacetimedb::startup::pin_threads; + /// let cores = pin_threads(); + /// let mut builder = tokio::runtime::Builder::new_multi_thread(); + /// cores.tokio.configure(&mut builder); + /// let mut rt = builder.build().unwrap(); + /// let database_cores = cores.databases.make_database_runners(rt.handle()); + /// ``` + pub fn make_database_runners(self, global_runtime: &tokio::runtime::Handle) -> JobCores { + JobCores::from_pinned_cores(self.0.into_iter(), global_runtime.clone()) + } +} diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 1cba69dcf9b..7024204f19c 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -10,10 +10,10 @@ use tokio::sync::watch; /// A handle to a pool of Tokio executors for running database WASM code on. /// -/// Each database has a [`DatabaseExecutor`], +/// Each database has a [`SingleCoreExecutor`], /// a handle to a single-threaded Tokio runtime which is pinned to a specific CPU core. -/// In multi-tenant environments, multiple databases' [`DatabaseExecutor`]s may be handles on the same runtime/core, -/// and a [`DatabaseExecutor`] may occasionally be migrated to a different runtime/core to balance load. +/// In multi-tenant environments, multiple databases' [`SingleCoreExecutor`]s may be handles on the same runtime/core, +/// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// /// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. /// a `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, @@ -35,25 +35,25 @@ enum JobCoresInner { } struct PinnedCoresExecutorManager { - /// Channels to request that a [`DatabaseExecutor`] move to a different Tokio runtime. + /// Channels to request that a [`SingleCoreExecutor`] move to a different Tokio runtime. /// - /// Alongside each channel is the [`CoreId`] of the runtime to which that [`DatabaseExecutor`] is currently pinned. + /// Alongside each channel is the [`CoreId`] of the runtime to which that [`SingleCoreExecutor`] is currently pinned. /// This is used as an index into `self.cores` to make load-balancing decisions when freeing a database executor /// in [`Self::deallocate`]. - database_executor_move: HashMap)>, + database_executor_move: HashMap)>, cores: IndexMap, /// An index into `cores` of the next core to put a new job onto. /// /// This acts as a partition point in `cores`; all cores in `..index` have /// one fewer job on them than the cores in `index..`. next_core: usize, - next_id: DatabaseExecutorId, + next_id: SingleCoreExecutorId, } /// Stores the [`tokio::Runtime`] pinned to a particular core, -/// and remembers the [`DatabaseExecutorId`]s for all databases sharing that executor. +/// and remembers the [`SingleCoreExecutorId`]s for all databases sharing that executor. struct CoreInfo { - jobs: SmallVec<[DatabaseExecutorId; 4]>, + jobs: SmallVec<[SingleCoreExecutorId; 4]>, tokio_runtime: runtime::Runtime, } @@ -61,7 +61,7 @@ impl CoreInfo { fn spawn_executor(id: CoreId) -> CoreInfo { let runtime = runtime::Builder::new_multi_thread() .worker_threads(1) - // [`DatabaseExecutor`]s should only be executing Wasmtime WASM futures, + // [`SingleCoreExecutor`]s should only be executing Wasmtime WASM futures, // and so should never be doing [`Tokio::spawn_blocking`] or performing blocking I/O. // However, `max_blocking_threads` will panic if passed 0, so we set a limit of 1 // and use `on_thread_start` to log an error when spawning a blocking task. @@ -77,7 +77,7 @@ impl CoreInfo { if already_spawned_worker.swap(true, Ordering::Relaxed) { // We're spawning a blocking thread, naughty! log::error!( - "`JobCores` Tokio runtime for `DatabaseExecutor` use on core {id:?} spawned a blocking thread!" + "`JobCores` Tokio runtime for `SingleCoreExecutor` use on core {id:?} spawned a blocking thread!" ); } else { // We're spawning our 1 worker, so pin it to the appropriate thread. @@ -86,7 +86,7 @@ impl CoreInfo { } }) .build() - .expect("Failed to start Tokio executor for `DatabaseExecutor`"); + .expect("Failed to start Tokio executor for `SingleCoreExecutor`"); CoreInfo { jobs: SmallVec::new(), tokio_runtime: runtime, @@ -95,31 +95,16 @@ impl CoreInfo { } #[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -struct DatabaseExecutorId(usize); +struct SingleCoreExecutorId(usize); impl JobCores { - /// Get a handle on a [`DatabaseExecutor`] to later run a database's jobs on. - pub fn take(&self) -> DatabaseExecutor { + /// Get a handle on a [`SingleCoreExecutor`] to later run a database's jobs on. + pub fn take(&self) -> SingleCoreExecutor { let database_executor_inner = match &self.inner { - JobCoresInner::NoPinning(handle) => DatabaseExecutorInner { - runtime: watch::channel(handle.clone()).1, - _guard: None, - }, - JobCoresInner::PinnedCores(manager) => { - let manager_weak = Arc::downgrade(manager); - let (database_executor_id, move_runtime_rx) = manager.lock().unwrap().allocate(); - DatabaseExecutorInner { - runtime: move_runtime_rx, - _guard: Some(LoadBalanceOnDropGuard { - manager: manager_weak, - database_executor_id, - }), - } - } + JobCoresInner::NoPinning(handle) => SingleCoreExecutorInner::without_load_balancing(handle.clone()), + JobCoresInner::PinnedCores(manager) => SingleCoreExecutorInner::with_load_balancing(manager), }; - DatabaseExecutor { - inner: Arc::new(database_executor_inner), - } + SingleCoreExecutor::from_inner(database_executor_inner) } /// Construct a [`JobCores`] which runs one Tokio runtime on each of the `cores`, @@ -136,7 +121,7 @@ impl JobCores { database_executor_move: HashMap::default(), cores, next_core: 0, - next_id: DatabaseExecutorId(0), + next_id: SingleCoreExecutorId(0), }))) }; @@ -156,7 +141,7 @@ impl JobCores { } impl PinnedCoresExecutorManager { - fn allocate(&mut self) -> (DatabaseExecutorId, watch::Receiver) { + fn allocate(&mut self) -> (SingleCoreExecutorId, watch::Receiver) { let database_executor_id = self.next_id; self.next_id.0 += 1; @@ -175,7 +160,7 @@ impl PinnedCoresExecutorManager { } /// Run when a `JobThread` exits. - fn deallocate(&mut self, id: DatabaseExecutorId) { + fn deallocate(&mut self, id: SingleCoreExecutorId) { let (freed_core_id, _) = self.database_executor_move.remove(&id).unwrap(); let core_index = self.cores.get_index_of(&freed_core_id).unwrap(); @@ -227,11 +212,12 @@ impl PinnedCoresExecutorManager { /// When all handles on this database executor have been dropped, /// its use of the core to which it is pinned will be released, /// and other databases may be migrated to that core to balance load. -pub struct DatabaseExecutor { - inner: Arc, +#[derive(Clone)] +pub struct SingleCoreExecutor { + inner: Arc, } -struct DatabaseExecutorInner { +struct SingleCoreExecutorInner { /// Handle on the [`runtime::Runtime`] where this executor should run jobs. /// /// This will be occasionally updated by [`PinnedCoresExecutorManager::deallocate`] @@ -243,7 +229,44 @@ struct DatabaseExecutorInner { _guard: Option, } -impl DatabaseExecutor { +impl SingleCoreExecutorInner { + fn without_load_balancing(handle: runtime::Handle) -> Self { + SingleCoreExecutorInner { + runtime: watch::channel(handle).1, + _guard: None, + } + } + + fn with_load_balancing(manager: &Arc>) -> Self { + let manager_weak = Arc::downgrade(manager); + let (database_executor_id, move_runtime_rx) = manager.lock().unwrap().allocate(); + SingleCoreExecutorInner { + runtime: move_runtime_rx, + _guard: Some(LoadBalanceOnDropGuard { + manager: manager_weak, + database_executor_id, + }), + } + } +} + +impl SingleCoreExecutor { + fn from_inner(inner: SingleCoreExecutorInner) -> Self { + Self { inner: Arc::new(inner) } + } + + /// Create a `SingleCoreExecutor` which runs jobs in [`tokio::runtime::Handle::current`]. + /// + /// Callers should most likely instead construct a `SingleCoreExecutor` via [`JobCores::take`], + /// which will intelligently pin each database to a particular core. + /// This method should only be used for short-lived instances which do not perform intense computation, + /// e.g. to extract the schema by calling `describe_module`. + pub fn in_current_tokio_runtime() -> Self { + Self::from_inner(SingleCoreExecutorInner::without_load_balancing( + runtime::Handle::current(), + )) + } + /// Run a job for this database executor. /// /// `f` must not perform any `Tokio::spawn_blocking` blocking operations. @@ -252,7 +275,11 @@ impl DatabaseExecutor { F: Future + Send + 'static, R: Send + 'static, { - match self.inner.runtime.borrow().spawn(f).await { + // Clone the handle rather than holding the `watch::Ref` alive + // because `watch::Ref` is not `Send`. + let handle = runtime::Handle::clone(&*self.inner.runtime.borrow()); + + match handle.spawn(f).await { Ok(r) => r, Err(e) => std::panic::resume_unwind(e.into_panic()), } @@ -266,13 +293,19 @@ impl DatabaseExecutor { { self.run_job(async { f() }).await } + + pub fn downgrade(&self) -> WeakSingleCoreExecutor { + WeakSingleCoreExecutor { + inner: Arc::downgrade(&self.inner), + } + } } /// On drop, tells the [`JobCores`] that this database is no longer occupying its Tokio runtime, /// allowing databases from more-contended runtimes/cores to migrate there. struct LoadBalanceOnDropGuard { manager: Weak>, - database_executor_id: DatabaseExecutorId, + database_executor_id: SingleCoreExecutorId, } impl Drop for LoadBalanceOnDropGuard { @@ -285,12 +318,13 @@ impl Drop for LoadBalanceOnDropGuard { /// A weak version of `JobThread` that does not hold the thread open. // used in crate::core::module_host::WeakModuleHost -pub struct WeakDatabaseExecutor { - inner: Weak, +#[derive(Clone)] +pub struct WeakSingleCoreExecutor { + inner: Weak, } -impl WeakDatabaseExecutor { - pub fn upgrade(&self) -> Option { - self.inner.upgrade().map(|inner| DatabaseExecutor { inner }) +impl WeakSingleCoreExecutor { + pub fn upgrade(&self) -> Option { + self.inner.upgrade().map(|inner| SingleCoreExecutor { inner }) } } diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 9ebb9c23047..0062d18afff 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -522,7 +522,7 @@ pub async fn start_server(data_dir: &ServerDataDir, cert_dir: Option<&std::path: args.extend(["--jwt-key-dir".as_ref(), cert_dir.as_os_str()]) } let args = start::cli().try_get_matches_from(args)?; - start::exec(&args, JobCores::default()).await + start::exec(&args, JobCores::without_pinned_cores(tokio::runtime::Handle::current())).await } #[cfg(test)] From 4c088f2f94939e6682eb4624b50e99b8cda825d9 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 13:30:23 -0400 Subject: [PATCH 03/16] Fix standalone main tyck --- crates/standalone/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index 81aee00bfa3..fe666ed40f1 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -76,5 +76,6 @@ fn main() -> anyhow::Result<()> { cores.tokio.configure(&mut builder); let rt = builder.build().unwrap(); cores.rayon.configure(rt.handle()); - rt.block_on(async_main(cores.databases)) + let database_cores = cores.databases.make_database_runners(rt.handle()); + rt.block_on(async_main(database_cores)) } From 0fedef3e3c4b261ed3ddbfc51e0c204d60bfd103 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 13:39:44 -0400 Subject: [PATCH 04/16] Clean up testing `JobCores` initialization --- crates/testing/src/modules.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 1e7fb48f637..150171871e9 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -187,7 +187,7 @@ impl CompiledModule { }, &certs, paths.data_dir.into(), - JobCores::default(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()), ) .await .unwrap(); From b26d447cc4e1c5210e03659919b3d938cadc4586 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 14:43:30 -0400 Subject: [PATCH 05/16] Wrap some tests in a `mod` so that I can `--skip` them --- crates/schema/tests/ensure_same_schema.rs | 108 +++++++++++----------- 1 file changed, 56 insertions(+), 52 deletions(-) diff --git a/crates/schema/tests/ensure_same_schema.rs b/crates/schema/tests/ensure_same_schema.rs index fcdfae6dfb0..3625330181d 100644 --- a/crates/schema/tests/ensure_same_schema.rs +++ b/crates/schema/tests/ensure_same_schema.rs @@ -1,61 +1,65 @@ -use serial_test::serial; -use spacetimedb_schema::auto_migrate::{ponder_auto_migrate, AutoMigrateStep}; -use spacetimedb_schema::def::ModuleDef; -use spacetimedb_testing::modules::{CompilationMode, CompiledModule}; +// Wrap these tests in a `mod` whose name contains `csharp` +// so that we can run tests with `--skip csharp` in environments without dotnet installed. +mod ensure_same_schema_rust_csharp { + use serial_test::serial; + use spacetimedb_schema::auto_migrate::{ponder_auto_migrate, AutoMigrateStep}; + use spacetimedb_schema::def::ModuleDef; + use spacetimedb_testing::modules::{CompilationMode, CompiledModule}; -fn get_normalized_schema(module_name: &str) -> ModuleDef { - let module = CompiledModule::compile(module_name, CompilationMode::Debug); - module.extract_schema_blocking() -} + fn get_normalized_schema(module_name: &str) -> ModuleDef { + let module = CompiledModule::compile(module_name, CompilationMode::Debug); + module.extract_schema_blocking() + } -fn assert_identical_modules(module_name_prefix: &str) { - let rs = get_normalized_schema(module_name_prefix); - let cs = get_normalized_schema(&format!("{module_name_prefix}-cs")); - let mut diff = ponder_auto_migrate(&cs, &rs) - .expect("could not compute a diff between Rust and C#") - .steps; + fn assert_identical_modules(module_name_prefix: &str) { + let rs = get_normalized_schema(module_name_prefix); + let cs = get_normalized_schema(&format!("{module_name_prefix}-cs")); + let mut diff = ponder_auto_migrate(&cs, &rs) + .expect("could not compute a diff between Rust and C#") + .steps; - // In any migration plan, all `RowLevelSecurityDef`s are ALWAYS removed and - // re-added to ensure the core engine reinintializes the policies. - // This is slightly silly (and arguably should be hidden inside `core`), - // but for now, we just ignore these steps and manually compare the `RowLevelSecurityDef`s. - diff.retain(|step| { - !matches!( - step, - AutoMigrateStep::AddRowLevelSecurity(_) | AutoMigrateStep::RemoveRowLevelSecurity(_) - ) - }); + // In any migration plan, all `RowLevelSecurityDef`s are ALWAYS removed and + // re-added to ensure the core engine reinintializes the policies. + // This is slightly silly (and arguably should be hidden inside `core`), + // but for now, we just ignore these steps and manually compare the `RowLevelSecurityDef`s. + diff.retain(|step| { + !matches!( + step, + AutoMigrateStep::AddRowLevelSecurity(_) | AutoMigrateStep::RemoveRowLevelSecurity(_) + ) + }); - assert!( - diff.is_empty(), - "Rust and C# modules are not identical. Here are the steps to migrate from C# to Rust: {diff:#?}" - ); + assert!( + diff.is_empty(), + "Rust and C# modules are not identical. Here are the steps to migrate from C# to Rust: {diff:#?}" + ); - let mut rls_rs = rs.row_level_security().collect::>(); - rls_rs.sort(); - let mut rls_cs = cs.row_level_security().collect::>(); - rls_cs.sort(); - assert_eq!( - rls_rs, rls_cs, - "Rust and C# modules are not identical: different row level security policies" - ) -} + let mut rls_rs = rs.row_level_security().collect::>(); + rls_rs.sort(); + let mut rls_cs = cs.row_level_security().collect::>(); + rls_cs.sort(); + assert_eq!( + rls_rs, rls_cs, + "Rust and C# modules are not identical: different row level security policies" + ) + } -macro_rules! declare_tests { - ($($name:ident => $path:literal,)*) => { - $( - #[test] - #[serial] - fn $name() { - assert_identical_modules($path); - } - )* + macro_rules! declare_tests { + ($($name:ident => $path:literal,)*) => { + $( + #[test] + #[serial] + fn $name() { + assert_identical_modules($path); + } + )* + } } -} -declare_tests! { - module_test => "module-test", - sdk_test_connect_disconnect => "sdk-test-connect-disconnect", - sdk_test => "sdk-test", - benchmarks => "benchmarks", + declare_tests! { + module_test => "module-test", + sdk_test_connect_disconnect => "sdk-test-connect-disconnect", + sdk_test => "sdk-test", + benchmarks => "benchmarks", + } } From e54e7286b3b1f0ae73b70adf1eeafc70422be351 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 14:49:41 -0400 Subject: [PATCH 06/16] Use Wasmtime `async` functions where necessary But these are never supposed to block, so use a custom "executor" which only calls `poll` once. --- .../core/src/host/wasmtime/wasmtime_module.rs | 45 ++++++++++++++----- crates/core/src/util/mod.rs | 1 + crates/core/src/util/poll_once_executor.rs | 29 ++++++++++++ crates/standalone/src/lib.rs | 19 ++++++-- .../src/module_bindings/mod.rs | 2 +- .../test-client/src/module_bindings/mod.rs | 2 +- 6 files changed, 80 insertions(+), 18 deletions(-) create mode 100644 crates/core/src/util/poll_once_executor.rs diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 141f7455518..dc8f71c6de4 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -6,9 +6,13 @@ use crate::energy::ReducerBudget; use crate::host::instance_env::InstanceEnv; use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; use crate::host::wasm_common::*; +use crate::util::poll_once_executor::poll_once; use crate::util::string_from_utf8_lossy_owned; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; -use wasmtime::{AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace}; +use wasmtime::{ + AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, + WasmResults, +}; fn log_traceback(func_type: &str, func: &str, e: &wasmtime::Error) { log::info!("{func_type} \"{func}\" runtime error: {e}"); @@ -85,15 +89,32 @@ fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), Box> { const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; +/// Invoke `typed_func` and assert that it doesn't yield. +/// +/// Our Wasmtime is configured for `async` execution, and will panic if we use the non-async [`TypedFunc::call`]. +/// The `async` config is necessary to allow procedures to suspend, e.g. when making HTTP calls or acquiring transactions. +/// However, most of the WASM we execute, incl. reducers and startup functions, should never block/yield. +/// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`crate::util::poll_once_executor`], +/// a custom "async executor" which invokes [`std::task::Future::poll`] exactly once. +fn call_sync_typed_func( + typed_func: &TypedFunc, + store: &mut Store, + args: Args, +) -> anyhow::Result { + let fut = typed_func.call_async(store, args); + poll_once(fut).expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") +} + impl module_host_actor::WasmInstancePre for WasmtimeModule { type Instance = WasmtimeInstance; fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result { let env = WasmInstanceEnv::new(env); let mut store = Store::new(self.module.module().engine(), env); - let instance = self - .module - .instantiate(&mut store) + let instance_fut = self.module.instantiate_async(&mut store); + + let instance = poll_once(instance_fut) + .expect("`instantiate_async` did not immediately return `Ready`") .map_err(InitializationError::Instantiation)?; let mem = Mem::extract(&instance, &mut store).unwrap(); @@ -114,16 +135,15 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { for preinit in &func_names.preinits { let func = instance.get_typed_func::<(), ()>(&mut store, preinit).unwrap(); - func.call(&mut store, ()) - .map_err(|err| InitializationError::RuntimeError { - err, - func: preinit.clone(), - })?; + call_sync_typed_func(&func, &mut store, ()).map_err(|err| InitializationError::RuntimeError { + err, + func: preinit.clone(), + })?; } if let Ok(init) = instance.get_typed_func::(&mut store, SETUP_DUNDER) { let setup_error = store.data_mut().setup_standard_bytes_sink(); - let res = init.call(&mut store, setup_error); + let res = call_sync_typed_func(&init, &mut store, setup_error); let error = store.data_mut().take_standard_bytes_sink(); match res { // TODO: catch this and return the error message to the http client @@ -170,7 +190,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let start = std::time::Instant::now(); log::trace!("Start describer \"{describer_func_name}\"..."); - let result = describer.call(&mut *store, sink); + let result = call_sync_typed_func(&describer, &mut *store, sink); let duration = start.elapsed(); log::trace!("Describer \"{}\" ran: {} us", describer_func_name, duration.as_micros()); @@ -207,7 +227,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let args_bytes = op.args.get_bsatn().clone(); let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp); - let call_result = self.call_reducer.call( + let call_result = call_sync_typed_func( + &self.call_reducer, &mut *store, ( op.id.0, diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 5a5c1fb8420..66203d070ac 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -9,6 +9,7 @@ pub mod prometheus_handle; pub mod jobs; pub mod notify_once; pub mod slow; +pub mod poll_once_executor; // TODO: use String::from_utf8_lossy_owned once stabilized pub(crate) fn string_from_utf8_lossy_owned(v: Vec) -> String { diff --git a/crates/core/src/util/poll_once_executor.rs b/crates/core/src/util/poll_once_executor.rs new file mode 100644 index 00000000000..74502c24c6b --- /dev/null +++ b/crates/core/src/util/poll_once_executor.rs @@ -0,0 +1,29 @@ +//! An `async` executor which polls a future only once, and cancels if it does not immediately return `Ready`. +//! +//! This is useful for running library code which is typed as `async`, +//! but which we know based on our specific invocation should never yield. +//! In our case, we configure Wasmtime in `async` mode in order to execute procedures, +//! but we maintain the invariant that instantiation and reducers will never yield +//! + +use std::{ + future::Future, + pin::pin, + sync::Arc, + task::{Context, Poll, Wake, Waker}, +}; + +struct TerribleWaker; + +impl Wake for TerribleWaker { + fn wake(self: Arc) {} +} + +pub fn poll_once>(mut fut: Fut) -> Option { + let waker = Waker::from(Arc::new(TerribleWaker)); + let mut context = Context::from_waker(&waker); + match ::poll(pin!(&mut fut), &mut context) { + Poll::Pending => None, + Poll::Ready(res) => Some(res), + } +} diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index ea00fd64c37..f6e8bbced40 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -565,11 +565,22 @@ mod tests { websocket: WebSocketOptions::default(), }; - let _env = StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()).await?; + let _env = StandaloneEnv::init( + config, + &ca, + data_dir.clone(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()), + ) + .await?; // Ensure that we have a lock. - assert!(StandaloneEnv::init(config, &ca, data_dir.clone(), Default::default()) - .await - .is_err()); + assert!(StandaloneEnv::init( + config, + &ca, + data_dir.clone(), + JobCores::without_pinned_cores(tokio::runtime::Handle::current()) + ) + .await + .is_err()); Ok(()) } diff --git a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs index c4575536b3c..37e04dc5294 100644 --- a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; diff --git a/sdks/rust/tests/test-client/src/module_bindings/mod.rs b/sdks/rust/tests/test-client/src/module_bindings/mod.rs index 96ba580035b..9c8008c7e3c 100644 --- a/sdks/rust/tests/test-client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/test-client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; From 21830cdc973c2f0985865e03442d826ae6f49604 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Mon, 22 Sep 2025 17:04:21 -0400 Subject: [PATCH 07/16] Fix clippy lints (except unused method I can't explain) --- crates/core/src/host/module_host.rs | 3 --- crates/core/src/host/wasm_common/instrumentation.rs | 8 +++++++- crates/core/src/startup.rs | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 6350addceca..c26fe291e75 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -395,8 +395,6 @@ impl Instance { } } -pub trait ModuleInstance: Send + 'static {} - /// Creates the table for `table_def` in `stdb`. pub fn create_table_from_def( stdb: &RelationalDB, @@ -981,7 +979,6 @@ impl ModuleHost { me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) }) .await? - .map_err(Into::into) } /// Empty the system tables tracking clients without running any lifecycle reducers. diff --git a/crates/core/src/host/wasm_common/instrumentation.rs b/crates/core/src/host/wasm_common/instrumentation.rs index e2d3550e626..c707bccebfd 100644 --- a/crates/core/src/host/wasm_common/instrumentation.rs +++ b/crates/core/src/host/wasm_common/instrumentation.rs @@ -91,6 +91,12 @@ pub struct CallTimes { times: EnumMap, } +impl Default for CallTimes { + fn default() -> Self { + Self::new() + } +} + impl CallTimes { /// Create a new timing structure, with times for all calls set to zero. pub fn new() -> Self { @@ -115,6 +121,6 @@ impl CallTimes { /// will `take`` the CallTimes after running a reducer and report the taken times, /// leaving a fresh zeroed CallTimes for the next reducer invocation. pub fn take(&mut self) -> CallTimes { - std::mem::replace(self, Self::new()) + std::mem::take(self) } } diff --git a/crates/core/src/startup.rs b/crates/core/src/startup.rs index fd73bf15037..ebd8c079fc8 100644 --- a/crates/core/src/startup.rs +++ b/crates/core/src/startup.rs @@ -443,6 +443,6 @@ impl DatabaseCores { /// let database_cores = cores.databases.make_database_runners(rt.handle()); /// ``` pub fn make_database_runners(self, global_runtime: &tokio::runtime::Handle) -> JobCores { - JobCores::from_pinned_cores(self.0.into_iter(), global_runtime.clone()) + JobCores::from_pinned_cores(self.0, global_runtime.clone()) } } From 56c89e63d01b604a4354a1d5739d13f1799fea28 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Tue, 23 Sep 2025 09:47:45 -0400 Subject: [PATCH 08/16] Add futures-util and use their `now_or_never` --- Cargo.lock | 1 + Cargo.toml | 1 + crates/core/Cargo.toml | 1 + .../core/src/host/wasmtime/wasmtime_module.rs | 12 ++++---- crates/core/src/util/mod.rs | 1 - crates/core/src/util/poll_once_executor.rs | 29 ------------------- 6 files changed, 10 insertions(+), 35 deletions(-) delete mode 100644 crates/core/src/util/poll_once_executor.rs diff --git a/Cargo.lock b/Cargo.lock index 77ae2b49ab9..c6037d0f2af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5812,6 +5812,7 @@ dependencies = [ "fs2", "fs_extra", "futures", + "futures-util", "hashbrown 0.15.3", "hex", "hostname", diff --git a/Cargo.toml b/Cargo.toml index 817c145ed8d..1b5ea771bce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -182,6 +182,7 @@ fs_extra = "1.3.0" fs2 = "0.4.3" futures = "0.3" futures-channel = "0.3" +futures-util = "0.3" getrandom02 = { package = "getrandom", version = "0.2" } glob = "0.3.1" hashbrown = { version = "0.15", default-features = false, features = ["equivalent", "inline-more"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 13136a18ef1..a8a324d21cb 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -58,6 +58,7 @@ enum-map.workspace = true flate2.workspace = true fs2.workspace = true futures.workspace = true +futures-util.workspace = true hashbrown = { workspace = true, features = ["rayon", "serde"] } hex.workspace = true hostname.workspace = true diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index dc8f71c6de4..9a9058387c0 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -6,8 +6,8 @@ use crate::energy::ReducerBudget; use crate::host::instance_env::InstanceEnv; use crate::host::wasm_common::module_host_actor::{DescribeError, InitializationError}; use crate::host::wasm_common::*; -use crate::util::poll_once_executor::poll_once; use crate::util::string_from_utf8_lossy_owned; +use futures_util::FutureExt; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use wasmtime::{ AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, @@ -94,15 +94,16 @@ const CALL_FAILURE: i32 = HOST_CALL_FAILURE.get() as i32; /// Our Wasmtime is configured for `async` execution, and will panic if we use the non-async [`TypedFunc::call`]. /// The `async` config is necessary to allow procedures to suspend, e.g. when making HTTP calls or acquiring transactions. /// However, most of the WASM we execute, incl. reducers and startup functions, should never block/yield. -/// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`crate::util::poll_once_executor`], -/// a custom "async executor" which invokes [`std::task::Future::poll`] exactly once. +/// Rather than crossing our fingers and trusting, we run [`TypedFunc::call_async`] in [`FutureExt::now_or_never`], +/// an "async executor" which invokes [`std::task::Future::poll`] exactly once. fn call_sync_typed_func( typed_func: &TypedFunc, store: &mut Store, args: Args, ) -> anyhow::Result { let fut = typed_func.call_async(store, args); - poll_once(fut).expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") + fut.now_or_never() + .expect("`call_async` of supposedly synchronous WASM function returned `Poll::Pending`") } impl module_host_actor::WasmInstancePre for WasmtimeModule { @@ -113,7 +114,8 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { let mut store = Store::new(self.module.module().engine(), env); let instance_fut = self.module.instantiate_async(&mut store); - let instance = poll_once(instance_fut) + let instance = instance_fut + .now_or_never() .expect("`instantiate_async` did not immediately return `Ready`") .map_err(InitializationError::Instantiation)?; diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs index 66203d070ac..5a5c1fb8420 100644 --- a/crates/core/src/util/mod.rs +++ b/crates/core/src/util/mod.rs @@ -9,7 +9,6 @@ pub mod prometheus_handle; pub mod jobs; pub mod notify_once; pub mod slow; -pub mod poll_once_executor; // TODO: use String::from_utf8_lossy_owned once stabilized pub(crate) fn string_from_utf8_lossy_owned(v: Vec) -> String { diff --git a/crates/core/src/util/poll_once_executor.rs b/crates/core/src/util/poll_once_executor.rs deleted file mode 100644 index 74502c24c6b..00000000000 --- a/crates/core/src/util/poll_once_executor.rs +++ /dev/null @@ -1,29 +0,0 @@ -//! An `async` executor which polls a future only once, and cancels if it does not immediately return `Ready`. -//! -//! This is useful for running library code which is typed as `async`, -//! but which we know based on our specific invocation should never yield. -//! In our case, we configure Wasmtime in `async` mode in order to execute procedures, -//! but we maintain the invariant that instantiation and reducers will never yield -//! - -use std::{ - future::Future, - pin::pin, - sync::Arc, - task::{Context, Poll, Wake, Waker}, -}; - -struct TerribleWaker; - -impl Wake for TerribleWaker { - fn wake(self: Arc) {} -} - -pub fn poll_once>(mut fut: Fut) -> Option { - let waker = Waker::from(Arc::new(TerribleWaker)); - let mut context = Context::from_waker(&waker); - match ::poll(pin!(&mut fut), &mut context) { - Poll::Pending => None, - Poll::Ready(res) => Some(res), - } -} From 9bb6df02ba1b5bcec9e7de59f2fd2200ece8b794 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Wed, 24 Sep 2025 10:28:15 -0400 Subject: [PATCH 09/16] Revert "Wrap some tests in a `mod` so that I can `--skip` them" This reverts commit b26d447cc4e1c5210e03659919b3d938cadc4586. These changes are now in a separate branch, `phoebe/skippable-tests`. --- crates/schema/tests/ensure_same_schema.rs | 108 +++++++++++----------- 1 file changed, 52 insertions(+), 56 deletions(-) diff --git a/crates/schema/tests/ensure_same_schema.rs b/crates/schema/tests/ensure_same_schema.rs index 3625330181d..fcdfae6dfb0 100644 --- a/crates/schema/tests/ensure_same_schema.rs +++ b/crates/schema/tests/ensure_same_schema.rs @@ -1,65 +1,61 @@ -// Wrap these tests in a `mod` whose name contains `csharp` -// so that we can run tests with `--skip csharp` in environments without dotnet installed. -mod ensure_same_schema_rust_csharp { - use serial_test::serial; - use spacetimedb_schema::auto_migrate::{ponder_auto_migrate, AutoMigrateStep}; - use spacetimedb_schema::def::ModuleDef; - use spacetimedb_testing::modules::{CompilationMode, CompiledModule}; +use serial_test::serial; +use spacetimedb_schema::auto_migrate::{ponder_auto_migrate, AutoMigrateStep}; +use spacetimedb_schema::def::ModuleDef; +use spacetimedb_testing::modules::{CompilationMode, CompiledModule}; - fn get_normalized_schema(module_name: &str) -> ModuleDef { - let module = CompiledModule::compile(module_name, CompilationMode::Debug); - module.extract_schema_blocking() - } +fn get_normalized_schema(module_name: &str) -> ModuleDef { + let module = CompiledModule::compile(module_name, CompilationMode::Debug); + module.extract_schema_blocking() +} - fn assert_identical_modules(module_name_prefix: &str) { - let rs = get_normalized_schema(module_name_prefix); - let cs = get_normalized_schema(&format!("{module_name_prefix}-cs")); - let mut diff = ponder_auto_migrate(&cs, &rs) - .expect("could not compute a diff between Rust and C#") - .steps; +fn assert_identical_modules(module_name_prefix: &str) { + let rs = get_normalized_schema(module_name_prefix); + let cs = get_normalized_schema(&format!("{module_name_prefix}-cs")); + let mut diff = ponder_auto_migrate(&cs, &rs) + .expect("could not compute a diff between Rust and C#") + .steps; - // In any migration plan, all `RowLevelSecurityDef`s are ALWAYS removed and - // re-added to ensure the core engine reinintializes the policies. - // This is slightly silly (and arguably should be hidden inside `core`), - // but for now, we just ignore these steps and manually compare the `RowLevelSecurityDef`s. - diff.retain(|step| { - !matches!( - step, - AutoMigrateStep::AddRowLevelSecurity(_) | AutoMigrateStep::RemoveRowLevelSecurity(_) - ) - }); + // In any migration plan, all `RowLevelSecurityDef`s are ALWAYS removed and + // re-added to ensure the core engine reinintializes the policies. + // This is slightly silly (and arguably should be hidden inside `core`), + // but for now, we just ignore these steps and manually compare the `RowLevelSecurityDef`s. + diff.retain(|step| { + !matches!( + step, + AutoMigrateStep::AddRowLevelSecurity(_) | AutoMigrateStep::RemoveRowLevelSecurity(_) + ) + }); - assert!( - diff.is_empty(), - "Rust and C# modules are not identical. Here are the steps to migrate from C# to Rust: {diff:#?}" - ); + assert!( + diff.is_empty(), + "Rust and C# modules are not identical. Here are the steps to migrate from C# to Rust: {diff:#?}" + ); - let mut rls_rs = rs.row_level_security().collect::>(); - rls_rs.sort(); - let mut rls_cs = cs.row_level_security().collect::>(); - rls_cs.sort(); - assert_eq!( - rls_rs, rls_cs, - "Rust and C# modules are not identical: different row level security policies" - ) - } + let mut rls_rs = rs.row_level_security().collect::>(); + rls_rs.sort(); + let mut rls_cs = cs.row_level_security().collect::>(); + rls_cs.sort(); + assert_eq!( + rls_rs, rls_cs, + "Rust and C# modules are not identical: different row level security policies" + ) +} - macro_rules! declare_tests { - ($($name:ident => $path:literal,)*) => { - $( - #[test] - #[serial] - fn $name() { - assert_identical_modules($path); - } - )* - } +macro_rules! declare_tests { + ($($name:ident => $path:literal,)*) => { + $( + #[test] + #[serial] + fn $name() { + assert_identical_modules($path); + } + )* } +} - declare_tests! { - module_test => "module-test", - sdk_test_connect_disconnect => "sdk-test-connect-disconnect", - sdk_test => "sdk-test", - benchmarks => "benchmarks", - } +declare_tests! { + module_test => "module-test", + sdk_test_connect_disconnect => "sdk-test-connect-disconnect", + sdk_test => "sdk-test", + benchmarks => "benchmarks", } From c1da1b896edc094cd02dc0c42527a7ef961a91e6 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 12:43:01 -0400 Subject: [PATCH 10/16] Fix typo'd comments revealed in PR review Co-authored-by: Mazdak Farrokhzad Co-authored-by: joshua-spacetime Signed-off-by: Phoebe Goldman --- crates/core/src/host/module_host.rs | 2 +- crates/core/src/util/jobs.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c26fe291e75..97b8163b937 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -717,7 +717,7 @@ impl ModuleHost { let timer_guard = self.start_call_timer(label); // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentialyl take a long time + // partially because the computation can potentially take a long time // and partially because interacting with the database requires taking // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. // This will bubble up any panic that may occur. diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 7024204f19c..62e0e2a7a43 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -16,7 +16,7 @@ use tokio::sync::watch; /// and a [`SingleCoreExecutor`] may occasionally be migrated to a different runtime/core to balance load. /// /// Construct a `JobCores` via [`Self::from_pinned_cores`] or [`Self::without_pinned_cores`]. -/// a `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, +/// A `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, /// will use the "global" Tokio executor to run database jobs, /// rather than creating multiple un-pinned single-threaded runtimes. /// From db90e6d6c39c8aef331b9da6c0ecd18ae1d8eb3d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 12:54:33 -0400 Subject: [PATCH 11/16] Expand comments as requested in code review --- crates/core/src/host/module_host.rs | 13 +++++++++++-- crates/core/src/util/jobs.rs | 19 ++++++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index c26fe291e75..1f6d027b3ed 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -491,6 +491,14 @@ pub struct CallReducerParams { pub args: ArgsTuple, } +/// Holds a [`Module`] and a set of [`Instance`]s from it, +/// and allocates the [`Instance`]s to be used for function calls. +/// +/// Capable of managing and allocating multiple instances of the same module, +/// but this functionality is currently unused, as only one reducer runs at a time. +/// When we introduce procedures, it will be necessary to have multiple instances, +/// as each procedure invocation will have its own sandboxed instance, +/// and multiple procedures can run concurrently with up to one reducer. struct ModuleInstanceManager { instances: VecDeque, module: Arc, @@ -509,7 +517,8 @@ impl ModuleInstanceManager { fn return_instance(&mut self, inst: Instance) { if inst.trapped() { // Don't return trapped instances; - // they may have left internal data structures in WASM linear memory in a bad state. + // they may have left internal data structures in the guest `Instance` + // (WASM linear memory, V8 global scope) in a bad state. return; } @@ -810,7 +819,7 @@ impl ModuleHost { if let Some((reducer_id, reducer_def)) = reducer_lookup { // The module defined a lifecycle reducer to handle new connections. // Call this reducer. - // If the call fails (as in, something unexpectedly goes wrong with WASM execution), + // If the call fails (as in, something unexpectedly goes wrong with guest execution), // abort the connection: we can't really recover. let reducer_outcome = me.call_reducer_inner_with_inst( Some(ScopeGuard::into_inner(mut_tx)), diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 7024204f19c..7bf277e5df6 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -19,6 +19,7 @@ use tokio::sync::watch; /// a `JobCores` constructed without core pinning, including `from_pinned_cores` on an empty set, /// will use the "global" Tokio executor to run database jobs, /// rather than creating multiple un-pinned single-threaded runtimes. +/// This means that long-running reducers or queries may block Tokio worker threads. /// /// This handle is cheaply cloneable, but at least one handle must be kept alive. /// If all instances of it are dropped, the per-thread [`runtime::Runtime`]s will be dropped, @@ -141,6 +142,19 @@ impl JobCores { } impl PinnedCoresExecutorManager { + /// Get a [`runtime::Handle`] for running database operations on, + /// and store state in `self` necessary to move that database to a new runtime + /// for load-balancing purposes. + /// + /// The returned [`SingleCoreExecutorId`] is an index into internal data structures in `self` (namely, `self.cores`) + /// which should be passed to [`Self::deallocate`] when the database is no longer using this executor. + /// This is done automatically by [`LoadBalanceOnDropGuard`]. + /// + /// The returned `watch::Receiver` stores the Tokio [`runtime::Handle`] + /// on which the database should run its compute-intensive jobs. + /// This may occasionally be replaced with a new [`runtime::Handle`] to balance databases among available cores, + /// so databases should read from the [`watch::Receiver`] when spawning each job, + /// and should not spawn long-lived background tasks such as ones which loop over a channel. fn allocate(&mut self) -> (SingleCoreExecutorId, watch::Receiver) { let database_executor_id = self.next_id; self.next_id.0 += 1; @@ -159,7 +173,10 @@ impl PinnedCoresExecutorManager { (database_executor_id, move_runtime_rx) } - /// Run when a `JobThread` exits. + /// Mark the executor at `id` as no longer in use, free internal state which tracks it, + /// and move other executors to different cores as necessary to maintain a balanced distribution. + /// + /// Called by [`LoadBalanceOnDropGuard`] when a [`SingleCoreExecutor`] is no longer in use. fn deallocate(&mut self, id: SingleCoreExecutorId) { let (freed_core_id, _) = self.database_executor_move.remove(&id).unwrap(); From f1c5938e0799bf168aa5bee669e714eed7895038 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 13:18:07 -0400 Subject: [PATCH 12/16] Add metric for time spent in `create_instance` So that we can determine whether we should be running this on the database core or not --- crates/core/src/host/host_controller.rs | 6 ++- crates/core/src/host/module_host.rs | 64 +++++++++++++++++++++++-- crates/core/src/worker_metrics/mod.rs | 12 +++++ 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index d698b86be7e..814fcc457a4 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -595,6 +595,8 @@ async fn make_module_host( // threads, but those aren't for computation. Also, wasmtime uses rayon // to run compilation in parallel, so it'll need to run stuff in rayon anyway. asyncify(move || { + let database_identity = replica_ctx.database_identity; + let mcc = ModuleCreationContext { replica_ctx, scheduler, @@ -607,12 +609,12 @@ async fn make_module_host( HostType::Wasm => { let actor = runtimes.wasmtime.make_actor(mcc)?; trace!("wasmtime::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, executor) + ModuleHost::new(actor, unregister, executor, database_identity) } HostType::Js => { let actor = runtimes.v8.make_actor(mcc)?; trace!("v8::make_actor blocked for {:?}", start.elapsed()); - ModuleHost::new(actor, unregister, executor) + ModuleHost::new(actor, unregister, executor, database_identity) } }; Ok((program, module_host)) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fd15bf4cc5a..42e76558260 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -56,6 +56,7 @@ use std::fmt; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use strum::Display; use tokio::sync::oneshot; #[derive(Debug, Default, Clone, From)] @@ -321,6 +322,13 @@ pub trait ModuleRuntime { fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; } +/// Used as a metrics label on `spacetime_module_create_instance_time`. +#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr)] +pub enum ModuleType { + Wasm, + Js, +} + pub enum Module { Wasm(super::wasmtime::Module), Js(super::v8::JsModule), @@ -364,6 +372,12 @@ impl Module { Module::Js(module) => Instance::Js(module.create_instance()), } } + fn module_type(&self) -> ModuleType { + match self { + Module::Wasm(_) => ModuleType::Wasm, + Module::Js(_) => ModuleType::Wasm, + } + } } impl Instance { @@ -502,15 +516,57 @@ pub struct CallReducerParams { struct ModuleInstanceManager { instances: VecDeque, module: Arc, + create_instance_time_metric: CreateInstanceTimeMetric, +} + +/// Handle on the `spacetime_module_create_instance_time_seconds` label for a particular database +/// which calls `remove_label_values` to clean up on drop. +struct CreateInstanceTimeMetric { + metric: Histogram, + module_type: ModuleType, + database_identity: Identity, +} + +impl Drop for CreateInstanceTimeMetric { + fn drop(&mut self) { + let _ = WORKER_METRICS + .module_create_instance_time_seconds + .remove_label_values(&self.database_identity, &self.module_type); + } +} + +impl CreateInstanceTimeMetric { + fn observe(&self, duration: std::time::Duration) { + self.metric.observe(duration.as_secs_f64()); + } } impl ModuleInstanceManager { + fn new(module: Arc, database_identity: Identity) -> Self { + let module_type = module.module_type(); + let create_instance_time_metric = CreateInstanceTimeMetric { + metric: WORKER_METRICS + .module_create_instance_time_seconds + .with_label_values(&database_identity, &module_type), + module_type, + database_identity, + }; + Self { + instances: Default::default(), + module, + create_instance_time_metric, + } + } fn get_instance(&mut self) -> Instance { if let Some(inst) = self.instances.pop_back() { inst } else { + let start_time = std::time::Instant::now(); // TODO: should we be calling `create_instance` on the `SingleCoreExecutor` rather than the calling thread? - self.module.create_instance() + let res = self.module.create_instance(); + let elapsed_time = start_time.elapsed(); + self.create_instance_time_metric.observe(elapsed_time); + res } } @@ -621,6 +677,7 @@ impl ModuleHost { module: Module, on_panic: impl Fn() + Send + Sync + 'static, executor: SingleCoreExecutor, + database_identity: Identity, ) -> Self { let info = module.info(); let module = Arc::new(module); @@ -628,10 +685,7 @@ impl ModuleHost { let module_clone = module.clone(); - let instance_manager = Arc::new(Mutex::new(ModuleInstanceManager { - instances: VecDeque::new(), - module: module_clone, - })); + let instance_manager = Arc::new(Mutex::new(ModuleInstanceManager::new(module_clone, database_identity))); ModuleHost { info, diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index 301b5d8f8cb..ab2ba3ac3f4 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,4 +1,5 @@ use crate::hash::Hash; +use crate::host::module_host::ModuleType; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -312,6 +313,17 @@ metrics_group!( #[help = "Number of commits replayed after restoring from a snapshot upon restart"] #[labels(db: Identity)] pub replay_commitlog_num_commits: IntGaugeVec, + + #[name = spacetime_module_create_instance_time_seconds] + #[help = "Time taken to construct a WASM instance or V8 isolate to run module code"] + #[labels(db: Identity, module_type: ModuleType)] + // As of writing (pgoldman 2025-09-25), calls to `create_instance` are rare, + // as they happen only when an instance traps (panics). + // However, this is not once-per-process, unlike the above replay metrics. + // I (pgoldman 2025-09-25) am not sure what range or distribution of values to expect, + // so I'm making up some buckets based on what I imagine are the upper and lower bounds of plausibility. + #[buckets(0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 50, 100)] + pub module_create_instance_time_seconds: HistogramVec, } ); From 492bd9336fb8da42f89ba2d27ef0060e120f5934 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 14:00:53 -0400 Subject: [PATCH 13/16] Use existing `HostType` enum And fix typo that mis-categorized `Module::Js` as `ModuleType::Wasm`. --- crates/core/src/host/module_host.rs | 26 +++++++++----------------- crates/core/src/messages/control_db.rs | 4 +++- crates/core/src/worker_metrics/mod.rs | 4 ++-- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 42e76558260..b6e337e961c 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -8,7 +8,7 @@ use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; use crate::identity::Identity; -use crate::messages::control_db::Database; +use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; use crate::sql::ast::SchemaViewer; @@ -56,7 +56,6 @@ use std::fmt; use std::sync::atomic::AtomicBool; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use strum::Display; use tokio::sync::oneshot; #[derive(Debug, Default, Clone, From)] @@ -322,13 +321,6 @@ pub trait ModuleRuntime { fn make_actor(&self, mcc: ModuleCreationContext<'_>) -> anyhow::Result; } -/// Used as a metrics label on `spacetime_module_create_instance_time`. -#[derive(Clone, Copy, Display, Hash, PartialEq, Eq, strum::AsRefStr)] -pub enum ModuleType { - Wasm, - Js, -} - pub enum Module { Wasm(super::wasmtime::Module), Js(super::v8::JsModule), @@ -372,10 +364,10 @@ impl Module { Module::Js(module) => Instance::Js(module.create_instance()), } } - fn module_type(&self) -> ModuleType { + fn host_type(&self) -> HostType { match self { - Module::Wasm(_) => ModuleType::Wasm, - Module::Js(_) => ModuleType::Wasm, + Module::Wasm(_) => HostType::Wasm, + Module::Js(_) => HostType::Js, } } } @@ -523,7 +515,7 @@ struct ModuleInstanceManager { /// which calls `remove_label_values` to clean up on drop. struct CreateInstanceTimeMetric { metric: Histogram, - module_type: ModuleType, + host_type: HostType, database_identity: Identity, } @@ -531,7 +523,7 @@ impl Drop for CreateInstanceTimeMetric { fn drop(&mut self) { let _ = WORKER_METRICS .module_create_instance_time_seconds - .remove_label_values(&self.database_identity, &self.module_type); + .remove_label_values(&self.database_identity, &self.host_type); } } @@ -543,12 +535,12 @@ impl CreateInstanceTimeMetric { impl ModuleInstanceManager { fn new(module: Arc, database_identity: Identity) -> Self { - let module_type = module.module_type(); + let host_type = module.host_type(); let create_instance_time_metric = CreateInstanceTimeMetric { metric: WORKER_METRICS .module_create_instance_time_seconds - .with_label_values(&database_identity, &module_type), - module_type, + .with_label_values(&database_identity, &host_type), + host_type, database_identity, }; Self { diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index 8299875e339..480a8755ccf 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -75,7 +75,9 @@ pub struct NodeStatus { /// SEE: pub state: String, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize, strum::AsRefStr, strum::Display, +)] #[repr(i32)] pub enum HostType { Wasm = 0, diff --git a/crates/core/src/worker_metrics/mod.rs b/crates/core/src/worker_metrics/mod.rs index ab2ba3ac3f4..c0f2199350e 100644 --- a/crates/core/src/worker_metrics/mod.rs +++ b/crates/core/src/worker_metrics/mod.rs @@ -1,5 +1,5 @@ use crate::hash::Hash; -use crate::host::module_host::ModuleType; +use crate::messages::control_db::HostType; use once_cell::sync::Lazy; use prometheus::{GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; use spacetimedb_datastore::execution_context::WorkloadType; @@ -316,7 +316,7 @@ metrics_group!( #[name = spacetime_module_create_instance_time_seconds] #[help = "Time taken to construct a WASM instance or V8 isolate to run module code"] - #[labels(db: Identity, module_type: ModuleType)] + #[labels(db: Identity, module_type: HostType)] // As of writing (pgoldman 2025-09-25), calls to `create_instance` are rare, // as they happen only when an instance traps (panics). // However, this is not once-per-process, unlike the above replay metrics. From 81c63668d539b22db971c0f5c8148eb705db7047 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 26 Sep 2025 11:04:09 -0400 Subject: [PATCH 14/16] Remove unused `initial_instances` method It's not used in private either, and no one knew what it was for, so I'm cutting it. --- crates/core/src/host/module_host.rs | 6 ------ crates/core/src/host/v8/mod.rs | 4 ---- crates/core/src/host/wasm_common/module_host_actor.rs | 3 --- 3 files changed, 13 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index b6e337e961c..ab4713cb6e2 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -346,12 +346,6 @@ impl Module { } } - fn initial_instances(&mut self) -> impl Iterator { - match self { - Module::Wasm(module) => itertools::Either::Left(module.initial_instances().into_iter().map(Instance::Wasm)), - Module::Js(module) => itertools::Either::Right(module.initial_instances().map(Instance::Js)), - } - } fn info(&self) -> Arc { match self { Module::Wasm(module) => module.info(), diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 84df9b589a2..807c1eea687 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -106,10 +106,6 @@ impl JsModule { self.common.scheduler() } - pub fn initial_instances(&mut self) -> impl Iterator { - std::iter::empty() - } - pub fn info(&self) -> Arc { self.common.info().clone() } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c3edc6b8496..7716da7bedd 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -181,9 +181,6 @@ impl WasmModuleHostActor { pub fn scheduler(&self) -> &Scheduler { self.common.scheduler() } - pub fn initial_instances(&mut self) -> Option> { - self.initial_instance.take().map(|x| *x) - } pub fn info(&self) -> Arc { self.common.info() From a55c0e0200718edff9441e7aec9930d06d6a606c Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 2 Oct 2025 12:59:49 -0400 Subject: [PATCH 15/16] Fix panic on shutdown Prior to this commit, when shutting down SpacetimeDB-standalone via C-c / SIGINT, the process would print a panic message and backtrace (if enabled) due to dropping nested Tokio runtimes. This commit avoids doing so in SpacetimeDB-standalone, and adds a note to `JobCores` instructing how to avoid it in other uses. --- crates/core/src/util/jobs.rs | 4 ++++ crates/standalone/src/main.rs | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 7e1c60047fb..6634d1e57d2 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -24,6 +24,10 @@ use tokio::sync::watch; /// This handle is cheaply cloneable, but at least one handle must be kept alive. /// If all instances of it are dropped, the per-thread [`runtime::Runtime`]s will be dropped, /// and so will stop executing jobs for databases. +/// +/// Dropping the last handle on a `JobCores` from an `async` context will panic, +/// as Tokio doesn't like to shut down nested runtimes. +/// To avoid this, keep a handle on the `JobCores` alive outside of the `async` runner. #[derive(Clone)] pub struct JobCores { inner: JobCoresInner, diff --git a/crates/standalone/src/main.rs b/crates/standalone/src/main.rs index fe666ed40f1..ca266f27d76 100644 --- a/crates/standalone/src/main.rs +++ b/crates/standalone/src/main.rs @@ -77,5 +77,12 @@ fn main() -> anyhow::Result<()> { let rt = builder.build().unwrap(); cores.rayon.configure(rt.handle()); let database_cores = cores.databases.make_database_runners(rt.handle()); - rt.block_on(async_main(database_cores)) + + // Keep a handle on the `database_cores` alive outside of `async_main` + // and explicitly drop it to avoid dropping it from an `async` context - + // Tokio gets angry when you drop a runtime within another runtime. + let res = rt.block_on(async_main(database_cores.clone())); + drop(database_cores); + + res } From af591f876ce12b26a29f5102eefc1e534718f079 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Tue, 7 Oct 2025 11:03:44 -0400 Subject: [PATCH 16/16] Clippy --- crates/core/src/host/module_host.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index d5210fcce5d..8d03f012f05 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -327,8 +327,10 @@ pub enum Module { } pub enum Instance { - Wasm(super::wasmtime::ModuleInstance), - Js(super::v8::JsInstance), + // Box these instances because they're very different sizes, + // which makes Clippy sad and angry. + Wasm(Box), + Js(Box), } impl Module { @@ -354,8 +356,8 @@ impl Module { } fn create_instance(&self) -> Instance { match self { - Module::Wasm(module) => Instance::Wasm(module.create_instance()), - Module::Js(module) => Instance::Js(module.create_instance()), + Module::Wasm(module) => Instance::Wasm(Box::new(module.create_instance())), + Module::Js(module) => Instance::Js(Box::new(module.create_instance())), } } fn host_type(&self) -> HostType {