Skip to content

Commit

Permalink
Speed up state_view::Iter by special casing committed-with-no-delet…
Browse files Browse the repository at this point in the history
…es (#2003)
  • Loading branch information
Centril authored Nov 28, 2024
1 parent 7f22d88 commit 1992b8e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl StateView for CommittedState {
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
if let Some(table_name) = self.table_name(table_id) {
return Ok(Iter::new(table_id, table_name, None, self));
if self.table_name(table_id).is_some() {
return Ok(Iter::new(table_id, None, self));
}
Err(TableError::IdNotFound(SystemTable::st_table, table_id.0).into())
}
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,9 @@ impl StateView for MutTxId {
}

fn iter(&self, table_id: TableId) -> Result<Iter<'_>> {
if let Some(table_name) = self.table_name(table_id) {
if self.table_name(table_id).is_some() {
return Ok(Iter::new(
table_id,
table_name,
Some(&self.tx_state),
&self.committed_state_write_lock,
));
Expand Down
176 changes: 91 additions & 85 deletions crates/core/src/db/datastore/locking_tx_datastore/state_view.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::{
committed_state::CommittedIndexIter, committed_state::CommittedState, datastore::Result, tx_state::TxState,
committed_state::{CommittedIndexIter, CommittedState},
datastore::Result,
tx_state::{DeleteTable, TxState},
};
use crate::{
db::datastore::system_tables::{
Expand All @@ -13,7 +15,10 @@ use core::ops::RangeBounds;
use spacetimedb_primitives::{ColList, TableId};
use spacetimedb_sats::AlgebraicValue;
use spacetimedb_schema::schema::{ColumnSchema, TableSchema};
use spacetimedb_table::table::{IndexScanIter, RowRef, TableScanIter};
use spacetimedb_table::{
blob_store::HashMapBlobStore,
table::{IndexScanIter, RowRef, Table, TableScanIter},
};
use std::sync::Arc;

// StateView trait, is designed to define the behavior of viewing internal datastore states.
Expand Down Expand Up @@ -146,36 +151,42 @@ pub trait StateView {

pub struct Iter<'a> {
table_id: TableId,
tx_state: Option<&'a TxState>,
tx_state_del: Option<&'a DeleteTable>,
tx_state_ins: Option<(&'a Table, &'a HashMapBlobStore)>,
committed_state: &'a CommittedState,
#[allow(dead_code)]
table_name: &'a str,
stage: ScanStage<'a>,
num_committed_rows_fetched: u64,
}

impl<'a> Iter<'a> {
pub(super) fn new(
table_id: TableId,
table_name: &'a str,
tx_state: Option<&'a TxState>,
committed_state: &'a CommittedState,
) -> Self {
pub(super) fn new(table_id: TableId, tx_state: Option<&'a TxState>, committed_state: &'a CommittedState) -> Self {
let tx_state_ins = tx_state.and_then(|tx| {
let ins = tx.insert_tables.get(&table_id)?;
let bs = &tx.blob_store;
Some((ins, bs))
});
let tx_state_del = tx_state.and_then(|tx| tx.delete_tables.get(&table_id));
Self {
table_id,
tx_state,
tx_state_ins,
tx_state_del,
committed_state,
table_name,
stage: ScanStage::Start,
num_committed_rows_fetched: 0,
}
}
}

enum ScanStage<'a> {
/// We haven't decided yet where to yield from.
Start,
/// Yielding rows from the current tx.
CurrentTx { iter: TableScanIter<'a> },
Committed { iter: TableScanIter<'a> },
/// Yielding rows from the committed state
/// without considering tx state deletes as there are none.
CommittedNoTxDeletes { iter: TableScanIter<'a> },
/// Yielding rows from the committed state
/// but there are deleted rows in the tx state,
/// so we must check against those.
CommittedWithTxDeletes { iter: TableScanIter<'a> },
}

impl<'a> Iterator for Iter<'a> {
Expand All @@ -184,91 +195,86 @@ impl<'a> Iterator for Iter<'a> {
fn next(&mut self) -> Option<Self::Item> {
let table_id = self.table_id;

// Moves the current scan stage to the current tx if rows were inserted in it.
// Returns `None` otherwise.
// NOTE(pgoldman 2024-01-05): above comment appears to not describe the behavior of this function.
let maybe_stage_current_tx_inserts = |this: &mut Self| {
let table = &this.tx_state?;
let insert_table = table.insert_tables.get(&table_id)?;
this.stage = ScanStage::CurrentTx {
iter: insert_table.scan_rows(&table.blob_store),
};
Some(())
};

// The finite state machine goes:
// Start --> CurrentTx ---\
// | ^ |
// v | v
// Committed ---/------> Stop
//
// Start
// |
// |--> CurrentTx -------------------------------\
// | ^ |
// | \--------------------\ |
// | ^ |
// |--> CommittedNoTxDeletes ----|---------------\
// | ^ v
// \--> CommittedWithTxDeletes --|------/----> Stop

loop {
match &mut self.stage {
ScanStage::Start => {
if let Some(table) = self.committed_state.tables.get(&table_id) {
// The committed state has changes for this table.
// Go through them in (1).
self.stage = ScanStage::Committed {
iter: table.scan_rows(&self.committed_state.blob_store),
let iter = table.scan_rows(&self.committed_state.blob_store);
self.stage = if self.tx_state_del.is_some() {
// There are no deletes in the tx state
// so we don't need to care about those (1a).
ScanStage::CommittedWithTxDeletes { iter }
} else {
// There are deletes in the tx state
// so we must exclude those (1b).
ScanStage::CommittedNoTxDeletes { iter }
};
} else {
// No committed changes, so look for inserts in the current tx in (2).
maybe_stage_current_tx_inserts(self);
continue;
}
}
ScanStage::Committed { iter } => {
// (1) Go through the committed state for this table.
for row_ref in iter {
// Increment metric for number of committed rows scanned.
self.num_committed_rows_fetched += 1;
// Check the committed row's state in the current tx.
// If it's been deleted, skip it.
// If it's still present, yield it.
// Note that the committed state and the insert tables are disjoint sets,
// so at this point we know the row will not be yielded in (2).
//
// NOTE for future MVCC implementors:
// In MVCC, it is no longer valid to elide inserts in this way.
// When a transaction inserts a row, that row *must* appear in its insert tables,
// even if the row is already present in the committed state.
//
// Imagine a chain of committed but un-squashed transactions:
// `Committed 0: Insert Row A` - `Committed 1: Delete Row A`
// where `Committed 1` happens after `Committed 0`.
// Imagine a transaction `Running 2: Insert Row A`,
// which began before `Committed 1` was committed.
// Because `Committed 1` has since been committed,
// `Running 2` *must* happen after `Committed 1`.
// Therefore, the correct sequence of events is:
// - Insert Row A
// - Delete Row A
// - Insert Row A
// This is impossible to recover if `Running 2` elides its insert.
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
if self
.tx_state
.filter(|tx_state| tx_state.is_deleted(table_id, row_ref.pointer()))
.is_none()
{
// There either are no state changes for the current tx (`None`),
// or there are, but `row_id` specifically has not been changed.
// Either way, the row is in the committed state
// and hasn't been removed in the current tx,
// so it exists and can be returned.
return Some(row_ref);
}
ScanStage::CommittedNoTxDeletes { iter } => {
// (1a) Go through the committed state for this table
// but do not consider deleted rows.
if let next @ Some(_) = iter.next() {
return next;
}
}
ScanStage::CommittedWithTxDeletes { iter } => {
// (1b) Check the committed row's state in the current tx.
// If it's been deleted, skip it.
// If it's still present, yield it.
// Note that the committed state and the insert tables are disjoint sets,
// so at this point we know the row will not be yielded in (3).
//
// NOTE for future MVCC implementors:
// In MVCC, it is no longer valid to elide inserts in this way.
// When a transaction inserts a row, that row *must* appear in its insert tables,
// even if the row is already present in the committed state.
//
// Imagine a chain of committed but un-squashed transactions:
// `Committed 0: Insert Row A` - `Committed 1: Delete Row A`
// where `Committed 1` happens after `Committed 0`.
// Imagine a transaction `Running 2: Insert Row A`,
// which began before `Committed 1` was committed.
// Because `Committed 1` has since been committed,
// `Running 2` *must* happen after `Committed 1`.
// Therefore, the correct sequence of events is:
// - Insert Row A
// - Delete Row A
// - Insert Row A
// This is impossible to recover if `Running 2` elides its insert.
//
// As a result, in MVCC, this branch will need to check if the `row_ref`
// also exists in the `tx_state.insert_tables` and ensure it is yielded only once.
let del_tables = unsafe { self.tx_state_del.unwrap_unchecked() };
if let next @ Some(_) = iter.find(|row_ref| !del_tables.contains(&row_ref.pointer())) {
return next;
}
// (3) We got here, so we must've exhausted the committed changes.
// Start looking in the current tx for inserts, if any, in (2).
maybe_stage_current_tx_inserts(self)?;
}
ScanStage::CurrentTx { iter } => {
// (2) look for inserts in the current tx.
// (3) look for inserts in the current tx.
return iter.next();
}
}

// (2) We got here, so we must've exhausted the committed changes.
// Start looking in the current tx for inserts, if any, in (3).
let (insert_table, blob_store) = self.tx_state_ins?;
let iter = insert_table.scan_rows(blob_store);
self.stage = ScanStage::CurrentTx { iter };
}
}
}
Expand Down

2 comments on commit 1992b8e

@github-actions
Copy link

@github-actions github-actions bot commented on 1992b8e Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on 1992b8e Nov 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callgrind benchmark results

Callgrind Benchmark Report

These benchmarks were run using callgrind,
an instruction-level profiler. They allow comparisons between sqlite (sqlite), SpacetimeDB running through a module (stdb_module), and the underlying SpacetimeDB data storage engine (stdb_raw). Callgrind emulates a CPU to collect the below estimates.

Measurement changes larger than five percent are in bold.

In-memory benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6432 6426 0.09% 6540 6544 -0.06%
sqlite 5609 5585 0.43% 6073 6113 -0.65%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 74706 76579 -2.45% 75138 77011 -2.43%
stdb_raw u32_u64_str no_index 64 128 2 string 116932 118813 -1.58% 117590 119511 -1.61%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25118 25110 0.03% 25632 25656 -0.09%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24084 24078 0.02% 24544 24518 0.11%
sqlite u32_u64_str no_index 64 128 2 string 144433 144695 -0.18% 145885 146165 -0.19%
sqlite u32_u64_str no_index 64 128 1 u64 123763 124044 -0.23% 125025 125280 -0.20%
sqlite u32_u64_str btree_each_column 64 128 1 u64 131080 131361 -0.21% 132526 132817 -0.22%
sqlite u32_u64_str btree_each_column 64 128 2 string 134222 134494 -0.20% 135822 136140 -0.23%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 879154 878597 0.06% 902792 898103 0.52%
stdb_raw u32_u64_str btree_each_column 64 128 1029887 1028168 0.17% 1054279 1054368 -0.01%
sqlite u32_u64_str unique_0 64 128 399360 398326 0.26% 418726 417714 0.24%
sqlite u32_u64_str btree_each_column 64 128 984611 983637 0.10% 1018541 1020547 -0.20%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 138485 153810 -9.96% 138559 153926 -9.98%
stdb_raw u32_u64_str unique_0 64 15910 16835 -5.49% 15976 16935 -5.66%
sqlite u32_u64_str unique_0 1024 1042718 1068281 -2.39% 1046054 1071651 -2.39%
sqlite u32_u64_str unique_0 64 74704 76267 -2.05% 75782 77411 -2.10%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50180 50214 -0.07%
64 bsatn 25509 25509 0.00% 27719 27753 -0.12%
16 bsatn 8200 8200 0.00% 9560 9560 0.00%
16 json 12188 12188 0.00% 14092 14126 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 20570562 20124016 2.22% 21252578 20570048 3.32%
stdb_raw u32_u64_str unique_0 64 128 1288954 1288354 0.05% 1366568 1354998 0.85%
sqlite u32_u64_str unique_0 1024 1024 1802137 1802182 -0.00% 1811317 1811352 -0.00%
sqlite u32_u64_str unique_0 64 128 128540 128528 0.01% 131492 131244 0.19%
On-disk benchmarks

callgrind: empty transaction

db total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw 6437 6431 0.09% 6541 6557 -0.24%
sqlite 5651 5627 0.43% 6141 6211 -1.13%

callgrind: filter

db schema indices count preload _column data_type total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str no_index 64 128 1 u64 74711 76584 -2.45% 75107 76988 -2.44%
stdb_raw u32_u64_str no_index 64 128 2 string 116953 118826 -1.58% 117639 119488 -1.55%
stdb_raw u32_u64_str btree_each_column 64 128 2 string 25121 25115 0.02% 25607 25601 0.02%
stdb_raw u32_u64_str btree_each_column 64 128 1 u64 24089 24083 0.02% 24545 24503 0.17%
sqlite u32_u64_str no_index 64 128 1 u64 125684 125965 -0.22% 127278 127565 -0.22%
sqlite u32_u64_str no_index 64 128 2 string 146336 146616 -0.19% 148136 148486 -0.24%
sqlite u32_u64_str btree_each_column 64 128 2 string 136418 136616 -0.14% 138398 138800 -0.29%
sqlite u32_u64_str btree_each_column 64 128 1 u64 133176 133457 -0.21% 134958 135391 -0.32%

callgrind: insert bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 64 128 828670 827030 0.20% 851564 845706 0.69%
stdb_raw u32_u64_str btree_each_column 64 128 977686 977613 0.01% 1032010 1034455 -0.24%
sqlite u32_u64_str unique_0 64 128 416914 415857 0.25% 435848 434579 0.29%
sqlite u32_u64_str btree_each_column 64 128 1023158 1021908 0.12% 1056170 1057620 -0.14%

callgrind: iterate

db schema indices count total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 138474 153815 -9.97% 138536 153915 -9.99%
stdb_raw u32_u64_str unique_0 64 15915 16840 -5.49% 15977 16940 -5.68%
sqlite u32_u64_str unique_0 1024 1045796 1071349 -2.39% 1049672 1075093 -2.36%
sqlite u32_u64_str unique_0 64 76486 78045 -2.00% 77848 79401 -1.96%

callgrind: serialize_product_value

count format total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
64 json 47528 47528 0.00% 50180 50214 -0.07%
64 bsatn 25509 25509 0.00% 27719 27753 -0.12%
16 bsatn 8200 8200 0.00% 9560 9560 0.00%
16 json 12188 12188 0.00% 14092 14126 -0.24%

callgrind: update bulk

db schema indices count preload total reads + writes old total reads + writes Δrw estimated cycles old estimated cycles Δcycles
stdb_raw u32_u64_str unique_0 1024 1024 19056703 19051047 0.03% 19812463 19575873 1.21%
stdb_raw u32_u64_str unique_0 64 128 1241863 1240287 0.13% 1319119 1303919 1.17%
sqlite u32_u64_str unique_0 1024 1024 1809785 1809743 0.00% 1818461 1818413 0.00%
sqlite u32_u64_str unique_0 64 128 132693 132654 0.03% 135789 135598 0.14%

Please sign in to comment.