Skip to content

Commit

Permalink
kv: refresh less and retry more
Browse files Browse the repository at this point in the history
Before this patch, when the DistSender would split a batch into multiple
sub-batches and one of the sub-batches fails, it would collect responses
for the successful ones and return them together with the error. This
used to be pretty important before we had write idempotency, because it
allowed the span refresher to only retry an EndTxn without also retring
other writes in that batch (which would have failed).

Since we've gotten idempotency in the meantime, we can retry those other
writes. In fact, it's arguably better to do it: there's a tradeoff
between refreshing and retrying. Currently the span refresher needs to
refresh the read spans of the successful sub-batches, which refresh is
at risk of failing under contention.

This patch makes the span refresher retry the whole batch without
considering partial successes. With this patch, refreshing the partial
successes is no longer needed because we'll retry those requests. In
other words, we'll refresh less and retry more.

The existing policy of refreshing more and retrying less will start to be
applied inconsistenly with #44654, where we start refreshing when the
client sees a WriteTooOld flag - but we're forced to refresh the whole
batch.

Besides the rationalizations above, this patch allows us to simplify
code by not having to deal with both responses and errors. We can thus
get rid of the enthralling comment on the client.Sender.Send() stating:
"
// The contract about whether both a response and an error can be
// returned varies between layers.
"

Release note: None
  • Loading branch information
andreimatei committed Feb 4, 2020
1 parent 60d40b8 commit 4f356d8
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 42 deletions.
5 changes: 2 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ const (
// Notable implementors: client.Txn, kv.TxnCoordSender, storage.Node,
// storage.Store, storage.Replica.
type Sender interface {
// Send sends a batch for evaluation.
// The contract about whether both a response and an error can be
// returned varies between layers.
// Send sends a batch for evaluation. Either a response or an error is
// returned.
//
// The caller retains ownership of all the memory referenced by the
// BatchRequest; the callee is not allowed to hold on to any parts
Expand Down
16 changes: 3 additions & 13 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,6 @@ func splitBatchAndCheckForRefreshSpans(
//
// When the request spans ranges, it is split by range and a partial
// subset of the batch request is sent to affected ranges in parallel.
//
// Note that on error, this method will return any batch responses for
// successfully processed batch requests. This allows the caller to
// deal with potential retry situations where a batch is split so that
// EndTxn is processed alone, after earlier requests in the batch
// succeeded. Where possible, the caller may be able to update spans
// encountered in the transaction and retry just the EndTxn request to
// avoid client-side serializable txn retries.
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -686,7 +678,6 @@ func (ds *DistSender) Send(
panic("batch with MaxSpanRequestKeys needs splitting")
}

var pErr *roachpb.Error
errIdxOffset := 0
for len(parts) > 0 {
part := parts[0]
Expand All @@ -710,6 +701,7 @@ func (ds *DistSender) Send(
}

var rpl *roachpb.BatchResponse
var pErr *roachpb.Error
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
} else {
Expand All @@ -733,9 +725,7 @@ func (ds *DistSender) Send(
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}
// Break out of loop to collate batch responses received so far to
// return with error.
break
return nil, pErr
}

errIdxOffset += len(ba.Requests)
Expand All @@ -759,7 +749,7 @@ func (ds *DistSender) Send(
reply.BatchResponse_Header = lastHeader
}

return reply, pErr
return reply, nil
}

type response struct {
Expand Down
36 changes: 36 additions & 0 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kv_test

import (
"bytes"
"context"
"fmt"
"regexp"
Expand Down Expand Up @@ -2232,6 +2233,41 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// Expect a transaction coord retry, which should succeed.
txnCoordRetry: true,
},
{
// This test checks the behavior of batches that were split by the
// DistSender. We'll check that the whole batch is retried after a
// successful refresh, and that previously-successful prefix sub-batches
// are not refreshed (but are retried instead).
name: "multi-range with scan getting updated results after refresh",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
// Write to "a". This value will not be seen by the Get the first time
// it's evaluated, but it will be see when it's retried at a bumped
// timestamp.
if err := db.Put(ctx, "a", "newval"); err != nil {
return err
}
// This will cause a WriteTooOldError on the 2nd sub-batch, which will
// cause a refresh.
return db.Put(ctx, "b", "newval")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Get("a")
b.Put("b", "put2")
err := txn.Run(ctx, b)
if err != nil {
return err
}
gr := b.RawResponse().Responses[0].GetGet()
if b, err := gr.Value.GetBytes(); err != nil {
return err
} else if bytes.Compare(b, []byte("newval")) != 0 {
return fmt.Errorf("expected \"newval\", got: %v", b)
}
return txn.Commit(ctx)
},
txnCoordRetry: true,
},
{
name: "cput within uncertainty interval",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down
22 changes: 3 additions & 19 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,25 +238,9 @@ func (sr *txnSpanRefresher) maybeRetrySend(
return nil, pErr, hlc.Timestamp{}
}

// If a prefix of the batch was executed, collect refresh spans for
// that executed portion, and retry the remainder. The canonical
// case is a batch split between everything up to but not including
// the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
// fails with a retryable error. We want to retry only the EndTxn.
ba.UpdateTxn(retryTxn)
retryBa := ba
if br != nil {
doneBa := ba
doneBa.Requests = ba.Requests[:len(br.Responses)]
log.VEventf(ctx, 2, "collecting refresh spans after partial batch execution of %s", doneBa)
if err := sr.appendRefreshSpans(ctx, doneBa, br); err != nil {
return nil, roachpb.NewError(err), hlc.Timestamp{}
}
retryBa.Requests = ba.Requests[len(br.Responses):]
}

log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
retryBa, retryTxn.ReadTimestamp, pErr)
ba, retryTxn.ReadTimestamp, pErr)

// Try updating the txn spans so we can retry.
if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
Expand All @@ -267,14 +251,14 @@ func (sr *txnSpanRefresher) maybeRetrySend(
// newBa.Txn.ReadTimestamp to the current timestamp. Submit the
// batch again.
retryBr, retryErr, retryLargestRefreshTS := sr.sendLockedWithRefreshAttempts(
ctx, retryBa, maxRefreshAttempts-1,
ctx, ba, maxRefreshAttempts-1,
)
if retryErr != nil {
log.VEventf(ctx, 2, "retry failed with %s", retryErr)
return nil, retryErr, hlc.Timestamp{}
}

log.VEventf(ctx, 2, "retry successful @%s", retryBa.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "retry successful @%s", ba.Txn.WriteTimestamp)
sr.autoRetryCounter.Inc(1)
retryTxn.ReadTimestamp.Forward(retryLargestRefreshTS)

Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/storagebase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package storagebase

// BatchEvalTestingKnobs contains testing helpers that are used during batch evaluation.
type BatchEvalTestingKnobs struct {
// TestingEvalFilter is called before evaluating each command. The
// number of times this callback is run depends on the propEvalKV
// setting, and it is therefore deprecated in favor of either
// TestingProposalFilter (which runs only on the lease holder) or
// TestingApplyFilter (which runs on each replica). If your filter is
// not idempotent, consider wrapping it in a
// TestingEvalFilter is called before evaluating each command. This filter is
// deprecated in favor of either TestingProposalFilter (which runs only on the
// lease holder) or TestingApplyFilter (which runs on each replica). If your
// filter is not idempotent, consider wrapping it in a
// ReplayProtectionFilterWrapper.
// TODO(bdarnell,tschottdorf): Migrate existing tests which use this
// to one of the other filters. See #10493
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type StoreTestingKnobs struct {
TxnWaitKnobs txnwait.TestingKnobs
ConsistencyTestingKnobs ConsistencyTestingKnobs

// TestingRequestFilter is called before evaluating each command on a
// TestingRequestFilter is called before evaluating each request on a
// replica. The filter is run before the request acquires latches, so
// blocking in the filter will not block interfering requests. If it
// returns an error, the command will not be evaluated.
Expand Down

0 comments on commit 4f356d8

Please sign in to comment.