Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ABI] Remove the special first element of iterator #420

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions crates/bindings-csharp/Runtime/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,18 @@ public void Reset()

public class RawTableIter : IEnumerable<byte[]>
{
public readonly byte[] Schema;

private readonly IEnumerator<byte[]> iter;
private readonly uint tableId;
private readonly byte[]? filterBytes;

public RawTableIter(uint tableId, byte[]? filterBytes = null)
{
iter = new BufferIter(tableId, filterBytes);
iter.MoveNext();
Schema = iter.Current;
this.tableId = tableId;
this.filterBytes = filterBytes;
}

public IEnumerator<byte[]> GetEnumerator()
{
return iter;
return new BufferIter(tableId, filterBytes);
}

IEnumerator IEnumerable.GetEnumerator()
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-csharp/Runtime/bindings.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ static MonoArray* stdb_buffer_consume(Buffer buf);
// return out;
// }

#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_6." #minor
#define STDB_IMPORT_MODULE_MINOR(minor) "spacetime_7." #minor
#define STDB_IMPORT_MODULE STDB_IMPORT_MODULE_MINOR(0)

__attribute__((import_module(STDB_IMPORT_MODULE),
Expand Down
2 changes: 1 addition & 1 deletion crates/bindings-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod raw {
// on. Any non-breaking additions to the abi surface should be put in a new `extern {}` block
// with a module identifier with a minor version 1 above the previous highest minor version.
// For breaking changes, all functions should be moved into one new `spacetime_X.0` block.
#[link(wasm_import_module = "spacetime_6.0")]
#[link(wasm_import_module = "spacetime_7.0")]
extern "C" {
/*
/// Create a table with `name`, a UTF-8 slice in WASM memory lasting `name_len` bytes,
Expand Down
42 changes: 11 additions & 31 deletions crates/bindings/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,34 +245,6 @@ pub fn delete_range(table_id: u32, col_id: u8, range: Range<AlgebraicValue>) ->
//
// }

// Get the buffer iterator for this table,
// with an optional filter,
// and return it and its decoded `ProductType` schema.
fn buffer_table_iter(
table_id: u32,
filter: Option<spacetimedb_lib::filter::Expr>,
) -> Result<(BufferIter, ProductType)> {
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let mut iter = sys::iter(table_id, filter.as_deref())?;

// First item is an encoded schema.
let schema_raw = iter
.next()
.expect("Missing schema")
.expect("Failed to get schema")
.read();
let schema = decode_schema(&mut &schema_raw[..]).expect("Could not decode schema");

Ok((iter, schema))
}

/// A table iterator which yields `ProductValue`s.
// type ProductValueTableIter = RawTableIter<ProductValue, ProductValueBufferDeserialize>;

Expand All @@ -285,10 +257,18 @@ fn buffer_table_iter(
/// A table iterator which yields values of the `TableType` corresponding to the table.
type TableTypeTableIter<T> = RawTableIter<TableTypeBufferDeserialize<T>>;

// Get the iterator for this table with an optional filter,
fn table_iter<T: TableType>(table_id: u32, filter: Option<spacetimedb_lib::filter::Expr>) -> Result<TableIter<T>> {
// The TableType deserializer doesn't need the schema, as we have type-directed
// dispatch to deserialize any given `TableType`.
let (iter, _schema) = buffer_table_iter(table_id, filter)?;
// Decode the filter, if any.
let filter = filter
.as_ref()
.map(bsatn::to_vec)
.transpose()
.expect("Couldn't decode the filter query");

// Create the iterator.
let iter = sys::iter(table_id, filter.as_deref())?;

let deserializer = TableTypeBufferDeserialize::new();
Ok(RawTableIter::new(iter, deserializer).into())
}
Expand Down
37 changes: 11 additions & 26 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ impl BufWriter for ChunkedWriter {
}

impl ChunkedWriter {
/// Flushes the currently populated part of the scratch space as a new chunk.
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
/// Flushes the data collected in the scratch space if it's larger than our
/// chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
Expand All @@ -63,22 +67,11 @@ impl ChunkedWriter {
}
}

/// Similar to [`Self::force_flush`], but only flushes if the data in the
/// scratch space is larger than our chunking threshold.
pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}

/// Finalises the writer and returns all the chunks.
pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// This is equivalent to calling `force_flush`, but we avoid extra
// clone by just shrinking and pushing the scratch space in-place.
// Avoid extra clone by just shrinking and pushing the scratch space
// in-place.
self.chunks.push(self.scratch_space.into());
}
self.chunks
Expand Down Expand Up @@ -380,10 +373,6 @@ impl InstanceEnv {
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
chunked_writer.force_flush();

for row in stdb.iter(tx, table_id)? {
row.view().encode(&mut chunked_writer);
// Flush at row boundaries.
Expand Down Expand Up @@ -424,18 +413,12 @@ impl InstanceEnv {
}
}

let mut chunked_writer = ChunkedWriter::default();

let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let schema = stdb.schema_for_table(tx, table_id)?;
let row_type = ProductType::from(&*schema);

// write and force flush schema as it's expected to be the first individual chunk
row_type.encode(&mut chunked_writer);
chunked_writer.force_flush();

let filter = filter::Expr::from_bytes(
// TODO: looks like module typespace is currently not hooked up to instances;
// use empty typespace for now which should be enough for primitives
Expand All @@ -453,6 +436,8 @@ impl InstanceEnv {
_ => unreachable!("query should always return a table"),
};

let mut chunked_writer = ChunkedWriter::default();

// write all rows and flush at row boundaries
for row in results.data {
row.data.encode(&mut chunked_writer);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasmer/wasmer_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ impl WasmerModule {
WasmerModule { module, engine }
}

pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(6, 0);
pub const IMPLEMENTED_ABI: abi::VersionTuple = abi::VersionTuple::new(7, 0);

fn imports(&self, store: &mut Store, env: &FunctionEnv<WasmInstanceEnv>) -> Imports {
#[allow(clippy::assertions_on_constants)]
const _: () = assert!(WasmerModule::IMPLEMENTED_ABI.major == spacetimedb_lib::MODULE_ABI_MAJOR_VERSION);
imports! {
"spacetime_6.0" => {
"spacetime_7.0" => {
"_schedule_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::schedule_reducer),
"_cancel_reducer" => Function::new_typed_with_env(store, env, WasmInstanceEnv::cancel_reducer),
"_delete_by_col_eq" => Function::new_typed_with_env(
Expand Down
2 changes: 1 addition & 1 deletion crates/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use type_value::{AlgebraicValue, ProductValue};

pub use spacetimedb_sats as sats;

pub const MODULE_ABI_MAJOR_VERSION: u16 = 6;
pub const MODULE_ABI_MAJOR_VERSION: u16 = 7;

// if it ends up we need more fields in the future, we can split one of them in two
#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Debug)]
Expand Down