Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 30 additions & 19 deletions crates/bindings-csharp/Runtime/Internal/FFI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,22 @@ internal static partial class FFI
// For now this must match the name of the `.c` file (`bindings.c`).
// In the future C# will allow to specify Wasm import namespace in
// `LibraryImport` directly.
const string StdbNamespace =
const string StdbNamespace10_0 =
#if EXPERIMENTAL_WASM_AOT
"spacetime_10.0"
#else
"bindings"
#endif
;

const string StdbNamespace10_1 =
#if EXPERIMENTAL_WASM_AOT
"spacetime_10.1"
#else
"bindings"
#endif
;

[NativeMarshalling(typeof(Marshaller))]
public struct CheckedStatus
{
Expand Down Expand Up @@ -126,30 +134,30 @@ public readonly record struct RowIter(uint Handle)
public static readonly RowIter INVALID = new(0);
}

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus table_id_from_name(
[In] byte[] name,
uint name_len,
out TableId out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus index_id_from_name(
[In] byte[] name,
uint name_len,
out IndexId out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_table_row_count(TableId table_id, out ulong out_);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_table_scan_bsatn(
TableId table_id,
out RowIter out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_index_scan_range_bsatn(
IndexId index_id,
ReadOnlySpan<byte> prefix,
Expand All @@ -162,32 +170,32 @@ public static partial CheckedStatus datastore_index_scan_range_bsatn(
out RowIter out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial Errno row_iter_bsatn_advance(
RowIter iter_handle,
[MarshalUsing(CountElementName = nameof(buffer_len))] [Out] byte[] buffer,
ref uint buffer_len
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus row_iter_bsatn_close(RowIter iter_handle);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_insert_bsatn(
TableId table_id,
Span<byte> row,
ref uint row_len
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_update_bsatn(
TableId table_id,
IndexId index_id,
Span<byte> row,
ref uint row_len
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn(
IndexId index_id,
ReadOnlySpan<byte> prefix,
Expand All @@ -200,22 +208,22 @@ public static partial CheckedStatus datastore_delete_by_index_scan_range_bsatn(
out uint out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus datastore_delete_all_by_eq_bsatn(
TableId table_id,
[In] byte[] relation,
uint relation_len,
out uint out_
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial Errno bytes_source_read(
BytesSource source,
Span<byte> buffer,
ref uint buffer_len
);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus bytes_sink_write(
BytesSink sink,
ReadOnlySpan<byte> buffer,
Expand All @@ -232,7 +240,7 @@ public enum LogLevel : byte
Panic = 5,
}

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial void console_log(
LogLevel level,
[In] byte[] target,
Expand Down Expand Up @@ -269,13 +277,13 @@ internal static class ConsoleTimerIdMarshaller
}
}

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial ConsoleTimerId console_timer_start([In] byte[] name, uint name_len);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial CheckedStatus console_timer_end(ConsoleTimerId stopwatch_id);

[LibraryImport(StdbNamespace)]
[LibraryImport(StdbNamespace10_0)]
public static partial void volatile_nonatomic_schedule_immediate(
[In] byte[] name,
uint name_len,
Expand All @@ -291,7 +299,10 @@ uint args_len
// which prevents source-generated PInvokes from working with types from other assemblies, and
// `Identity` lives in another assembly (`BSATN.Runtime`). Luckily, `DllImport` is enough here.
#pragma warning disable SYSLIB1054 // Suppress "Use 'LibraryImportAttribute' instead of 'DllImportAttribute'" warning.
[DllImport(StdbNamespace)]
[DllImport(StdbNamespace10_0)]
public static extern void identity(out Identity dest);
#pragma warning restore SYSLIB1054

[DllImport(StdbNamespace10_1)]
public static extern Errno bytes_source_remaining_length(BytesSource source, ref uint len);
}
23 changes: 21 additions & 2 deletions crates/bindings-csharp/Runtime/Internal/Module.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,33 @@ private static byte[] Consume(this BytesSource source)
{
return [];
}
var buffer = new byte[0x20_000];

var len = (uint)0;
var ret = FFI.bytes_source_remaining_length(source, ref len);
switch (ret)
{
case Errno.OK:
break;
case Errno.NO_SUCH_BYTES:
throw new NoSuchBytesException();
default:
throw new UnknownException(ret);
}

var buffer = new byte[len];
var written = 0U;
// Because we've reserved space in our buffer already, this loop should be unnecessary.
// We expect the first call to `bytes_source_read` to always return `-1`.
// I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it,
// and in the future we may want to support `BytesSource`s which don't have a known length ahead of time
// (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers),
// at which point the loop will become useful again.
while (true)
{
// Write into the spare capacity of the buffer.
var spare = buffer.AsSpan((int)written);
var buf_len = (uint)spare.Length;
var ret = FFI.bytes_source_read(source, spare, ref buf_len);
ret = FFI.bytes_source_read(source, spare, ref buf_len);
written += buf_len;
switch (ret)
{
Expand Down
8 changes: 7 additions & 1 deletion crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t);
#define CSTR(s) (uint8_t*)s, sizeof(s) - 1

#define STDB_EXTERN(name) \
__attribute__((import_module("spacetime_10.0"), import_name(#name))) extern
__attribute__((import_module(SPACETIME_MODULE_VERSION), import_name(#name))) extern

#ifndef EXPERIMENTAL_WASM_AOT
#define IMPORT(ret, name, params, args) \
Expand All @@ -37,6 +37,7 @@ OPAQUE_TYPEDEF(ConsoleTimerId, uint32_t);
#define IMPORT(ret, name, params, args) STDB_EXTERN(name) ret name params;
#endif

#define SPACETIME_MODULE_VERSION "spacetime_10.0"
IMPORT(Status, table_id_from_name,
(const uint8_t* name, uint32_t name_len, TableId* id),
(name, name_len, id));
Expand Down Expand Up @@ -97,6 +98,11 @@ IMPORT(void, volatile_nonatomic_schedule_immediate,
(const uint8_t* name, size_t name_len, const uint8_t* args, size_t args_len),
(name, name_len, args, args_len));
IMPORT(void, identity, (void* id_ptr), (id_ptr));
#undef SPACETIME_MODULE_VERSION

#define SPACETIME_MODULE_VERSION "spacetime_10.1"
IMPORT(int16_t, bytes_source_remaining_length, (BytesSource source, uint32_t* out), (source, out));
#undef SPACETIME_MODULE_VERSION

#ifndef EXPERIMENTAL_WASM_AOT
static MonoClass* ffi_class;
Expand Down
29 changes: 29 additions & 0 deletions crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,35 @@ pub mod raw {
pub fn identity(out_ptr: *mut u8);
}

// See comment on previous `extern "C"` block re: ABI version.
#[link(wasm_import_module = "spacetime_10.1")]
extern "C" {
/// Read the remaining length of a [`BytesSource`] and write it to `out`.
///
/// Note that the host automatically frees byte sources which are exhausted.
/// Such sources are invalid, and this method will return an error when passed one.
/// Callers of [`bytes_source_read`] should check for a return of -1
/// before invoking this function on the same `source`.
///
/// Also note that the special [`BytesSource::INVALID`] (zero) is always invalid.
/// Callers should check for that value before invoking this function.
///
/// # Traps
///
/// Traps if:
///
/// - `out` is NULL or `out` is not in bounds of WASM memory.
///
/// # Errors
///
/// Returns an error:
///
/// - `NO_SUCH_BYTES`, when `source` is not a valid bytes source.
///
/// If this function returns an error, `out` is not written.
pub fn bytes_source_remaining_length(source: BytesSource, out: *mut u32) -> i16;
}

/// What strategy does the database index use?
///
/// See also: <https://www.postgresql.org/docs/current/sql-createindex.html>
Expand Down
22 changes: 22 additions & 0 deletions crates/bindings/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,28 @@ const NO_SUCH_BYTES: u16 = errno::NO_SUCH_BYTES.get();
fn read_bytes_source_into(source: BytesSource, buf: &mut Vec<u8>) {
const INVALID: i16 = NO_SUCH_BYTES as i16;

// For reducer arguments, the `buf` will almost certainly already be large enough,
// as it comes from `IterBuf`, which start at 64KiB.
// But reading the remaining length and calling `buf.reserve` is a negligible cost,
// and in the future we may want to use this method to read other `BytesSource`s into other buffers.
// I (pgoldman 2025-09-26) also value having it as an example of correct usage of `bytes_source_remaining_length`.
let len = {
let mut len = 0;
let ret = unsafe { sys::raw::bytes_source_remaining_length(source, &raw mut len) };
match ret {
0 => len,
INVALID => panic!("invalid source passed"),
_ => unreachable!(),
}
};
buf.reserve(buf.len().saturating_sub(len as usize));

// Because we've reserved space in our buffer already, this loop should be unnecessary.
// We expect the first call to `bytes_source_read` to always return `-1`.
// I (pgoldman 2025-09-26) am leaving the loop here because there's no downside to it,
// and in the future we may want to support `BytesSource`s which don't have a known length ahead of time
// (i.e. put arbitrary streams in `BytesSource` on the host side rather than just `Bytes` buffers),
// at which point the loop will become useful again.
loop {
// Write into the spare capacity of the buffer.
let buf_ptr = buf.spare_capacity_mut();
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jsonwebtoken.workspace = true
lazy_static.workspace = true
log.workspace = true
memchr.workspace = true
nohash-hasher.workspace = true
once_cell.workspace = true
openssl.workspace = true
parking_lot.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/host/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub enum AbiCall {
DatastoreDeleteByIndexScanRangeBsatn,
DatastoreDeleteAllByEqBsatn,
BytesSourceRead,
BytesSourceRemainingLength,
BytesSinkWrite,
ConsoleLog,
ConsoleTimerStart,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,8 @@ macro_rules! abi_funcs {

// unstable:
"spacetime_10.0"::volatile_nonatomic_schedule_immediate,

"spacetime_10.1"::bytes_source_remaining_length,
}
};
}
Expand Down
Loading
Loading