Skip to content

Commit

Permalink
kvclient/kvcoord: inhibit parallel commit when retrying EndTxn request
Browse files Browse the repository at this point in the history
The scenario that this patch addresses is the following (from cockroachdb#46431):
1. txn1 sends Put(a) + Put(b) + EndTxn
2. DistSender splits the Put(a) from the rest.
3. Put(a) succeeds, but the rest catches some retriable error.
4. TxnCoordSender gets the retriable error. The fact that a sub-batch
  succeeded is lost. We used to care about that fact, but we've
  successively gotten rid of that tracking across cockroachdb#35140 and cockroachdb#44661.
5. we refresh everything that came before this batch. The refresh
  succeeds.
6. we re-send the batch. It gets split again. The part with the EndTxn
  executes first. The transaction is now STAGING. More than that, the txn
  is in fact implicitly committed - the intent on a is already there since
  the previous attempt and, because it's at a lower timestamp than the txn
  record, it counts as golden for the purposes of verifying the implicit
  commit condition.
7. some other transaction wonders in, sees that txn1 is in its way, and
  transitions it to explicitly committed.
8. the Put(a) now tries to evaluate. It gets really confused. I guess
  that different things can happen; none of them good. One thing that I
  believe we've observed in cockroachdb#46299 is that, if there's another txn's
  intent there already, the Put will try to push it, enter the
  txnWaitQueue, eventually observe that its own txn is committed and
  return an error. The client thus gets an error (and a non-ambiguous one
  to boot) although the txn is committed. Even worse perhaps, I think it's
  possible for a request to return wrong results instead of an error.

This patch fixes it by inhibiting the parallel commit when the EndTxn
batch is retried. This way, there's never a STAGING record.

Release note (bug fix): A rare bug causing errors to be returned for
successfully committed transactions was fixed. The most common error
message was "TransactionStatusError: already committed".

Release justification: serious bug fix

Fixes cockroachdb#46341
  • Loading branch information
andreimatei committed Mar 31, 2020
1 parent 00c3216 commit 1c8fc6f
Show file tree
Hide file tree
Showing 4 changed files with 365 additions and 9 deletions.
275 changes: 274 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -2630,6 +2631,278 @@ func TestTxnCoordSenderRetries(t *testing.T) {
}
}

type pushExpectation int

const (
// expectPusheeTxnRecovery means we're expecting transaction recovery to be
// performed (after finding a STAGING txn record).
expectPusheeTxnRecovery pushExpectation = iota
// expectPusheeTxnRecordNotFound means we're expecting the push to not find the
// pushee txn record.
expectPusheeTxnRecordNotFound
// dontExpectAnything means we're not going to check the state in which the
// pusher found the pushee's txn record.
dontExpectAnything
)

type expectedTxnResolution int

const (
expectedAborted expectedTxnResolution = iota
expectedCommitted
)

// checkPushResult pushes the specified txn and checks that the pushee's
// resolution is the expected one.
func checkPushResult(
ctx context.Context,
db *kv.DB,
txn roachpb.Transaction,
expResolution expectedTxnResolution,
pushExpectation pushExpectation,
) error {
pushReq := roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{
Key: txn.Key,
},
PusheeTxn: txn.TxnMeta,
PushTo: hlc.Timestamp{},
PushType: roachpb.PUSH_ABORT,
// We're going to Force the push in order to not wait for the pushee to
// expire.
Force: true,
}
ba := roachpb.BatchRequest{}
ba.Add(&pushReq)

recCtx, collectRec, cancel := tracing.ContextWithRecordingSpan(ctx, "test trace")
defer cancel()

resp, pErr := db.NonTransactionalSender().Send(recCtx, ba)
if pErr != nil {
return pErr.GoError()
}

var statusErr error
pusheeStatus := resp.Responses[0].GetPushTxn().PusheeTxn.Status
switch pusheeStatus {
case roachpb.ABORTED:
if expResolution != expectedAborted {
statusErr = errors.Errorf("transaction unexpectedly aborted")
}
case roachpb.COMMITTED:
if expResolution != expectedCommitted {
statusErr = errors.Errorf("transaction unexpectedly committed")
}
default:
return errors.Errorf("unexpected txn status: %s", pusheeStatus)
}

// Verify that we're not fooling ourselves and that checking for the implicit
// commit actually caused the txn recovery procedure to run.
recording := collectRec()
var resolutionErr error
switch pushExpectation {
case expectPusheeTxnRecovery:
expMsg := fmt.Sprintf("recovered txn %s", txn.ID.Short())
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"recovery didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case expectPusheeTxnRecordNotFound:
expMsg := "pushee txn record not found"
if _, ok := recording.FindLogMessage(expMsg); !ok {
resolutionErr = errors.Errorf(
"push didn't run as expected (missing \"%s\"). recording: %s",
expMsg, recording)
}
case dontExpectAnything:
}

return errors.CombineErrors(statusErr, resolutionErr)
}

// Test that, even though at the kvserver level requests are not idempotent
// across an EndTxn, a TxnCoordSender retry of the final batch after a refresh
// still works fine. We check that a transaction is not considered implicitly
// committed through a combination of writes from a previous attempt of the
// EndTxn batch and a STAGING txn record written by a newer attempt of that
// batch.
// Namely, the scenario is as follows:
// 1. client sends CPut(a) + CPut(b) + EndTxn. The CPut(a) is split by the
// DistSender from the rest. Note that the parallel commit mechanism is in
// effect here.
// 2. One of the two sides gets a WriteTooOldError, the other succeeds.
// The client needs to refresh.
// 3. The refresh succeeds.
// 4. The client resends the whole batch (note that we don't keep track of the
// previous partial success).
// 5. The batch is split again, and one of the two sides fails.
//
// This tests checks that, for the different combinations of failures across the
// two attempts of the request, the transaction is not erroneously considered to
// be committed. We don't want an intent laid down by the first attempt to
// satisfy a STAGING record from the 2nd attempt, or the other way around (an
// intent written in the 2nd attempt satisfying a STAGING record written on the
// first attempt). See subtests for more details.
func TestTxnCoordSenderRetriesAcrossEndTxn(t *testing.T) {
defer leaktest.AfterTest(t)()

var filterFn atomic.Value
var storeKnobs kvserver.StoreTestingKnobs
storeKnobs.EvalKnobs.TestingEvalFilter =
func(fArgs storagebase.FilterArgs) *roachpb.Error {
fnVal := filterFn.Load()
if fn, ok := fnVal.(func(storagebase.FilterArgs) *roachpb.Error); ok && fn != nil {
return fn(fArgs)
}
return nil
}

type side int
const (
left side = iota
right
)

testCases := []struct {
// sidePushedOnFirstAttempt controls which sub-batch will return a
// WriteTooOldError on the first attemp. The left side is CPut(a), the right
// side is CPut(b)+EndTxn(STAGING).
sidePushedOnFirstAttempt side
sideRejectedOnSecondAttempt side
txnRecExpectation pushExpectation
}{
{
// On the first attempt, the left side succeeds in laying down an intent,
// while the right side fails. On the 2nd attempt, the right side succeeds
// while the left side fails.
//
// The point of this test is to check that the txn is not considered to be
// implicitly committed at this point. Handling this scenario requires
// special care. If we didn't do anything, then we'd end up with a STAGING
// txn record (from the second attempt of the request) and an intent on
// "a" from the first attempt. That intent would have a lower timestamp
// than the txn record and so the txn would be considered explicitly
// committed. If the txn were to be considered implicitly committed, and
// the intent on "a" was resolved, then write on a (when it eventually
// evaluates) might return wrong results, or be pushed, or generally get
// very confused about how its own transaction got committed already.
//
// We handle this scenario by disabling the parallel commit on the
// request's 2nd attempt. Thus, the EndTxn will be split from all the
// other requests, and the txn record is never written if anything fails.
sidePushedOnFirstAttempt: right,
sideRejectedOnSecondAttempt: left,
// The first attempt of right side contains a parallel commit (i.e. an
// EndTxn), but fails. The 2nd attempt of the right side will no longer
// contain an EndTxn, as explained above. So we expect the txn record to
// not exist.
txnRecExpectation: expectPusheeTxnRecordNotFound,
},
{
// On the first attempt, the right side succeed in writing a STAGING txn
// record, but the left side fails. On the second attempt, the right side
// is rejected.
//
// The point of this test is to check that the txn is not considered
// implicitly committed at this point. All the intents are in place for
// the txn to be considered committed, but we rely on the fact that the
// intent on "a" has a timestamp that's too high (it gets the timestamp
// from the 2nd attempt, after a refresh, but the STAGING txn record has
// an older timestamp). If the txn were to be considered implicitly
// committed, it'd be bad as we are returning an error to the client
// telling it that the EndTxn failed.
sidePushedOnFirstAttempt: left,
sideRejectedOnSecondAttempt: right,
// The first attempt of the right side writes a STAGING txn record, so we
// expect to perform txn recovery.
txnRecExpectation: expectPusheeTxnRecovery,
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
s, _, db := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &storeKnobs}})
ctx := context.Background()
defer s.Stopper().Stop(ctx)

keyA, keyA1, keyB, keyB1 := roachpb.Key("a"), roachpb.Key("a1"), roachpb.Key("b"), roachpb.Key("b1")
require.NoError(t, setupMultipleRanges(ctx, db, string(keyB)))

origValA := roachpb.MakeValueFromString("initA")
require.NoError(t, db.Put(ctx, keyA, &origValA))
origValB := roachpb.MakeValueFromString("initA")
require.NoError(t, db.Put(ctx, keyB, &origValB))

txn := db.NewTxn(ctx, "test txn")

// Do a write to anchor the txn on b's range.
require.NoError(t, txn.Put(ctx, keyB1, "b1"))

// Take a snapshot of the txn early. We'll use it when verifying if the txn is
// implicitly committed. If we didn't use this early snapshot and, instead,
// used the transaction with a bumped timestamp, then the push code would
// infer that the txn is not implicitly committed without actually running the
// recovery procedure. Using this snapshot mimics a pusher that ran into an
// old intent.
origTxn := txn.TestingCloneTxn()

// Do a read to prevent the txn for performing server-side refreshes.
_, err := txn.Get(ctx, keyA1)
require.NoError(t, err)

// After the txn started, do a conflicting read. This will cause one of
// the txn's upcoming CPuts to return a WriteTooOldError on the first
// attempt, causing in turn to refresh and a retry. Note that, being
// CPuts, the pushed writes don't defer the error by returning the
// WriteTooOld flag instead of a WriteTooOldError.
var readKey roachpb.Key
if tc.sidePushedOnFirstAttempt == left {
readKey = keyA
} else {
readKey = keyB
}
_, err = db.Get(ctx, readKey)

b := txn.NewBatch()
b.CPut(keyA, "a", &origValA)
b.CPut(keyB, "b", &origValB)

var secondAttemptRejectKey roachpb.Key
if tc.sideRejectedOnSecondAttempt == left {
secondAttemptRejectKey = keyA
} else {
secondAttemptRejectKey = keyB
}

// Install a filter which will reject requests touching
// secondAttemptRejectKey on the retry.
var count int32
filterFn.Store(func(args storagebase.FilterArgs) *roachpb.Error {
put, ok := args.Req.(*roachpb.ConditionalPutRequest)
if !ok {
return nil
}
if !put.Key.Equal(secondAttemptRejectKey) {
return nil
}
count++
// Reject the right request on the 2nd attempt.
if count == 2 {
return roachpb.NewErrorf("injected error; test rejecting request")
}
return nil
})

require.Error(t, txn.CommitInBatch(ctx, b), "injected")
require.NoError(t, checkPushResult(ctx, db, *origTxn, expectedAborted, tc.txnRecExpectation))
})
}
}

// Test that we're being smart about the timestamp ranges that need to be
// refreshed: when span are refreshed, they only need to be checked for writes
// above the previous time when they've been refreshed, not from the
Expand Down
46 changes: 44 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,42 @@ func (tc *txnCommitter) SendLocked(
// set. This is the only place where EndTxnRequest.Key is assigned, but we
// could be dealing with a re-issued batch after a refresh. Remember, the
// committer is below the span refresh on the interceptor stack.
var etAttempt endTxnAttempt
if et.Key == nil {
et.Key = ba.Txn.Key
etAttempt = endTxnFirstAttempt
} else {
// If this is a retry, we'll disable parallel commit. Since the previous
// attempt might have partially succeeded (i.e. the batch might have been
// split into sub-batches and some of them might have evaluated
// successfully), there might be intents laying around. If we'd perform a
// parallel commit, and the batch gets split again, and the STAGING txn
// record were written before we evaluate some of the other sub-batche. We
// could technically enter the "implicitly committed" state before all the
// sub-batches are evaluated and this is problematic: there's a race between
// evaluating those requests and other pushers coming along and
// transitioning the txn to explicitly committed (and cleaning up all the
// intents), and the evaluations of the outstanding sub-batches. If the
// randos win, then the re-evaluations will fail because we don't have
// idempotency of evaluations across a txn commit (for example, the
// re-evaluations might notice that their transaction is already committed
// and get confused).
etAttempt = endTxnRetry
if len(et.InFlightWrites) > 0 {
// Make a copy of the EndTxn, since we're going to change it below to
// disable the parallel commit.
etCpy := *et
ba.Requests[len(ba.Requests)-1].SetInner(&etCpy)
et = &etCpy
}
}

// Determine whether the commit request can be run in parallel with the rest
// of the requests in the batch. If not, move the in-flight writes currently
// attached to the EndTxn request to the LockSpans and clear the in-flight
// write set; no writes will be in-flight concurrently with the EndTxn
// request.
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et) {
if len(et.InFlightWrites) > 0 && !tc.canCommitInParallel(ctx, ba, et, etAttempt) {
// NB: when parallel commits is disabled, this is the best place to
// detect whether the batch has only distinct spans. We can set this
// flag based on whether any of previously declared in-flight writes
Expand Down Expand Up @@ -275,17 +301,33 @@ func (tc *txnCommitter) sendLockedWithElidedEndTxn(
return br, nil
}

// endTxnAttempt specifies whether it's the first time that we're attempting to
// evaluate an EndTxn request or whether it's a retry (i.e. after a successful
// refresh). There are some precautions we need to take when sending out
// retries.
type endTxnAttempt int

const (
endTxnFirstAttempt endTxnAttempt = iota
endTxnRetry
)

// canCommitInParallel determines whether the batch can issue its committing
// EndTxn in parallel with the rest of its requests and with any in-flight
// writes, which all should have corresponding QueryIntent requests in the
// batch.
func (tc *txnCommitter) canCommitInParallel(
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest,
ctx context.Context, ba roachpb.BatchRequest, et *roachpb.EndTxnRequest, etAttempt endTxnAttempt,
) bool {
if !parallelCommitsEnabled.Get(&tc.st.SV) {
return false
}

if etAttempt == endTxnRetry {
log.VEventf(ctx, 2, "retrying batch not eligible for parallel commit")
return false
}

// We're trying to parallel commit, not parallel abort.
if !et.Commit {
return false
Expand Down
Loading

0 comments on commit 1c8fc6f

Please sign in to comment.