diff --git a/Cargo.lock b/Cargo.lock index 385843c655d..b63d9596a86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5823,6 +5823,7 @@ dependencies = [ "log", "memchr", "nix 0.30.1", + "nohash-hasher", "once_cell", "openssl", "parking_lot 0.12.3", diff --git a/crates/bindings-csharp/Runtime/Internal/FFI.cs b/crates/bindings-csharp/Runtime/Internal/FFI.cs index 76cb057b8de..2c0bf8b925f 100644 --- a/crates/bindings-csharp/Runtime/Internal/FFI.cs +++ b/crates/bindings-csharp/Runtime/Internal/FFI.cs @@ -44,7 +44,7 @@ internal static partial class FFI // For now this must match the name of the `.c` file (`bindings.c`). // In the future C# will allow to specify Wasm import namespace in // `LibraryImport` directly. - const string StdbNamespace = + const string StdbNamespace10_0 = #if EXPERIMENTAL_WASM_AOT "spacetime_10.0" #else @@ -52,6 +52,14 @@ internal static partial class FFI #endif ; + const string StdbNamespace10_1 = +#if EXPERIMENTAL_WASM_AOT + "spacetime_10.1" +#else + "bindings" +#endif + ; + [NativeMarshalling(typeof(Marshaller))] public struct CheckedStatus { @@ -126,30 +134,30 @@ public readonly record struct RowIter(uint Handle) public static readonly RowIter INVALID = new(0); } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus table_id_from_name( [In] byte[] name, uint name_len, out TableId out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus index_id_from_name( [In] byte[] name, uint name_len, out IndexId out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_table_row_count(TableId table_id, out ulong out_); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_table_scan_bsatn( TableId table_id, out RowIter out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_index_scan_range_bsatn( IndexId index_id, ReadOnlySpan prefix, @@ -162,24 +170,24 @@ public static partial CheckedStatus datastore_index_scan_range_bsatn( out RowIter out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial Errno row_iter_bsatn_advance( RowIter iter_handle, [MarshalUsing(CountElementName = nameof(buffer_len))] [Out] byte[] buffer, ref uint buffer_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus row_iter_bsatn_close(RowIter iter_handle); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_insert_bsatn( TableId table_id, Span row, ref uint row_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_update_bsatn( TableId table_id, IndexId index_id, @@ -187,7 +195,7 @@ public static partial CheckedStatus datastore_update_bsatn( ref uint row_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn( IndexId index_id, ReadOnlySpan prefix, @@ -200,7 +208,7 @@ public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn( out uint out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus datastore_delete_all_by_eq_bsatn( TableId table_id, [In] byte[] relation, @@ -208,14 +216,14 @@ public static partial CheckedStatus datastore_delete_all_by_eq_bsatn( out uint out_ ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial Errno bytes_source_read( BytesSource source, Span buffer, ref uint buffer_len ); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus bytes_sink_write( BytesSink sink, ReadOnlySpan buffer, @@ -232,7 +240,7 @@ public enum LogLevel : byte Panic = 5, } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial void console_log( LogLevel level, [In] byte[] target, @@ -269,13 +277,13 @@ internal static class ConsoleTimerIdMarshaller } } - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial ConsoleTimerId console_timer_start([In] byte[] name, uint name_len); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial CheckedStatus console_timer_end(ConsoleTimerId stopwatch_id); - [LibraryImport(StdbNamespace)] + [LibraryImport(StdbNamespace10_0)] public static partial void volatile_nonatomic_schedule_immediate( [In] byte[] name, uint name_len, @@ -291,7 +299,10 @@ uint args_len // which prevents source-generated PInvokes from working with types from other assemblies, and // `Identity` lives in another assembly (`BSATN.Runtime`). Luckily, `DllImport` is enough here. #pragma warning disable SYSLIB1054 // Suppress "Use 'LibraryImportAttribute' instead of 'DllImportAttribute'" warning. - [DllImport(StdbNamespace)] + [DllImport(StdbNamespace10_0)] public static extern void identity(out Identity dest); #pragma warning restore SYSLIB1054 + + [DllImport(StdbNamespace10_1)] + public static extern Errno bytes_source_remaining_length(BytesSource source, ref uint len); } diff --git a/crates/bindings-csharp/Runtime/Internal/Module.cs b/crates/bindings-csharp/Runtime/Internal/Module.cs index 68ddd5c273e..ecd079e1ad8 100644 --- a/crates/bindings-csharp/Runtime/Internal/Module.cs +++ b/crates/bindings-csharp/Runtime/Internal/Module.cs @@ -116,14 +116,33 @@ private static byte[] Consume(this BytesSource source) { return []; } - var buffer = new byte[0x20_000]; + + var len = (uint)0; + var ret = FFI.bytes_source_remaining_length(source, ref len); + switch (ret) + { + case Errno.OK: + break; + case Errno.NO_SUCH_BYTES: + throw new NoSuchBytesException(); + default: + throw new UnknownException(ret); + } + + var buffer = new byte[len]; var written = 0U; + // Because we've reserved space in our buffer already, this loop should be unnecessary. + // We expect the first call to `bytes_source_read` to always return `-1`. + // I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it, + // and in the future we may want to support `BytesSource`s which don't have a known length ahead of time + // (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers), + // at which point the loop will become useful again. while (true) { // Write into the spare capacity of the buffer. var spare = buffer.AsSpan((int)written); var buf_len = (uint)spare.Length; - var ret = FFI.bytes_source_read(source, spare, ref buf_len); + ret = FFI.bytes_source_read(source, spare, ref buf_len); written += buf_len; switch (ret) { diff --git a/crates/bindings-csharp/Runtime/bindings.c b/crates/bindings-csharp/Runtime/bindings.c index 51eb8712c12..54b471783bf 100644 --- a/crates/bindings-csharp/Runtime/bindings.c +++ b/crates/bindings-csharp/Runtime/bindings.c @@ -27,7 +27,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t); #define CSTR(s) (uint8_t*)s, sizeof(s) - 1 #define STDB_EXTERN(name) \ - __attribute__((import_module("spacetime_10.0"), import_name(#name))) extern + __attribute__((import_module(SPACETIME_MODULE_VERSION), import_name(#name))) extern #ifndef EXPERIMENTAL_WASM_AOT #define IMPORT(ret, name, params, args) \ @@ -37,6 +37,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t); #define IMPORT(ret, name, params, args) STDB_EXTERN(name) ret name params; #endif +#define SPACETIME_MODULE_VERSION "spacetime_10.0" IMPORT(Status, table_id_from_name, (const uint8_t* name, uint32_t name_len, TableId* id), (name, name_len, id)); @@ -97,6 +98,11 @@ IMPORT(void, volatile_nonatomic_schedule_immediate, (const uint8_t* name, size_t name_len, const uint8_t* args, size_t args_len), (name, name_len, args, args_len)); IMPORT(void, identity, (void* id_ptr), (id_ptr)); +#undef SPACETIME_MODULE_VERSION + +#define SPACETIME_MODULE_VERSION "spacetime_10.1" +IMPORT(int16_t, bytes_source_remaining_length, (BytesSource source, uint32_t* out), (source, out)); +#undef SPACETIME_MODULE_VERSION #ifndef EXPERIMENTAL_WASM_AOT static MonoClass* ffi_class; diff --git a/crates/bindings-sys/src/lib.rs b/crates/bindings-sys/src/lib.rs index b901de7e5d7..861f6feadc3 100644 --- a/crates/bindings-sys/src/lib.rs +++ b/crates/bindings-sys/src/lib.rs @@ -590,6 +590,35 @@ pub mod raw { pub fn identity(out_ptr: *mut u8); } + // See comment on previous `extern "C"` block re: ABI version. + #[link(wasm_import_module = "spacetime_10.1")] + extern "C" { + /// Read the remaining length of a [`BytesSource`] and write it to `out`. + /// + /// Note that the host automatically frees byte sources which are exhausted. + /// Such sources are invalid, and this method will return an error when passed one. + /// Callers of [`bytes_source_read`] should check for a return of -1 + /// before invoking this function on the same `source`. + /// + /// Also note that the special [`BytesSource::INVALID`] (zero) is always invalid. + /// Callers should check for that value before invoking this function. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `out` is NULL or `out` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// If this function returns an error, `out` is not written. + pub fn bytes_source_remaining_length(source: BytesSource, out: *mut u32) -> i16; + } + /// What strategy does the database index use? /// /// See also: diff --git a/crates/bindings/src/rt.rs b/crates/bindings/src/rt.rs index 337475d3fa3..6cdddde830d 100644 --- a/crates/bindings/src/rt.rs +++ b/crates/bindings/src/rt.rs @@ -535,6 +535,28 @@ const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get(); fn read_bytes_source_into(source: BytesSource, buf: &mut Vec) { const INVALID: i16 = NO_SUCH_BYTES as i16; + // For reducer arguments, the `buf` will almost certainly already be large enough, + // as it comes from `IterBuf`, which start at 64KiB. + // But reading the remaining length and calling `buf.reserve` is a negligible cost, + // and in the future we may want to use this method to read other `BytesSource`s into other buffers. + // I (pgoldman 2025-09-26) also value having it as an example of correct usage of `bytes_source_remaining_length`. + let len = { + let mut len = 0; + let ret = unsafe { sys::raw::bytes_source_remaining_length(source, &raw mut len) }; + match ret { + 0 => len, + INVALID => panic!("invalid source passed"), + _ => unreachable!(), + } + }; + buf.reserve(buf.len().saturating_sub(len as usize)); + + // Because we've reserved space in our buffer already, this loop should be unnecessary. + // We expect the first call to `bytes_source_read` to always return `-1`. + // I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it, + // and in the future we may want to support `BytesSource`s which don't have a known length ahead of time + // (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers), + // at which point the loop will become useful again. loop { // Write into the spare capacity of the buffer. let buf_ptr = buf.spare_capacity_mut(); diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 13136a18ef1..142a19e7484 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -69,6 +69,7 @@ jsonwebtoken.workspace = true lazy_static.workspace = true log.workspace = true memchr.workspace = true +nohash-hasher.workspace = true once_cell.workspace = true openssl.workspace = true parking_lot.workspace = true diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index beb72fc2f99..5e1e2470745 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -139,6 +139,7 @@ pub enum AbiCall { DatastoreDeleteByIndexScanRangeBsatn, DatastoreDeleteAllByEqBsatn, BytesSourceRead, + BytesSourceRemainingLength, BytesSinkWrite, ConsoleLog, ConsoleTimerStart, diff --git a/crates/core/src/host/wasm_common.rs b/crates/core/src/host/wasm_common.rs index 666cc808ea3..f2b29310ff9 100644 --- a/crates/core/src/host/wasm_common.rs +++ b/crates/core/src/host/wasm_common.rs @@ -395,6 +395,8 @@ macro_rules! abi_funcs { // unstable: "spacetime_10.0"::volatile_nonatomic_schedule_immediate, + + "spacetime_10.1"::bytes_source_remaining_length, } }; } diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index c2cd6ed067e..d2aabaf656e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1,5 +1,6 @@ #![allow(clippy::too_many_arguments)] +use std::num::NonZeroU32; use std::time::Instant; use crate::database_logger::{BacktraceFrame, BacktraceProvider, ModuleBacktrace, Record}; @@ -12,6 +13,7 @@ use crate::host::wasm_common::{ }; use crate::host::AbiCall; use anyhow::Context as _; +use spacetimedb_data_structures::map::IntMap; use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{errno, ColId}; use wasmtime::{AsContext, Caller, StoreContextMut}; @@ -23,6 +25,46 @@ use instrumentation::noop as span; #[cfg(feature = "spacetimedb-wasm-instance-env-times")] use instrumentation::op as span; +/// A stream of bytes which the WASM module can read from +/// using [`WasmInstanceEnv::bytes_source_read`]. +/// +/// These are managed in the `bytes_sources` of [`WasmInstanceEnv`], +/// where each one is paired with an integer ID. +/// This is basically a massively-simplified version of Unix read files and file descriptors. +/// +/// Unlike Unix read files, we implicitly close `BytesSource`s once they are read to the end. +/// This is sensible because we don't provide a seek operation, +/// so the `BytesSource` becomes useless once read to the end. +struct BytesSource { + /// The actual bytes which will be returned by calls to `byte_source_read`. + /// + /// When this becomes empty, this `ByteSource` is expended and should be discarded. + bytes: bytes::Bytes, +} + +/// Identifier for a [`BytesSource`] stored in the `bytes_sources` of a [`WasmInstanceEnv`]. +/// +/// The special sentinel [`Self::INVALID`] (zero) is used for a never-readable [`BytesSource`]. +/// We pass this to guests for a [`BytesSource`] with a length of zero +/// so that they can avoid host calls. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +pub(super) struct BytesSourceId(pub(super) u32); + +// `nohash_hasher` recommends impling `Hash` explicitly rather than using the derive macro, +// as the derive macro is not technically guaranteed to only call `hasher.write_{int}` for an integer newtype, +// even though any other behavior would be deranged. +impl std::hash::Hash for BytesSourceId { + fn hash(&self, hasher: &mut H) { + hasher.write_u32(self.0) + } +} + +impl nohash_hasher::IsEnabled for BytesSourceId {} + +impl BytesSourceId { + const INVALID: Self = Self(0); +} + /// A `WasmInstanceEnv` provides the connection between a module /// and the database. /// @@ -47,9 +89,19 @@ pub(super) struct WasmInstanceEnv { /// always be `Some`. mem: Option, - /// The arguments being passed to a reducer - /// that it can read via [`Self::bytes_source_read`]. - call_reducer_args: Option<(bytes::Bytes, usize)>, + /// `File`-like [`BytesSource`]s which guest code can read via [`Self::bytes_source_read`]. + /// + /// These are essentially simplified versions of Unix read files, + /// with [`BytesSourceId`] being file descriptors. + /// + /// Unlike Unix files, we implicitly close a [`BytesSource`] when it is read to the end. + /// This is because we don't provide a seek operation and a [`BytesSource`] never grows after initialization. + bytes_sources: IntMap, + + /// Counter as a source of [`BytesSourceId`] values. + /// + /// Recall that zero is [`BytesSourceId::INVALID`], so we have to start at 1. + next_bytes_source_id: NonZeroU32, /// The standard sink used for [`Self::bytes_sink_write`]. standard_bytes_sink: Option>, @@ -76,7 +128,6 @@ pub(super) struct WasmInstanceEnv { chunk_pool: ChunkPool, } -const CALL_REDUCER_ARGS_SOURCE: u32 = 1; const STANDARD_BYTES_SINK: u32 = 1; type WasmResult = Result; @@ -91,7 +142,8 @@ impl WasmInstanceEnv { Self { instance_env, mem: None, - call_reducer_args: None, + bytes_sources: IntMap::default(), + next_bytes_source_id: NonZeroU32::new(1).unwrap(), standard_bytes_sink: None, iters: Default::default(), timing_spans: Default::default(), @@ -102,6 +154,46 @@ impl WasmInstanceEnv { } } + fn alloc_bytes_source_id(&mut self) -> RtResult { + let id = self.next_bytes_source_id; + self.next_bytes_source_id = id + .checked_add(1) + .context("Allocating next `BytesSourceId` overflowed `u32`")?; + Ok(BytesSourceId(id.into())) + } + + /// Binds `bytes` to the environment and assigns it an ID. + /// + /// If `bytes` is empty, `BytesSourceId::INVALID` is returned. + fn create_bytes_source(&mut self, bytes: bytes::Bytes) -> RtResult { + // Pass an invalid source when the bytes were empty. + // This allows the module to avoid allocating and make a system call in those cases. + if bytes.is_empty() { + Ok(BytesSourceId::INVALID) + } else if bytes.len() > u32::MAX as usize { + // There's no inherent reason we need to error here, + // other than that it makes it impossible to report the length in `bytes_source_remaining_length` + // and that all of our usage of `BytesSource`s as of writing (pgoldman 2025-09-26) + // are to immediately slurp the whole thing into a buffer in guest memory, + // which can't hold buffers this big because it's WASM32. + Err(anyhow::anyhow!( + "`create_bytes_source`: `Bytes` has length {}, which is greater than `u32::MAX` {}", + bytes.len(), + u32::MAX, + )) + } else { + let id = self.alloc_bytes_source_id()?; + self.bytes_sources.insert(id, BytesSource { bytes }); + Ok(id) + } + } + + fn free_bytes_source(&mut self, id: BytesSourceId) { + if self.bytes_sources.remove(&id).is_none() { + log::warn!("`free_bytes_source` on non-existent source {id:?}"); + } + } + /// Finish the instantiation of this instance with the provided `Mem`. pub fn instantiate(&mut self, mem: Mem) { assert!(self.mem.is_none()); @@ -140,17 +232,10 @@ impl WasmInstanceEnv { /// /// Returns the handle used by reducers to read from `args` /// as well as the handle used to write the error message, if any. - pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (u32, u32) { + pub fn start_reducer(&mut self, name: &str, args: bytes::Bytes, ts: Timestamp) -> (BytesSourceId, u32) { let errors = self.setup_standard_bytes_sink(); - // Pass an invalid source when the reducer args were empty. - // This allows the module to avoid allocating and make a system call in those cases. - self.call_reducer_args = (!args.is_empty()).then_some((args, 0)); - let args = if self.call_reducer_args.is_some() { - CALL_REDUCER_ARGS_SOURCE - } else { - 0 - }; + let args = self.create_bytes_source(args).unwrap(); self.reducer_start = Instant::now(); name.clone_into(&mut self.reducer_name); @@ -194,7 +279,11 @@ impl WasmInstanceEnv { wasm_instance_env_call_times, }; - self.call_reducer_args = None; + // Drop any outstanding bytes sources and reset the ID counter, + // so that we don't leak either the IDs or the buffers themselves. + self.bytes_sources = IntMap::default(); + self.next_bytes_source_id = NonZeroU32::new(1).unwrap(); + (timings, self.take_standard_bytes_sink()) } @@ -1029,12 +1118,10 @@ impl WasmInstanceEnv { Self::cvt_custom(caller, AbiCall::BytesSourceRead, |caller| { let (mem, env) = Self::mem_env(caller); + let source = BytesSourceId(source); + // Retrieve the reducer args if available and requested, or error. - let Some((reducer_args, cursor)) = env - .call_reducer_args - .as_mut() - .filter(|_| source == CALL_REDUCER_ARGS_SOURCE) - else { + let Some(bytes_source) = env.bytes_sources.get_mut(&source) else { return Ok(errno::NO_SUCH_BYTES.get().into()); }; @@ -1046,24 +1133,71 @@ impl WasmInstanceEnv { // Derive the portion that we can read and what remains, // based on what is left to read and the capacity. - let left_to_read = &reducer_args[*cursor..]; - let can_read_len = buffer_len.min(left_to_read.len()); - let (can_read, remainder) = left_to_read.split_at(can_read_len); + let can_read_len = buffer_len.min(bytes_source.bytes.len()); + let can_read = bytes_source.bytes.split_to(can_read_len); // Copy to the `buffer` and write written bytes count to `buffer_len`. - buffer[..can_read_len].copy_from_slice(can_read); + buffer[..can_read_len].copy_from_slice(&can_read); (can_read_len as u32).write_to(mem, buffer_len_ptr)?; // Destroy the source if exhausted, or advance `cursor`. - if remainder.is_empty() { - env.call_reducer_args = None; + if bytes_source.bytes.is_empty() { + env.free_bytes_source(source); Ok(-1i32) } else { - *cursor += can_read_len; Ok(0) } }) } + /// Read the remaining length of a [`BytesSource`] and write it to `out`. + /// + /// Note that the host automatically frees byte sources which are exhausted. + /// Such sources are invalid, and this method will return an error when passed one. + /// Callers of [`Self::bytes_source_read`] should check for a return of -1 + /// before invoking this function on the same `source`. + /// + /// Also note that the special [`BytesSourceId::INVALID`] (zero) is always invalid. + /// Callers should check for that value before invoking this function. + /// + /// # Traps + /// + /// Traps if: + /// + /// - `out` is NULL or `out` is not in bounds of WASM memory. + /// + /// # Errors + /// + /// Returns an error: + /// + /// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source. + /// + /// If this function returns an error, `out` is not written. + pub fn bytes_source_remaining_length(caller: Caller<'_, Self>, source: u32, out: WasmPtr) -> RtResult { + Self::cvt_custom(caller, AbiCall::BytesSourceRemainingLength, |caller| { + let (mem, env) = Self::mem_env(caller); + + let Some(bytes_source) = env.bytes_sources.get(&BytesSourceId(source)) else { + return Ok(errno::NO_SUCH_BYTES.get().into()); + }; + + let remaining: u32 = bytes_source + .bytes + .len() + .try_into() + // TODO: Change this into an `errno::BYTES_SOURCE_LENGTH_UNKNOWN` rather than a trap, + // so that we can support very large `BytesSource`s, streams, and other file-like things that aren't just `Bytes`. + // This is not currently (pgoldman 2025-09-26) a useful thing to do, + // as all of our uses of `BytesSource` are to slurp the whole source into a single buffer in guest memory, + // `File::read_to_end`-style, and we don't have any use for large or streaming `BytesSource`s. + .context("Bytes object in `BytesSource` had length greater than range of u32")?; + + u32::write_to(remaining, mem, out) + .context("Failed to write output from `bytes_source_remaining_length`")?; + + Ok(0) + }) + } + /// Writes up to `buffer_len` bytes from `buffer = buffer_ptr[..buffer_len]`, /// to the `sink`, registered in the host environment. /// diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index 141f7455518..4374b708eb3 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -34,7 +34,7 @@ impl WasmtimeModule { WasmtimeModule { module } } - pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 0); + pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(10, 1); pub(super) fn link_imports(linker: &mut Linker) -> anyhow::Result<()> { const { assert!(WasmtimeModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION) }; @@ -205,6 +205,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); + let (args_source, errors_sink) = store.data_mut().start_reducer(op.name, args_bytes, op.timestamp); let call_result = self.call_reducer.call( @@ -218,7 +219,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { conn_id_0, conn_id_1, op.timestamp.to_micros_since_unix_epoch() as u64, - args_source, + args_source.0, errors_sink, ), ); diff --git a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs index c4575536b3c..37e04dc5294 100644 --- a/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/connect_disconnect_client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws}; diff --git a/sdks/rust/tests/test-client/src/module_bindings/mod.rs b/sdks/rust/tests/test-client/src/module_bindings/mod.rs index 96ba580035b..9c8008c7e3c 100644 --- a/sdks/rust/tests/test-client/src/module_bindings/mod.rs +++ b/sdks/rust/tests/test-client/src/module_bindings/mod.rs @@ -1,7 +1,7 @@ // THIS FILE IS AUTOMATICALLY GENERATED BY SPACETIMEDB. EDITS TO THIS FILE // WILL NOT BE SAVED. MODIFY TABLES IN YOUR MODULE SOURCE CODE INSTEAD. -// This was generated using spacetimedb cli version 1.4.0 (commit 9f4eccde6ef1eb71827c0ae48125dff49de72612). +// This was generated using spacetimedb cli version 1.4.0 (commit f26e20f1d99a66e7422ff86a037bd4aa80b44963). #![allow(unused, clippy::all)] use spacetimedb_sdk::__codegen::{self as __sdk, __lib, __sats, __ws};