Skip to content

Commit

Permalink
feat(260): Add index scans to vm
Browse files Browse the repository at this point in the history
Fixes #260
  • Loading branch information
joshua-spacetime committed Sep 14, 2023
1 parent c47e571 commit 79af61a
Show file tree
Hide file tree
Showing 11 changed files with 259 additions and 73 deletions.
15 changes: 14 additions & 1 deletion crates/core/src/db/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::error::DBError;
use spacetimedb_lib::relation::{DbTable, RowCount};
use spacetimedb_sats::ProductValue;

use super::datastore::locking_tx_datastore::Iter;
use super::datastore::locking_tx_datastore::{Iter, IterByColEq};

#[derive(Debug, Clone, Copy)]
pub enum CatalogKind {
Expand All @@ -24,6 +24,19 @@ impl<'a> TableCursor<'a> {
}
}

/// A relational iterator wrapping a storage level index iterator.
/// A relational iterator returns [RelValue]s whereas storage iterators return [DataRef]s.
pub struct IndexCursor<'a> {
pub table: DbTable,
pub iter: IterByColEq<'a>,
}

impl<'a> IndexCursor<'a> {
pub fn new(table: DbTable, iter: IterByColEq<'a>) -> Result<Self, DBError> {
Ok(Self { table, iter })
}
}

/// Common wrapper for relational iterators of [Catalog].
pub struct CatalogCursor<I> {
pub(crate) table: DbTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl BTreeIndex {
///
/// For a unique index this will always yield at most one `RowId`.
#[tracing::instrument(skip_all)]
pub(crate) fn seek<'a>(&'a self, value: &'a AlgebraicValue) -> BTreeIndexRangeIter<'a> {
pub(crate) fn seek<'a>(&'a self, value: &AlgebraicValue) -> BTreeIndexRangeIter<'a> {
let k_start = IndexKey::from_row(value, DataKey::min_datakey());
let k_end = IndexKey::from_row(value, DataKey::max_datakey());
BTreeIndexRangeIter {
Expand Down
81 changes: 38 additions & 43 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ impl CommittedState {
&'a self,
table_id: &TableId,
col_id: &ColId,
value: &'a AlgebraicValue,
value: &AlgebraicValue,
) -> Option<BTreeIndexRangeIter<'a>> {
if let Some(table) = self.tables.get(table_id) {
table.index_seek(*col_id, value)
Expand Down Expand Up @@ -321,7 +321,7 @@ impl TxState {
&'a self,
table_id: &TableId,
col_id: &ColId,
value: &'a AlgebraicValue,
value: &AlgebraicValue,
) -> Option<BTreeIndexRangeIter<'a>> {
self.insert_tables.get(table_id)?.index_seek(*col_id, value)
}
Expand Down Expand Up @@ -580,7 +580,7 @@ impl Inner {
fn drop_table_from_st_tables(&mut self, table_id: TableId) -> super::Result<()> {
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let value = AlgebraicValue::U32(table_id.0);
let rows = self.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, &value)?;
let rows = self.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, value)?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
Expand All @@ -592,7 +592,7 @@ impl Inner {
fn drop_table_from_st_columns(&mut self, table_id: TableId) -> super::Result<()> {
const ST_COLUMNS_TABLE_ID_COL: ColId = ColId(0);
let value = AlgebraicValue::U32(table_id.0);
let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, &ST_COLUMNS_TABLE_ID_COL, &value)?;
let rows = self.iter_by_col_eq(&ST_COLUMNS_ID, &ST_COLUMNS_TABLE_ID_COL, value)?;
let rows = rows.map(|row| row.view().to_owned()).collect::<Vec<_>>();
if rows.is_empty() {
return Err(TableError::IdNotFound(table_id.0).into());
Expand Down Expand Up @@ -621,7 +621,7 @@ impl Inner {
.iter_by_col_eq(
&ST_SEQUENCES_ID,
&ST_SEQUENCES_SEQUENCE_ID_COL,
&AlgebraicValue::U32(seq_id.0),
AlgebraicValue::U32(seq_id.0),
)?
.last()
.unwrap()
Expand Down Expand Up @@ -690,7 +690,7 @@ impl Inner {
.iter_by_col_eq(
&ST_SEQUENCES_ID,
&ST_SEQUENCES_SEQUENCE_ID_COL,
&AlgebraicValue::U32(seq_id.0),
AlgebraicValue::U32(seq_id.0),
)?
.last()
.unwrap()
Expand All @@ -706,7 +706,7 @@ impl Inner {
self.iter_by_col_eq(
&ST_SEQUENCES_ID,
&seq_name_col,
&AlgebraicValue::String(seq_name.to_owned()),
AlgebraicValue::String(seq_name.to_owned()),
)
.map(|mut iter| {
iter.next()
Expand Down Expand Up @@ -837,7 +837,7 @@ impl Inner {
// so. We can't just call iter_by_col_eq here as that would attempt to use the
// index which we haven't created yet. So instead we just manually Scan here.
let rows = IterByColEq::Scan(ScanIterByColEq {
value: &AlgebraicValue::U32(table_id.0),
value: AlgebraicValue::U32(table_id.0),
col_id: table_id_col,
scan_iter: self.iter(&ST_TABLES_ID)?,
})
Expand All @@ -852,7 +852,7 @@ impl Inner {
// Look up the columns for the table in question.
let mut columns = Vec::new();
const TABLE_ID_COL: ColId = ColId(0);
for data_ref in self.iter_by_col_eq(&ST_COLUMNS_ID, &TABLE_ID_COL, &AlgebraicValue::U32(table_id))? {
for data_ref in self.iter_by_col_eq(&ST_COLUMNS_ID, &TABLE_ID_COL, AlgebraicValue::U32(table_id))? {
let row = data_ref.view();

let el = StColumnRow::try_from(row)?;
Expand All @@ -871,7 +871,7 @@ impl Inner {
// Look up the indexes for the table in question.
let mut indexes = Vec::new();
let table_id_col: ColId = ColId(1);
for data_ref in self.iter_by_col_eq(&ST_INDEXES_ID, &table_id_col, &AlgebraicValue::U32(table_id))? {
for data_ref in self.iter_by_col_eq(&ST_INDEXES_ID, &table_id_col, AlgebraicValue::U32(table_id))? {
let row = data_ref.view();

let el = StIndexRow::try_from(row)?;
Expand Down Expand Up @@ -903,7 +903,7 @@ impl Inner {
.iter_by_col_eq(
&ST_INDEXES_ID,
&ST_INDEXES_TABLE_ID_COL,
&AlgebraicValue::U32(table_id.0),
AlgebraicValue::U32(table_id.0),
)?
.collect::<Vec<_>>();
for data_ref in rows {
Expand All @@ -918,7 +918,7 @@ impl Inner {
.iter_by_col_eq(
&ST_SEQUENCES_ID,
&ST_SEQUENCES_TABLE_ID_COL,
&AlgebraicValue::U32(table_id.0),
AlgebraicValue::U32(table_id.0),
)?
.collect::<Vec<_>>();
for data_ref in rows {
Expand All @@ -944,7 +944,7 @@ impl Inner {
// Update the table's name in st_tables.
const ST_TABLES_TABLE_ID_COL: ColId = ColId(0);
let rows = self
.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq(&ST_TABLES_ID, &ST_TABLES_TABLE_ID_COL, AlgebraicValue::U32(table_id.0))?
.collect::<Vec<_>>();
assert!(rows.len() <= 1, "Expected at most one row in st_tables for table_id");
let row = rows.first().ok_or_else(|| TableError::IdNotFound(table_id.0))?;
Expand All @@ -961,7 +961,7 @@ impl Inner {
self.iter_by_col_eq(
&ST_TABLES_ID,
&table_name_col,
&AlgebraicValue::String(table_name.to_owned()),
AlgebraicValue::String(table_name.to_owned()),
)
.map(|mut iter| {
iter.next()
Expand All @@ -971,7 +971,7 @@ impl Inner {

fn table_name_from_id(&self, table_id: TableId) -> super::Result<Option<String>> {
let table_id_col: ColId = ColId(0);
self.iter_by_col_eq(&ST_TABLES_ID, &table_id_col, &AlgebraicValue::U32(table_id.0))
self.iter_by_col_eq(&ST_TABLES_ID, &table_id_col, AlgebraicValue::U32(table_id.0))
.map(|mut iter| {
iter.next()
.map(|row| row.view().elements[1].as_string().unwrap().to_owned())
Expand Down Expand Up @@ -1075,7 +1075,7 @@ impl Inner {
.iter_by_col_eq(
&ST_INDEXES_ID,
&ST_INDEXES_INDEX_ID_COL,
&AlgebraicValue::U32(index_id.0),
AlgebraicValue::U32(index_id.0),
)?
.last()
.unwrap()
Expand Down Expand Up @@ -1126,7 +1126,7 @@ impl Inner {
self.iter_by_col_eq(
&ST_INDEXES_ID,
&index_name_col,
&AlgebraicValue::String(index_name.to_owned()),
AlgebraicValue::String(index_name.to_owned()),
)
.map(|mut iter| {
iter.next()
Expand Down Expand Up @@ -1228,7 +1228,7 @@ impl Inner {
for seq_row in self.iter_by_col_eq(
&ST_SEQUENCES_ID,
&st_sequences_table_id_col,
&AlgebraicValue::U32(table_id.0),
AlgebraicValue::U32(table_id.0),
)? {
let seq_row = seq_row.view();
let seq_row = StSequenceRow::try_from(seq_row)?;
Expand Down Expand Up @@ -1504,12 +1504,7 @@ impl Inner {
/// Returns an iterator,
/// yielding every row in the table identified by `table_id`,
/// where the column data identified by `col_id` equates to `value`.
fn iter_by_col_eq<'a>(
&'a self,
table_id: &TableId,
col_id: &ColId,
value: &'a AlgebraicValue,
) -> super::Result<IterByColEq> {
fn iter_by_col_eq(&self, table_id: &TableId, col_id: &ColId, value: AlgebraicValue) -> super::Result<IterByColEq> {
// We have to index_seek in both the committed state and the current tx state.
// First, we will check modifications in the current tx. It may be that the table
// has not been modified yet in the current tx, in which case we will only search
Expand All @@ -1524,25 +1519,25 @@ impl Inner {
if let Some(inserted_rows) = self
.tx_state
.as_ref()
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, value))
.and_then(|tx_state| tx_state.index_seek(table_id, col_id, &value))
{
// The current transaction has modified this table, and the table is indexed.
let tx_state = self.tx_state.as_ref().unwrap();
Ok(IterByColEq::Index(IndexIterByColEq {
value,
value: value.clone(),
col_id: *col_id,
iter: IndexSeekIterInner {
table_id: *table_id,
tx_state,
inserted_rows,
committed_rows: self.committed_state.index_seek(table_id, col_id, value),
committed_rows: self.committed_state.index_seek(table_id, col_id, &value),
committed_state: &self.committed_state,
},
}))
} else {
// Either the current transaction has not modified this table, or the table is not
// indexed.
match self.committed_state.index_seek(table_id, col_id, value) {
match self.committed_state.index_seek(table_id, col_id, &value) {
//If we don't have `self.tx_state` yet is likely we are running the bootstrap process
Some(committed_rows) => match self.tx_state.as_ref() {
None => Ok(IterByColEq::Scan(ScanIterByColEq {
Expand Down Expand Up @@ -1821,7 +1816,7 @@ impl Iterator for IterByColEq<'_> {
pub struct ScanIterByColEq<'a> {
scan_iter: Iter<'a>,
col_id: ColId,
value: &'a AlgebraicValue,
value: AlgebraicValue,
}

impl Iterator for ScanIterByColEq<'_> {
Expand All @@ -1832,7 +1827,7 @@ impl Iterator for ScanIterByColEq<'_> {
for data_ref in &mut self.scan_iter {
let row = data_ref.view();
let value = &row.elements[self.col_id.0 as usize];
if self.value == value {
if &self.value == value {
return Some(data_ref);
}
}
Expand All @@ -1843,7 +1838,7 @@ impl Iterator for ScanIterByColEq<'_> {
pub struct IndexIterByColEq<'a> {
iter: IndexSeekIterInner<'a>,
col_id: ColId,
value: &'a AlgebraicValue,
value: AlgebraicValue,
}

impl Iterator for IndexIterByColEq<'_> {
Expand All @@ -1854,7 +1849,7 @@ impl Iterator for IndexIterByColEq<'_> {
self.iter.find(|data_ref| {
let row = data_ref.view();
let value = &row.elements[self.col_id.0 as usize];
self.value == value
&self.value == value
})
}
}
Expand Down Expand Up @@ -1988,7 +1983,7 @@ impl TxDatastore for Locking {
tx: &'a Self::TxId,
table_id: TableId,
col_id: ColId,
value: &'a spacetimedb_sats::AlgebraicValue,
value: spacetimedb_sats::AlgebraicValue,
) -> super::Result<Self::IterByColEq<'a>> {
self.iter_by_col_eq_mut_tx(tx, table_id, col_id, value)
}
Expand Down Expand Up @@ -2127,7 +2122,7 @@ impl MutTxDatastore for Locking {
tx: &'a Self::MutTxId,
table_id: TableId,
col_id: ColId,
value: &'a spacetimedb_sats::AlgebraicValue,
value: spacetimedb_sats::AlgebraicValue,
) -> super::Result<Self::IterByColEq<'a>> {
tx.lock.iter_by_col_eq(&table_id, &col_id, value)
}
Expand Down Expand Up @@ -2355,7 +2350,7 @@ mod tests {
let schema = basic_table_schema();
let table_id = datastore.create_table_mut_tx(&mut tx, schema)?;
let table_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StTableRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| x.table_id)
.collect::<Vec<_>>();
Expand All @@ -2367,7 +2362,7 @@ mod tests {
]
);
let column_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StColumnRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| (x.table_id, x.col_id))
.collect::<Vec<_>>();
Expand All @@ -2392,7 +2387,7 @@ mod tests {
datastore.commit_mut_tx(tx)?;
let tx = datastore.begin_mut_tx();
let table_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StTableRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| x.table_id)
.collect::<Vec<_>>();
Expand All @@ -2404,7 +2399,7 @@ mod tests {
]
);
let column_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StColumnRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| (x.table_id, x.col_id))
.collect::<Vec<_>>();
Expand All @@ -2429,13 +2424,13 @@ mod tests {
datastore.rollback_mut_tx(tx);
let tx = datastore.begin_mut_tx();
let table_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_TABLES_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StTableRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| x.table_id)
.collect::<Vec<_>>();
assert_eq!(table_rows, vec![]);
let column_rows = datastore
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), &AlgebraicValue::U32(table_id.0))?
.iter_by_col_eq_mut_tx(&tx, ST_COLUMNS_ID, ColId(0), AlgebraicValue::U32(table_id.0))?
.map(|x| StColumnRow::try_from(x.view()).unwrap().to_owned())
.sorted_by_key(|x| x.table_id)
.collect::<Vec<_>>();
Expand Down Expand Up @@ -3100,7 +3095,7 @@ mod tests {
let mut tx = datastore.begin_mut_tx();
// Iterate over all rows with the value 1 (from the autoinc) in column 0.
let rows = datastore
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), &AlgebraicValue::U32(1))?
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), AlgebraicValue::U32(1))?
.collect::<Vec<_>>();
assert_eq!(rows.len(), 1);
let rows: Vec<ProductValue> = rows
Expand All @@ -3114,7 +3109,7 @@ mod tests {

// We shouldn't see the row when iterating now that it's deleted.
let rows = datastore
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), &AlgebraicValue::U32(1))?
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), AlgebraicValue::U32(1))?
.collect::<Vec<_>>();
assert_eq!(rows.len(), 0);

Expand All @@ -3125,7 +3120,7 @@ mod tests {
// The actual test: we should be able to iterate again, while still in the
// second transaction, and see exactly one row.
let rows = datastore
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), &AlgebraicValue::U32(1))?
.iter_by_col_eq_mut_tx(&tx, table_id, ColId(0), AlgebraicValue::U32(1))?
.collect::<Vec<_>>();
assert_eq!(rows.len(), 1);

Expand Down
6 changes: 1 addition & 5 deletions crates/core/src/db/datastore/locking_tx_datastore/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ impl Table {
/// Matching is defined by `Ord for AlgebraicValue`.
///
/// For a unique index this will always yield at most one `RowId`.
pub(crate) fn index_seek<'a>(
&'a self,
col_id: ColId,
value: &'a AlgebraicValue,
) -> Option<BTreeIndexRangeIter<'a>> {
pub(crate) fn index_seek<'a>(&'a self, col_id: ColId, value: &AlgebraicValue) -> Option<BTreeIndexRangeIter<'a>> {
self.indexes.get(&col_id).map(|index| index.seek(value))
}

Expand Down
Loading

0 comments on commit 79af61a

Please sign in to comment.