diff --git a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs index 8aa66b2387b..9a1d3b584ea 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/committed_state.rs @@ -491,7 +491,7 @@ impl CommittedState { tx_data } - fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap) { + fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: IntMap) { for (table_id, row_ptrs) in delete_tables { if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) { let mut deletes = Vec::with_capacity(row_ptrs.len()); @@ -500,7 +500,7 @@ impl CommittedState { // holds only committed rows which should be deleted, // i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`, // so no need to check before applying the deletes. - for row_ptr in row_ptrs.iter().copied() { + for row_ptr in row_ptrs { debug_assert!(row_ptr.squashed_offset().is_committed_state()); // TODO: re-write `TxData` to remove `ProductValue`s @@ -524,7 +524,7 @@ impl CommittedState { fn merge_apply_inserts( &mut self, tx_data: &mut TxData, - insert_tables: BTreeMap, + insert_tables: IntMap, tx_blob_store: impl BlobStore, ) { // TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state, diff --git a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs index 6893b3793d0..3c964eb2820 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/datastore.rs @@ -6,7 +6,7 @@ use super::{ tx::TxId, tx_state::TxState, }; -use crate::execution_context::Workload; +use crate::{db::datastore::system_tables::ST_RESERVED_SEQUENCE_RANGE, execution_context::Workload}; use crate::{ db::{ datastore::{ @@ -564,7 +564,11 @@ impl MutTxDatastore for Locking { table_id: TableId, mut row: ProductValue, ) -> Result<(AlgebraicValue, RowRef<'a>)> { - let (gens, row_ref) = tx.insert(table_id, &mut row)?; + let (gens, row_ref) = if table_id.0 <= ST_RESERVED_SEQUENCE_RANGE { + tx.insert(table_id, &mut row)? + } else { + tx.insert_2(table_id, &mut row)? + }; Ok((gens, row_ref.collapse())) } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index b13b650db79..8f0749c9319 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -7,7 +7,7 @@ use super::{ tx_state::{DeleteTable, IndexIdMap, TxState}, SharedMutexGuard, SharedWriteGuard, }; -use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; +use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID}; use crate::db::datastore::{ system_tables::{ StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, StIndexRow, @@ -1147,6 +1147,24 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> { } impl MutTxId { + /// Insert a row into a table. + /// + /// Requires: + /// - `TableId` must refer to a valid table for the database at `database_address`. + /// - `row` must be a valid row for the table at `table_id`. + /// + /// Returns: + /// - a product value with a projection of the row containing only the generated column values. + /// - a ref to the inserted row. + pub(super) fn insert_2<'a>( + &'a mut self, + table_id: TableId, + row: &mut ProductValue, + ) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> { + let row_ref = self.insert_row_internal_2(table_id, row)?; + Ok((AlgebraicValue::product([]), row_ref)) + } + /// Insert a row into a table. /// /// Requires: @@ -1162,6 +1180,7 @@ impl MutTxId { row: &mut ProductValue, ) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> { let generated = self.write_sequence_values(table_id, row)?; + let row_ref = self.insert_row_internal(table_id, row)?; Ok((generated, row_ref)) } @@ -1193,6 +1212,125 @@ impl MutTxId { Ok(row.project(&cols_to_update)?) } + pub(super) fn insert_row_internal_2(&mut self, table_id: TableId, row: &ProductValue) -> Result> { + let commit_table = self.committed_state_write_lock.get_table(table_id); + + // NOTE(basics): update path, with a single `#[primary_key]` constraint. + // There are no other indices or constraint on the table. + // + // We proceed to: + // + // 1. Find the row using a `IndexId`, equating the index to a value. + // This first tries the committed state, as that is most likely. + // If not found in the committed state, find in tx state. + // + // 2. Update the row to the provided PV by deleting in the committed state + // and setting the new value in the tx state. + // Alternatively, if the row came from tx state, update the row in-place in tx state. + // + // Notably, this doesn't need to check constraints and set-semantic violations, + // as those are implied by `#[primary_key]`. + // We don't even need to maintain a pointer map. + // We don't implement this here and instead attempt an approximation by deleting checks. + + // SKIP(basics): update path, no columns with a unique constraints were changed, + // so don't check this. + /* + // Check for constraint violations as early as possible, + // to ensure that `UniqueConstraintViolation` errors have precedence over other errors. + // `tx_table.insert` will later perform the same check on the tx table, + // so this method needs only to check the committed state. + if let Some(commit_table) = commit_table { + commit_table + .check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict)) + .map_err(IndexError::from)?; + } + */ + + // Get the insert table, so we can write the row into it. + let (tx_table, tx_blob_store, _, delete_table) = self + .tx_state + .get_table_and_blob_store_or_maybe_create_from(table_id, commit_table) + .ok_or(TableError::IdNotFoundState(table_id))?; + + match tx_table.insert_2(tx_blob_store, row) { + Ok(row_ref) => { + // `row` not previously present in insert tables, + // but may still be a set-semantic conflict with a row + // in the committed state. + let ptr = row_ref.pointer(); + + // SKIP(basics): we don't need to check for set semantic collisions. + /* + if let Some(commit_table) = commit_table { + // Safety: + // - `commit_table` and `tx_table` use the same schema + // because `tx_table` is derived from `commit_table`. + // - `ptr` and `hash` are correct because we just got them from `tx_table.insert`. + if let Some(committed_ptr) = unsafe { Table::find_same_row(commit_table, tx_table, ptr, hash) } { + // If `row` was already present in the committed state, + // either this is a set-semantic duplicate, + // or the row is marked as deleted, so we will undelete it + // and leave it in the committed state. + // Either way, it should not appear in the insert tables, + // so roll back the insertion. + // + // NOTE for future MVCC implementors: + // In MVCC, it is no longer valid to elide inserts in this way. + // When a transaction inserts a row, that row *must* appear in its insert tables, + // even if the row is already present in the committed state. + // + // Imagine a chain of committed but un-squashed transactions: + // `Committed 0: Insert Row A` - `Committed 1: Delete Row A` + // where `Committed 1` happens after `Committed 0`. + // Imagine a transaction `Running 2: Insert Row A`, + // which began before `Committed 1` was committed. + // Because `Committed 1` has since been committed, + // `Running 2` *must* happen after `Committed 1`. + // Therefore, the correct sequence of events is: + // - Insert Row A + // - Delete Row A + // - Insert Row A + // This is impossible to recover if `Running 2` elides its insert. + tx_table + .delete(tx_blob_store, ptr, |_| ()) + .expect("Failed to delete a row we just inserted"); + + // It's possible that `row` appears in the committed state, + // but is marked as deleted. + // In this case, undelete it, so it remains in the committed state. + delete_table.remove(&committed_ptr); + + // No new row was inserted, but return `committed_ptr`. + let blob_store = &self.committed_state_write_lock.blob_store; + return Ok(RowRefInsertion::Existed( + // SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`. + unsafe { commit_table.get_row_ref_unchecked(blob_store, committed_ptr) }, + )); + } + } + */ + + Ok(RowRefInsertion::Inserted(unsafe { + // SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls. + tx_table.get_row_ref_unchecked(tx_blob_store, ptr) + })) + } + // `row` previously present in insert tables; do nothing but return `ptr`. + Err(InsertError::Duplicate(ptr)) => Ok(RowRefInsertion::Existed( + // SAFETY: `tx_table` told us that `ptr` refers to a valid row in it. + unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) }, + )), + + // Index error: unbox and return `TableError::IndexError` + // rather than `TableError::Insert(InsertError::IndexError)`. + Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()), + + // Misc. insertion error; fail. + Err(e) => Err(TableError::Insert(e).into()), + } + } + pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result> { let commit_table = self.committed_state_write_lock.get_table(table_id); @@ -1217,8 +1355,8 @@ impl MutTxId { // `row` not previously present in insert tables, // but may still be a set-semantic conflict with a row // in the committed state. - let ptr = row_ref.pointer(); + if let Some(commit_table) = commit_table { // Safety: // - `commit_table` and `tx_table` use the same schema diff --git a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs index 74d2c932346..493d8851b9c 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/state_view.rs @@ -1,5 +1,7 @@ use super::{ - committed_state::CommittedIndexIter, committed_state::CommittedState, datastore::Result, tx_state::TxState, + committed_state::{CommittedIndexIter, CommittedState}, + datastore::Result, + tx_state::{DeleteTable, TxState}, }; use crate::{ db::datastore::system_tables::{ @@ -13,7 +15,10 @@ use core::ops::RangeBounds; use spacetimedb_primitives::{ColList, TableId}; use spacetimedb_sats::AlgebraicValue; use spacetimedb_schema::schema::{ColumnSchema, TableSchema}; -use spacetimedb_table::table::{IndexScanIter, RowRef, TableScanIter}; +use spacetimedb_table::{ + blob_store::HashMapBlobStore, + table::{IndexScanIter, RowRef, Table, TableScanIter}, +}; use std::sync::Arc; // StateView trait, is designed to define the behavior of viewing internal datastore states. @@ -146,28 +151,31 @@ pub trait StateView { pub struct Iter<'a> { table_id: TableId, - tx_state: Option<&'a TxState>, + tx_state_del: Option<&'a DeleteTable>, + tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>, committed_state: &'a CommittedState, - #[allow(dead_code)] - table_name: &'a str, stage: ScanStage<'a>, - num_committed_rows_fetched: u64, } impl<'a> Iter<'a> { pub(super) fn new( table_id: TableId, - table_name: &'a str, + _table_name: &'a str, tx_state: Option<&'a TxState>, committed_state: &'a CommittedState, ) -> Self { + let tx_state_ins = tx_state.and_then(|tx| { + let ins = tx.insert_tables.get(&table_id)?; + let bs = &tx.blob_store; + Some((ins, bs)) + }); + let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id)); Self { table_id, - tx_state, + tx_state_ins, + tx_state_del, committed_state, - table_name, stage: ScanStage::Start, - num_committed_rows_fetched: 0, } } } @@ -175,7 +183,8 @@ impl<'a> Iter<'a> { enum ScanStage<'a> { Start, CurrentTx { iter: TableScanIter<'a> }, - Committed { iter: TableScanIter<'a> }, + CommittedNoTxDeletes { iter: TableScanIter<'a> }, + CommittedWithTxDeletes { iter: TableScanIter<'a> }, } impl<'a> Iterator for Iter<'a> { @@ -184,18 +193,6 @@ impl<'a> Iterator for Iter<'a> { fn next(&mut self) -> Option { let table_id = self.table_id; - // Moves the current scan stage to the current tx if rows were inserted in it. - // Returns `None` otherwise. - // NOTE(pgoldman 2024-01-05): above comment appears to not describe the behavior of this function. - let maybe_stage_current_tx_inserts = |this: &mut Self| { - let table = &this.tx_state?; - let insert_table = table.insert_tables.get(&table_id)?; - this.stage = ScanStage::CurrentTx { - iter: insert_table.scan_rows(&table.blob_store), - }; - Some(()) - }; - // The finite state machine goes: // Start --> CurrentTx ---\ // | ^ | @@ -208,19 +205,27 @@ impl<'a> Iterator for Iter<'a> { if let Some(table) = self.committed_state.tables.get(&table_id) { // The committed state has changes for this table. // Go through them in (1). - self.stage = ScanStage::Committed { - iter: table.scan_rows(&self.committed_state.blob_store), + let iter = table.scan_rows(&self.committed_state.blob_store); + self.stage = if self.tx_state_del.is_some() { + ScanStage::CommittedWithTxDeletes { iter } + } else { + ScanStage::CommittedNoTxDeletes { iter } }; - } else { - // No committed changes, so look for inserts in the current tx in (2). - maybe_stage_current_tx_inserts(self); + continue; } } - ScanStage::Committed { iter } => { + ScanStage::CommittedNoTxDeletes { iter } => { + if let next @ Some(_) = iter.next() { + return next; + } + } + ScanStage::CommittedWithTxDeletes { iter } => { // (1) Go through the committed state for this table. - for row_ref in iter { - // Increment metric for number of committed rows scanned. - self.num_committed_rows_fetched += 1; + let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() }; + if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) { + return next; + } + /* // Check the committed row's state in the current tx. // If it's been deleted, skip it. // If it's still present, yield it. @@ -247,11 +252,7 @@ impl<'a> Iterator for Iter<'a> { // // As a result, in MVCC, this branch will need to check if the `row_ref` // also exists in the `tx_state.insert_tables` and ensure it is yielded only once. - if self - .tx_state - .filter(|tx_state| tx_state.is_deleted(table_id, row_ref.pointer())) - .is_none() - { + if !del_tables.contains(&row_ref.pointer()) { // There either are no state changes for the current tx (`None`), // or there are, but `row_id` specifically has not been changed. // Either way, the row is in the committed state @@ -260,15 +261,19 @@ impl<'a> Iterator for Iter<'a> { return Some(row_ref); } } - // (3) We got here, so we must've exhausted the committed changes. - // Start looking in the current tx for inserts, if any, in (2). - maybe_stage_current_tx_inserts(self)?; + */ } ScanStage::CurrentTx { iter } => { // (2) look for inserts in the current tx. return iter.next(); } } + + // (3) We got here, so we must've exhausted the committed changes. + // Start looking in the current tx for inserts, if any, in (2). + let (insert_table, blob_store) = self.tx_state_ins?; + let iter = insert_table.scan_rows(blob_store); + self.stage = ScanStage::CurrentTx { iter }; } } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs index 5b04d76eed0..0b0609547c2 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/tx_state.rs @@ -1,5 +1,5 @@ use core::ops::RangeBounds; -use spacetimedb_data_structures::map::{IntMap, IntSet}; +use spacetimedb_data_structures::map::{Entry, IntMap, IntSet}; use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::{AlgebraicType, AlgebraicValue}; use spacetimedb_table::{ @@ -50,13 +50,13 @@ pub(super) struct TxState { /// a separate `Table` containing only the new insertions. /// /// `RowPointer`s into the `insert_tables` use `SquashedOffset::TX_STATE`. - pub(super) insert_tables: BTreeMap, + pub(super) insert_tables: IntMap, /// For any `TableId` that has had a previously-committed row deleted from it, /// a set of the deleted previously-committed rows. /// /// Any `RowPointer` in this set will have `SquashedOffset::COMMITTED_STATE`. - pub(super) delete_tables: BTreeMap, + pub(super) delete_tables: IntMap, /// A blob store for those blobs referred to by the `insert_tables`. /// @@ -77,7 +77,7 @@ pub(super) struct TxState { pub(super) index_id_map_removals: Option>, } -static_assert_size!(TxState, 120); +static_assert_size!(TxState, 136); impl TxState { /// Returns the row count in insert tables @@ -170,11 +170,11 @@ impl TxState { let blob_store = &mut self.blob_store; let idx_map = &mut self.index_id_map; let tbl = match insert_tables.entry(table_id) { - btree_map::Entry::Vacant(e) => { + Entry::Vacant(e) => { let new_table = template?.clone_structure(SquashedOffset::TX_STATE); e.insert(new_table) } - btree_map::Entry::Occupied(e) => e.into_mut(), + Entry::Occupied(e) => e.into_mut(), }; Some((tbl, blob_store, idx_map, delete_tables.entry(table_id).or_default())) } diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index 44943f82db1..159ebc95528 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -226,6 +226,97 @@ impl Table { Ok(()) } + /// Insert a `row` into this table, storing its large var-len members in the `blob_store`. + /// + /// On success, returns the hash of the newly-inserted row, + /// and a `RowRef` referring to the row. + /// + /// When a row equal to `row` already exists in `self`, + /// returns `InsertError::Duplicate(existing_row_pointer)`, + /// where `existing_row_pointer` is a `RowPointer` which identifies the existing row. + /// In this case, the duplicate is not inserted, + /// but internal data structures may be altered in ways that affect performance and fragmentation. + /// + /// TODO(error-handling): describe errors from `write_row_to_pages` and return meaningful errors. + pub fn insert_2<'a>( + &'a mut self, + blob_store: &'a mut dyn BlobStore, + row: &ProductValue, + ) -> Result, InsertError> { + // SKIP(perf): update path, no columns with a unique constraints were changed, + // so don't check this. + /* + // Check unique constraints. + // This error should take precedence over any other potential failures. + self.check_unique_constraints( + row, + // No need to worry about the committed vs tx state dichotomy here; + // just treat all rows in the table as live. + |_| false, + )?; + */ + + // Insert the row into the page manager. + let ptr = self.insert_internal_2(blob_store, row)?; + + // SAFETY: We just inserted `ptr`, so it must be present. + let row_ref = unsafe { self.inner.get_row_ref_unchecked(blob_store, ptr) }; + + /* + // Insert row into indices. + for (cols, index) in self.indexes.iter_mut() { + index.insert(cols, row_ref).unwrap(); + } + */ + + Ok(row_ref) + } + + /// Insert a `row` into this table. + /// NOTE: This method skips index updating. Use `insert` to insert a row with index updating. + pub fn insert_internal_2( + &mut self, + blob_store: &mut dyn BlobStore, + row: &ProductValue, + ) -> Result { + // Optimistically insert the `row` before checking for set-semantic collisions, + // under the assumption that set-semantic collisions are rare. + let (row_ref, blob_bytes) = self.insert_internal_allow_duplicate(blob_store, row)?; + + // Ensure row isn't already there. + // SAFETY: We just inserted `ptr`, so we know it's valid. + //let hash = row_ref.row_hash(); + // Safety: + // We just inserted `ptr` and computed `hash`, so they're valid. + // `self` trivially has the same `row_layout` as `self`. + let ptr = row_ref.pointer(); + /* + let existing_row = unsafe { Self::find_same_row(self, self, ptr, hash) }; + + if let Some(existing_row) = existing_row { + // If an equal row was already present, + // roll back our optimistic insert to avoid violating set semantics. + + // SAFETY: we just inserted `ptr`, so it must be valid. + unsafe { + self.inner + .pages + .delete_row(&self.inner.visitor_prog, self.row_size(), ptr, blob_store) + }; + return Err(InsertError::Duplicate(existing_row)); + } + */ + self.row_count += 1; + self.blob_store_bytes += blob_bytes; + + // If the optimistic insertion was correct, + // i.e. this is not a set-semantic duplicate, + // add it to the `pointer_map`. + //self.pointer_map.insert(hash, ptr); + + Ok(ptr) + } + /// Insert a `row` into this table, storing its large var-len members in the `blob_store`. /// /// On success, returns the hash of the newly-inserted row, @@ -284,6 +375,7 @@ impl Table { // We just inserted `ptr` and computed `hash`, so they're valid. // `self` trivially has the same `row_layout` as `self`. let ptr = row_ref.pointer(); + /* let existing_row = unsafe { Self::find_same_row(self, self, ptr, hash) }; if let Some(existing_row) = existing_row { @@ -298,13 +390,14 @@ impl Table { }; return Err(InsertError::Duplicate(existing_row)); } + */ self.row_count += 1; self.blob_store_bytes += blob_bytes; // If the optimistic insertion was correct, // i.e. this is not a set-semantic duplicate, // add it to the `pointer_map`. - self.pointer_map.insert(hash, ptr); + //self.pointer_map.insert(hash, ptr); Ok((hash, ptr)) }