Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MutTxId::insert_2 as impl for user tables #2009

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ impl CommittedState {
tx_data
}

fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: IntMap<TableId, DeleteTable>) {
for (table_id, row_ptrs) in delete_tables {
if let Some((table, blob_store)) = self.get_table_and_blob_store(table_id) {
let mut deletes = Vec::with_capacity(row_ptrs.len());
Expand All @@ -500,7 +500,7 @@ impl CommittedState {
// holds only committed rows which should be deleted,
// i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`,
// so no need to check before applying the deletes.
for row_ptr in row_ptrs.iter().copied() {
for row_ptr in row_ptrs {
debug_assert!(row_ptr.squashed_offset().is_committed_state());

// TODO: re-write `TxData` to remove `ProductValue`s
Expand All @@ -524,7 +524,7 @@ impl CommittedState {
fn merge_apply_inserts(
&mut self,
tx_data: &mut TxData,
insert_tables: BTreeMap<TableId, Table>,
insert_tables: IntMap<TableId, Table>,
tx_blob_store: impl BlobStore,
) {
// TODO(perf): Consider moving whole pages from the `insert_tables` into the committed state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::{
tx::TxId,
tx_state::TxState,
};
use crate::execution_context::Workload;
use crate::{db::datastore::system_tables::ST_RESERVED_SEQUENCE_RANGE, execution_context::Workload};
use crate::{
db::{
datastore::{
Expand Down Expand Up @@ -564,7 +564,11 @@ impl MutTxDatastore for Locking {
table_id: TableId,
mut row: ProductValue,
) -> Result<(AlgebraicValue, RowRef<'a>)> {
let (gens, row_ref) = tx.insert(table_id, &mut row)?;
let (gens, row_ref) = if table_id.0 <= ST_RESERVED_SEQUENCE_RANGE {
tx.insert(table_id, &mut row)?
} else {
tx.insert_2(table_id, &mut row)?
};
Ok((gens, row_ref.collapse()))
}

Expand Down
142 changes: 140 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
tx_state::{DeleteTable, IndexIdMap, TxState},
SharedMutexGuard, SharedWriteGuard,
};
use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID};
use crate::db::datastore::system_tables::{StRowLevelSecurityFields, StRowLevelSecurityRow, ST_RESERVED_SEQUENCE_RANGE, ST_ROW_LEVEL_SECURITY_ID};
use crate::db::datastore::{
system_tables::{
StColumnFields, StColumnRow, StConstraintFields, StConstraintRow, StFields as _, StIndexFields, StIndexRow,
Expand Down Expand Up @@ -1147,6 +1147,24 @@ impl<'a> Iterator for IndexScanFilterDeleted<'a> {
}

impl MutTxId {
/// Insert a row into a table.
///
/// Requires:
/// - `TableId` must refer to a valid table for the database at `database_address`.
/// - `row` must be a valid row for the table at `table_id`.
///
/// Returns:
/// - a product value with a projection of the row containing only the generated column values.
/// - a ref to the inserted row.
pub(super) fn insert_2<'a>(
&'a mut self,
table_id: TableId,
row: &mut ProductValue,
) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> {
let row_ref = self.insert_row_internal_2(table_id, row)?;
Ok((AlgebraicValue::product([]), row_ref))
}

/// Insert a row into a table.
///
/// Requires:
Expand All @@ -1162,6 +1180,7 @@ impl MutTxId {
row: &mut ProductValue,
) -> Result<(AlgebraicValue, RowRefInsertion<'a>)> {
let generated = self.write_sequence_values(table_id, row)?;

let row_ref = self.insert_row_internal(table_id, row)?;
Ok((generated, row_ref))
}
Expand Down Expand Up @@ -1193,6 +1212,125 @@ impl MutTxId {
Ok(row.project(&cols_to_update)?)
}

pub(super) fn insert_row_internal_2(&mut self, table_id: TableId, row: &ProductValue) -> Result<RowRefInsertion<'_>> {
let commit_table = self.committed_state_write_lock.get_table(table_id);

// NOTE(basics): update path, with a single `#[primary_key]` constraint.
// There are no other indices or constraint on the table.
//
// We proceed to:
//
// 1. Find the row using a `IndexId`, equating the index to a value.
// This first tries the committed state, as that is most likely.
// If not found in the committed state, find in tx state.
//
// 2. Update the row to the provided PV by deleting in the committed state
// and setting the new value in the tx state.
// Alternatively, if the row came from tx state, update the row in-place in tx state.
//
// Notably, this doesn't need to check constraints and set-semantic violations,
// as those are implied by `#[primary_key]`.
// We don't even need to maintain a pointer map.
// We don't implement this here and instead attempt an approximation by deleting checks.

// SKIP(basics): update path, no columns with a unique constraints were changed,
// so don't check this.
/*
// Check for constraint violations as early as possible,
// to ensure that `UniqueConstraintViolation` errors have precedence over other errors.
// `tx_table.insert` will later perform the same check on the tx table,
// so this method needs only to check the committed state.
if let Some(commit_table) = commit_table {
commit_table
.check_unique_constraints(row, |maybe_conflict| self.tx_state.is_deleted(table_id, maybe_conflict))
.map_err(IndexError::from)?;
}
*/

// Get the insert table, so we can write the row into it.
let (tx_table, tx_blob_store, _, delete_table) = self
.tx_state
.get_table_and_blob_store_or_maybe_create_from(table_id, commit_table)
.ok_or(TableError::IdNotFoundState(table_id))?;

match tx_table.insert_2(tx_blob_store, row) {
Ok(row_ref) => {
// `row` not previously present in insert tables,
// but may still be a set-semantic conflict with a row
// in the committed state.
let ptr = row_ref.pointer();

// SKIP(basics): we don't need to check for set semantic collisions.
/*
if let Some(commit_table) = commit_table {
// Safety:
// - `commit_table` and `tx_table` use the same schema
// because `tx_table` is derived from `commit_table`.
// - `ptr` and `hash` are correct because we just got them from `tx_table.insert`.
if let Some(committed_ptr) = unsafe { Table::find_same_row(commit_table, tx_table, ptr, hash) } {
// If `row` was already present in the committed state,
// either this is a set-semantic duplicate,
// or the row is marked as deleted, so we will undelete it
// and leave it in the committed state.
// Either way, it should not appear in the insert tables,
// so roll back the insertion.
//
// NOTE for future MVCC implementors:
// In MVCC, it is no longer valid to elide inserts in this way.
// When a transaction inserts a row, that row *must* appear in its insert tables,
// even if the row is already present in the committed state.
//
// Imagine a chain of committed but un-squashed transactions:
// `Committed 0: Insert Row A` - `Committed 1: Delete Row A`
// where `Committed 1` happens after `Committed 0`.
// Imagine a transaction `Running 2: Insert Row A`,
// which began before `Committed 1` was committed.
// Because `Committed 1` has since been committed,
// `Running 2` *must* happen after `Committed 1`.
// Therefore, the correct sequence of events is:
// - Insert Row A
// - Delete Row A
// - Insert Row A
// This is impossible to recover if `Running 2` elides its insert.
tx_table
.delete(tx_blob_store, ptr, |_| ())
.expect("Failed to delete a row we just inserted");

// It's possible that `row` appears in the committed state,
// but is marked as deleted.
// In this case, undelete it, so it remains in the committed state.
delete_table.remove(&committed_ptr);

// No new row was inserted, but return `committed_ptr`.
let blob_store = &self.committed_state_write_lock.blob_store;
return Ok(RowRefInsertion::Existed(
// SAFETY: `find_same_row` told us that `ptr` refers to a valid row in `commit_table`.
unsafe { commit_table.get_row_ref_unchecked(blob_store, committed_ptr) },
));
}
}
*/

Ok(RowRefInsertion::Inserted(unsafe {
// SAFETY: `ptr` came from `tx_table.insert` just now without any interleaving calls.
tx_table.get_row_ref_unchecked(tx_blob_store, ptr)
}))
}
// `row` previously present in insert tables; do nothing but return `ptr`.
Err(InsertError::Duplicate(ptr)) => Ok(RowRefInsertion::Existed(
// SAFETY: `tx_table` told us that `ptr` refers to a valid row in it.
unsafe { tx_table.get_row_ref_unchecked(tx_blob_store, ptr) },
)),

// Index error: unbox and return `TableError::IndexError`
// rather than `TableError::Insert(InsertError::IndexError)`.
Err(InsertError::IndexError(e)) => Err(IndexError::from(e).into()),

// Misc. insertion error; fail.
Err(e) => Err(TableError::Insert(e).into()),
}
}

pub(super) fn insert_row_internal(&mut self, table_id: TableId, row: &ProductValue) -> Result<RowRefInsertion<'_>> {
let commit_table = self.committed_state_write_lock.get_table(table_id);

Expand All @@ -1217,8 +1355,8 @@ impl MutTxId {
// `row` not previously present in insert tables,
// but may still be a set-semantic conflict with a row
// in the committed state.

let ptr = row_ref.pointer();

if let Some(commit_table) = commit_table {
// Safety:
// - `commit_table` and `tx_table` use the same schema
Expand Down
85 changes: 45 additions & 40 deletions crates/core/src/db/datastore/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{
committed_state::CommittedIndexIter, committed_state::CommittedState, datastore::Result, tx_state::TxState,
committed_state::{CommittedIndexIter, CommittedState},
datastore::Result,
tx_state::{DeleteTable, TxState},
};
use crate::{
db::datastore::system_tables::{
Expand All @@ -13,7 +15,10 @@ use core::ops::RangeBounds;
use spacetimedb_primitives::{ColList, TableId};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_table::table::{IndexScanIter, RowRef, TableScanIter};
use spacetimedb_table::{
blob_store::HashMapBlobStore,
table::{IndexScanIter, RowRef, Table, TableScanIter},
};
use std::sync::Arc;

// StateView trait, is designed to define the behavior of viewing internal datastore states.
Expand Down Expand Up @@ -146,36 +151,40 @@ pub trait StateView {

pub struct Iter<'a> {
table_id: TableId,
tx_state: Option<&'a TxState>,
tx_state_del: Option<&'a DeleteTable>,
tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>,
committed_state: &'a CommittedState,
#[allow(dead_code)]
table_name: &'a str,
stage: ScanStage<'a>,
num_committed_rows_fetched: u64,
}

impl<'a> Iter<'a> {
pub(super) fn new(
table_id: TableId,
table_name: &'a str,
_table_name: &'a str,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
) -> Self {
let tx_state_ins = tx_state.and_then(|tx| {
let ins = tx.insert_tables.get(&table_id)?;
let bs = &tx.blob_store;
Some((ins, bs))
});
let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id));
Self {
table_id,
tx_state,
tx_state_ins,
tx_state_del,
committed_state,
table_name,
stage: ScanStage::Start,
num_committed_rows_fetched: 0,
}
}
}

enum ScanStage<'a> {
Start,
CurrentTx { iter: TableScanIter<'a> },
Committed { iter: TableScanIter<'a> },
CommittedNoTxDeletes { iter: TableScanIter<'a> },
CommittedWithTxDeletes { iter: TableScanIter<'a> },
}

impl<'a> Iterator for Iter<'a> {
Expand All @@ -184,18 +193,6 @@ impl<'a> Iterator for Iter<'a> {
fn next(&mut self) -> Option<Self::Item> {
let table_id = self.table_id;

// Moves the current scan stage to the current tx if rows were inserted in it.
// Returns `None` otherwise.
// NOTE(pgoldman 2024-01-05): above comment appears to not describe the behavior of this function.
let maybe_stage_current_tx_inserts = |this: &mut Self| {
let table = &this.tx_state?;
let insert_table = table.insert_tables.get(&table_id)?;
this.stage = ScanStage::CurrentTx {
iter: insert_table.scan_rows(&table.blob_store),
};
Some(())
};

// The finite state machine goes:
// Start --> CurrentTx ---\
// | ^ |
Expand All @@ -208,19 +205,27 @@ impl<'a> Iterator for Iter<'a> {
if let Some(table) = self.committed_state.tables.get(&table_id) {
// The committed state has changes for this table.
// Go through them in (1).
self.stage = ScanStage::Committed {
iter: table.scan_rows(&self.committed_state.blob_store),
let iter = table.scan_rows(&self.committed_state.blob_store);
self.stage = if self.tx_state_del.is_some() {
ScanStage::CommittedWithTxDeletes { iter }
} else {
ScanStage::CommittedNoTxDeletes { iter }
};
} else {
// No committed changes, so look for inserts in the current tx in (2).
maybe_stage_current_tx_inserts(self);
continue;
}
}
ScanStage::Committed { iter } => {
ScanStage::CommittedNoTxDeletes { iter } => {
if let next @ Some(_) = iter.next() {
return next;
}
}
ScanStage::CommittedWithTxDeletes { iter } => {
// (1) Go through the committed state for this table.
for row_ref in iter {
// Increment metric for number of committed rows scanned.
self.num_committed_rows_fetched += 1;
let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() };
if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) {
return next;
}
/*
// Check the committed row's state in the current tx.
// If it's been deleted, skip it.
// If it's still present, yield it.
Expand All @@ -247,11 +252,7 @@ impl<'a> Iterator for Iter<'a> {
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
if self
.tx_state
.filter(|tx_state| tx_state.is_deleted(table_id, row_ref.pointer()))
.is_none()
{
if !del_tables.contains(&row_ref.pointer()) {
// There either are no state changes for the current tx (`None`),
// or there are, but `row_id` specifically has not been changed.
// Either way, the row is in the committed state
Expand All @@ -260,15 +261,19 @@ impl<'a> Iterator for Iter<'a> {
return Some(row_ref);
}
}
// (3) We got here, so we must've exhausted the committed changes.
// Start looking in the current tx for inserts, if any, in (2).
maybe_stage_current_tx_inserts(self)?;
*/
}
ScanStage::CurrentTx { iter } => {
// (2) look for inserts in the current tx.
return iter.next();
}
}

// (3) We got here, so we must've exhausted the committed changes.
// Start looking in the current tx for inserts, if any, in (2).
let (insert_table, blob_store) = self.tx_state_ins?;
let iter = insert_table.scan_rows(blob_store);
self.stage = ScanStage::CurrentTx { iter };
}
}
}
Expand Down
Loading
Loading