Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl fmt::Display for JsStackTraceFrame {

// This isn't exactly the same format as chrome uses,
// but it's close enough for now.
// TODO(centril): make it more like chrome in the future.
// TODO(v8): make it more like chrome in the future.
f.write_fmt(format_args!(
"at {} ({}:{}:{})",
fn_name, script_name, &self.line, &self.column
Expand Down
114 changes: 92 additions & 22 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@ use crate::host::wasm_common::module_host_actor::{
};
use crate::host::ArgsTuple;
use crate::{host::Scheduler, module_host_context::ModuleCreationContext, replica_context::ReplicaContext};
use anyhow::anyhow;
use core::sync::atomic::{AtomicBool, Ordering};
use core::time::Duration;
use de::deserialize_js;
use error::{catch_exception, exception_already_thrown, log_traceback, ExcResult, Throwable};
use from_value::cast;
use key_cache::get_or_create_key_cache;
use ser::serialize_to_js;
use spacetimedb_client_api_messages::energy::{EnergyQuanta, ReducerBudget};
use spacetimedb_client_api_messages::energy::ReducerBudget;
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::traits::Program;
use spacetimedb_lib::RawModuleDef;
use spacetimedb_lib::{ConnectionId, Identity};
use spacetimedb_schema::auto_migrate::MigrationPolicy;
use std::sync::{Arc, LazyLock};
use v8::{Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, Local, Value};
use std::thread;
use std::time::Instant;
use v8::{Context, ContextOptions, ContextScope, Function, HandleScope, Isolate, IsolateHandle, Local, Value};

mod de;
mod error;
Expand Down Expand Up @@ -81,9 +83,13 @@ impl V8RuntimeInner {
);

if true {
return Err::<JsModule, _>(anyhow!("v8_todo"));
return Err::<JsModule, _>(anyhow::anyhow!("v8_todo"));
}

// TODO(v8): determine min required ABI by module and check that it's supported?

// TODO(v8): validate function signatures like in WASM? Is that possible with V8?

let desc = todo!();
// Validate and create a common module rom the raw definition.
let common = build_common_module_from_raw(mcc, desc)?;
Expand Down Expand Up @@ -121,6 +127,17 @@ impl Module for JsModule {
}

fn create_instance(&self) -> Self::Instance {
// TODO(v8): consider some equivalent to `epoch_deadline_callback`
// where we report `Js has been running for ...`.

// TODO(v8): timeout things like `extract_description`.

// TODO(v8): do we care about preinits / setup or are they unnecessary?

// TODO(v8): create `InstanceEnv`.

// TODO(v8): extract description.

todo!()
}
}
Expand All @@ -147,35 +164,49 @@ impl ModuleInstance for JsInstance {
}

fn call_reducer(&mut self, tx: Option<MutTxId>, params: CallReducerParams) -> super::ReducerCallResult {
// TODO(centril): snapshots, module->host calls
let mut isolate = Isolate::new(<_>::default());
let scope = &mut HandleScope::new(&mut isolate);
let context = Context::new(scope, ContextOptions::default());
let scope = &mut ContextScope::new(scope, context);

self.common.call_reducer_with_tx(
&self.replica_ctx.clone(),
tx,
params,
log_traceback,
|tx, op, _budget| {
let call_result = call_call_reducer_from_op(scope, op);
// TODO(centril): energy metrering.
let energy = EnergyStats {
used: EnergyQuanta::ZERO,
wasmtime_fuel_used: 0,
remaining: ReducerBudget::ZERO,
};
// TODO(centril): timings.
|tx, op, budget| {
// TODO(v8): snapshots, module->host calls
// Setup V8 scope.
let mut isolate: v8::OwnedIsolate = Isolate::new(<_>::default());
let isolate_handle = isolate.thread_safe_handle();
let mut scope_1 = HandleScope::new(&mut isolate);
let context = Context::new(&mut scope_1, ContextOptions::default());
let mut scope_2 = ContextScope::new(&mut scope_1, context);

let timeout_thread_cancel_flag = run_reducer_timeout(isolate_handle, budget);

// Call the reducer.
let start = Instant::now();
let call_result = call_call_reducer_from_op(&mut scope_2, op);
let total_duration = start.elapsed();
// Cancel the execution timeout in `run_reducer_timeout`.
timeout_thread_cancel_flag.store(true, Ordering::Relaxed);

// Handle energy and timings.
let used = duration_to_budget(total_duration);
let remaining = budget - used;
let energy = EnergyStats { budget, remaining };
let timings = ExecutionTimings {
total_duration: Duration::ZERO,
total_duration,
// TODO(v8): call times.
wasm_instance_env_call_times: CallTimes::new(),
};

// Fetch the currently used heap size in V8.
// The used size is ostensibly fairer than the total size.
drop(scope_2);
drop(scope_1);
let memory_allocation = isolate.get_heap_statistics().used_heap_size();

let exec_result = ExecuteResult {
energy,
timings,
// TODO(centril): memory allocation.
memory_allocation: 0,
memory_allocation,
call_result,
};
(tx, exec_result)
Expand All @@ -184,6 +215,45 @@ impl ModuleInstance for JsInstance {
}
}

/// Spawns a thread that will terminate reducer execution
/// when `budget` has been used up.
fn run_reducer_timeout(isolate_handle: IsolateHandle, budget: ReducerBudget) -> Arc<AtomicBool> {
let execution_done_flag = Arc::new(AtomicBool::new(false));
let execution_done_flag2 = execution_done_flag.clone();
let timeout = budget_to_duration(budget);

// TODO(v8): Using an OS thread is a bit heavy handed...?
thread::spawn(move || {
// Sleep until the timeout.
thread::sleep(timeout);

if execution_done_flag2.load(Ordering::Relaxed) {
// The reducer completed successfully.
return;
}

// Reducer is still running.
// Terminate V8 execution.
isolate_handle.terminate_execution();
});

execution_done_flag
}

/// Converts a [`ReducerBudget`] to a [`Duration`].
fn budget_to_duration(_budget: ReducerBudget) -> Duration {
// TODO(v8): This is fake logic that allows a maximum timeout.
// Replace with sensible math.
Duration::MAX
}

/// Converts a [`Duration`] to a [`ReducerBudget`].
fn duration_to_budget(_duration: Duration) -> ReducerBudget {
// TODO(v8): This is fake logic that allows minimum energy usage.
// Replace with sensible math.
ReducerBudget::ZERO
}

/// Returns the global property `key`.
fn get_global_property<'scope>(
scope: &mut HandleScope<'scope>,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl<'this, 'scope> ser::Serializer for Serializer<'this, 'scope> {
}

fn serialize_named_product(self, _len: usize) -> Result<Self::SerializeNamedProduct, Self::Error> {
// TODO(noa): this can be more efficient if we tell it the names ahead of time
// TODO(v8, noa): this can be more efficient if we tell it the names ahead of time
let object = Object::new(self.scope);
Ok(SerializeNamedProduct {
inner: self,
Expand Down
24 changes: 16 additions & 8 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tracing::span::EnteredSpan;
use super::instrumentation::CallTimes;
use crate::client::ClientConnectionSender;
use crate::database_logger;
use crate::energy::{EnergyMonitor, EnergyQuanta, ReducerBudget, ReducerFingerprint};
use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint};
use crate::host::instance_env::InstanceEnv;
use crate::host::module_common::{build_common_module_from_raw, ModuleCommon};
use crate::host::module_host::{
Expand Down Expand Up @@ -60,11 +60,17 @@ pub trait WasmInstance: Send + Sync + 'static {
}

pub struct EnergyStats {
pub used: EnergyQuanta,
pub wasmtime_fuel_used: u64,
pub budget: ReducerBudget,
pub remaining: ReducerBudget,
}

impl EnergyStats {
/// Returns the used energy amount.
fn used(&self) -> ReducerBudget {
(self.budget.get() - self.remaining.get()).into()
}
}

pub struct ExecutionTimings {
pub total_duration: Duration,
pub wasm_instance_env_call_times: CallTimes,
Expand Down Expand Up @@ -412,22 +418,24 @@ impl InstanceCommon {
call_result,
} = result;

let energy_used = energy.used();
let energy_quanta_used = energy_used.into();
vm_metrics.report(
energy.wasmtime_fuel_used,
energy_used.get(),
timings.total_duration,
&timings.wasm_instance_env_call_times,
);

self.energy_monitor
.record_reducer(&energy_fingerprint, energy.used, timings.total_duration);
.record_reducer(&energy_fingerprint, energy_quanta_used, timings.total_duration);
if self.allocated_memory != memory_allocation {
self.metric_wasm_memory_bytes.set(memory_allocation as i64);
self.allocated_memory = memory_allocation;
}

reducer_span
.record("timings.total_duration", tracing::field::debug(timings.total_duration))
.record("energy.used", tracing::field::debug(energy.used));
.record("energy.used", tracing::field::debug(energy_used));

maybe_log_long_running_reducer(reducer_name, timings.total_duration);
reducer_span.exit();
Expand Down Expand Up @@ -486,7 +494,7 @@ impl InstanceCommon {
args,
},
status,
energy_quanta_used: energy.used,
energy_quanta_used,
host_execution_duration: timings.total_duration,
request_id,
timer,
Expand All @@ -495,7 +503,7 @@ impl InstanceCommon {

ReducerCallResult {
outcome: ReducerOutcome::from(&event.status),
energy_used: energy.used,
energy_used: energy_quanta_used,
execution_duration: timings.total_duration,
}
}
Expand Down
13 changes: 3 additions & 10 deletions crates/core/src/host/wasmtime/wasmtime_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,8 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {
#[tracing::instrument(level = "trace", skip_all)]
fn call_reducer(&mut self, op: ReducerOp<'_>, budget: ReducerBudget) -> module_host_actor::ExecuteResult {
let store = &mut self.store;
// note that ReducerBudget being a u64 is load-bearing here - although we convert budget right back into
// EnergyQuanta at the end of this function, from_energy_quanta clamps it to a u64 range.
// otherwise, we'd return something like `used: i128::MAX - u64::MAX`, which is inaccurate.
// Set the fuel budget in WASM.
set_store_fuel(store, budget.into());
let original_fuel = get_store_fuel(store);
store.set_epoch_deadline(EPOCH_TICKS_PER_SECOND);

// Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays.
Expand Down Expand Up @@ -231,14 +228,10 @@ impl module_host_actor::WasmInstance for WasmtimeInstance {

let call_result = call_result.map(|code| handle_error_sink_code(code, error));

// Compute fuel and heap usage.
let remaining_fuel = get_store_fuel(store);

let remaining: ReducerBudget = remaining_fuel.into();
let energy = module_host_actor::EnergyStats {
used: (budget - remaining).into(),
wasmtime_fuel_used: original_fuel.0 - remaining_fuel.0,
remaining,
};
let energy = module_host_actor::EnergyStats { budget, remaining };
let memory_allocation = store.data().get_mem().memory.data_size(&store);

module_host_actor::ExecuteResult {
Expand Down
Loading