Skip to content

Commit

Permalink
Merge #45102
Browse files Browse the repository at this point in the history
45102: storage,kv: don't return errors with the WriteTooOld flag set r=andreimatei a=andreimatei

The span refresher interceptor is supposed to terminate the
txn.WriteTooOld flag. For that, it asserts that the request doesn't not
have that flag set on it; only responses should have it. There was,
however, a case where terminating the flag was not happening as
intended: when a non-retriable error was coming back with the
WriteTooOld flag set, the flag made its way into the client's txn,
slipping past the interceptor. This was causing further requests to fail
the aforementioned assertion. (Since errors are at play here, the only
further request should be a Rollback).

This patch fixes this by having servers never return errors with this
flag set. Setting this flag on an error does not make sense anyway.

Touches #40786

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Feb 14, 2020
2 parents 55089f5 + 189151d commit 12d6459
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 55 deletions.
21 changes: 11 additions & 10 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,21 @@ func (sr *txnSpanRefresher) SendLocked(
func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
ctx context.Context, ba roachpb.BatchRequest, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn.WriteTooOld && sr.canAutoRetry {
if ba.Txn.WriteTooOld {
// The WriteTooOld flag is not supposed to be set on requests. It's only set
// by the server and it's terminated by this interceptor on the client.
log.Fatalf(ctx, "unexpected WriteTooOld request. ba: %s (txn: %s)",
ba.String(), ba.Txn.String())
}
br, pErr := sr.sendHelper(ctx, ba)
br, pErr := sr.wrapped.SendLocked(ctx, ba)

// 19.2 servers might give us an error with the WriteTooOld flag set. This
// interceptor wants to always terminate that flag. In the case of an error,
// we can just ignore it.
if pErr != nil && pErr.GetTxn() != nil {
pErr.GetTxn().WriteTooOld = false
}

if pErr == nil && br.Txn.WriteTooOld {
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
Expand Down Expand Up @@ -367,7 +375,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
addRefreshes(sr.refreshSpans)

// Send through wrapped lockedSender. Unlocks while sending then re-locks.
if _, batchErr := sr.sendHelper(ctx, refreshSpanBa); batchErr != nil {
if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil {
log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr)
return false
}
Expand All @@ -376,13 +384,6 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
return true
}

func (sr *txnSpanRefresher) sendHelper(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br, pErr := sr.wrapped.SendLocked(ctx, ba)
return br, pErr
}

// appendRefreshSpans appends refresh spans from the supplied batch request,
// qualified by the batch response where appropriate.
func (sr *txnSpanRefresher) appendRefreshSpans(
Expand Down
73 changes: 39 additions & 34 deletions pkg/roachpb/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 15 additions & 10 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -449,22 +449,27 @@ message Transaction {
// transaction's write timestamp because a newer value was present. Had our
// write been performed, it would have overwritten the other value even though
// that value might not have been read by a previous read in the transaction
// (i.e. lost update anomaly).
// When this flag is set, the transaction's write timestamp is also bumped.
// The flag is used, though, to return a more specific error to the client
// instead of TransactionRetryError(RETRY_SERIALIZABLE).
// (i.e. lost update anomaly). The write is still performed, but this flag is
// set and the txn's write timestamp is bumped, so the client will not be able
// to commit without performing a refresh.
//
// As explained above, this flag indicates that, if the key in question had
// been read previously by the transaction, a refresh of the transaction's
// read span will surely fail. The client does not currently take advantage of
// this flag to avoid hopeless refreshes, though.
// Since 20.1, errors do not carry this flag; only successful BatchResponses
// do. When possible, such a BatchResponse is preferred to a WriteTooOldError
// because the former leaves intents behind to act as locks.
//
// On the client, the txnSpanRefresher terminates this flag by refreshing
// eagerly when the flag is set. If the key that generated the write too old
// condition had been previously read by the transaction, a refresh of the
// transaction's read span will surely fail. The client is not currently smart
// enough to avoid hopeless refreshes, though.
//
// Historically, this field was also important for SNAPSHOT transactions which
// could commit in other situations when the write timestamp is bumped, but
// not when this flag is set (since lost updates cannot be tolerated even in
// SNAPSHOT). In SERIALIZABLE isolation, transactions generally don't commit
// with a bumped write timestamp, so this flag stopped telling us anything
// more than ReadTimestamp != WriteTimestamp.
// with a bumped write timestamp, so this flag is only telling us that a
// refresh is less likely to succeed than in other cases where
// ReadTimestamp != WriteTimestamp.
bool write_too_old = 12;
// Set of spans that the transaction has written intents into. These
// are spans which must be resolved on txn completion. Note that these
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,17 @@ func evaluateBatch(
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
readOnly bool,
) (*roachpb.BatchResponse, result.Result, *roachpb.Error) {
) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) {

defer func() {
// Ensure that errors don't carry the WriteTooOld flag set. The client
// handles non-error responses with the WriteTooOld flag set, and errors
// with this flag set confuse it.
if retErr != nil && retErr.GetTxn() != nil {
retErr.GetTxn().WriteTooOld = false
}
}()

// NB: Don't mutate BatchRequest directly.
baReqs := ba.Requests
baHeader := ba.Header
Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4353,6 +4353,47 @@ func TestRPCRetryProtectionInTxn(t *testing.T) {
})
}

// Test that errors from batch evaluation never have the WriteTooOld flag set.
// The WriteTooOld flag is supposed to only be set on successful responses.
//
// The test will construct a batch with a write that would normally cause the
// WriteTooOld flag to be set on the response, and another CPut which causes an
// error to be returned.
func TestErrorsDontCarryWriteTooOldFlag(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
cfg := TestStoreConfig(nil /* clock */)
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
// Start a transaction early to get a low timestamp.
txn := roachpb.MakeTransaction(
"test", keyA, roachpb.NormalUserPriority, tc.Clock().Now(), 0 /* offset */)

// Write a value outside of the txn to cause a WriteTooOldError later.
put := putArgs(keyA, []byte("val1"))
var ba roachpb.BatchRequest
ba.Add(&put)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// This put will cause the WriteTooOld flag to be set.
put = putArgs(keyA, []byte("val2"))
// This will cause a ConditionFailedError.
cput := cPutArgs(keyB, []byte("missing"), []byte("newVal"))
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&put)
ba.Add(&cput)
assignSeqNumsForReqs(&txn, &put, &cput)
_, pErr = tc.Sender().Send(ctx, ba)
require.IsType(t, pErr.GetDetail(), &roachpb.ConditionFailedError{})
require.False(t, pErr.GetTxn().WriteTooOld)
}

// TestReplicaLaziness verifies that Raft Groups are brought up lazily.
func TestReplicaLaziness(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down

0 comments on commit 12d6459

Please sign in to comment.