Skip to content

Commit

Permalink
test(553): Generate right index join for incremental updates
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime committed Nov 14, 2023
1 parent ec791ca commit 12b5690
Showing 1 changed file with 97 additions and 1 deletion.
98 changes: 97 additions & 1 deletion crates/core/src/sql/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,15 @@ mod tests {
use std::ops::Bound;

use crate::db::relational_db::tests_utils::make_test_db;
use crate::host::module_host::{DatabaseTableUpdate, TableOp};
use crate::subscription::query;
use spacetimedb_lib::error::ResultTest;
use spacetimedb_lib::operator::OpQuery;
use spacetimedb_primitives::TableId;
use spacetimedb_sats::data_key::ToDataKey;
use spacetimedb_sats::db::def::{ColumnDef, IndexDef, TableDef};
use spacetimedb_sats::AlgebraicType;
use spacetimedb_sats::relation::MemTable;
use spacetimedb_sats::{product, AlgebraicType};
use spacetimedb_vm::expr::{IndexJoin, IndexScan, JoinExpr, Query};

fn create_table(
Expand Down Expand Up @@ -1025,4 +1029,96 @@ mod tests {
};
Ok(())
}

#[test]
fn compile_incremental_index_join() -> ResultTest<()> {
let (db, _) = make_test_db()?;
let mut tx = db.begin_tx();

// Create table [lhs] with index on [b]
let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
let indexes = &[(1.into(), "b")];
let lhs_id = create_table(&db, &mut tx, "lhs", schema, indexes)?;

// Create table [rhs] with index on [b, c]
let schema = &[
("b", AlgebraicType::U64),
("c", AlgebraicType::U64),
("d", AlgebraicType::U64),
];
let indexes = &[(0.into(), "b"), (1.into(), "c")];
let rhs_id = create_table(&db, &mut tx, "rhs", schema, indexes)?;

// Should generate an index join since there is an index on `lhs.b`.
// Should push the sargable range condition into the index join's probe side.
let sql = "select lhs.* from lhs join rhs on lhs.b = rhs.b where rhs.c > 2 and rhs.c < 4 and rhs.d = 3";
let exp = compile_sql(&db, &tx, sql)?.remove(0);

let CrudExpr::Query(expr) = exp else {
panic!("unexpected result from compilation: {:?}", exp);
};

// Create an insert for an incremental update.
let row = product!(0u64, 0u64);
let insert = TableOp {
op_type: 1,
row_pk: row.to_data_key().to_bytes(),
row,
};
let insert = DatabaseTableUpdate {
table_id: lhs_id,
table_name: String::from("lhs"),
ops: vec![insert],
};

// Optimize the query plan for the incremental update.
let expr = query::to_mem_table(expr, &insert);
let expr = expr.optimize();

let QueryExpr {
source:
SourceExpr::MemTable(MemTable {
head: Header { table_name, .. },
..
}),
query,
..
} = expr
else {
panic!("unexpected result after optimization: {:?}", expr);
};

assert_eq!(table_name, "lhs");
assert_eq!(query.len(), 1);

let Query::IndexJoin(IndexJoin {
probe_side:
QueryExpr {
source: SourceExpr::MemTable(_),
query: ref lhs,
},
probe_field:
FieldName::Name {
table: ref probe_table,
field: ref probe_field,
},
index_header: _,
index_select: Some(_),
index_table,
index_col,
return_index_rows: false,
}) = query[0]
else {
panic!("unexpected operator {:#?}", query[0]);
};

assert!(lhs.is_empty());

// Assert that original index and probe tables have been swapped.
assert_eq!(index_table, rhs_id);
assert_eq!(index_col, 0.into());
assert_eq!(probe_field, "b");
assert_eq!(probe_table, "lhs");
Ok(())
}
}

0 comments on commit 12b5690

Please sign in to comment.