Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
137374: vecstore: disable deadlock linting for inMemoryLock r=drewkimball a=andy-kimball

Do not use syncutil.RWMutex in the inMemoryLock class, because deadlock detection reports spurious failures. Different partitions in the vector index can be locked in different orders by merge, split, format and other operations. In all these cases, we first acquire the in-memory store's structure lock to prevent deadlocks. But the deadlock detection package is not smart enough to realize this and reports false positives.

Epic: CRDB-42943
Fixes: #136958.
Fixes: #136960.

Release note: None

137409: kv: support two-phase commit transactions r=arulajmani a=nvanbenschoten

Informs #22329.

This commit adds support for Txn.Prepare, DB.CommitPrepared, and DB.RollbackPrepared APIs to KV. These methods allow clients to prepare a transaction for commit, commit a prepared transaction, and rollback a prepared transaction, respectively.

When a transaction is prepared, we send an EndTxnRequest as usual, but a new flag indicates the transaction should be moved to status PREPARED instead of COMMITTED. In state PREPARED, a transaction record is updated to include all of its intents, and awaits a subsequent EndTxnRequest which will move the transaction either to COMMITTED or ABORTED. Transaction in state PREPARED cannot be pushed, regardless of isolation level, ensuring that a subsequent commit or rollback is guaranteed to succeed.

Release note: None

137627: pgwire: parseClientProvidedSessionParameters can leak entire ReadBuffer r=fqazi a=fqazi

Previously, when parsing the client session parameters, we would fetch option value via ReaderBuffer.GetString. This function would return a reference to the original read buffer, so any session parameters would cause the entire message buffer to stay allocated. To address this, this patch copies the values from the read buffer, so that no references cause the entire message buffer to be kept alive.

Fixes: #137623

Release note (bug fix): Address potential memory leak parsing client session parameters for new connections.

137634: roachtest: add io_listener_logs to pertubation/* tests r=kvoli a=andrewbaptist

Epic: none

Release note: None

Co-authored-by: Andrew Kimball <andyk@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
Co-authored-by: Andrew Baptist <baptist@cockroachlabs.com>
  • Loading branch information
5 people committed Dec 17, 2024
5 parents 205b941 + 86ffeec + b01f778 + 9134182 + 2ca7c77 commit 1203a72
Show file tree
Hide file tree
Showing 35 changed files with 1,402 additions and 87 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcaborted</td><td>Number of GC&#39;able entries corresponding to aborted txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangccommitted</td><td>Number of GC&#39;able entries corresponding to committed txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcpending</td><td>Number of GC&#39;able entries corresponding to pending txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcprepared</td><td>Number of GC&#39;able entries corresponding to prepared txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspangcstaging</td><td>Number of GC&#39;able entries corresponding to staging txns</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.info.transactionspanscanned</td><td>Number of entries in transaction spans scanned from the engine</td><td>Txn Entries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>queue.gc.pending</td><td>Number of pending replicas in the MVCC GC queue</td><td>Replicas</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
Expand Down Expand Up @@ -1886,6 +1887,7 @@
<tr><td>APPLICATION</td><td>txn.inflight_locks_over_tracking_budget</td><td>KV transactions whose in-flight writes and locking reads have exceeded the intent tracking memory budget (kv.transaction.max_intents_bytes).</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits</td><td>Number of KV transaction parallel commits</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits.auto_retries</td><td>Number of commit tries after successful failed parallel commit attempts</td><td>Retries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.prepares</td><td>Number of prepared KV transactions</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.auto_retries</td><td>Number of request retries after successful client-side refreshes</td><td>Retries</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.fail</td><td>Number of failed client-side transaction refreshes</td><td>Refreshes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.refresh.fail_with_condensed_spans</td><td>Number of failed client-side refreshes for transactions whose read tracking lost fidelity because of condensing. Such a failure could be a false conflict. Failures counted here are also counted in txn.refresh.fail, and the respective transactions are also counted in txn.refresh.memory_limit_exceeded.</td><td>Refreshes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachtest/tests/perturbation/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ func setup(p perturbation, acceptableChange float64) variations {
}
v.acceptableChange = acceptableChange
v.clusterSettings = make(map[string]string)
// Having the io_load_listener logs makes it easier to debug failures.
v.clusterSettings["server.debug.default_vmodule"] = "io_load_listener=1"
return v
}

Expand Down
40 changes: 40 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,46 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn)
return err
}

// CommitPrepared commits the prepared transaction.
func (db *DB) CommitPrepared(ctx context.Context, txn *roachpb.Transaction) error {
return db.endPrepared(ctx, txn, true /* commit */)
}

// RollbackPrepared rolls back the prepared transaction.
func (db *DB) RollbackPrepared(ctx context.Context, txn *roachpb.Transaction) error {
return db.endPrepared(ctx, txn, false /* commit */)
}

func (db *DB) endPrepared(ctx context.Context, txn *roachpb.Transaction, commit bool) error {
if txn.Status != roachpb.PREPARED {
return errors.WithContextTags(errors.AssertionFailedf("transaction %v is not in a prepared state", txn), ctx)
}
if txn.Key == nil {
// If the transaction key is nil, the transaction was read-only and never
// wrote a transaction record when preparing. Committing or rolling back
// such a transaction is a no-op.
return nil
}

// NOTE: an EndTxn sent to a prepared transaction does not need a deadline,
// because the commit deadline was already checked when the transaction was
// prepared and the transaction can not have been pushed to a later commit
// timestamp when prepared.
et := endTxnReq(commit, hlc.Timestamp{} /* deadline */)
et.req.Key = txn.Key
// TODO(nvanbenschoten): it's unfortunate that we have to set the txn's
// LockSpans here. cmd_end_transaction.go should be able to read them from the
// transaction record. Unfortunately, it currently doesn't. Address this
// before merging this commit.
et.req.LockSpans = txn.LockSpans
ba := &kvpb.BatchRequest{Requests: et.unionArr[:]}
ba.Txn = txn
// NOTE: bypass the CrossRangeTxnWrapperSender, which does not support
// transactional requests. Use the underlying sender directly.
_, pErr := db.sendUsingSender(ctx, ba, db.factory.NonTransactionalSender())
return pErr.GoError()
}

// send runs the specified calls synchronously in a single batch and returns
// any errors. Returns (nil, nil) for an empty batch.
func (db *DB) send(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
Expand Down
61 changes: 46 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ const (
// batches except EndTxn(commit=false) will be rejected.
txnError

// txnPrepared means that an EndTxn(commit=true,prepare=true) has been
// executed successfully. Further batches except EndTxn(commit=*) will
// be rejected.
txnPrepared

// txnFinalized means that an EndTxn(commit=true) has been executed
// successfully, or an EndTxn(commit=false) was sent - regardless of
// whether it executed successfully or not. Further batches except
Expand Down Expand Up @@ -453,19 +458,23 @@ func (tc *TxnCoordSender) finalizeNonLockingTxnLocked(
ba.Txn = txn
return tc.updateStateLocked(ctx, ba, nil /* br */, pErr)
}
// Mark the transaction as committed so that, in case this commit is done by
// the closure passed to db.Txn()), db.Txn() doesn't attempt to commit again.
// Also so that the correct metric gets incremented.
tc.mu.txn.Status = roachpb.COMMITTED
tc.interceptorAlloc.txnMetricRecorder.setReadOnlyCommit()
} else {
tc.mu.txn.Status = roachpb.ABORTED
}
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
if et.Prepare {
tc.mu.txn.Status = roachpb.PREPARED
tc.markTxnPreparedLocked(ctx)
} else {
// Mark the transaction as committed so that, in case this commit is done
// by the closure passed to db.Txn()), db.Txn() doesn't attempt to commit
// again. Also, so that the correct metric gets incremented.
tc.mu.txn.Status = roachpb.COMMITTED
tc.finalizeAndCleanupTxnLocked(ctx)
}
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return kvpb.NewError(err)
}
} else {
tc.mu.txn.Status = roachpb.ABORTED
tc.finalizeAndCleanupTxnLocked(ctx)
}
return nil
}
Expand Down Expand Up @@ -536,16 +545,22 @@ func (tc *TxnCoordSender) Send(
pErr = tc.updateStateLocked(ctx, ba, br, pErr)

// If we succeeded to commit, or we attempted to rollback, we move to
// txnFinalized.
// txnFinalized. If we succeeded to prepare, we move to txnPrepared.
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
et := req.(*kvpb.EndTxnRequest)
if (et.Commit && pErr == nil) || !et.Commit {
tc.finalizeAndCleanupTxnLocked(ctx)
if et.Commit {
if et.Commit {
if pErr == nil {
if et.Prepare {
tc.markTxnPreparedLocked(ctx)
} else {
tc.finalizeAndCleanupTxnLocked(ctx)
}
if err := tc.maybeCommitWait(ctx, false /* deferred */); err != nil {
return nil, kvpb.NewError(err)
}
}
} else /* !et.Commit */ {
tc.finalizeAndCleanupTxnLocked(ctx)
}
}

Expand Down Expand Up @@ -630,8 +645,8 @@ func (tc *TxnCoordSender) Send(
// For more, see https://www.cockroachlabs.com/blog/consistency-model/ and
// docs/RFCS/20200811_non_blocking_txns.md.
func (tc *TxnCoordSender) maybeCommitWait(ctx context.Context, deferred bool) error {
if tc.mu.txn.Status != roachpb.COMMITTED {
log.Fatalf(ctx, "maybeCommitWait called when not committed")
if tc.mu.txn.Status != roachpb.PREPARED && tc.mu.txn.Status != roachpb.COMMITTED {
log.Fatalf(ctx, "maybeCommitWait called when not prepared/committed")
}
if tc.mu.commitWaitDeferred && !deferred {
// If this is an automatic commit-wait call and the user of this
Expand Down Expand Up @@ -727,6 +742,15 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
return kvpb.NewError(tc.mu.storedRetryableErr)
case txnError:
return tc.mu.storedErr
case txnPrepared:
endTxn := ba != nil && ba.IsSingleEndTxnRequest()
if endTxn {
return nil
}
msg := redact.Sprintf("client already prepared the transaction. "+
"Trying to execute: %s", ba.Summary())
reason := kvpb.TransactionStatusError_REASON_UNKNOWN
return kvpb.NewErrorWithTxn(kvpb.NewTransactionStatusError(reason, msg), &tc.mu.txn)
case txnFinalized:
msg := redact.Sprintf("client already committed or rolled back the transaction. "+
"Trying to execute: %s", ba.Summary())
Expand Down Expand Up @@ -784,6 +808,13 @@ func (tc *TxnCoordSender) finalizeAndCleanupTxnLocked(ctx context.Context) {
tc.cleanupTxnLocked(ctx)
}

// markTxnPreparedLocked marks the transaction state as prepared and closes all
// interceptors.
func (tc *TxnCoordSender) markTxnPreparedLocked(ctx context.Context) {
tc.mu.txnState = txnPrepared
tc.cleanupTxnLocked(ctx)
}

// cleanupTxnLocked closes all the interceptors.
func (tc *TxnCoordSender) cleanupTxnLocked(ctx context.Context) {
if tc.mu.closed {
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (tc *txnCommitter) SendLocked(
switch br.Txn.Status {
case roachpb.STAGING:
// Continue with STAGING-specific validation and cleanup.
case roachpb.PREPARED:
// The transaction is prepared.
return br, nil
case roachpb.COMMITTED:
// The transaction is explicitly committed. This is possible if all
// in-flight writes were sent to the same range as the EndTxn request,
Expand Down Expand Up @@ -348,6 +351,11 @@ func (tc *txnCommitter) canCommitInParallel(ba *kvpb.BatchRequest, et *kvpb.EndT
return false
}

// We don't support a parallel prepare.
if et.Prepare {
return false
}

// If the transaction has a commit trigger, we don't allow it to commit in
// parallel with writes. There's no fundamental reason for this restriction,
// but for now it's not worth the complication.
Expand Down
29 changes: 29 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,35 @@ func TestTxnCommitterStripsInFlightWrites(t *testing.T) {
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with an EndTxn with the Prepare flag set. In-flight
// writes should not be attached because the XA two-phase commit protocol
// disables parallel commits.
ba.Requests = nil
etArgsPrepare := etArgs
etArgsPrepare.Prepare = true
ba.Add(&putArgs, &qiArgs, &etArgsPrepare)

mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 3)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[2].GetInner())

et := ba.Requests[2].GetInner().(*kvpb.EndTxnRequest)
require.True(t, et.Commit)
require.True(t, et.Prepare)
require.Len(t, et.LockSpans, 2)
require.Equal(t, []roachpb.Span{{Key: keyA}, {Key: keyB}}, et.LockSpans)
require.Len(t, et.InFlightWrites, 0)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.PREPARED
return br, nil
})

br, pErr = tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Send the same batch but with an EndTxn containing a commit trigger.
// In-flight writes should not be attached because commit triggers disable
// parallel commits.
Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,22 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// Returns true if heartbeating should continue, false if the transaction is no
// longer Pending and so there's no point in heartbeating further.
func (h *txnHeartbeater) heartbeatLocked(ctx context.Context) bool {
if h.mu.txn.Status != roachpb.PENDING {
if h.mu.txn.Status == roachpb.COMMITTED {
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
}
switch h.mu.txn.Status {
case roachpb.PENDING:
// Continue heartbeating.
case roachpb.PREPARED:
// If the transaction is prepared, there's no point in heartbeating. The
// transaction will remain active without heartbeats until it is committed
// or rolled back.
return false
case roachpb.ABORTED:
// If the transaction is aborted, there's no point in heartbeating. The
// client needs to send a rollback.
return false
case roachpb.COMMITTED:
log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)
default:
log.Fatalf(ctx, "unexpected txn status in heartbeat loop: %s", h.mu.txn)
}

// Clone the txn in order to put it in the heartbeat request.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,5 +129,7 @@ func (m *txnMetricRecorder) closeLocked() {
// Note that successful read-only txn are also counted as committed, even
// though they never had a txn record.
m.metrics.Commits.Inc(1)
case roachpb.PREPARED:
m.metrics.Prepares.Inc(1)
}
}
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type TxnMetrics struct {
ParallelCommits *metric.Counter // Commits which entered the STAGING state
ParallelCommitAutoRetries *metric.Counter // Commits which were retried after entering the STAGING state
CommitWaits *metric.Counter // Commits that waited for linearizability
Prepares *metric.Counter

ClientRefreshSuccess *metric.Counter
ClientRefreshFail *metric.Counter
Expand Down Expand Up @@ -99,6 +100,12 @@ var (
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaPreparesRates = metric.Metadata{
Name: "txn.prepares",
Help: "Number of prepared KV transactions",
Measurement: "KV Transactions",
Unit: metric.Unit_COUNT,
}
metaClientRefreshSuccess = metric.Metadata{
Name: "txn.refresh.success",
Help: "Number of successful client-side transaction refreshes. A refresh may be " +
Expand Down Expand Up @@ -274,6 +281,7 @@ func MakeTxnMetrics(histogramWindow time.Duration) TxnMetrics {
ParallelCommits: metric.NewCounter(metaParallelCommitsRates),
ParallelCommitAutoRetries: metric.NewCounter(metaParallelCommitAutoRetries),
CommitWaits: metric.NewCounter(metaCommitWaitCount),
Prepares: metric.NewCounter(metaPreparesRates),
ClientRefreshSuccess: metric.NewCounter(metaClientRefreshSuccess),
ClientRefreshFail: metric.NewCounter(metaClientRefreshFail),
ClientRefreshFailWithCondensedSpans: metric.NewCounter(metaClientRefreshFailWithCondensedSpans),
Expand Down
Loading

0 comments on commit 1203a72

Please sign in to comment.