diff --git a/crates/core/src/db/datastore/locking_tx_datastore/btree_index.rs b/crates/core/src/db/datastore/locking_tx_datastore/btree_index.rs index 6b443dc0a5..4bc1851770 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/btree_index.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/btree_index.rs @@ -62,6 +62,7 @@ impl Iterator for BTreeIndexIter<'_> { /// [BTreeIndex] pub struct BTreeIndexRangeIter<'a> { range_iter: btree_set::Range<'a, IndexKey>, + num_keys_scanned: u64, } impl<'a> Iterator for BTreeIndexRangeIter<'a> { @@ -69,7 +70,19 @@ impl<'a> Iterator for BTreeIndexRangeIter<'a> { #[tracing::instrument(skip_all)] fn next(&mut self) -> Option { - self.range_iter.next().map(|key| &key.row_id) + if let Some(key) = self.range_iter.next() { + self.num_keys_scanned += 1; + Some(&key.row_id) + } else { + None + } + } +} + +impl BTreeIndexRangeIter<'_> { + /// Returns the current number of keys the iterator has scanned. + pub fn keys_scanned(&self) -> u64 { + self.num_keys_scanned } } @@ -161,6 +174,7 @@ impl BTreeIndex { let end = map(range.end_bound(), DataKey::max_datakey()); BTreeIndexRangeIter { range_iter: self.idx.range((start, end)), + num_keys_scanned: 0, } } diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs index 01e32de5b4..792206983f 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mod.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mod.rs @@ -1495,11 +1495,13 @@ impl Inner { // The current transaction has modified this table, and the table is indexed. let tx_state = self.tx_state.as_ref().unwrap(); Ok(IterByColRange::Index(IndexSeekIterInner { + ctx, table_id: *table_id, tx_state, inserted_rows, committed_rows: self.committed_state.index_seek(table_id, &cols, &range), committed_state: &self.committed_state, + num_committed_rows_fetched: 0, })) } else { // Either the current transaction has not modified this table, or the table is not @@ -1513,10 +1515,12 @@ impl Inner { scan_iter: self.iter(ctx, table_id)?, })), Some(tx_state) => Ok(IterByColRange::CommittedIndex(CommittedIndexIter { + ctx, table_id: *table_id, tx_state, committed_state: &self.committed_state, committed_rows, + num_committed_rows_fetched: 0, })), }, None => Ok(IterByColRange::Scan(ScanIterByColRange { @@ -1689,7 +1693,7 @@ pub struct Iter<'a> { table_id: TableId, inner: &'a Inner, stage: ScanStage<'a>, - committed_rows_fetched: u64, + num_committed_rows_fetched: u64, } impl Drop for Iter<'_> { @@ -1702,7 +1706,7 @@ impl Drop for Iter<'_> { self.ctx.reducer_name().unwrap_or_default(), &self.table_id.into(), ) - .inc_by(self.committed_rows_fetched); + .inc_by(self.num_committed_rows_fetched); } } @@ -1713,7 +1717,7 @@ impl<'a> Iter<'a> { table_id, inner, stage: ScanStage::Start, - committed_rows_fetched: 0, + num_committed_rows_fetched: 0, } } } @@ -1771,7 +1775,7 @@ impl<'a> Iterator for Iter<'a> { let _span = tracing::debug_span!("ScanStage::Committed").entered(); for (row_id, row) in iter { // Increment metric for number of committed rows scanned. - self.committed_rows_fetched += 1; + self.num_committed_rows_fetched += 1; // Check the committed row's state in the current tx. match self.inner.tx_state.as_ref().map(|tx_state| tx_state.get_row_op(&table_id, row_id)) { Some(RowState::Committed(_)) => unreachable!("a row cannot be committed in a tx state"), @@ -1802,11 +1806,50 @@ impl<'a> Iterator for Iter<'a> { } pub struct IndexSeekIterInner<'a> { + ctx: &'a ExecutionContext<'a>, table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState, inserted_rows: BTreeIndexRangeIter<'a>, committed_rows: Option>, + num_committed_rows_fetched: u64, +} + +impl Drop for IndexSeekIterInner<'_> { + fn drop(&mut self) { + // Increment number of index seeks + DB_METRICS + .rdb_num_index_seeks + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc(); + + // Increment number of index keys scanned + DB_METRICS + .rdb_num_keys_scanned + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc_by(self.committed_rows.as_ref().map_or(0, |iter| iter.keys_scanned())); + + // Increment number of rows fetched + DB_METRICS + .rdb_num_rows_fetched + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc_by(self.num_committed_rows_fetched); + } } impl<'a> Iterator for IndexSeekIterInner<'a> { @@ -1830,6 +1873,7 @@ impl<'a> Iterator for IndexSeekIterInner<'a> { .map_or(false, |table| table.contains(row_id)) }) }) { + self.num_committed_rows_fetched += 1; return Some(get_committed_row(self.committed_state, &self.table_id, row_id)); } @@ -1838,10 +1882,48 @@ impl<'a> Iterator for IndexSeekIterInner<'a> { } pub struct CommittedIndexIter<'a> { + ctx: &'a ExecutionContext<'a>, table_id: TableId, tx_state: &'a TxState, committed_state: &'a CommittedState, committed_rows: BTreeIndexRangeIter<'a>, + num_committed_rows_fetched: u64, +} + +impl Drop for CommittedIndexIter<'_> { + fn drop(&mut self) { + DB_METRICS + .rdb_num_index_seeks + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc(); + + // Increment number of index keys scanned + DB_METRICS + .rdb_num_keys_scanned + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc_by(self.committed_rows.keys_scanned()); + + // Increment number of rows fetched + DB_METRICS + .rdb_num_rows_fetched + .with_label_values( + &self.ctx.txn_type(), + &self.ctx.database(), + self.ctx.reducer_name().unwrap_or_default(), + &self.table_id.0, + ) + .inc_by(self.num_committed_rows_fetched); + } } impl<'a> Iterator for CommittedIndexIter<'a> { @@ -1856,6 +1938,7 @@ impl<'a> Iterator for CommittedIndexIter<'a> { .get(&self.table_id) .map_or(false, |table| table.contains(row_id)) }) { + self.num_committed_rows_fetched += 1; return Some(get_committed_row(self.committed_state, &self.table_id, row_id)); } diff --git a/crates/core/src/db/db_metrics/mod.rs b/crates/core/src/db/db_metrics/mod.rs index 87fc91c0bd..a3fff1be09 100644 --- a/crates/core/src/db/db_metrics/mod.rs +++ b/crates/core/src/db/db_metrics/mod.rs @@ -71,6 +71,16 @@ metrics_group!( #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] pub rdb_num_rows_fetched: IntCounterVec, + #[name = spacetime_num_index_keys_scanned_cumulative] + #[help = "The cumulative number of keys scanned from an index"] + #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + pub rdb_num_keys_scanned: IntCounterVec, + + #[name = spacetime_num_index_seeks_cumulative] + #[help = "The cumulative number of index seeks"] + #[labels(txn_type: TransactionType, db: Address, reducer: str, table_id: u32)] + pub rdb_num_index_seeks: IntCounterVec, + #[name = spacetime_num_txns_committed_cumulative] #[help = "The cumulative number of committed transactions"] #[labels(txn_type: TransactionType, db: Address, reducer: str)]