From 152f86bee9d8916ba896f26f04b9e6e911e9ed4e Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 11:39:39 -0400 Subject: [PATCH 01/11] Support `BytesSource`s other than the one for reducer args Each reducer gets its arguments through an `ArgSource`, a Unix-file-like abstraction for streams of bytes. Prior to this commit, we had an ABI designed as if it could support other args sources, but it actually hardcoded the ID of the reducer args source, and errored elsewhere. This commit extends the `BytesSource` infrastructure to support other bytes sources. This will be useful for exposing JWT payloads and HTTP responses. No other `BytesSource` uses are actually included in this commit, only the infrastructure. This commit also defines a new host call, `bytes_source_remaining_length`. This is intended to allow callers to pre-allocate a buffer correctly sized to read the entire `BytesSource` all at once. The new host function is added to a new ABI minor version, 10.1, so that old SpacetimeDB hosts can detect and reject too-new compiled modules. I have added uses of this new function to `__call_reducer__` in both Rust and C#, even though it's not strictly necessary, and I haven't removed the loop which repeatedly calls `bytes_source_read` and grows the buffer. --- Cargo.lock | 1 + .../Runtime/Internal/Module.cs | 15 +- crates/bindings-csharp/Runtime/bindings.c | 8 +- crates/bindings-sys/src/lib.rs | 29 +++ crates/bindings/src/rt.rs | 12 ++ crates/core/Cargo.toml | 1 + crates/core/src/host/mod.rs | 1 + crates/core/src/host/wasm_common.rs | 2 + .../src/host/wasmtime/wasm_instance_env.rs | 166 +++++++++++++++--- .../core/src/host/wasmtime/wasmtime_module.rs | 5 +- .../src/module_bindings/mod.rs | 2 +- .../test-client/src/module_bindings/mod.rs | 2 +- 12 files changed, 215 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6121eb4292..2f8771abda7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5823,6 +5823,7 @@ dependencies = [ "log", "memchr", "nix 0.30.1", + "nohash-hasher", "once_cell", "openssl", "parking_lot 0.12.3", diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index 68ddd5c273e..a54b58d2f8d 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -116,7 +116,20 @@ private static byte[] Consume(this BytesSource source) { return []; } - var buffer = new byte[0x20_000]; + + uint32_t len = 0; + var ret = FFI.bytes_source_remaining_length(source, ref len); + switch (ret) + { + case Errno.OK: + break; + case Errno.NO_SUCH_BYTES: + throw new NoSuchBytesException(); + default: + throw new UnknownException(ret); + } + + var buffer = new byte[len]; var written = 0U; while (true) { diff --git a/crates/bindings-csharp/Runtime/bindings.c b/crates/bindings-csharp/Runtime/bindings.c index 51eb8712c12..54b471783bf 100644 --- a/crates/bindings-csharp/Runtime/bindings.c +++ b/crates/bindings-csharp/Runtime/bindings.c @@ -27,7 +27,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t); #define CSTR(s) (uint8_t*)s, sizeof(s) - 1 #define STDB_EXTERN(name) \ - __attribute__((import_module("spacetime_10.0"), import_name(#name))) extern + __attribute__((import_module(SPACETIME_MODULE_VERSION), import_name(#name))) extern #ifndef EXPERIMENTAL_WASM_AOT #define IMPORT(ret, name, params, args) \ @@ -37,6 +37,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t); #define IMPORT(ret, name, params, args) STDB_EXTERN(name) ret name params; #endif +#define SPACETIME_MODULE_VERSION "spacetime_10.0" IMPORT(Status, table_id_from_name, (const uint8_t* name, uint32_t name_len, TableId* id), (name, name_len, id)); @@ -97,6 +98,11 @@ IMPORT(void, volatile_nonatomic_schedule_immediate, (const uint8_t* name, size_t name_len, const uint8_t* args, size_t args_len), (name, name_len, args, args_len)); IMPORT(void, identity, (void* id_ptr), (id_ptr)); +#undef SPACETIME_MODULE_VERSION + +#define SPACETIME_MODULE_VERSION "spacetime_10.1" +IMPORT(int16_t, bytes_source_remaining_length, (BytesSource source, uint32_t* out), (source, out)); +#undef SPACETIME_MODULE_VERSION #ifndef EXPERIMENTAL_WASM_AOT static MonoClass* ffi_class; diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index b901de7e5d7..6b71b955fc8 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -590,6 +590,35 @@ pub mod raw { pub fn identity(out_ptr: *mut u8); } + // See comment on previous `extern "C"` block re: ABI version. + #[link(wasm_import_module = "spacetime_10.1")] + extern "C" { + /// Read the remaining length of a [`BytesSource`] and write it to `out`. + /// + /// Note that the host automatically frees byte sources which are exhausted. + /// Such sources are invalid, and this method will return an error when passed one. + /// Callers of [`bytes_source_read`] should check for a return of -1 + /// before invoking this function on the same `source`. + /// + /// Also note that the special [`BytesSource::INVALID`] (zero) is always invalid. + /// Callers should check for that value before invoking this function. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `out` is NULL or `ou` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// If this function returns an error, `out` is not written. + pub fn bytes_source_remaining_length(source: BytesSource, out: *mut u32) -> i16; + } + /// What strategy does the database index use? /// /// See also: diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index e169e916fd1..6b7c2d9b995 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -535,6 +535,18 @@ const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get(); fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { const INVALID: i16 = NO_SUCH_BYTES as i16; + let len = { + let mut len = 0; + let ret = unsafe { sys::raw::bytes_source_remaining_length(source, &raw mut len) }; + match ret { + 0 => len, + INVALID => panic!("invalid source passed"), + _ => unreachable!(), + } + }; + + buf.reserve(buf.len().saturating_sub(len as usize)); + loop { // Write into the spare capacity of the buffer. let buf_ptr = buf.spare_capacity_mut(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 13136a18ef1..142a19e7484 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -69,6 +69,7 @@ jsonwebtoken.workspace = true lazy_static.workspace = true log.workspace = true memchr.workspace = true +nohash-hasher.workspace = true once_cell.workspace = true openssl.workspace = true parking_lot.workspace = true diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 6e4525032f7..05538542bbe 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -139,6 +139,7 @@ pub enum AbiCall { DatastoreDeleteByIndexScanRangeBsatn, DatastoreDeleteAllByEqBsatn, BytesSourceRead, + BytesSourceRemainingLength, BytesSinkWrite, ConsoleLog, ConsoleTimerStart, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 666cc808ea3..f2b29310ff9 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -395,6 +395,8 @@ macro_rules! abi_funcs { // unstable: "spacetime_10.0"::volatile_nonatomic_schedule_immediate, + + "spacetime_10.1"::bytes_source_remaining_length, } }; } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index c2cd6ed067e..5abfbc0ba4c 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1,5 +1,6 @@ #![allow(clippy::too_many_arguments)] +use std::num::NonZeroU32; use std::time::Instant; use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; @@ -12,6 +13,7 @@ use crate::host::wasm_common::{ }; use crate::host::AbiCall; use anyhow::Context as _; +use spacetimedb_data_structures::map::IntMap; use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{errno, ColId}; use wasmtime::{AsContext, Caller, StoreContextMut}; @@ -23,6 +25,47 @@ use instrumentation::noop as span; #[cfg(feature = "spacetimedb-wasm-instance-env-times")] use instrumentation::op as span; +/// A stream of bytes which the WASM module can read from +/// using [`WasmInstanceEnv::bytes_source_read`]. +/// +/// These are managed in the `bytes_sources` of [`WasmInstanceEnv`], +/// where each one is paired with an integer ID. +/// This is basically a massively-simplified version of Unix read files and file descriptors. +/// +/// Unlike Unix read files, we implicitly close `BytesSource`s once they are read to the end. +/// This is sensible because we don't provide a seek operation, +/// so the `BytesSource` becomes useless once read to the end. +struct BytesSource { + /// The actual bytes which will be returned by calls to `byte_source_read`. + bytes: bytes::Bytes, + /// The index in `self.bytes` of the next byte to be read by `byte_source_read`. + /// + /// When this is one past the end of `self.bytes`, this `ByteSource` is expended and should be discarded. + cursor: usize, +} + +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +/// Identifier for a [`BytesSource`] stored in the `bytes_sources` of a [`WasmInstanceEnv`]. +/// +/// The special sentinel [`Self::INVALID`] (zero) is used for a never-readable [`BytesSource`]. +/// We pass this to guests for a [`BytesSource`] with a length of zero. +pub(super) struct BytesSourceId(pub(super) u32); + +// `nohash_hasher` recommends impling `Hash` explicitly rather than using the derive macro, +// as the derive macro is not technically guaranteed to only call `hasher.write_{int}` for an integer newtype, +// even though any other behavior would be deranged. +impl std::hash::Hash for BytesSourceId { + fn hash(&self, hasher: &mut H) { + hasher.write_u32(self.0) + } +} + +impl nohash_hasher::IsEnabled for BytesSourceId {} + +impl BytesSourceId { + const INVALID: Self = Self(0); +} + /// A `WasmInstanceEnv` provides the connection between a module /// and the database. /// @@ -47,9 +90,19 @@ pub(super) struct WasmInstanceEnv { /// always be `Some`. mem: Option, - /// The arguments being passed to a reducer - /// that it can read via [`Self::bytes_source_read`]. - call_reducer_args: Option<(bytes::Bytes, usize)>, + /// `File`-like [`BytesSource`]s which guest code can read via [`Self::bytes_source_read`]. + /// + /// These are essentially simplified versions of Unix read files, + /// with [`BytesSourceId`] being file descriptors. + /// + /// Unlike Unix files, we implicitly close a [`BytesSource`] when it is read to the end. + /// This is because we don't provide a seek operation and a [`BytesSource`] never grows after initialization. + bytes_sources: IntMap, + + /// Counter as a source of [`BytesSourceId`] values. + /// + /// Recall that zero is [`BytesSourceId::INVALID`], so we have to start at 1. + next_bytes_source_id: NonZeroU32, /// The standard sink used for [`Self::bytes_sink_write`]. standard_bytes_sink: Option>, @@ -76,7 +129,6 @@ pub(super) struct WasmInstanceEnv { chunk_pool: ChunkPool, } -const CALL_REDUCER_ARGS_SOURCE: u32 = 1; const STANDARD_BYTES_SINK: u32 = 1; type WasmResult = Result; @@ -91,7 +143,8 @@ impl WasmInstanceEnv { Self { instance_env, mem: None, - call_reducer_args: None, + bytes_sources: IntMap::default(), + next_bytes_source_id: NonZeroU32::new(1).unwrap(), standard_bytes_sink: None, iters: Default::default(), timing_spans: Default::default(), @@ -102,6 +155,38 @@ impl WasmInstanceEnv { } } + fn alloc_bytes_source_id(&mut self) -> RtResult { + let id = self.next_bytes_source_id; + self.next_bytes_source_id = id + .checked_add(1) + .context("Allocating next `BytesSourceId` overflowed u32")?; + Ok(BytesSourceId(id.into())) + } + + fn create_bytes_source(&mut self, bytes: bytes::Bytes) -> RtResult { + // Pass an invalid source when the bytes were empty. + // This allows the module to avoid allocating and make a system call in those cases. + if bytes.is_empty() { + Ok(BytesSourceId::INVALID) + } else if bytes.len() > u32::MAX as usize { + Err(anyhow::anyhow!( + "`create_bytes_source`: `Bytes` has length {}, which is greater than u32::MAX {}", + bytes.len(), + u32::MAX, + )) + } else { + let id = self.alloc_bytes_source_id()?; + self.bytes_sources.insert(id, BytesSource { bytes, cursor: 0 }); + Ok(id) + } + } + + fn free_bytes_source(&mut self, id: BytesSourceId) { + if self.bytes_sources.remove(&id).is_none() { + log::warn!("`free_bytes_source` on non-existent source {id:?}"); + } + } + /// Finish the instantiation of this instance with the provided `Mem`. pub fn instantiate(&mut self, mem: Mem) { assert!(self.mem.is_none()); @@ -140,17 +225,10 @@ impl WasmInstanceEnv { /// /// Returns the handle used by reducers to read from `args` /// as well as the handle used to write the error message, if any. - pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (u32, u32) { + pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (BytesSourceId, u32) { let errors = self.setup_standard_bytes_sink(); - // Pass an invalid source when the reducer args were empty. - // This allows the module to avoid allocating and make a system call in those cases. - self.call_reducer_args = (!args.is_empty()).then_some((args, 0)); - let args = if self.call_reducer_args.is_some() { - CALL_REDUCER_ARGS_SOURCE - } else { - 0 - }; + let args = self.create_bytes_source(args).unwrap(); self.reducer_start = Instant::now(); name.clone_into(&mut self.reducer_name); @@ -194,7 +272,7 @@ impl WasmInstanceEnv { wasm_instance_env_call_times, }; - self.call_reducer_args = None; + self.bytes_sources = IntMap::default(); (timings, self.take_standard_bytes_sink()) } @@ -1029,12 +1107,10 @@ impl WasmInstanceEnv { Self::cvt_custom(caller, AbiCall::BytesSourceRead, |caller| { let (mem, env) = Self::mem_env(caller); + let source = BytesSourceId(source); + // Retrieve the reducer args if available and requested, or error. - let Some((reducer_args, cursor)) = env - .call_reducer_args - .as_mut() - .filter(|_| source == CALL_REDUCER_ARGS_SOURCE) - else { + let Some(bytes_source) = env.bytes_sources.get_mut(&source) else { return Ok(errno::NO_SUCH_BYTES.get().into()); }; @@ -1046,7 +1122,7 @@ impl WasmInstanceEnv { // Derive the portion that we can read and what remains, // based on what is left to read and the capacity. - let left_to_read = &reducer_args[*cursor..]; + let left_to_read = &bytes_source.bytes[bytes_source.cursor..]; let can_read_len = buffer_len.min(left_to_read.len()); let (can_read, remainder) = left_to_read.split_at(can_read_len); // Copy to the `buffer` and write written bytes count to `buffer_len`. @@ -1055,15 +1131,59 @@ impl WasmInstanceEnv { // Destroy the source if exhausted, or advance `cursor`. if remainder.is_empty() { - env.call_reducer_args = None; + env.free_bytes_source(source); Ok(-1i32) } else { - *cursor += can_read_len; + bytes_source.cursor += can_read_len; Ok(0) } }) } + /// Read the remaining length of a [`BytesSource`] and write it to `out`. + /// + /// Note that the host automatically frees byte sources which are exhausted. + /// Such sources are invalid, and this method will return an error when passed one. + /// Callers of [`Self::bytes_source_read`] should check for a return of -1 + /// before invoking this function on the same `source`. + /// + /// Also note that the special [`BytesSourceId::INVALID`] (zero) is always invalid. + /// Callers should check for that value before invoking this function. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `out` is NULL or `ou` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// If this function returns an error, `out` is not written. + pub fn bytes_source_remaining_length(caller: Caller<'_, Self>, source: u32, out: WasmPtr) -> RtResult { + Self::cvt_custom(caller, AbiCall::BytesSourceRemainingLength, |caller| { + let (mem, env) = Self::mem_env(caller); + + let Some(bytes_source) = env.bytes_sources.get(&BytesSourceId(source)) else { + return Ok(errno::NO_SUCH_BYTES.get().into()); + }; + + let total_len = bytes_source.bytes.len(); + let remaining = total_len.saturating_sub(bytes_source.cursor); + let remaining: u32 = remaining + .try_into() + .context("Bytes object in `BytesSource` had length greater than range of u32")?; + + u32::write_to(remaining, mem, out) + .context("Failed to write output from `bytes_source_remaining_length`")?; + + Ok(0) + }) + } + /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`, /// to the `sink`, registered in the host environment. /// diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 141f7455518..4374b708eb3 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -34,7 +34,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 0); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 1); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; @@ -205,6 +205,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); + let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp); let call_result = self.call_reducer.call( @@ -218,7 +219,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { conn_id_0, conn_id_1, op.timestamp.to_micros_since_unix_epoch() as u64, - args_source, + args_source.0, errors_sink, ), ); diff --git a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs index c4575536b3c..37e04dc5294 100644 --- a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; diff --git a/sdks/rust/tests/test-client/src/module_bindings/mod.rs b/sdks/rust/tests/test-client/src/module_bindings/mod.rs index 96ba580035b..9c8008c7e3c 100644 --- a/sdks/rust/tests/test-client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/test-client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; From 1d81267097e26cb4caa97d02cecc63219bbad493 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 11:55:49 -0400 Subject: [PATCH 02/11] Hopefully fix C# errors --- .../bindings-csharp/Runtime/Internal/FFI.cs | 52 ++++++++++++------- .../Runtime/Internal/Module.cs | 4 +- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 76cb057b8de..84fd40423ad 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -44,7 +44,7 @@ internal static partial class FFI // For now this must match the name of the `.c` file (`bindings.c`). // In the future C# will allow to specify Wasm import namespace in // `LibraryImport` directly. - const string StdbNamespace = + const string StdbNamespace10_0 = #if EXPERIMENTAL_WASM_AOT "spacetime_10.0" #else @@ -52,6 +52,14 @@ internal static partial class FFI #endif ; + const string StdbNamespace10_1 = +#if EXPERIMENTAL_WASM_AOT + "spacetime_10.1" +#else + "bindings" +#endif + ; + [NativeMarshalling(typeof(Marshaller))] public struct CheckedStatus { @@ -126,30 +134,30 @@ public readonly record struct RowIter(uint Handle) public static readonly RowIter INVALID = new(0); } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus table_id_from_name( [In] byte[] name, uint name_len, out TableId out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus index_id_from_name( [In] byte[] name, uint name_len, out IndexId out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_table_row_count(TableId table_id, out ulong out_); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_table_scan_bsatn( TableId table_id, out RowIter out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_index_scan_range_bsatn( IndexId index_id, ReadOnlySpan prefix, @@ -162,24 +170,24 @@ public static partial CheckedStatus datastore_index_scan_range_bsatn( out RowIter out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial Errno row_iter_bsatn_advance( RowIter iter_handle, [MarshalUsing(CountElementName = nameof(buffer_len))] [Out] byte[] buffer, ref uint buffer_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus row_iter_bsatn_close(RowIter iter_handle); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_insert_bsatn( TableId table_id, Span row, ref uint row_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_update_bsatn( TableId table_id, IndexId index_id, @@ -187,7 +195,7 @@ public static partial CheckedStatus datastore_update_bsatn( ref uint row_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn( IndexId index_id, ReadOnlySpan prefix, @@ -200,7 +208,7 @@ public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn( out uint out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_delete_all_by_eq_bsatn( TableId table_id, [In] byte[] relation, @@ -208,14 +216,14 @@ public static partial CheckedStatus datastore_delete_all_by_eq_bsatn( out uint out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial Errno bytes_source_read( BytesSource source, Span buffer, ref uint buffer_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus bytes_sink_write( BytesSink sink, ReadOnlySpan buffer, @@ -232,7 +240,7 @@ public enum LogLevel : byte Panic = 5, } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial void console_log( LogLevel level, [In] byte[] target, @@ -269,13 +277,13 @@ internal static class ConsoleTimerIdMarshaller } } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial ConsoleTimerId console_timer_start([In] byte[] name, uint name_len); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus console_timer_end(ConsoleTimerId stopwatch_id); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial void volatile_nonatomic_schedule_immediate( [In] byte[] name, uint name_len, @@ -291,7 +299,13 @@ uint args_len // which prevents source-generated PInvokes from working with types from other assemblies, and // `Identity` lives in another assembly (`BSATN.Runtime`). Luckily, `DllImport` is enough here. #pragma warning disable SYSLIB1054 // Suppress "Use 'LibraryImportAttribute' instead of 'DllImportAttribute'" warning. - [DllImport(StdbNamespace)] + [DllImport(StdbNamespace10_0)] public static extern void identity(out Identity dest); #pragma warning restore SYSLIB1054 + + [DllImport(StdbNamespace10_1)] + public static partial Errno bytes_source_remaining_length( + BytesSource(source), + ref uint len + ); } diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index a54b58d2f8d..248ed76519a 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -117,7 +117,7 @@ private static byte[] Consume(this BytesSource source) return []; } - uint32_t len = 0; + var len = (uint)0; var ret = FFI.bytes_source_remaining_length(source, ref len); switch (ret) { @@ -136,7 +136,7 @@ private static byte[] Consume(this BytesSource source) // Write into the spare capacity of the buffer. var spare = buffer.AsSpan((int)written); var buf_len = (uint)spare.Length; - var ret = FFI.bytes_source_read(source, spare, ref buf_len); + ret = FFI.bytes_source_read(source, spare, ref buf_len); written += buf_len; switch (ret) { From eff691423b94d1842f0baf32d4ef63b35324e13d Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 12:36:14 -0400 Subject: [PATCH 03/11] Csharp format for lints - delete trailing whitespace --- crates/bindings-csharp/Runtime/Internal/Module.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index 248ed76519a..ae2b46b5188 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -128,7 +128,7 @@ private static byte[] Consume(this BytesSource source) default: throw new UnknownException(ret); } - + var buffer = new byte[len]; var written = 0U; while (true) From 602b9ccf28d49c0d0eb1aa6bb880e553e05808ae Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 12:37:28 -0400 Subject: [PATCH 04/11] Fix typo in C# code --- crates/bindings-csharp/Runtime/Internal/FFI.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 84fd40423ad..a3b414c0401 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -305,7 +305,7 @@ uint args_len [DllImport(StdbNamespace10_1)] public static partial Errno bytes_source_remaining_length( - BytesSource(source), + BytesSource source, ref uint len ); } From a549aafce4ce584206dc9810d55a13b907a41bdb Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 13:37:50 -0400 Subject: [PATCH 05/11] Also reset ID counter to 1 after end of call --- crates/core/src/host/wasmtime/wasm_instance_env.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 5abfbc0ba4c..e2ba653bfce 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -272,7 +272,11 @@ impl WasmInstanceEnv { wasm_instance_env_call_times, }; + // Drop any outstanding bytes sources and reset the ID counter, + // so that we don't leak either the IDs or the buffers themselves. self.bytes_sources = IntMap::default(); + self.next_bytes_source_id = NonZeroU32::from(1).unwrap(); + (timings, self.take_standard_bytes_sink()) } From 4078fbf837bc36c72b964b059592a90b255dcd15 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 13:51:34 -0400 Subject: [PATCH 06/11] It's not `from`, it's `new` --- crates/core/src/host/wasmtime/wasm_instance_env.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index e2ba653bfce..87deba9c5a4 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -275,7 +275,7 @@ impl WasmInstanceEnv { // Drop any outstanding bytes sources and reset the ID counter, // so that we don't leak either the IDs or the buffers themselves. self.bytes_sources = IntMap::default(); - self.next_bytes_source_id = NonZeroU32::from(1).unwrap(); + self.next_bytes_source_id = NonZeroU32::new(1).unwrap(); (timings, self.take_standard_bytes_sink()) } From 927d03dc95bb782c8beecc09d2a059baceb38abd Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 13:56:23 -0400 Subject: [PATCH 07/11] Remove unnecessary cursor from `BytesSource` As @jsdt points out, `bytes::Bytes` is already a view over an underlying buffer, so keeping an additional cursor as a slice into it is silly. --- .../src/host/wasmtime/wasm_instance_env.rs | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 87deba9c5a4..56c119e80ca 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -37,11 +37,9 @@ use instrumentation::op as span; /// so the `BytesSource` becomes useless once read to the end. struct BytesSource { /// The actual bytes which will be returned by calls to `byte_source_read`. - bytes: bytes::Bytes, - /// The index in `self.bytes` of the next byte to be read by `byte_source_read`. /// - /// When this is one past the end of `self.bytes`, this `ByteSource` is expended and should be discarded. - cursor: usize, + /// When this becomes empty, this `ByteSource` is expended and should be discarded. + bytes: bytes::Bytes, } #[derive(Copy, Clone, PartialEq, Eq, Debug)] @@ -176,7 +174,7 @@ impl WasmInstanceEnv { )) } else { let id = self.alloc_bytes_source_id()?; - self.bytes_sources.insert(id, BytesSource { bytes, cursor: 0 }); + self.bytes_sources.insert(id, BytesSource { bytes }); Ok(id) } } @@ -1126,19 +1124,17 @@ impl WasmInstanceEnv { // Derive the portion that we can read and what remains, // based on what is left to read and the capacity. - let left_to_read = &bytes_source.bytes[bytes_source.cursor..]; - let can_read_len = buffer_len.min(left_to_read.len()); - let (can_read, remainder) = left_to_read.split_at(can_read_len); + let can_read_len = buffer_len.min(bytes_source.bytes.len()); + let can_read = bytes_source.bytes.split_to(can_read_len); // Copy to the `buffer` and write written bytes count to `buffer_len`. - buffer[..can_read_len].copy_from_slice(can_read); + buffer[..can_read_len].copy_from_slice(&can_read); (can_read_len as u32).write_to(mem, buffer_len_ptr)?; // Destroy the source if exhausted, or advance `cursor`. - if remainder.is_empty() { + if bytes_source.bytes.is_empty() { env.free_bytes_source(source); Ok(-1i32) } else { - bytes_source.cursor += can_read_len; Ok(0) } }) @@ -1175,9 +1171,9 @@ impl WasmInstanceEnv { return Ok(errno::NO_SUCH_BYTES.get().into()); }; - let total_len = bytes_source.bytes.len(); - let remaining = total_len.saturating_sub(bytes_source.cursor); - let remaining: u32 = remaining + let remaining: u32 = bytes_source + .bytes + .len() .try_into() .context("Bytes object in `BytesSource` had length greater than range of u32")?; From b88599dd9d7a4c95c361736cb9de1b8d17906d11 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 16:22:43 -0400 Subject: [PATCH 08/11] `extern`, not `partial` Co-authored-by: rekhoff Signed-off-by: Phoebe Goldman --- crates/bindings-csharp/Runtime/Internal/FFI.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index a3b414c0401..e79d0a30261 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -304,7 +304,7 @@ uint args_len #pragma warning restore SYSLIB1054 [DllImport(StdbNamespace10_1)] - public static partial Errno bytes_source_remaining_length( + public static extern Errno bytes_source_remaining_length( BytesSource source, ref uint len ); From ca8e00da6fe28c7f79432af71bca23a1a95005a9 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Thu, 25 Sep 2025 17:31:06 -0400 Subject: [PATCH 09/11] Run formatter --- crates/bindings-csharp/Runtime/Internal/FFI.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index e79d0a30261..2c0bf8b925f 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -304,8 +304,5 @@ uint args_len #pragma warning restore SYSLIB1054 [DllImport(StdbNamespace10_1)] - public static extern Errno bytes_source_remaining_length( - BytesSource source, - ref uint len - ); + public static extern Errno bytes_source_remaining_length(BytesSource source, ref uint len); } From f74e0d1fa7eb06e3c6e5f964b96100b82652dc40 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 26 Sep 2025 10:37:50 -0400 Subject: [PATCH 10/11] Apply style nits Co-authored-by: Mazdak Farrokhzad Signed-off-by: Phoebe Goldman --- crates/bindings-sys/src/lib.rs | 2 +- crates/core/src/host/wasmtime/wasm_instance_env.rs | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index 6b71b955fc8..861f6feadc3 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -607,7 +607,7 @@ pub mod raw { /// /// Traps if: /// - /// - `out` is NULL or `ou` is not in bounds of WASM memory. + /// - `out` is NULL or `out` is not in bounds of WASM memory. /// /// # Errors /// diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 56c119e80ca..a3a2aaa9809 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -42,11 +42,12 @@ struct BytesSource { bytes: bytes::Bytes, } -#[derive(Copy, Clone, PartialEq, Eq, Debug)] /// Identifier for a [`BytesSource`] stored in the `bytes_sources` of a [`WasmInstanceEnv`]. /// /// The special sentinel [`Self::INVALID`] (zero) is used for a never-readable [`BytesSource`]. -/// We pass this to guests for a [`BytesSource`] with a length of zero. +/// We pass this to guests for a [`BytesSource`] with a length of zero +/// so that they can avoid host calls. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] pub(super) struct BytesSourceId(pub(super) u32); // `nohash_hasher` recommends impling `Hash` explicitly rather than using the derive macro, @@ -157,10 +158,13 @@ impl WasmInstanceEnv { let id = self.next_bytes_source_id; self.next_bytes_source_id = id .checked_add(1) - .context("Allocating next `BytesSourceId` overflowed u32")?; + .context("Allocating next `BytesSourceId` overflowed `u32`")?; Ok(BytesSourceId(id.into())) } + /// Binds `bytes` to the environment and assigns it an ID. + /// + /// If `bytes` is empty, `BytesSourceId::INVALID` is returned. fn create_bytes_source(&mut self, bytes: bytes::Bytes) -> RtResult { // Pass an invalid source when the bytes were empty. // This allows the module to avoid allocating and make a system call in those cases. @@ -168,7 +172,7 @@ impl WasmInstanceEnv { Ok(BytesSourceId::INVALID) } else if bytes.len() > u32::MAX as usize { Err(anyhow::anyhow!( - "`create_bytes_source`: `Bytes` has length {}, which is greater than u32::MAX {}", + "`create_bytes_source`: `Bytes` has length {}, which is greater than `u32::MAX` {}", bytes.len(), u32::MAX, )) @@ -1154,7 +1158,7 @@ impl WasmInstanceEnv { /// /// Traps if: /// - /// - `out` is NULL or `ou` is not in bounds of WASM memory. + /// - `out` is NULL or `out` is not in bounds of WASM memory. /// /// # Errors /// From e9084e40528a3fc43d45b0f5ac86d87ef8187d92 Mon Sep 17 00:00:00 2001 From: Phoebe Goldman Date: Fri, 26 Sep 2025 11:00:09 -0400 Subject: [PATCH 11/11] Comments resulting from @centril 's review --- crates/bindings-csharp/Runtime/Internal/Module.cs | 6 ++++++ crates/bindings/src/rt.rs | 12 +++++++++++- crates/core/src/host/wasmtime/wasm_instance_env.rs | 10 ++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index ae2b46b5188..ecd079e1ad8 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -131,6 +131,12 @@ private static byte[] Consume(this BytesSource source) var buffer = new byte[len]; var written = 0U; + // Because we've reserved space in our buffer already, this loop should be unnecessary. + // We expect the first call to `bytes_source_read` to always return `-1`. + // I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it, + // and in the future we may want to support `BytesSource`s which don't have a known length ahead of time + // (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers), + // at which point the loop will become useful again. while (true) { // Write into the spare capacity of the buffer. diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index 6b7c2d9b995..5049f1f6432 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -535,6 +535,11 @@ const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get(); fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { const INVALID: i16 = NO_SUCH_BYTES as i16; + // For reducer arguments, the `buf` will almost certainly already be large enough, + // as it comes from `IterBuf`, which start at 64KiB. + // But reading the remaining length and calling `buf.reserve` is a negligible cost, + // and in the future we may want to use this method to read other `BytesSource`s into other buffers. + // I (pgoldman 2025-09-26) also value having it as an example of correct usage of `bytes_source_remaining_length`. let len = { let mut len = 0; let ret = unsafe { sys::raw::bytes_source_remaining_length(source, &raw mut len) }; @@ -544,9 +549,14 @@ fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { _ => unreachable!(), } }; - buf.reserve(buf.len().saturating_sub(len as usize)); + // Because we've reserved space in our buffer already, this loop should be unnecessary. + // We expect the first call to `bytes_source_read` to always return `-1`. + // I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it, + // and in the future we may want to support `BytesSource`s which don't have a known length ahead of time + // (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers), + // at which point the loop will become useful again. loop { // Write into the spare capacity of the buffer. let buf_ptr = buf.spare_capacity_mut(); diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index a3a2aaa9809..d2aabaf656e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -171,6 +171,11 @@ impl WasmInstanceEnv { if bytes.is_empty() { Ok(BytesSourceId::INVALID) } else if bytes.len() > u32::MAX as usize { + // There's no inherent reason we need to error here, + // other than that it makes it impossible to report the length in `bytes_source_remaining_length` + // and that all of our usage of `BytesSource`s as of writing (pgoldman 2025-09-26) + // are to immediately slurp the whole thing into a buffer in guest memory, + // which can't hold buffers this big because it's WASM32. Err(anyhow::anyhow!( "`create_bytes_source`: `Bytes` has length {}, which is greater than `u32::MAX` {}", bytes.len(), @@ -1179,6 +1184,11 @@ impl WasmInstanceEnv { .bytes .len() .try_into() + // TODO: Change this into an `errno::BYTES_SOURCE_LENGTH_UNKNOWN` rather than a trap, + // so that we can support very large `BytesSource`s, streams, and other file-like things that aren't just `Bytes`. + // This is not currently (pgoldman 2025-09-26) a useful thing to do, + // as all of our uses of `BytesSource` are to slurp the whole source into a single buffer in guest memory, + // `File::read_to_end`-style, and we don't have any use for large or streaming `BytesSource`s. .context("Bytes object in `BytesSource` had length greater than range of u32")?; u32::write_to(remaining, mem, out)