From ceef3fdb3689ea1b670fd8b059fe71bfacc0c25f Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 18 Mar 2024 12:12:47 +0100 Subject: [PATCH 1/3] incr-join, find_updates: avoid unncecessary clones & use partition --- crates/core/src/subscription/subscription.rs | 27 ++++++++------------ 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index b6fd769504e..405c925caae 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -365,9 +365,9 @@ impl IncrementalJoin { for update in updates { if update.table_id == self.lhs.table_id { - lhs_ops.extend(update.ops.iter().cloned()); + lhs_ops.extend(update.ops.iter()); } else if update.table_id == self.rhs.table_id { - rhs_ops.extend(update.ops.iter().cloned()); + rhs_ops.extend(update.ops.iter()); } } @@ -375,21 +375,16 @@ impl IncrementalJoin { return None; } - let lhs = JoinSide { - table_id: self.lhs.table_id, - table_name: self.lhs.head.table_name.clone(), - inserts: lhs_ops.iter().filter(|op| op.op_type == 1).cloned().collect(), - deletes: lhs_ops.iter().filter(|op| op.op_type == 0).cloned().collect(), - }; - - let rhs = JoinSide { - table_id: self.rhs.table_id, - table_name: self.rhs.head.table_name.clone(), - inserts: rhs_ops.iter().filter(|op| op.op_type == 1).cloned().collect(), - deletes: rhs_ops.iter().filter(|op| op.op_type == 0).cloned().collect(), + let join_side = |table: &DbTable, ops: Vec<&TableOp>| { + let (deletes, inserts) = ops.into_iter().cloned().partition(|op| op.op_type == 0); + JoinSide { + table_id: table.table_id, + table_name: table.head.table_name.clone(), + deletes, + inserts, + } }; - - Some((lhs, rhs)) + Some((join_side(&self.lhs, lhs_ops), join_side(&self.rhs, rhs_ops))) } /// Evaluate join plan for lhs updates. From 4244323b906de7bd232d94b84ca26787cc9ff961 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 18 Mar 2024 13:25:39 +0100 Subject: [PATCH 2/3] JoinSide: store 'Vec's instead --- crates/core/src/subscription/subscription.rs | 146 +++++++------------ 1 file changed, 51 insertions(+), 95 deletions(-) diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index 405c925caae..f2d3e9f5b9b 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -40,7 +40,7 @@ use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::ProductValue; use spacetimedb_primitives::TableId; use spacetimedb_sats::db::auth::{StAccess, StTableType}; -use spacetimedb_sats::relation::{DbTable, Header}; +use spacetimedb_sats::relation::DbTable; use spacetimedb_vm::expr::{self, IndexJoin, Query, QueryExpr, SourceSet}; use spacetimedb_vm::rel_ops::RelOps; use spacetimedb_vm::relation::MemTable; @@ -210,31 +210,21 @@ pub struct IncrementalJoin { /// One side of an [`IncrementalJoin`]. /// /// Holds the "physical" [`DbTable`] this side of the join operates on, as well -/// as the [`DatabaseTableUpdate`]s pertaining that table. +/// as the updates pertaining to that table. struct JoinSide { - table_id: TableId, - table_name: String, - inserts: Vec, - deletes: Vec, + inserts: Vec, + deletes: Vec, } impl JoinSide { - /// Return a [`DatabaseTableUpdate`] consisting of only insert operations. - pub fn inserts(&self) -> DatabaseTableUpdate { - DatabaseTableUpdate { - table_id: self.table_id, - table_name: self.table_name.clone(), - ops: self.inserts.to_vec(), - } + /// Return a list of updates consisting of only insert operations. + pub fn inserts(&self) -> Vec { + self.inserts.clone() } - /// Return a [`DatabaseTableUpdate`] with only delete operations. - pub fn deletes(&self) -> DatabaseTableUpdate { - DatabaseTableUpdate { - table_id: self.table_id, - table_name: self.table_name.clone(), - ops: self.deletes.to_vec(), - } + /// Return a list of updates with only delete operations. + pub fn deletes(&self) -> Vec { + self.deletes.clone() } /// Does this table update include inserts? @@ -249,18 +239,6 @@ impl JoinSide { } impl IncrementalJoin { - /// Construct an empty [`DatabaseTableUpdate`] with the schema of `table` - /// to use as a source when pre-compiling `eval_incr` queries. - fn dummy_table_update(table: &DbTable) -> DatabaseTableUpdate { - let table_id = table.table_id; - let table_name = table.head.table_name.clone(); - DatabaseTableUpdate { - table_id, - table_name, - ops: vec![], - } - } - fn optimize_query(join: IndexJoin) -> QueryExpr { let expr = QueryExpr::from(join); // Because (at least) one of the two tables will be a `MemTable`, @@ -313,21 +291,15 @@ impl IncrementalJoin { .context("expected a physical database table")? .clone(); - let (virtual_index_plan, _sources) = - with_delta_table(join.clone(), Some(Self::dummy_table_update(&index_table)), None); + let (virtual_index_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), None); debug_assert_eq!(_sources.len(), 1); let virtual_index_plan = Self::optimize_query(virtual_index_plan); - let (virtual_probe_plan, _sources) = - with_delta_table(join.clone(), None, Some(Self::dummy_table_update(&probe_table))); + let (virtual_probe_plan, _sources) = with_delta_table(join.clone(), None, Some(Vec::new())); debug_assert_eq!(_sources.len(), 1); let virtual_probe_plan = Self::optimize_query(virtual_probe_plan); - let (virtual_plan, _sources) = with_delta_table( - join.clone(), - Some(Self::dummy_table_update(&index_table)), - Some(Self::dummy_table_update(&probe_table)), - ); + let (virtual_plan, _sources) = with_delta_table(join.clone(), Some(Vec::new()), Some(Vec::new())); debug_assert_eq!(_sources.len(), 2); let virtual_plan = virtual_plan.to_inner_join(); @@ -360,31 +332,38 @@ impl IncrementalJoin { &self, updates: impl IntoIterator, ) -> Option<(JoinSide, JoinSide)> { - let mut lhs_ops = Vec::new(); - let mut rhs_ops = Vec::new(); + let mut lhs_inserts = Vec::new(); + let mut lhs_deletes = Vec::new(); + let mut rhs_inserts = Vec::new(); + let mut rhs_deletes = Vec::new(); + + // Partitions deletes of `update` into `ds` and inserts into `is`. + let partition_into = |ds: &mut Vec<_>, is: &mut Vec<_>, updates: &DatabaseTableUpdate| { + for update in &updates.ops { + if update.op_type == 0 { &mut *ds } else { &mut *is }.push(update.row.clone()); + } + }; + // Partitions all updates into the `l/rhs_insert/delete_ops` above. for update in updates { if update.table_id == self.lhs.table_id { - lhs_ops.extend(update.ops.iter()); + partition_into(&mut lhs_deletes, &mut lhs_inserts, update); } else if update.table_id == self.rhs.table_id { - rhs_ops.extend(update.ops.iter()); + partition_into(&mut rhs_deletes, &mut rhs_inserts, update); } } - if lhs_ops.is_empty() && rhs_ops.is_empty() { + // No updates at all? Return `None`. + if [&lhs_inserts, &lhs_deletes, &rhs_inserts, &rhs_deletes] + .iter() + .all(|ops| ops.is_empty()) + { return None; } - let join_side = |table: &DbTable, ops: Vec<&TableOp>| { - let (deletes, inserts) = ops.into_iter().cloned().partition(|op| op.op_type == 0); - JoinSide { - table_id: table.table_id, - table_name: table.head.table_name.clone(), - deletes, - inserts, - } - }; - Some((join_side(&self.lhs, lhs_ops), join_side(&self.rhs, rhs_ops))) + // Stich together the `JoinSide`s. + let join_side = |deletes, inserts| JoinSide { deletes, inserts }; + Some((join_side(lhs_deletes, lhs_inserts), join_side(rhs_deletes, rhs_inserts))) } /// Evaluate join plan for lhs updates. @@ -392,9 +371,9 @@ impl IncrementalJoin { &self, db: &RelationalDB, tx: &Tx, - lhs: DatabaseTableUpdate, + lhs: Vec, ) -> Result, DBError> { - let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs); + let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs); let mut sources = SourceSet::default(); sources.add_mem_table(lhs); eval_updates(db, tx, self.plan_for_delta_lhs(), sources) @@ -405,9 +384,9 @@ impl IncrementalJoin { &self, db: &RelationalDB, tx: &Tx, - rhs: DatabaseTableUpdate, + rhs: Vec, ) -> Result, DBError> { - let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs); + let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs); let mut sources = SourceSet::default(); sources.add_mem_table(rhs); eval_updates(db, tx, self.plan_for_delta_rhs(), sources) @@ -418,11 +397,11 @@ impl IncrementalJoin { &self, db: &RelationalDB, tx: &Tx, - lhs: DatabaseTableUpdate, - rhs: DatabaseTableUpdate, + lhs: Vec, + rhs: Vec, ) -> Result, DBError> { - let lhs = to_mem_table(self.lhs.head.clone(), self.lhs.table_access, lhs); - let rhs = to_mem_table(self.rhs.head.clone(), self.rhs.table_access, rhs); + let lhs = MemTable::new(self.lhs.head.clone(), self.lhs.table_access, lhs); + let rhs = MemTable::new(self.rhs.head.clone(), self.rhs.table_access, rhs); let mut sources = SourceSet::default(); let (index_side, probe_side) = if self.return_index_rows { (lhs, rhs) } else { (rhs, lhs) }; sources.add_mem_table(index_side); @@ -566,39 +545,25 @@ impl IncrementalJoin { } } -/// Construct a [`MemTable`] containing the updates from `delta`, -/// which must be derived from a table with `head` and `table_access`. -fn to_mem_table(head: Arc
, table_access: StAccess, delta: DatabaseTableUpdate) -> MemTable { - MemTable::new( - head, - table_access, - delta.ops.into_iter().map(|op| op.row).collect::>(), - ) -} - /// Replace an [IndexJoin]'s scan or fetch operation with a delta table. /// A delta table consists purely of updates or changes to the base table. fn with_delta_table( mut join: IndexJoin, - index_side: Option, - probe_side: Option, + index_side: Option>, + probe_side: Option>, ) -> (IndexJoin, SourceSet) { let mut sources = SourceSet::default(); if let Some(index_side) = index_side { let head = join.index_side.head().clone(); let table_access = join.index_side.table_access(); - let mem_table = to_mem_table(head, table_access, index_side); - let source_expr = sources.add_mem_table(mem_table); - join.index_side = source_expr; + join.index_side = sources.add_mem_table(MemTable::new(head, table_access, index_side)); } if let Some(probe_side) = probe_side { let head = join.probe_side.source.head().clone(); let table_access = join.probe_side.source.table_access(); - let mem_table = to_mem_table(head, table_access, probe_side); - let source_expr = sources.add_mem_table(mem_table); - join.probe_side.source = source_expr; + join.probe_side.source = sources.add_mem_table(MemTable::new(head, table_access, probe_side)); } (join, sources) @@ -715,7 +680,6 @@ pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) -> mod tests { use super::*; use crate::db::relational_db::tests_utils::make_test_db; - use crate::host::module_host::TableOp; use crate::sql::compiler::compile_sql; use spacetimedb_lib::error::ResultTest; use spacetimedb_sats::relation::{DbTable, FieldName}; @@ -731,7 +695,7 @@ mod tests { // Create table [lhs] with index on [b] let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)]; let indexes = &[(1.into(), "b")]; - let lhs_id = db.create_table_for_test("lhs", schema, indexes)?; + let _ = db.create_table_for_test("lhs", schema, indexes)?; // Create table [rhs] with index on [b, c] let schema = &[ @@ -761,11 +725,7 @@ mod tests { }; // Create an insert for an incremental update. - let delta = DatabaseTableUpdate { - table_id: lhs_id, - table_name: String::from("lhs"), - ops: vec![TableOp::insert(product![0u64, 0u64])], - }; + let delta = vec![product![0u64, 0u64]]; // Optimize the query plan for the incremental update. let (expr, _sources) = with_delta_table(join, Some(delta), None); @@ -829,7 +789,7 @@ mod tests { ("d", AlgebraicType::U64), ]; let indexes = &[(0.into(), "b"), (1.into(), "c")]; - let rhs_id = db.create_table_for_test("rhs", schema, indexes)?; + let _ = db.create_table_for_test("rhs", schema, indexes)?; let tx = db.begin_tx(); // Should generate an index join since there is an index on `lhs.b`. @@ -850,11 +810,7 @@ mod tests { }; // Create an insert for an incremental update. - let delta = DatabaseTableUpdate { - table_id: rhs_id, - table_name: String::from("rhs"), - ops: vec![TableOp::insert(product![0u64, 0u64, 0u64])], - }; + let delta = vec![product![0u64, 0u64, 0u64]]; // Optimize the query plan for the incremental update. let (expr, _sources) = with_delta_table(join, None, Some(delta)); From 68c4f1eda1c7a5418ef001ad964df21c14f138f3 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 19 Mar 2024 18:00:26 +0100 Subject: [PATCH 3/3] address joshua & phoebe's reviews --- crates/core/src/subscription/subscription.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/core/src/subscription/subscription.rs b/crates/core/src/subscription/subscription.rs index f2d3e9f5b9b..17bf99689ad 100644 --- a/crates/core/src/subscription/subscription.rs +++ b/crates/core/src/subscription/subscription.rs @@ -209,8 +209,7 @@ pub struct IncrementalJoin { /// One side of an [`IncrementalJoin`]. /// -/// Holds the "physical" [`DbTable`] this side of the join operates on, as well -/// as the updates pertaining to that table. +/// Holds the updates pertaining to a table on one side of the join. struct JoinSide { inserts: Vec, deletes: Vec, @@ -337,14 +336,19 @@ impl IncrementalJoin { let mut rhs_inserts = Vec::new(); let mut rhs_deletes = Vec::new(); - // Partitions deletes of `update` into `ds` and inserts into `is`. - let partition_into = |ds: &mut Vec<_>, is: &mut Vec<_>, updates: &DatabaseTableUpdate| { + // Partitions `updates` into `deletes` and `inserts`. + let partition_into = |deletes: &mut Vec<_>, inserts: &mut Vec<_>, updates: &DatabaseTableUpdate| { for update in &updates.ops { - if update.op_type == 0 { &mut *ds } else { &mut *is }.push(update.row.clone()); + if update.op_type == 0 { + &mut *deletes + } else { + &mut *inserts + } + .push(update.row.clone()); } }; - // Partitions all updates into the `l/rhs_insert/delete_ops` above. + // Partitions all updates into the `(l|r)hs_(insert|delete)_ops` above. for update in updates { if update.table_id == self.lhs.table_id { partition_into(&mut lhs_deletes, &mut lhs_inserts, update);