Skip to content

Commit

Permalink
[ABI] Remove the special first element of iterator
Browse files Browse the repository at this point in the history
I'm not sure why we're sending encoded schema as this special first element of iteration.

There is some commented out code that suggests it might have been used in the past, but at least nowadays modules (both Rust and C#) are strongly typed and already know the schema of the tables they asked to iterate over, and ignore the first element returned from the iteration.

As a result, we've been unnecessarily reading schema, encoding it into binary, taking care to allocate it as individual small blob, stored in list of buffers and sent it between Wasm and Rust just for it to be ignored.

Removing it simplifies code and if we ever want to get table schema, we can do that via a dedicated method rather than send it automatically on beginning of each iteration.

This is an ABI breaking change, but likely worth it.
  • Loading branch information
RReverser committed Oct 16, 2023
1 parent 6715b4c commit a7f3604
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 61 deletions.
12 changes: 5 additions & 7 deletions crates/bindings-csharp/Runtime/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,18 @@ public void Reset()

public class RawTableIter : IEnumerable<byte[]>
{
public readonly byte[] Schema;

private readonly IEnumerator<byte[]> iter;
private readonly uint tableId;
private readonly byte[]? filterBytes;

public RawTableIter(uint tableId, byte[]? filterBytes = null)
{
iter = new BufferIter(tableId, filterBytes);
iter.MoveNext();
Schema = iter.Current;
this.tableId = tableId;
this.filterBytes = filterBytes;
}

public IEnumerator<byte[]> GetEnumerator()
{
return iter;
return new BufferIter(tableId, filterBytes);
}

IEnumerator IEnumerable.GetEnumerator()
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static MonoArray* stdb_buffer_consume(Buffer buf);
// return out;
// }

#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_6." #minor
#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_7." #minor
#define STDB_IMPORT_MODULE STDB_IMPORT_MODULE_MINOR(0)

__attribute__((import_module(STDB_IMPORT_MODULE),
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod raw {
// on. Any non-breaking additions to the abi surface should be put in a new `extern {}` block
// with a module identifier with a minor version 1 above the previous highest minor version.
// For breaking changes, all functions should be moved into one new `spacetime_X.0` block.
#[link(wasm_import_module = "spacetime_6.0")]
#[link(wasm_import_module = "spacetime_7.0")]
extern "C" {
/*
/// Create a table with `name`, a UTF-8 slice in WASM memory lasting `name_len` bytes,
Expand Down
42 changes: 11 additions & 31 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,34 +245,6 @@ pub fn delete_range(table_id: u32, col_id: u8, range: Range<AlgebraicValue>) ->
//
// }

// Get the buffer iterator for this table,
// with an optional filter,
// and return it and its decoded `ProductType` schema.
fn buffer_table_iter(
table_id: u32,
filter: Option<spacetimedb_lib::filter::Expr>,
) -> Result<(BufferIter, ProductType)> {
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let mut iter = sys::iter(table_id, filter.as_deref())?;

// First item is an encoded schema.
let schema_raw = iter
.next()
.expect("Missing schema")
.expect("Failed to get schema")
.read();
let schema = decode_schema(&mut &schema_raw[..]).expect("Could not decode schema");

Ok((iter, schema))
}

/// A table iterator which yields `ProductValue`s.
// type ProductValueTableIter = RawTableIter<ProductValue, ProductValueBufferDeserialize>;

Expand All @@ -285,10 +257,18 @@ fn buffer_table_iter(
/// A table iterator which yields values of the `TableType` corresponding to the table.
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;

// Get the iterator for this table with an optional filter,
fn table_iter<T: TableType>(table_id: u32, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<TableIter<T>> {
// The TableType deserializer doesn't need the schema, as we have type-directed
// dispatch to deserialize any given `TableType`.
let (iter, _schema) = buffer_table_iter(table_id, filter)?;
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let iter = sys::iter(table_id, filter.as_deref())?;

let deserializer = TableTypeBufferDeserialize::new();
Ok(RawTableIter::new(iter, deserializer).into())
}
Expand Down
25 changes: 7 additions & 18 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ impl BufWriter for ChunkedWriter {
}

impl ChunkedWriter {
/// Flushes the currently populated part of the scratch space as a new chunk.
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
/// Flushes the data collected in the scratch space if it's larger than our
/// chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
Expand All @@ -63,17 +67,6 @@ impl ChunkedWriter {
}
}

/// Similar to [`Self::force_flush`], but only flushes if the data in the
/// scratch space is larger than our chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}

/// Finalises the writer and returns all the chunks.
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
Expand Down Expand Up @@ -380,10 +373,6 @@ impl InstanceEnv {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
chunked_writer.force_flush();

for row in stdb.iter(tx, table_id)? {
row.view().encode(&mut chunked_writer);
// Flush at row boundaries.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmer/wasmer_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ impl WasmerModule {
WasmerModule { module, engine }
}

pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(6, 0);
pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(7, 0);

fn imports(&self, store: &mut Store, env: &FunctionEnv<WasmInstanceEnv>) -> Imports {
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(WasmerModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION);
imports! {
"spacetime_6.0" => {
"spacetime_7.0" => {
"_schedule_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::schedule_reducer),
"_cancel_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::cancel_reducer),
"_delete_by_col_eq" => Function::new_typed_with_env(
Expand Down
2 changes: 1 addition & 1 deletion crates/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use type_value::{AlgebraicValue, ProductValue};

pub use spacetimedb_sats as sats;

pub const MODULE_ABI_MAJOR_VERSION: u16 = 6;
pub const MODULE_ABI_MAJOR_VERSION: u16 = 7;

// if it ends up we need more fields in the future, we can split one of them in two
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
Expand Down

0 comments on commit a7f3604

Please sign in to comment.