Skip to content

Commit

Permalink
kvclient/kvcoord: inhibit parallel commit when retrying EndTxn request
Browse files Browse the repository at this point in the history
The scenario that this patch addresses is the following (from cockroachdb#46431):
1. txn1 sends Put(a) + Put(b) + EndTxn
2. DistSender splits the Put(a) from the rest.
3. Put(a) succeeds, but the rest catches some retriable error.
4. TxnCoordSender gets the retriable error. The fact that a sub-batch
  succeeded is lost. We used to care about that fact, but we've
  successively gotten rid of that tracking across cockroachdb#35140 and cockroachdb#44661.
5. we refresh everything that came before this batch. The refresh
  succeeds.
6. we re-send the batch. It gets split again. The part with the EndTxn
  executes first. The transaction is now STAGING. More than that, the txn
  is in fact implicitly committed - the intent on a is already there since
  the previous attempt and, because it's at a lower timestamp than the txn
  record, it counts as golden for the purposes of verifying the implicit
  commit condition.
7. some other transaction wonders in, sees that txn1 is in its way, and
  transitions it to explicitly committed.
8. the Put(a) now tries to evaluate. It gets really confused. I guess
  that different things can happen; none of them good. One thing that I
  believe we've observed in cockroachdb#46299 is that, if there's another txn's
  intent there already, the Put will try to push it, enter the
  txnWaitQueue, eventually observe that its own txn is committed and
  return an error. The client thus gets an error (and a non-ambiguous one
  to boot) although the txn is committed. Even worse perhaps, I think it's
  possible for a request to return wrong results instead of an error.

This patch fixes it by inhibiting the parallel commit when the EndTxn
batch is retried. This way, there's never a STAGING record.

Release note (bug fix): A rare bug causing errors to be returned for
successfully committed transactions was fixed. The most common error
message was "TransactionStatusError: already committed".

Release justification: serious bug fix

Fixes cockroachdb#46341
  • Loading branch information
andreimatei committed Mar 25, 2020
1 parent 1d7380c commit af92767
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 7 deletions.
133 changes: 128 additions & 5 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2570,8 +2570,22 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
}

type pushExpectation int

const (
// expectPusheeTxnRecovery means we're expecting transaction recovery to be
// performed (after finding a STAGING txn record).
expectPusheeTxnRecovery pushExpectation = iota
// expectPusheeTxnRecordNotFound means we're expecting the push to not find the
// pushee txn record.
expectPusheeTxnRecordNotFound
dontExpectNothing
)

// checkImplicitCommit checks whether the txn is implicitly committed.
func checkImplicitCommit(ctx context.Context, db *kv.DB, txn roachpb.Transaction) (bool, error) {
func checkImplicitCommit(
ctx context.Context, db *kv.DB, txn roachpb.Transaction, pushExpectation pushExpectation,
) (bool, error) {
pushReq := roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
Expand All @@ -2597,9 +2611,22 @@ func checkImplicitCommit(ctx context.Context, db *kv.DB, txn roachpb.Transaction
// Verify that we're not fooling ourselves and that checking for the implicit
// commit actually caused the txn recovery procedure to run.
recording := collectRec()
if "" == recording.FindLogMessage(
fmt.Sprintf("recovered txn %s", txn.ID.Short())) {
return false, errors.Errorf("recovery didn't run as expected. recording: %s", recording)
switch pushExpectation {
case expectPusheeTxnRecovery:
expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short())
if recording.FindLogMessage(expMsg) == "" {
return false, errors.Errorf(
"recovery didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case expectPusheeTxnRecordNotFound:
expMsg := "pushee txn record not found"
if recording.FindLogMessage(expMsg) == "" {
return false, errors.Errorf(
"push didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case dontExpectNothing:
}

pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status
Expand All @@ -2613,6 +2640,102 @@ func checkImplicitCommit(ctx context.Context, db *kv.DB, txn roachpb.Transaction
}
}

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry after a refresh still works fine. We
// check that a transaction is not considered implicitly committed through a
// combination of writes from a previous attempt of the EndTxn batch and a
// STAGING txn record written by a newer attempt of that batch.
// Namely, the scenario is as follows:
//
// 1. client sends Put(a) + Put(b) + EndTxn. The Put(a) is split by the
// DistSender from the rest. Note that the parallel commit mechanism is in
// effect here.
// 2. Put(a) succeeds, but Put(b) is pushed. The client needs to refresh.
// 3. The refresh succeeds.
// 4. The client resends the whole batch (note that we don't keep track of the
// previous partial success).
// 5. The batch is split again. b's sub-batch executed first. Say a's batch is
// delayed.
//
// The point of this test is to check that the txn is not considered to be
// implicitly committed at this point. Handling this scenario requires special
// care. If we wouldn't do anything, then we'd end up with a STAGING txn record
// (from the second attempt of the request) and an intent on "a" from the first
// attempt. That intent would have a lower timestamp than the txn record and so
// the txn would be considered explicitly committed. If the txn were to be
// considered implicitly committed, and the intent on "a" was resolved, then
// write on a (when it eventually evaluates) might return wrong results, or be
// pushed, or generally get very confused about how its own transaction got
// committed already.
//
// We handle this scenario by disabling the parallel commit on the request's 2nd
// attempt. Thus, the EndTxn will be split from all the other requests, and the
// txn record is never written if anything fails.
func TestTxnCoordSenderRetriesAcrossEndTxn_EndTxnSucceedsLate(t *testing.T) {
defer leaktest.AfterTest(t)()

var filterFn atomic.Value
var storeKnobs kvserver.StoreTestingKnobs
storeKnobs.EvalKnobs.TestingEvalFilter =
func(fArgs storagebase.FilterArgs) *roachpb.Error {
fnVal := filterFn.Load()
if fn, ok := fnVal.(func(storagebase.FilterArgs) *roachpb.Error); ok && fn != nil {
return fn(fArgs)
}
return nil
}

s, _, db := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1")
require.NoError(t, setupMultipleRanges(ctx, db, string(keyB)))

txn := db.NewTxn(ctx, "test txn")
// Do a write to anchor the txn on b's range.
require.NoError(t, txn.Put(ctx, keyB1, "b1"))

// Do a read to prevent the txn for performing server-side refreshes.
_, err := txn.Get(ctx, keyA1)
require.NoError(t, err)

// After the txn started, perform another read on keyB. This will cause the
// txn's upcoming write to be pushed. Because the write on keyB is sent
// together with the EndTxn, this means that the write too old condition
// can't be deferred and the respective sub-batch will return a
// WriteTooOldError.
_, err = db.Get(ctx, keyB)
require.NoError(t, err)

b := txn.NewBatch()
b.Put(keyA, "a")
b.Put(keyB, "b")

var count int32
filterFn.Store(func(args storagebase.FilterArgs) *roachpb.Error {
put, ok := args.Req.(*roachpb.PutRequest)
if !ok {
return nil
}
if !put.Key.Equal(keyA) {
return nil
}
count++
// Reject the left request on the 2nd attempt.
if count == 2 {
return roachpb.NewErrorf("injected")
}
return nil
})

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
committed, err := checkImplicitCommit(ctx, db, *txn.TestingCloneTxn(), expectPusheeTxnRecordNotFound)
require.NoError(t, err)
require.False(t, committed)
}

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry after a refresh still works fine. We
// check that a transaction is not considered implicitly committed through a
Expand Down Expand Up @@ -2708,7 +2831,7 @@ func TestTxnCoordSenderRetriesAcrossEndTxn_EndTxnSucceedsEarly(t *testing.T) {
})

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
committed, err := checkImplicitCommit(ctx, db, *origTxn)
committed, err := checkImplicitCommit(ctx, db, *origTxn, expectPusheeTxnRecovery)

require.NoError(t, err)
require.False(t, committed)
Expand Down
40 changes: 38 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,40 @@ func (tc *txnCommitter) SendLocked(
// set. This is the only place where EndTxnRequest.Key is assigned, but we
// could be dealing with a re-issued batch after a refresh. Remember, the
// committer is below the span refresh on the interceptor stack.
var etAttempt endTxnAttempt
if et.Key == nil {
et.Key = ba.Txn.Key
etAttempt = endTxnFirstAttempt
} else {
// If this is a retry, we'll disable parallel commit. Since the previous
// attempt might have partially succeeded (i.e. the batch might have been
// split into sub-batches and some of them might have evaluated
// successfully), there might be intents laying around. If we'd perform a
// parallel commit, and the batch gets split again, and the STAGING txn
// record were written before we evaluate some of the other sub-batche. We
// could technically enter the "implicitly committed" state before all the
// sub-batches are evaluated and this is problematic: there's a race between
// evaluating those requests randos coming along and transitioning the txn
// to explicitly committed (and cleaning up all the intents), and the
// evaluations of the outstanding sub-batches. If the randos win, then the
// re-evaluations will fail because we don't have idempotency of evaluations
// across a txn commit.
etAttempt = endTxnRetry
if len(et.InFlightWrites) > 0 {
// Make a copy of the EndTxn, since we're going to change it below to
// disable the parallel commit.
etCpy := *et
ba.Requests[len(ba.Requests)-1].SetInner(&etCpy)
et = &etCpy
}
}

// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
// attached to the EndTxn request to the LockSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTxn
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) {
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et, etAttempt) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
Expand Down Expand Up @@ -274,13 +298,25 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn(
return br, nil
}

type endTxnAttempt int

const (
endTxnFirstAttempt endTxnAttempt = iota
endTxnRetry
)

// canCommitInParallel determines whether the batch can issue its committing
// EndTxn in parallel with the rest of its requests and with any in-flight
// writes, which all should have corresponding QueryIntent requests in the
// batch.
func (tc *txnCommitter) canCommitInParallel(
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest,
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, etAttempt endTxnAttempt,
) bool {
if etAttempt == endTxnRetry {
log.VEventf(ctx, 2, "retrying batch not eligible for parallel commit")
return false
}

if !parallelCommitsEnabled.Get(&tc.st.SV) {
return false
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,48 @@ func TestTxnCommitterRetryAfterStaging(t *testing.T) {
require.Equal(t, expReason, pErr.GetDetail().(*roachpb.TransactionRetryError).Reason)
})
}

// Test that parallel commits are inhibited on retries (i.e. after a successful
// refresh caused by a parallel-commit batch). See comments in the interceptor
// about why this is necessary.
func TestTxnCommitterNoParallelCommitsOnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
tc, mockSender := makeMockTxnCommitter()
defer tc.stopper.Stop(ctx)

txn := makeTxnProto()
keyA := roachpb.Key("a")

var ba roachpb.BatchRequest
ba.Header = roachpb.Header{Txn: &txn}
putArgs := roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}}
etArgs := roachpb.EndTxnRequest{Commit: true}
putArgs.Sequence = 1
etArgs.Sequence = 2
etArgs.InFlightWrites = []roachpb.SequencedWrite{{Key: keyA, Sequence: 1}}

// Pretend that this is a retry of the request (after a successful refresh). Having the key
// assigned is how the interceptor distinguishes retries.
etArgs.Key = keyA

ba.Add(&putArgs, &etArgs)

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 2)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner())

et := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.True(t, et.Commit)
require.Len(t, et.InFlightWrites, 0, "expected parallel commit to be inhibited")

br := ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})

_, pErr := tc.SendLocked(ctx, ba)
require.Nil(t, pErr)
}

0 comments on commit af92767

Please sign in to comment.