Skip to content

Commit

Permalink
Collect iterator into chunks early
Browse files Browse the repository at this point in the history
Initially we used iterator because we wanted to lazily pull values from the database when Wasm requests the next item, so that e.g. early stop works as expected and doesn't read too much data.

Unfortunately, we discovered that this leads to crashes when someone tries to iterate over the database and update/delete items inside the iteration since iteration itself held a lock. We added a workaround by collecting the iterator early into a Vec, and then returning a newly created iterator to the user. This fixed the issue, but at this point lazy iterator mechanism turned from an optimisation into pure overhead - if we want to collect Vec, we might as well do this early in the pipeline and avoid an extra dependency.

This slightly improves iteration of empty tables, but otherwise the difference gets drowned by the speed of the iterator itself, so this is mainly code simplification.

I still hope we can return to this and implement proper end-to-end lazy iteration in the future. This should be possible, since we already have branching so any updates inside iteration shouldn't affect iteration itself and both could happen without holding a global lock, but in the past this has proven difficult.
  • Loading branch information
RReverser committed Oct 16, 2023
1 parent a66faa4 commit 65c1468
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 153 deletions.
90 changes: 3 additions & 87 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ fs2 = "0.4.3"
fs-err = "2.9.0"
futures = "0.3"
futures-channel = "0.3"
genawaiter = "0.99.1"
getrandom = { version = "0.2.7", features = ["custom"] }
glob = "0.3.1"
hex = "0.4.3"
Expand Down
1 change: 0 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ email_address.workspace = true
flate2.workspace = true
fs2.workspace = true
futures.workspace = true
genawaiter.workspace = true
hex.workspace = true
hostname.workspace = true
hyper.workspace = true
Expand Down
124 changes: 77 additions & 47 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use spacetimedb_lib::filter::CmpArgs;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_lib::operator::OpQuery;
use spacetimedb_lib::relation::{FieldExpr, FieldName};
use spacetimedb_sats::buffer::BufWriter;
use spacetimedb_sats::{ProductType, Typespace};
use spacetimedb_vm::expr::{Code, ColumnOp};

Expand All @@ -33,6 +34,53 @@ pub struct TxSlot {
inner: Arc<Mutex<Option<MutTxId>>>,
}

#[derive(Default)]
struct ChunkedWriter {
chunks: Vec<Box<[u8]>>,
scratch_space: Vec<u8>,
}

impl BufWriter for ChunkedWriter {
fn put_slice(&mut self, slice: &[u8]) {
self.scratch_space.extend_from_slice(slice);
}
}

impl ChunkedWriter {
pub fn force_flush(&mut self) {
if !self.scratch_space.is_empty() {
// We intentionally clone here so that our scratch space is not
// recreated with zero capacity (via `Vec::new`), but instead can
// be `.clear()`ed in-place and reused.
//
// This way the buffers in `chunks` are always fitted fixed-size to
// the actual data they contain, while the scratch space is ever-
// growing and has higher chance of fitting each next row without
// reallocation.
self.chunks.push(self.scratch_space.as_slice().into());
self.scratch_space.clear();
}
}

pub fn flush(&mut self) {
// For now, just send buffers over a certain fixed size.
const ITER_CHUNK_SIZE: usize = 64 * 1024;

if self.scratch_space.len() > ITER_CHUNK_SIZE {
self.force_flush();
}
}

pub fn into_chunks(mut self) -> Vec<Box<[u8]>> {
if !self.scratch_space.is_empty() {
// This is equivalent to calling `force_flush`, but we avoid extra
// clone by just shrinking and pushing the scratch space in-place.
self.chunks.push(self.scratch_space.into());
}
self.chunks
}
}

// Generic 'instance environment' delegated to from various host types.
impl InstanceEnv {
pub fn new(dbic: Arc<DatabaseInstanceContext>, scheduler: Scheduler) -> Self {
Expand Down Expand Up @@ -322,57 +370,27 @@ impl InstanceEnv {
}

#[tracing::instrument(skip_all)]
pub fn iter(&self, table_id: u32) -> impl Iterator<Item = Result<Vec<u8>, NodesError>> {
use genawaiter::{sync::gen, yield_, GeneratorState};

// Cheap Arc clones to untie the returned iterator from our own lifetime.
let relational_db = self.dbic.relational_db.clone();
let tx = self.tx.clone();
pub fn iter_chunks(&self, table_id: u32) -> Result<Vec<Box<[u8]>>, NodesError> {
let mut chunked_writer = ChunkedWriter::default();

// For now, just send buffers over a certain fixed size.
fn should_yield_buf(buf: &Vec<u8>) -> bool {
const SIZE: usize = 64 * 1024;
buf.len() >= SIZE
}

let mut generator = Some(gen!({
let stdb = &*relational_db;
let tx = &mut *tx.get()?;
let stdb = &*self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let mut buf = Vec::new();
let schema = stdb.row_schema_for_table(tx, table_id)?;
schema.encode(&mut buf);
yield_!(buf);
stdb.row_schema_for_table(tx, table_id)?.encode(&mut chunked_writer);
// initial chunk is expected to be schema itself, so force-flush it as a separate chunk
chunked_writer.force_flush();

let mut buf = Vec::new();
for row in stdb.iter(tx, table_id)? {
if should_yield_buf(&buf) {
yield_!(buf);
buf = Vec::new();
}
row.view().encode(&mut buf);
}
if !buf.is_empty() {
yield_!(buf)
}
for row in stdb.iter(tx, table_id)? {
row.view().encode(&mut chunked_writer);
// Flush at row boundaries.
chunked_writer.flush();
}

Ok(())
}));

std::iter::from_fn(move || match generator.as_mut()?.resume() {
GeneratorState::Yielded(bytes) => Some(Ok(bytes)),
GeneratorState::Complete(res) => {
generator = None;
match res {
Ok(()) => None,
Err(err) => Some(Err(err)),
}
}
})
Ok(chunked_writer.into_chunks())
}

#[tracing::instrument(skip_all)]
pub fn iter_filtered(&self, table_id: u32, filter: &[u8]) -> Result<impl Iterator<Item = Vec<u8>>, NodesError> {
pub fn iter_filtered_chunks(&self, table_id: u32, filter: &[u8]) -> Result<Vec<Box<[u8]>>, NodesError> {
use spacetimedb_lib::filter;

fn filter_to_column_op(table_name: &str, filter: filter::Expr) -> ColumnOp {
Expand Down Expand Up @@ -402,11 +420,18 @@ impl InstanceEnv {
}
}

let mut chunked_writer = ChunkedWriter::default();

let stdb = &self.dbic.relational_db;
let tx = &mut *self.tx.get()?;

let schema = stdb.schema_for_table(tx, table_id)?;
let row_type = ProductType::from(&*schema);

// write and force flush schema as it's expected to be the first individual chunk
row_type.encode(&mut chunked_writer);
chunked_writer.force_flush();

let filter = filter::Expr::from_bytes(
// TODO: looks like module typespace is currently not hooked up to instances;
// use empty typespace for now which should be enough for primitives
Expand All @@ -423,9 +448,14 @@ impl InstanceEnv {
Code::Table(table) => table,
_ => unreachable!("query should always return a table"),
};
Ok(std::iter::once(bsatn::to_vec(&row_type))
.chain(results.data.into_iter().map(|row| bsatn::to_vec(&row.data)))
.map(|bytes| bytes.expect("encoding algebraic values should never fail")))

// write all rows and flush at row boundaries
for row in results.data {
row.data.encode(&mut chunked_writer);
chunked_writer.flush();
}

Ok(chunked_writer.into_chunks())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/wasm_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl BufferIdx {
}
}

decl_index!(BufferIterIdx => Box<dyn Iterator<Item = Result<bytes::Bytes, NodesError>> + Send + Sync>);
decl_index!(BufferIterIdx => std::vec::IntoIter<Box<[u8]>>);
pub(super) type BufferIters = ResourceSlab<BufferIterIdx>;

pub(super) struct TimingSpan {
Expand Down
Loading

0 comments on commit 65c1468

Please sign in to comment.