Skip to content

Commit

Permalink
kv/kvclient: add support for prepared transactions
Browse files Browse the repository at this point in the history
Informs #22329.

This commit adds support for `Txn.Prepare`, `DB.CommitPrepared`, and
`DB.RollbackPrepared` APIs to KV. These methods allow clients to prepare
a transaction for commit, commit a prepared transaction, and rollback a
prepared transaction, respectively.

Release note: None
  • Loading branch information
nvanbenschoten committed Dec 17, 2024
1 parent f24721a commit b01f778
Show file tree
Hide file tree
Showing 11 changed files with 317 additions and 43 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1884,6 +1884,7 @@
<tr><td>APPLICATION</td><td>txn.inflight_locks_over_tracking_budget</td><td>KV transactions whose in-flight writes and locking reads have exceeded the intent tracking memory budget (kv.transaction.max_intents_bytes).</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits</td><td>Number of KV transaction parallel commits</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits.auto_retries</td><td>Number of commit tries after successful failed parallel commit attempts</td><td>Retries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.prepares</td><td>Number of prepared KV transactions</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.auto_retries</td><td>Number of request retries after successful client-side refreshes</td><td>Retries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.fail</td><td>Number of failed client-side transaction refreshes</td><td>Refreshes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.fail_with_condensed_spans</td><td>Number of failed client-side refreshes for transactions whose read tracking lost fidelity because of condensing. Such a failure could be a false conflict. Failures counted here are also counted in txn.refresh.fail, and the respective transactions are also counted in txn.refresh.memory_limit_exceeded.</td><td>Refreshes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
40 changes: 40 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,46 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn)
return err
}

// CommitPrepared commits the prepared transaction.
func (db *DB) CommitPrepared(ctx context.Context, txn *roachpb.Transaction) error {
return db.endPrepared(ctx, txn, true /* commit */)
}

// RollbackPrepared rolls back the prepared transaction.
func (db *DB) RollbackPrepared(ctx context.Context, txn *roachpb.Transaction) error {
return db.endPrepared(ctx, txn, false /* commit */)
}

func (db *DB) endPrepared(ctx context.Context, txn *roachpb.Transaction, commit bool) error {
if txn.Status != roachpb.PREPARED {
return errors.WithContextTags(errors.AssertionFailedf("transaction %v is not in a prepared state", txn), ctx)
}
if txn.Key == nil {
// If the transaction key is nil, the transaction was read-only and never
// wrote a transaction record when preparing. Committing or rolling back
// such a transaction is a no-op.
return nil
}

// NOTE: an EndTxn sent to a prepared transaction does not need a deadline,
// because the commit deadline was already checked when the transaction was
// prepared and the transaction can not have been pushed to a later commit
// timestamp when prepared.
et := endTxnReq(commit, hlc.Timestamp{} /* deadline */)
et.req.Key = txn.Key
// TODO(nvanbenschoten): it's unfortunate that we have to set the txn's
// LockSpans here. cmd_end_transaction.go should be able to read them from the
// transaction record. Unfortunately, it currently doesn't. Address this
// before merging this commit.
et.req.LockSpans = txn.LockSpans
ba := &kvpb.BatchRequest{Requests: et.unionArr[:]}
ba.Txn = txn
// NOTE: bypass the CrossRangeTxnWrapperSender, which does not support
// transactional requests. Use the underlying sender directly.
_, pErr := db.sendUsingSender(ctx, ba, db.factory.NonTransactionalSender())
return pErr.GoError()
}

// send runs the specified calls synchronously in a single batch and returns
// any errors. Returns (nil, nil) for an empty batch.
func (db *DB) send(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
Expand Down
61 changes: 46 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ const (
// batches except EndTxn(commit=false) will be rejected.
txnError

// txnPrepared means that an EndTxn(commit=true,prepare=true) has been
// executed successfully. Further batches except EndTxn(commit=*) will
// be rejected.
txnPrepared

// txnFinalized means that an EndTxn(commit=true) has been executed
// successfully, or an EndTxn(commit=false) was sent - regardless of
// whether it executed successfully or not. Further batches except
Expand Down Expand Up @@ -453,19 +458,23 @@ func (tc *TxnCoordSender) finalizeNonLockingTxnLocked(
ba.Txn = txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
// Mark the transaction as committed so that, in case this commit is done by
// the closure passed to db.Txn()), db.Txn() doesn't attempt to commit again.
// Also so that the correct metric gets incremented.
tc.mu.txn.Status = roachpb.COMMITTED
tc.interceptorAlloc.txnMetricRecorder.setReadOnlyCommit()
} else {
tc.mu.txn.Status = roachpb.ABORTED
}
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
if et.Prepare {
tc.mu.txn.Status = roachpb.PREPARED
tc.markTxnPreparedLocked(ctx)
} else {
// Mark the transaction as committed so that, in case this commit is done
// by the closure passed to db.Txn()), db.Txn() doesn't attempt to commit
// again. Also, so that the correct metric gets incremented.
tc.mu.txn.Status = roachpb.COMMITTED
tc.finalizeAndCleanupTxnLocked(ctx)
}
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return kvpb.NewError(err)
}
} else {
tc.mu.txn.Status = roachpb.ABORTED
tc.finalizeAndCleanupTxnLocked(ctx)
}
return nil
}
Expand Down Expand Up @@ -536,16 +545,22 @@ func (tc *TxnCoordSender) Send(
pErr = tc.updateStateLocked(ctx, ba, br, pErr)

// If we succeeded to commit, or we attempted to rollback, we move to
// txnFinalized.
// txnFinalized. If we succeeded to prepare, we move to txnPrepared.
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
et := req.(*kvpb.EndTxnRequest)
if (et.Commit && pErr == nil) || !et.Commit {
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
if et.Commit {
if pErr == nil {
if et.Prepare {
tc.markTxnPreparedLocked(ctx)
} else {
tc.finalizeAndCleanupTxnLocked(ctx)
}
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return nil, kvpb.NewError(err)
}
}
} else /* !et.Commit */ {
tc.finalizeAndCleanupTxnLocked(ctx)
}
}

Expand Down Expand Up @@ -630,8 +645,8 @@ func (tc *TxnCoordSender) Send(
// For more, see https://www.cockroachlabs.com/blog/consistency-model/ and
// docs/RFCS/20200811_non_blocking_txns.md.
func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) error {
if tc.mu.txn.Status != roachpb.COMMITTED {
log.Fatalf(ctx, "maybeCommitWait called when not committed")
if tc.mu.txn.Status != roachpb.PREPARED && tc.mu.txn.Status != roachpb.COMMITTED {
log.Fatalf(ctx, "maybeCommitWait called when not prepared/committed")
}
if tc.mu.commitWaitDeferred && !deferred {
// If this is an automatic commit-wait call and the user of this
Expand Down Expand Up @@ -727,6 +742,15 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
return kvpb.NewError(tc.mu.storedRetryableErr)
case txnError:
return tc.mu.storedErr
case txnPrepared:
endTxn := ba != nil && ba.IsSingleEndTxnRequest()
if endTxn {
return nil
}
msg := redact.Sprintf("client already prepared the transaction. "+
"Trying to execute: %s", ba.Summary())
reason := kvpb.TransactionStatusError_REASON_UNKNOWN
return kvpb.NewErrorWithTxn(kvpb.NewTransactionStatusError(reason, msg), &tc.mu.txn)
case txnFinalized:
msg := redact.Sprintf("client already committed or rolled back the transaction. "+
"Trying to execute: %s", ba.Summary())
Expand Down Expand Up @@ -784,6 +808,13 @@ func (tc *TxnCoordSender) finalizeAndCleanupTxnLocked(ctx context.Context) {
tc.cleanupTxnLocked(ctx)
}

// markTxnPreparedLocked marks the transaction state as prepared and closes all
// interceptors.
func (tc *TxnCoordSender) markTxnPreparedLocked(ctx context.Context) {
tc.mu.txnState = txnPrepared
tc.cleanupTxnLocked(ctx)
}

// cleanupTxnLocked closes all the interceptors.
func (tc *TxnCoordSender) cleanupTxnLocked(ctx context.Context) {
if tc.mu.closed {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (tc *txnCommitter) SendLocked(
switch br.Txn.Status {
case roachpb.STAGING:
// Continue with STAGING-specific validation and cleanup.
case roachpb.PREPARED:
// The transaction is prepared.
return br, nil
case roachpb.COMMITTED:
// The transaction is explicitly committed. This is possible if all
// in-flight writes were sent to the same range as the EndTxn request,
Expand Down Expand Up @@ -348,6 +351,11 @@ func (tc *txnCommitter) canCommitInParallel(ba *kvpb.BatchRequest, et *kvpb.EndT
return false
}

// We don't support a parallel prepare.
if et.Prepare {
return false
}

// If the transaction has a commit trigger, we don't allow it to commit in
// parallel with writes. There's no fundamental reason for this restriction,
// but for now it's not worth the complication.
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,35 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with an EndTxn with the Prepare flag set. In-flight
// writes should not be attached because the XA two-phase commit protocol
// disables parallel commits.
ba.Requests = nil
etArgsPrepare := etArgs
etArgsPrepare.Prepare = true
ba.Add(&putArgs, &qiArgs, &etArgsPrepare)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[2].GetInner())

et := ba.Requests[2].GetInner().(*kvpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.Prepare)
require.Len(t, et.LockSpans, 2)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, et.LockSpans)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.PREPARED
return br, nil
})

br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with an EndTxn containing a commit trigger.
// In-flight writes should not be attached because commit triggers disable
// parallel commits.
Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,22 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// Returns true if heartbeating should continue, false if the transaction is no
// longer Pending and so there's no point in heartbeating further.
func (h *txnHeartbeater) heartbeatLocked(ctx context.Context) bool {
if h.mu.txn.Status != roachpb.PENDING {
if h.mu.txn.Status == roachpb.COMMITTED {
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
}
switch h.mu.txn.Status {
case roachpb.PENDING:
// Continue heartbeating.
case roachpb.PREPARED:
// If the transaction is prepared, there's no point in heartbeating. The
// transaction will remain active without heartbeats until it is committed
// or rolled back.
return false
case roachpb.ABORTED:
// If the transaction is aborted, there's no point in heartbeating. The
// client needs to send a rollback.
return false
case roachpb.COMMITTED:
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
default:
log.Fatalf(ctx, "unexpected txn status in heartbeat loop: %s", h.mu.txn)
}

// Clone the txn in order to put it in the heartbeat request.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ func (m *txnMetricRecorder) closeLocked() {
// Note that successful read-only txn are also counted as committed, even
// though they never had a txn record.
m.metrics.Commits.Inc(1)
case roachpb.PREPARED:
m.metrics.Prepares.Inc(1)
}
}
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type TxnMetrics struct {
ParallelCommits *metric.Counter // Commits which entered the STAGING state
ParallelCommitAutoRetries *metric.Counter // Commits which were retried after entering the STAGING state
CommitWaits *metric.Counter // Commits that waited for linearizability
Prepares *metric.Counter

ClientRefreshSuccess *metric.Counter
ClientRefreshFail *metric.Counter
Expand Down Expand Up @@ -99,6 +100,12 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaPreparesRates = metric.Metadata{
Name: "txn.prepares",
Help: "Number of prepared KV transactions",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaClientRefreshSuccess = metric.Metadata{
Name: "txn.refresh.success",
Help: "Number of successful client-side transaction refreshes. A refresh may be " +
Expand Down Expand Up @@ -274,6 +281,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
ParallelCommits: metric.NewCounter(metaParallelCommitsRates),
ParallelCommitAutoRetries: metric.NewCounter(metaParallelCommitAutoRetries),
CommitWaits: metric.NewCounter(metaCommitWaitCount),
Prepares: metric.NewCounter(metaPreparesRates),
ClientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess),
ClientRefreshFail: metric.NewCounter(metaClientRefreshFail),
ClientRefreshFailWithCondensedSpans: metric.NewCounter(metaClientRefreshFailWithCondensedSpans),
Expand Down
Loading

0 comments on commit b01f778

Please sign in to comment.