Skip to content

Commit

Permalink
Merge #44661
Browse files Browse the repository at this point in the history
44661: kv: refresh less and retry more r=andreimatei a=andreimatei

Before this patch, when the DistSender would split a batch into multiple
sub-batches and one of the sub-batches fails, it would collect responses
for the successful ones and return them together with the error. This
used to be pretty important before we had write idempotency, because it
allowed the span refresher to only retry an EndTxn without also retring
other writes in that batch (which would have failed).

Since we've gotten idempotency in the meantime, we can retry those other
writes. In fact, it's arguably better to do it: there's a tradeoff
between refreshing and retrying. Currently the span refresher needs to
refresh the read spans of the successful sub-batches, which refresh is
at risk of failing under contention.

This patch makes the span refresher retry the whole batch without
considering partial successes. With this patch, refreshing the partial
successes is no longer needed because we'll retry those requests. In
other words, we'll refresh less and retry more.

The existing policy of refreshing more and retrying less will start to be
applied inconsistenly with #44654, where we start refreshing when the
client sees a WriteTooOld flag - but we're forced to refresh the whole
batch.

Besides the rationalizations above, this patch allows us to simplify
code by not having to deal with both responses and errors. We can thus
get rid of the enthralling comment on the client.Sender.Send() stating:
"
// The contract about whether both a response and an error can be
// returned varies between layers.
"

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Feb 10, 2020
2 parents 917005b + dc3e0b3 commit 2bd6bf4
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 57 deletions.
5 changes: 2 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ const (
// Notable implementors: client.Txn, kv.TxnCoordSender, storage.Node,
// storage.Store, storage.Replica.
type Sender interface {
// Send sends a batch for evaluation.
// The contract about whether both a response and an error can be
// returned varies between layers.
// Send sends a batch for evaluation. Either a response or an error is
// returned.
//
// The caller retains ownership of all the memory referenced by the
// BatchRequest; the callee is not allowed to hold on to any parts
Expand Down
16 changes: 3 additions & 13 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,6 @@ func splitBatchAndCheckForRefreshSpans(
//
// When the request spans ranges, it is split by range and a partial
// subset of the batch request is sent to affected ranges in parallel.
//
// Note that on error, this method will return any batch responses for
// successfully processed batch requests. This allows the caller to
// deal with potential retry situations where a batch is split so that
// EndTxn is processed alone, after earlier requests in the batch
// succeeded. Where possible, the caller may be able to update spans
// encountered in the transaction and retry just the EndTxn request to
// avoid client-side serializable txn retries.
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -686,7 +678,6 @@ func (ds *DistSender) Send(
panic("batch with MaxSpanRequestKeys needs splitting")
}

var pErr *roachpb.Error
errIdxOffset := 0
for len(parts) > 0 {
part := parts[0]
Expand All @@ -710,6 +701,7 @@ func (ds *DistSender) Send(
}

var rpl *roachpb.BatchResponse
var pErr *roachpb.Error
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
} else {
Expand All @@ -733,9 +725,7 @@ func (ds *DistSender) Send(
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}
// Break out of loop to collate batch responses received so far to
// return with error.
break
return nil, pErr
}

errIdxOffset += len(ba.Requests)
Expand All @@ -759,7 +749,7 @@ func (ds *DistSender) Send(
reply.BatchResponse_Header = lastHeader
}

return reply, pErr
return reply, nil
}

type response struct {
Expand Down
39 changes: 39 additions & 0 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kv_test

import (
"bytes"
"context"
"fmt"
"regexp"
Expand Down Expand Up @@ -2236,6 +2237,44 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// This test checks the behavior of batches that were split by the
// DistSender. We'll check that the whole batch is retried after a
// successful refresh, and that previously-successful prefix sub-batches
// are not refreshed (but are retried instead).
name: "multi-range with scan getting updated results after refresh",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
// Write to "a". This value will not be seen by the Get the first time
// it's evaluated, but it will be see when it's retried at a bumped
// timestamp. In particular, this verifies that the get is not
// refreshed, for this would fail (and lead to a client-side retry
// instead of one at the txn coord sender).
if err := db.Put(ctx, "a", "newval"); err != nil {
return err
}
// "b" is on a different range, so this put will cause a
// WriteTooOldError on the 2nd sub-batch. The error will cause a
// refresh.
return db.Put(ctx, "b", "newval2")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Get("a")
b.Put("b", "put2")
err := txn.Run(ctx, b)
if err != nil {
return err
}
gr := b.RawResponse().Responses[0].GetGet()
if b, err := gr.Value.GetBytes(); err != nil {
return err
} else if !bytes.Equal(b, []byte("newval")) {
return fmt.Errorf("expected \"newval\", got: %v", b)
}
return txn.Commit(ctx)
},
txnCoordRetry: true,
},
{
name: "cput within uncertainty interval",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down
41 changes: 7 additions & 34 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,47 +229,27 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
) (_ *roachpb.BatchResponse, _ *roachpb.Error, largestRefreshTS hlc.Timestamp) {
br, pErr := sr.sendHelper(ctx, ba)
if pErr != nil && maxRefreshAttempts > 0 {
br, pErr, largestRefreshTS = sr.maybeRetrySend(ctx, ba, br, pErr, maxRefreshAttempts)
br, pErr, largestRefreshTS = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
}
return br, pErr, largestRefreshTS
}

// maybeRetrySend attempts to catch serializable errors and avoid them by
// refreshing the txn at a larger timestamp. If it succeeds at refreshing the
// txn timestamp, it recurses into sendLockedWithRefreshAttempts and retries the
// suffix of the original batch that has not yet completed successfully.
// batch. If the refresh fails, the input pErr is returned.
func (sr *txnSpanRefresher) maybeRetrySend(
ctx context.Context,
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
maxRefreshAttempts int,
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error, hlc.Timestamp) {
// Check for an error which can be retried after updating spans.
canRetryTxn, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr)
if !canRetryTxn || !sr.canAutoRetry {
return nil, pErr, hlc.Timestamp{}
}

// If a prefix of the batch was executed, collect refresh spans for
// that executed portion, and retry the remainder. The canonical
// case is a batch split between everything up to but not including
// the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
// fails with a retryable error. We want to retry only the EndTxn.
ba.UpdateTxn(retryTxn)
retryBa := ba
if br != nil {
doneBa := ba
doneBa.Requests = ba.Requests[:len(br.Responses)]
log.VEventf(ctx, 2, "collecting refresh spans after partial batch execution of %s", doneBa)
if err := sr.appendRefreshSpans(ctx, doneBa, br); err != nil {
return nil, roachpb.NewError(err), hlc.Timestamp{}
}
retryBa.Requests = ba.Requests[len(br.Responses):]
}

log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
retryBa, retryTxn.ReadTimestamp, pErr)
ba, retryTxn.ReadTimestamp, pErr)

// Try updating the txn spans so we can retry.
if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
Expand All @@ -280,26 +260,19 @@ func (sr *txnSpanRefresher) maybeRetrySend(
// newBa.Txn.ReadTimestamp to the current timestamp. Submit the
// batch again.
retryBr, retryErr, retryLargestRefreshTS := sr.sendLockedWithRefreshAttempts(
ctx, retryBa, maxRefreshAttempts-1,
ctx, ba, maxRefreshAttempts-1,
)
if retryErr != nil {
log.VEventf(ctx, 2, "retry failed with %s", retryErr)
return nil, retryErr, hlc.Timestamp{}
}

log.VEventf(ctx, 2, "retry successful @%s", retryBa.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "retry successful @%s", ba.Txn.WriteTimestamp)
sr.autoRetryCounter.Inc(1)
retryTxn.ReadTimestamp.Forward(retryLargestRefreshTS)

// On success, combine responses if applicable and set error to nil.
if br != nil {
br.Responses = append(br.Responses, retryBr.Responses...)
retryBr.CollectedSpans = append(br.CollectedSpans, retryBr.CollectedSpans...)
br.BatchResponse_Header = retryBr.BatchResponse_Header
} else {
br = retryBr
}
return br, nil, retryTxn.ReadTimestamp
return retryBr, nil, retryTxn.ReadTimestamp
}

// tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all spans read
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/storagebase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package storagebase

// BatchEvalTestingKnobs contains testing helpers that are used during batch evaluation.
type BatchEvalTestingKnobs struct {
// TestingEvalFilter is called before evaluating each command. The
// number of times this callback is run depends on the propEvalKV
// setting, and it is therefore deprecated in favor of either
// TestingProposalFilter (which runs only on the lease holder) or
// TestingApplyFilter (which runs on each replica). If your filter is
// not idempotent, consider wrapping it in a
// TestingEvalFilter is called before evaluating each command. This filter is
// deprecated in favor of either TestingProposalFilter (which runs only on the
// lease holder) or TestingApplyFilter (which runs on each replica). If your
// filter is not idempotent, consider wrapping it in a
// ReplayProtectionFilterWrapper.
// TODO(bdarnell,tschottdorf): Migrate existing tests which use this
// to one of the other filters. See #10493
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type StoreTestingKnobs struct {
TxnWaitKnobs txnwait.TestingKnobs
ConsistencyTestingKnobs ConsistencyTestingKnobs

// TestingRequestFilter is called before evaluating each command on a
// TestingRequestFilter is called before evaluating each request on a
// replica. The filter is run before the request acquires latches, so
// blocking in the filter will not block interfering requests. If it
// returns an error, the command will not be evaluated.
Expand Down

0 comments on commit 2bd6bf4

Please sign in to comment.