Skip to content

Commit

Permalink
perf(468): Add metric counters for committed transactions and rollbac…
Browse files Browse the repository at this point in the history
…ks (#478)

Closes #468.
  • Loading branch information
joshua-spacetime authored Oct 28, 2023
1 parent 88952b3 commit 3c98b8d
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 67 deletions.
76 changes: 48 additions & 28 deletions crates/core/src/db/datastore/locking_tx_datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1676,8 +1676,8 @@ impl traits::Tx for Locking {
self.begin_mut_tx()
}

fn release_tx(&self, tx: Self::TxId) {
self.rollback_mut_tx(tx)
fn release_tx(&self, ctx: &ExecutionContext, tx: Self::TxId) {
self.rollback_mut_tx(ctx, tx)
}
}

Expand Down Expand Up @@ -1980,11 +1980,31 @@ impl traits::MutTx for Locking {
MutTxId { lock: inner }
}

fn rollback_mut_tx(&self, mut tx: Self::MutTxId) {
fn rollback_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) {
DB_METRICS
.rdb_num_txns_rolledback
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.inc();
tx.lock.rollback();
}

fn commit_mut_tx(&self, ctx: &ExecutionContext, mut tx: Self::MutTxId) -> super::Result<Option<TxData>> {
// Note, we record empty transactions in our metrics.
// That is, transactions that don't write any rows to the commit log.
DB_METRICS
.rdb_num_txns_committed
.with_label_values(&ctx.txn_type(), &ctx.database(), ctx.reducer_name().unwrap_or(""))
.inc();
tx.lock.commit()
}

#[cfg(test)]
fn rollback_mut_tx_for_test(&self, mut tx: Self::MutTxId) {
tx.lock.rollback();
}

fn commit_mut_tx(&self, mut tx: Self::MutTxId) -> super::Result<Option<TxData>> {
#[cfg(test)]
fn commit_mut_tx_for_test(&self, mut tx: Self::MutTxId) -> super::Result<Option<TxData>> {
tx.lock.commit()
}
}
Expand Down Expand Up @@ -2483,7 +2503,7 @@ mod tests {
StConstraintRow{ constraint_id: 5.into(), constraint_name: "ct_columns_table_id".to_string(), kind: ColumnIndexAttribute::INDEXED, table_id: 1.into(), columns: vec![0.into()] },
]
);
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
Ok(())
}

Expand Down Expand Up @@ -2536,7 +2556,7 @@ mod tests {
#[test]
fn test_create_table_post_commit() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let tx = datastore.begin_mut_tx();
let table_rows = datastore
.iter_by_col_eq_mut_tx(
Expand Down Expand Up @@ -2584,7 +2604,7 @@ mod tests {
#[test]
fn test_create_table_post_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
let tx = datastore.begin_mut_tx();
let table_rows = datastore
.iter_by_col_eq_mut_tx(
Expand Down Expand Up @@ -2642,7 +2662,7 @@ mod tests {
#[test]
fn test_schema_for_table_post_commit() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let tx = datastore.begin_mut_tx();
let schema = &*datastore.schema_for_table_mut_tx(&tx, table_id)?;
#[rustfmt::skip]
Expand Down Expand Up @@ -2670,7 +2690,7 @@ mod tests {
#[test]
fn test_schema_for_table_alter_indexes() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

let mut tx = datastore.begin_mut_tx();
let schema = datastore.schema_for_table_mut_tx(&tx, table_id)?.into_owned();
Expand All @@ -2682,7 +2702,7 @@ mod tests {
datastore.schema_for_table_mut_tx(&tx, table_id)?.indexes.is_empty(),
"no indexes should be left in the schema pre-commit"
);
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

let mut tx = datastore.begin_mut_tx();
assert!(
Expand All @@ -2699,7 +2719,7 @@ mod tests {
"created index should be present in schema pre-commit"
);

datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

let tx = datastore.begin_mut_tx();
assert_eq!(
Expand All @@ -2708,15 +2728,15 @@ mod tests {
"created index should be present in schema post-commit"
);

datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

Ok(())
}

#[test]
fn test_schema_for_table_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
let tx = datastore.begin_mut_tx();
let schema = datastore.schema_for_table_mut_tx(&tx, table_id);
assert!(schema.is_err());
Expand Down Expand Up @@ -2751,7 +2771,7 @@ mod tests {
let (datastore, mut tx, table_id) = setup_table()?;
// 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, u32_str_u32(0, "Foo", 18))?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let tx = datastore.begin_mut_tx();
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![u32_str_u32(1, "Foo", 18)]);
Expand All @@ -2762,10 +2782,10 @@ mod tests {
fn test_insert_post_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
let row = u32_str_u32(15, "Foo", 18); // 15 is ignored.
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
let tx = datastore.begin_mut_tx();
#[rustfmt::skip]
assert_eq!(all_rows(&datastore, &tx, table_id), vec![]);
Expand All @@ -2777,7 +2797,7 @@ mod tests {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let created_row = u32_str_u32(1, "Foo", 18);
let num_deleted = datastore.delete_by_rel_mut_tx(&mut tx, table_id, [created_row]);
Expand Down Expand Up @@ -2832,7 +2852,7 @@ mod tests {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let result = datastore.insert_mut_tx(&mut tx, table_id, row);
match result {
Expand All @@ -2852,11 +2872,11 @@ mod tests {
#[test]
fn test_unique_constraint_post_rollback() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row.clone())?;
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
let mut tx = datastore.begin_mut_tx();
datastore.insert_mut_tx(&mut tx, table_id, row)?;
#[rustfmt::skip]
Expand All @@ -2867,11 +2887,11 @@ mod tests {
#[test]
fn test_create_index_pre_commit() -> ResultTest<()> {
let (datastore, tx, table_id) = setup_table()?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let index_def = IndexDef::new("age_idx".to_string(), table_id, 2.into(), true);
datastore.create_index_mut_tx(&mut tx, index_def)?;
Expand Down Expand Up @@ -2914,11 +2934,11 @@ mod tests {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let index_def = IndexDef::new("age_idx".to_string(), table_id, 2.into(), true);
datastore.create_index_mut_tx(&mut tx, index_def)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let index_rows = datastore
.iter_mut_tx(&ExecutionContext::default(), &tx, ST_INDEXES_ID)?
Expand Down Expand Up @@ -2960,11 +2980,11 @@ mod tests {
let (datastore, mut tx, table_id) = setup_table()?;
let row = u32_str_u32(0, "Foo", 18); // 0 will be ignored.
datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;
let mut tx = datastore.begin_mut_tx();
let index_def = IndexDef::new("age_idx".to_string(), table_id, 2.into(), true);
datastore.create_index_mut_tx(&mut tx, index_def)?;
datastore.rollback_mut_tx(tx);
datastore.rollback_mut_tx_for_test(tx);
let mut tx = datastore.begin_mut_tx();
let index_rows = datastore
.iter_mut_tx(&ExecutionContext::default(), &tx, ST_INDEXES_ID)?
Expand Down Expand Up @@ -3002,7 +3022,7 @@ mod tests {
// Because of autoinc columns, we will get a slightly different
// value than the one we inserted.
let row = datastore.insert_mut_tx(&mut tx, table_id, row)?;
datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

let all_rows_col_0_eq_1 = |tx: &MutTxId| {
datastore
Expand Down Expand Up @@ -3039,7 +3059,7 @@ mod tests {
// second transaction, and see exactly one row.
assert_eq!(all_rows_col_0_eq_1(&tx).len(), 1);

datastore.commit_mut_tx(tx)?;
datastore.commit_mut_tx_for_test(tx)?;

Ok(())
}
Expand Down
12 changes: 9 additions & 3 deletions crates/core/src/db/datastore/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,21 @@ pub trait Tx {
type TxId;

fn begin_tx(&self) -> Self::TxId;
fn release_tx(&self, tx: Self::TxId);
fn release_tx(&self, ctx: &ExecutionContext, tx: Self::TxId);
}

pub trait MutTx {
type MutTxId;

fn begin_mut_tx(&self) -> Self::MutTxId;
fn rollback_mut_tx(&self, tx: Self::MutTxId);
fn commit_mut_tx(&self, tx: Self::MutTxId) -> Result<Option<TxData>>;
fn commit_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTxId) -> Result<Option<TxData>>;
fn rollback_mut_tx(&self, ctx: &ExecutionContext, tx: Self::MutTxId);

#[cfg(test)]
fn commit_mut_tx_for_test(&self, tx: Self::MutTxId) -> Result<Option<TxData>>;

#[cfg(test)]
fn rollback_mut_tx_for_test(&self, tx: Self::MutTxId);
}

pub trait TxDatastore: DataRow + Tx {
Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/db/db_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ metrics_group!(
#[help = "The cumulative number of rows fetched from a table"]
#[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)]
pub rdb_num_rows_fetched: IntCounterVec,

#[name = spacetime_num_txns_committed_cumulative]
#[help = "The cumulative number of committed transactions"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_num_txns_committed: IntCounterVec,

#[name = spacetime_num_txns_rolledback_cumulative]
#[help = "The cumulative number of rolled back transactions"]
#[labels(txn_type: TransactionType, db: Address, reducer: str)]
pub rdb_num_txns_rolledback: IntCounterVec,
}
);

Expand Down
34 changes: 20 additions & 14 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ impl RelationalDB {
}

#[tracing::instrument(skip_all)]
pub fn rollback_tx(&self, tx: MutTxId) {
pub fn rollback_tx(&self, ctx: &ExecutionContext, tx: MutTxId) {
log::trace!("ROLLBACK TX");
self.inner.rollback_mut_tx(tx)
self.inner.rollback_mut_tx(ctx, tx)
}

#[tracing::instrument(skip_all)]
pub fn commit_tx(&self, ctx: &ExecutionContext, tx: MutTxId) -> Result<Option<(TxData, Option<usize>)>, DBError> {
log::trace!("COMMIT TX");
if let Some(tx_data) = self.inner.commit_mut_tx(tx)? {
if let Some(tx_data) = self.inner.commit_mut_tx(ctx, tx)? {
let bytes_written = self.commit_log.append_tx(ctx, &tx_data, &self.inner)?;
return Ok(Some((tx_data, bytes_written)));
}
Expand Down Expand Up @@ -322,12 +322,12 @@ impl RelationalDB {
/// Similar in purpose to [`Self::with_auto_commit`], but returns the
/// [`MutTxId`] alongside the `Ok` result of the function `F` without
/// committing the transaction.
pub fn with_auto_rollback<F, A, E>(&self, mut tx: MutTxId, f: F) -> Result<(MutTxId, A), E>
pub fn with_auto_rollback<F, A, E>(&self, ctx: &ExecutionContext, mut tx: MutTxId, f: F) -> Result<(MutTxId, A), E>
where
F: FnOnce(&mut MutTxId) -> Result<A, E>,
{
let res = f(&mut tx);
self.rollback_on_err(tx, res)
self.rollback_on_err(ctx, tx, res)
}

/// Run a fallible function in a transaction.
Expand All @@ -339,14 +339,14 @@ impl RelationalDB {
/// TODO(jgilles): when we support actual read-only transactions, use those here instead.
/// TODO(jgilles, kim): get this merged with the above function (two people had similar ideas
/// at the same time)
pub fn with_read_only<F, A, E>(&self, f: F) -> Result<A, E>
pub fn with_read_only<F, A, E>(&self, ctx: &ExecutionContext, f: F) -> Result<A, E>
where
F: FnOnce(&mut MutTxId) -> Result<A, E>,
E: From<DBError>,
{
let mut tx = self.begin_tx();
let res = f(&mut tx);
self.rollback_tx(tx);
self.rollback_tx(ctx, tx);
res
}

Expand All @@ -357,7 +357,7 @@ impl RelationalDB {
E: From<DBError>,
{
if res.is_err() {
self.rollback_tx(tx);
self.rollback_tx(ctx, tx);
} else {
match self.commit_tx(ctx, tx).map_err(E::from)? {
Some(_) => (),
Expand All @@ -369,10 +369,15 @@ impl RelationalDB {

/// Roll back transaction `tx` if `res` is `Err`, otherwise return it
/// alongside the `Ok` value.
pub fn rollback_on_err<A, E>(&self, tx: MutTxId, res: Result<A, E>) -> Result<(MutTxId, A), E> {
pub fn rollback_on_err<A, E>(
&self,
ctx: &ExecutionContext,
tx: MutTxId,
res: Result<A, E>,
) -> Result<(MutTxId, A), E> {
match res {
Err(e) => {
self.rollback_tx(tx);
self.rollback_tx(ctx, tx);
Err(e)
}
Ok(a) => Ok((tx, a)),
Expand Down Expand Up @@ -924,7 +929,7 @@ mod tests {
let mut schema = TableDef::from(ProductType::from([("my_col", AlgebraicType::I32)]));
schema.table_name = "MyTable".to_string();
let table_id = stdb.create_table(&mut tx, schema)?;
stdb.rollback_tx(tx);
stdb.rollback_tx(&ExecutionContext::default(), tx);

let mut tx = stdb.begin_tx();
let result = stdb.drop_table(&mut tx, table_id);
Expand All @@ -937,21 +942,22 @@ mod tests {
let (stdb, _tmp_dir) = make_test_db()?;

let mut tx = stdb.begin_tx();
let ctx = ExecutionContext::default();

let mut schema = TableDef::from(ProductType::from([("my_col", AlgebraicType::I32)]));
schema.table_name = "MyTable".to_string();
let table_id = stdb.create_table(&mut tx, schema)?;
stdb.commit_tx(&ExecutionContext::default(), tx)?;
stdb.commit_tx(&ctx, tx)?;

let mut tx = stdb.begin_tx();
stdb.insert(&mut tx, table_id, product![AlgebraicValue::I32(-1)])?;
stdb.insert(&mut tx, table_id, product![AlgebraicValue::I32(0)])?;
stdb.insert(&mut tx, table_id, product![AlgebraicValue::I32(1)])?;
stdb.rollback_tx(tx);
stdb.rollback_tx(&ctx, tx);

let tx = stdb.begin_tx();
let mut rows = stdb
.iter(&ExecutionContext::default(), &tx, table_id)?
.iter(&ctx, &tx, table_id)?
.map(|r| *r.view().elements[0].as_i32().unwrap())
.collect::<Vec<i32>>();
rows.sort();
Expand Down
Loading

1 comment on commit 3c98b8d

@github-actions
Copy link

@github-actions github-actions bot commented on 3c98b8d Oct 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.