Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f26e20f
Rewrite `jobs.rs` to use single-threaded Tokio runtimes
gefjon Sep 12, 2025
54b7d5a
Thread `jobs.rs` changes through all of `core` crate.
gefjon Sep 22, 2025
ba27106
Merge remote-tracking branch 'origin/master' into phoebe/wasmtime-async
gefjon Sep 22, 2025
4c088f2
Fix standalone main tyck
gefjon Sep 22, 2025
0fedef3
Clean up testing `JobCores` initialization
gefjon Sep 22, 2025
b26d447
Wrap some tests in a `mod` so that I can `--skip` them
gefjon Sep 22, 2025
e54e728
Use Wasmtime `async` functions where necessary
gefjon Sep 22, 2025
21830cd
Fix clippy lints (except unused method I can't explain)
gefjon Sep 22, 2025
56c89e6
Add futures-util and use their `now_or_never`
gefjon Sep 23, 2025
9bb6df0
Revert "Wrap some tests in a `mod` so that I can `--skip` them"
gefjon Sep 24, 2025
c1da1b8
Fix typo'd comments revealed in PR review
gefjon Sep 25, 2025
db90e6d
Expand comments as requested in code review
gefjon Sep 25, 2025
b27c69d
Merge branch 'phoebe/wasmtime-async' of github.com:clockworklabs/Spac…
gefjon Sep 25, 2025
a4f5d50
Merge branch 'master' into phoebe/wasmtime-async
gefjon Sep 25, 2025
f1c5938
Add metric for time spent in `create_instance`
gefjon Sep 25, 2025
492bd93
Use existing `HostType` enum
gefjon Sep 25, 2025
81c6366
Remove unused `initial_instances` method
gefjon Sep 26, 2025
9dbb4e7
Merge remote-tracking branch 'origin/master' into phoebe/wasmtime-async
gefjon Oct 2, 2025
a55c0e0
Fix panic on shutdown
gefjon Oct 2, 2025
438cb98
Merge remote-tracking branch 'origin/master' into phoebe/wasmtime-async
gefjon Oct 3, 2025
e22d3cd
Merge remote-tracking branch 'origin/master' into phoebe/wasmtime-async
gefjon Oct 7, 2025
af591f8
Clippy
gefjon Oct 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ fs_extra = "1.3.0"
fs2 = "0.4.3"
futures = "0.3"
futures-channel = "0.3"
futures-util = "0.3"
getrandom02 = { package = "getrandom", version = "0.2" }
glob = "0.3.1"
hashbrown = { version = "0.15", default-features = false, features = ["equivalent", "inline-more"] }
Expand Down Expand Up @@ -312,6 +313,7 @@ version = "25"
default-features = false
features = [
"addr2line",
"async",
"cache",
"cranelift",
"demangle",
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ enum-map.workspace = true
flate2.workspace = true
fs2.workspace = true
futures.workspace = true
futures-util.workspace = true
hashbrown = { workspace = true, features = ["rayon", "serde"] }
hex.workspace = true
hostname.workspace = true
Expand Down
24 changes: 13 additions & 11 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::replica_context::ReplicaContext;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager};
use crate::util::asyncify;
use crate::util::jobs::{JobCore, JobCores};
use crate::util::jobs::{JobCores, SingleCoreExecutor};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
Expand Down Expand Up @@ -583,14 +583,16 @@ async fn make_module_host(
program: Program,
energy_monitor: Arc<dyn EnergyMonitor>,
unregister: impl Fn() + Send + Sync + 'static,
core: JobCore,
executor: SingleCoreExecutor,
) -> anyhow::Result<(Program, ModuleHost)> {
// `make_actor` is blocking, as it needs to compile the wasm to native code,
// which may be computationally expensive - sometimes up to 1s for a large module.
// TODO: change back to using `spawn_rayon` here - asyncify runs on tokio blocking
// threads, but those aren't for computation. Also, wasmtime uses rayon
// to run compilation in parallel, so it'll need to run stuff in rayon anyway.
asyncify(move || {
let database_identity = replica_ctx.database_identity;

let mcc = ModuleCreationContext {
replica_ctx,
scheduler,
Expand All @@ -603,12 +605,12 @@ async fn make_module_host(
HostType::Wasm => {
let actor = runtimes.wasmtime.make_actor(mcc)?;
trace!("wasmtime::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, unregister, core)
ModuleHost::new(actor, unregister, executor, database_identity)
}
HostType::Js => {
let actor = runtimes.v8.make_actor(mcc)?;
trace!("v8::make_actor blocked for {:?}", start.elapsed());
ModuleHost::new(actor, unregister, core)
ModuleHost::new(actor, unregister, executor, database_identity)
}
};
Ok((program, module_host))
Expand Down Expand Up @@ -641,7 +643,7 @@ async fn launch_module(
energy_monitor: Arc<dyn EnergyMonitor>,
replica_dir: ReplicaDir,
runtimes: Arc<HostRuntimes>,
core: JobCore,
executor: SingleCoreExecutor,
) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = database.database_identity;
let host_type = database.host_type;
Expand All @@ -658,7 +660,7 @@ async fn launch_module(
program,
energy_monitor.clone(),
on_panic,
core,
executor,
)
.await?;

Expand Down Expand Up @@ -874,7 +876,7 @@ impl Host {
page_pool: PagePool,
database: Database,
program: Program,
core: JobCore,
executor: SingleCoreExecutor,
) -> anyhow::Result<Arc<ModuleInfo>> {
// Even in-memory databases acquire a lockfile.
// Grab a tempdir to put that lockfile in.
Expand Down Expand Up @@ -906,7 +908,7 @@ impl Host {
Arc::new(NullEnergyMonitor),
phony_replica_dir,
runtimes.clone(),
core,
executor,
)
.await?;

Expand Down Expand Up @@ -939,7 +941,7 @@ impl Host {
policy: MigrationPolicy,
energy_monitor: Arc<dyn EnergyMonitor>,
on_panic: impl Fn() + Send + Sync + 'static,
core: JobCore,
executor: SingleCoreExecutor,
) -> anyhow::Result<UpdateDatabaseResult> {
let replica_ctx = &self.replica_ctx;
let (scheduler, scheduler_starter) = Scheduler::open(self.replica_ctx.relational_db.clone());
Expand All @@ -952,7 +954,7 @@ impl Host {
program,
energy_monitor,
on_panic,
core,
executor,
)
.await?;

Expand Down Expand Up @@ -1094,7 +1096,7 @@ pub async fn extract_schema(program_bytes: Box<[u8]>, host_type: HostType) -> an

let runtimes = HostRuntimes::new(None);
let page_pool = PagePool::new(None);
let core = JobCore::default();
let core = SingleCoreExecutor::in_current_tokio_runtime();
let module_info = Host::try_init_in_memory_to_check(&runtimes, page_pool, database, program, core).await?;
// this should always succeed, but sometimes it doesn't
let module_def = match Arc::try_unwrap(module_info) {
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/host/module_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
energy::EnergyMonitor,
host::{
module_host::{DynModule, ModuleInfo},
module_host::ModuleInfo,
wasm_common::{module_host_actor::DescribeError, DESCRIBE_MODULE_DUNDER},
Scheduler,
},
Expand Down Expand Up @@ -80,12 +80,12 @@ impl ModuleCommon {
}
}

impl DynModule for ModuleCommon {
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
impl ModuleCommon {
pub fn replica_ctx(&self) -> &Arc<ReplicaContext> {
&self.replica_context
}

fn scheduler(&self) -> &Scheduler {
pub fn scheduler(&self) -> &Scheduler {
&self.scheduler
}
}
Expand Down
Loading
Loading