Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage,kv: don't return errors with the WriteTooOld flag set #45102

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,13 +225,21 @@ func (sr *txnSpanRefresher) SendLocked(
func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
ctx context.Context, ba roachpb.BatchRequest, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn.WriteTooOld && sr.canAutoRetry {
if ba.Txn.WriteTooOld {
// The WriteTooOld flag is not supposed to be set on requests. It's only set
// by the server and it's terminated by this interceptor on the client.
log.Fatalf(ctx, "unexpected WriteTooOld request. ba: %s (txn: %s)",
ba.String(), ba.Txn.String())
}
br, pErr := sr.sendHelper(ctx, ba)
br, pErr := sr.wrapped.SendLocked(ctx, ba)

// 19.2 servers might give us an error with the WriteTooOld flag set. This
// interceptor wants to always terminate that flag. In the case of an error,
// we can just ignore it.
if pErr != nil && pErr.GetTxn() != nil {
pErr.GetTxn().WriteTooOld = false
}

if pErr == nil && br.Txn.WriteTooOld {
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
Expand Down Expand Up @@ -367,7 +375,7 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
addRefreshes(sr.refreshSpans)

// Send through wrapped lockedSender. Unlocks while sending then re-locks.
if _, batchErr := sr.sendHelper(ctx, refreshSpanBa); batchErr != nil {
if _, batchErr := sr.wrapped.SendLocked(ctx, refreshSpanBa); batchErr != nil {
log.VEventf(ctx, 2, "failed to refresh txn spans (%s); propagating original retry error", batchErr)
return false
}
Expand All @@ -376,13 +384,6 @@ func (sr *txnSpanRefresher) tryUpdatingTxnSpans(
return true
}

func (sr *txnSpanRefresher) sendHelper(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br, pErr := sr.wrapped.SendLocked(ctx, ba)
return br, pErr
}

// appendRefreshSpans appends refresh spans from the supplied batch request,
// qualified by the batch response where appropriate.
func (sr *txnSpanRefresher) appendRefreshSpans(
Expand Down
73 changes: 39 additions & 34 deletions pkg/roachpb/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 15 additions & 10 deletions pkg/roachpb/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -449,22 +449,27 @@ message Transaction {
// transaction's write timestamp because a newer value was present. Had our
// write been performed, it would have overwritten the other value even though
// that value might not have been read by a previous read in the transaction
// (i.e. lost update anomaly).
// When this flag is set, the transaction's write timestamp is also bumped.
// The flag is used, though, to return a more specific error to the client
// instead of TransactionRetryError(RETRY_SERIALIZABLE).
// (i.e. lost update anomaly). The write is still performed, but this flag is
// set and the txn's write timestamp is bumped, so the client will not be able
// to commit without performing a refresh.
//
// As explained above, this flag indicates that, if the key in question had
// been read previously by the transaction, a refresh of the transaction's
// read span will surely fail. The client does not currently take advantage of
// this flag to avoid hopeless refreshes, though.
// Since 20.1, errors do not carry this flag; only successful BatchResponses
// do. When possible, such a BatchResponse is preferred to a WriteTooOldError
// because the former leaves intents behind to act as locks.
//
// On the client, the txnSpanRefresher terminates this flag by refreshing
// eagerly when the flag is set. If the key that generated the write too old
// condition had been previously read by the transaction, a refresh of the
// transaction's read span will surely fail. The client is not currently smart
// enough to avoid hopeless refreshes, though.
//
// Historically, this field was also important for SNAPSHOT transactions which
// could commit in other situations when the write timestamp is bumped, but
// not when this flag is set (since lost updates cannot be tolerated even in
// SNAPSHOT). In SERIALIZABLE isolation, transactions generally don't commit
// with a bumped write timestamp, so this flag stopped telling us anything
// more than ReadTimestamp != WriteTimestamp.
// with a bumped write timestamp, so this flag is only telling us that a
// refresh is less likely to succeed than in other cases where
// ReadTimestamp != WriteTimestamp.
bool write_too_old = 12;
// Set of spans that the transaction has written intents into. These
// are spans which must be resolved on txn completion. Note that these
Expand Down
12 changes: 11 additions & 1 deletion pkg/storage/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,17 @@ func evaluateBatch(
ms *enginepb.MVCCStats,
ba *roachpb.BatchRequest,
readOnly bool,
) (*roachpb.BatchResponse, result.Result, *roachpb.Error) {
) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) {

defer func() {
// Ensure that errors don't carry the WriteTooOld flag set. The client
// handles non-error responses with the WriteTooOld flag set, and errors
// with this flag set confuse it.
if retErr != nil && retErr.GetTxn() != nil {
retErr.GetTxn().WriteTooOld = false
}
}()

// NB: Don't mutate BatchRequest directly.
baReqs := ba.Requests
baHeader := ba.Header
Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4353,6 +4353,47 @@ func TestRPCRetryProtectionInTxn(t *testing.T) {
})
}

// Test that errors from batch evaluation never have the WriteTooOld flag set.
// The WriteTooOld flag is supposed to only be set on successful responses.
//
// The test will construct a batch with a write that would normally cause the
// WriteTooOld flag to be set on the response, and another CPut which causes an
// error to be returned.
func TestErrorsDontCarryWriteTooOldFlag(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
cfg := TestStoreConfig(nil /* clock */)
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.StartWithStoreConfig(t, stopper, cfg)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
// Start a transaction early to get a low timestamp.
txn := roachpb.MakeTransaction(
"test", keyA, roachpb.NormalUserPriority, tc.Clock().Now(), 0 /* offset */)

// Write a value outside of the txn to cause a WriteTooOldError later.
put := putArgs(keyA, []byte("val1"))
var ba roachpb.BatchRequest
ba.Add(&put)
_, pErr := tc.Sender().Send(ctx, ba)
require.Nil(t, pErr)

// This put will cause the WriteTooOld flag to be set.
put = putArgs(keyA, []byte("val2"))
// This will cause a ConditionFailedError.
cput := cPutArgs(keyB, []byte("missing"), []byte("newVal"))
ba.Header = roachpb.Header{Txn: &txn}
ba.Add(&put)
ba.Add(&cput)
assignSeqNumsForReqs(&txn, &put, &cput)
_, pErr = tc.Sender().Send(ctx, ba)
require.IsType(t, pErr.GetDetail(), &roachpb.ConditionFailedError{})
require.False(t, pErr.GetTxn().WriteTooOld)
}

// TestReplicaLaziness verifies that Raft Groups are brought up lazily.
func TestReplicaLaziness(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down