Skip to content

Commit

Permalink
kv/kvserver: misc additional support for prepared transactions
Browse files Browse the repository at this point in the history
Informs #22329.

Adds support for prepared transactions to MVCC GC, rangefeeds, and a few
other places.

Release note: None
  • Loading branch information
nvanbenschoten committed Dec 17, 2024
1 parent 780cc19 commit f24721a
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 4 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcaborted</td><td>Number of GC&#39;able entries corresponding to aborted txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangccommitted</td><td>Number of GC&#39;able entries corresponding to committed txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcpending</td><td>Number of GC&#39;able entries corresponding to pending txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcprepared</td><td>Number of GC&#39;able entries corresponding to prepared txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcstaging</td><td>Number of GC&#39;able entries corresponding to staging txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspanscanned</td><td>Number of entries in transaction spans scanned from the engine</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.pending</td><td>Number of pending replicas in the MVCC GC queue</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ type Info struct {
// potentially necessary intent resolutions did not fail).
TransactionSpanGCAborted, TransactionSpanGCCommitted int
TransactionSpanGCStaging, TransactionSpanGCPending int
TransactionSpanGCPrepared int
// AbortSpanTotal is the total number of transactions present in the AbortSpan.
AbortSpanTotal int
// AbortSpanConsidered is the number of AbortSpan entries old enough to be
Expand Down Expand Up @@ -1217,6 +1218,8 @@ func processLocalKeyRange(
switch txn.Status {
case roachpb.PENDING:
info.TransactionSpanGCPending++
case roachpb.PREPARED:
info.TransactionSpanGCPrepared++
case roachpb.STAGING:
info.TransactionSpanGCStaging++
case roachpb.ABORTED:
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,12 @@ The messages are dropped to help these replicas to recover from I/O overload.`,
Measurement: "Txn Entries",
Unit: metric.Unit_COUNT,
}
metaGCTransactionSpanGCPrepared = metric.Metadata{
Name: "queue.gc.info.transactionspangcprepared",
Help: "Number of GC'able entries corresponding to prepared txns",
Measurement: "Txn Entries",
Unit: metric.Unit_COUNT,
}
metaGCAbortSpanScanned = metric.Metadata{
Name: "queue.gc.info.abortspanscanned",
Help: "Number of transactions present in the AbortSpan scanned from the engine",
Expand Down Expand Up @@ -2904,6 +2910,7 @@ type StoreMetrics struct {
GCTransactionSpanGCCommitted *metric.Counter
GCTransactionSpanGCStaging *metric.Counter
GCTransactionSpanGCPending *metric.Counter
GCTransactionSpanGCPrepared *metric.Counter
GCAbortSpanScanned *metric.Counter
GCAbortSpanConsidered *metric.Counter
GCAbortSpanGCNum *metric.Counter
Expand Down Expand Up @@ -3672,6 +3679,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
GCTransactionSpanGCCommitted: metric.NewCounter(metaGCTransactionSpanGCCommitted),
GCTransactionSpanGCStaging: metric.NewCounter(metaGCTransactionSpanGCStaging),
GCTransactionSpanGCPending: metric.NewCounter(metaGCTransactionSpanGCPending),
GCTransactionSpanGCPrepared: metric.NewCounter(metaGCTransactionSpanGCPrepared),
GCAbortSpanScanned: metric.NewCounter(metaGCAbortSpanScanned),
GCAbortSpanConsidered: metric.NewCounter(metaGCAbortSpanConsidered),
GCAbortSpanGCNum: metric.NewCounter(metaGCAbortSpanGCNum),
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ func updateStoreMetricsWithGCInfo(metrics *StoreMetrics, info gc.Info) {
metrics.GCTransactionSpanGCCommitted.Inc(int64(info.TransactionSpanGCCommitted))
metrics.GCTransactionSpanGCStaging.Inc(int64(info.TransactionSpanGCStaging))
metrics.GCTransactionSpanGCPending.Inc(int64(info.TransactionSpanGCPending))
metrics.GCTransactionSpanGCPrepared.Inc(int64(info.TransactionSpanGCPrepared))
metrics.GCAbortSpanScanned.Inc(int64(info.AbortSpanTotal))
metrics.GCAbortSpanConsidered.Inc(int64(info.AbortSpanConsidered))
metrics.GCAbortSpanGCNum.Inc(int64(info.AbortSpanGCNum))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {
var intentsToCleanup []roachpb.LockUpdate
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
case roachpb.PENDING, roachpb.PREPARED, roachpb.STAGING:
// The transaction is still in progress but its timestamp was moved
// forward to the current time. Inform the Processor that it can
// forward the txn's timestamp in its unresolvedIntentQueue.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2334,9 +2334,6 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {

var mergeCommitted bool
switch pushTxnRes.PusheeTxn.Status {
case roachpb.PENDING, roachpb.STAGING:
log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s",
intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
case roachpb.COMMITTED:
// If PushTxn claims that the transaction committed, then the transaction
// definitely committed.
Expand Down Expand Up @@ -2391,6 +2388,9 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
mergeCommitted = true
}
}
default:
log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s",
intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
}
r.raftMu.Lock()
r.readOnlyCmdMu.Lock()
Expand Down

0 comments on commit f24721a

Please sign in to comment.