Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: refresh less and retry more #44661

Merged
merged 1 commit into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ const (
// Notable implementors: client.Txn, kv.TxnCoordSender, storage.Node,
// storage.Store, storage.Replica.
type Sender interface {
// Send sends a batch for evaluation.
// The contract about whether both a response and an error can be
// returned varies between layers.
// Send sends a batch for evaluation. Either a response or an error is
// returned.
//
// The caller retains ownership of all the memory referenced by the
// BatchRequest; the callee is not allowed to hold on to any parts
Expand Down
16 changes: 3 additions & 13 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,6 @@ func splitBatchAndCheckForRefreshSpans(
//
// When the request spans ranges, it is split by range and a partial
// subset of the batch request is sent to affected ranges in parallel.
//
// Note that on error, this method will return any batch responses for
// successfully processed batch requests. This allows the caller to
// deal with potential retry situations where a batch is split so that
// EndTxn is processed alone, after earlier requests in the batch
// succeeded. Where possible, the caller may be able to update spans
// encountered in the transaction and retry just the EndTxn request to
// avoid client-side serializable txn retries.
func (ds *DistSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
Expand Down Expand Up @@ -686,7 +678,6 @@ func (ds *DistSender) Send(
panic("batch with MaxSpanRequestKeys needs splitting")
}

var pErr *roachpb.Error
errIdxOffset := 0
for len(parts) > 0 {
part := parts[0]
Expand All @@ -710,6 +701,7 @@ func (ds *DistSender) Send(
}

var rpl *roachpb.BatchResponse
var pErr *roachpb.Error
if withParallelCommit {
rpl, pErr = ds.divideAndSendParallelCommit(ctx, ba, rs, 0 /* batchIdx */)
} else {
Expand All @@ -733,9 +725,7 @@ func (ds *DistSender) Send(
if pErr.Index != nil && pErr.Index.Index != -1 {
pErr.Index.Index += int32(errIdxOffset)
}
// Break out of loop to collate batch responses received so far to
// return with error.
break
return nil, pErr
}

errIdxOffset += len(ba.Requests)
Expand All @@ -759,7 +749,7 @@ func (ds *DistSender) Send(
reply.BatchResponse_Header = lastHeader
}

return reply, pErr
return reply, nil
}

type response struct {
Expand Down
39 changes: 39 additions & 0 deletions pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package kv_test

import (
"bytes"
"context"
"fmt"
"regexp"
Expand Down Expand Up @@ -2236,6 +2237,44 @@ func TestTxnCoordSenderRetries(t *testing.T) {
// We expect the request to succeed after a server-side retry.
txnCoordRetry: false,
},
{
// This test checks the behavior of batches that were split by the
// DistSender. We'll check that the whole batch is retried after a
// successful refresh, and that previously-successful prefix sub-batches
// are not refreshed (but are retried instead).
name: "multi-range with scan getting updated results after refresh",
afterTxnStart: func(ctx context.Context, db *client.DB) error {
// Write to "a". This value will not be seen by the Get the first time
// it's evaluated, but it will be see when it's retried at a bumped
// timestamp. In particular, this verifies that the get is not
// refreshed, for this would fail (and lead to a client-side retry
// instead of one at the txn coord sender).
if err := db.Put(ctx, "a", "newval"); err != nil {
return err
}
// "b" is on a different range, so this put will cause a
// WriteTooOldError on the 2nd sub-batch. The error will cause a
// refresh.
return db.Put(ctx, "b", "newval2")
},
retryable: func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
b.Get("a")
b.Put("b", "put2")
err := txn.Run(ctx, b)
if err != nil {
return err
}
gr := b.RawResponse().Responses[0].GetGet()
if b, err := gr.Value.GetBytes(); err != nil {
return err
} else if !bytes.Equal(b, []byte("newval")) {
return fmt.Errorf("expected \"newval\", got: %v", b)
}
return txn.Commit(ctx)
},
txnCoordRetry: true,
},
{
name: "cput within uncertainty interval",
beforeTxnStart: func(ctx context.Context, db *client.DB) error {
Expand Down
41 changes: 7 additions & 34 deletions pkg/kv/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,47 +229,27 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
) (_ *roachpb.BatchResponse, _ *roachpb.Error, largestRefreshTS hlc.Timestamp) {
br, pErr := sr.sendHelper(ctx, ba)
if pErr != nil && maxRefreshAttempts > 0 {
br, pErr, largestRefreshTS = sr.maybeRetrySend(ctx, ba, br, pErr, maxRefreshAttempts)
br, pErr, largestRefreshTS = sr.maybeRetrySend(ctx, ba, pErr, maxRefreshAttempts)
}
return br, pErr, largestRefreshTS
}

// maybeRetrySend attempts to catch serializable errors and avoid them by
// refreshing the txn at a larger timestamp. If it succeeds at refreshing the
// txn timestamp, it recurses into sendLockedWithRefreshAttempts and retries the
// suffix of the original batch that has not yet completed successfully.
// batch. If the refresh fails, the input pErr is returned.
func (sr *txnSpanRefresher) maybeRetrySend(
ctx context.Context,
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
pErr *roachpb.Error,
maxRefreshAttempts int,
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, maxRefreshAttempts int,
) (*roachpb.BatchResponse, *roachpb.Error, hlc.Timestamp) {
// Check for an error which can be retried after updating spans.
canRetryTxn, retryTxn := roachpb.CanTransactionRetryAtRefreshedTimestamp(ctx, pErr)
if !canRetryTxn || !sr.canAutoRetry {
return nil, pErr, hlc.Timestamp{}
}

// If a prefix of the batch was executed, collect refresh spans for
// that executed portion, and retry the remainder. The canonical
// case is a batch split between everything up to but not including
// the EndTxn. Requests up to the EndTxn succeed, but the EndTxn
// fails with a retryable error. We want to retry only the EndTxn.
ba.UpdateTxn(retryTxn)
retryBa := ba
if br != nil {
doneBa := ba
doneBa.Requests = ba.Requests[:len(br.Responses)]
log.VEventf(ctx, 2, "collecting refresh spans after partial batch execution of %s", doneBa)
if err := sr.appendRefreshSpans(ctx, doneBa, br); err != nil {
return nil, roachpb.NewError(err), hlc.Timestamp{}
}
retryBa.Requests = ba.Requests[len(br.Responses):]
}

log.VEventf(ctx, 2, "retrying %s at refreshed timestamp %s because of %s",
retryBa, retryTxn.ReadTimestamp, pErr)
ba, retryTxn.ReadTimestamp, pErr)

// Try updating the txn spans so we can retry.
if ok := sr.tryUpdatingTxnSpans(ctx, retryTxn); !ok {
Expand All @@ -280,26 +260,19 @@ func (sr *txnSpanRefresher) maybeRetrySend(
// newBa.Txn.ReadTimestamp to the current timestamp. Submit the
// batch again.
retryBr, retryErr, retryLargestRefreshTS := sr.sendLockedWithRefreshAttempts(
ctx, retryBa, maxRefreshAttempts-1,
ctx, ba, maxRefreshAttempts-1,
)
if retryErr != nil {
log.VEventf(ctx, 2, "retry failed with %s", retryErr)
return nil, retryErr, hlc.Timestamp{}
}

log.VEventf(ctx, 2, "retry successful @%s", retryBa.Txn.WriteTimestamp)
log.VEventf(ctx, 2, "retry successful @%s", ba.Txn.WriteTimestamp)
sr.autoRetryCounter.Inc(1)
retryTxn.ReadTimestamp.Forward(retryLargestRefreshTS)

// On success, combine responses if applicable and set error to nil.
if br != nil {
br.Responses = append(br.Responses, retryBr.Responses...)
retryBr.CollectedSpans = append(br.CollectedSpans, retryBr.CollectedSpans...)
br.BatchResponse_Header = retryBr.BatchResponse_Header
} else {
br = retryBr
}
return br, nil, retryTxn.ReadTimestamp
return retryBr, nil, retryTxn.ReadTimestamp
}

// tryUpdatingTxnSpans sends Refresh and RefreshRange commands to all spans read
Expand Down
10 changes: 4 additions & 6 deletions pkg/storage/storagebase/knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package storagebase

// BatchEvalTestingKnobs contains testing helpers that are used during batch evaluation.
type BatchEvalTestingKnobs struct {
// TestingEvalFilter is called before evaluating each command. The
// number of times this callback is run depends on the propEvalKV
// setting, and it is therefore deprecated in favor of either
// TestingProposalFilter (which runs only on the lease holder) or
// TestingApplyFilter (which runs on each replica). If your filter is
// not idempotent, consider wrapping it in a
// TestingEvalFilter is called before evaluating each command. This filter is
// deprecated in favor of either TestingProposalFilter (which runs only on the
// lease holder) or TestingApplyFilter (which runs on each replica). If your
// filter is not idempotent, consider wrapping it in a
// ReplayProtectionFilterWrapper.
// TODO(bdarnell,tschottdorf): Migrate existing tests which use this
// to one of the other filters. See #10493
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type StoreTestingKnobs struct {
TxnWaitKnobs txnwait.TestingKnobs
ConsistencyTestingKnobs ConsistencyTestingKnobs

// TestingRequestFilter is called before evaluating each command on a
// TestingRequestFilter is called before evaluating each request on a
// replica. The filter is run before the request acquires latches, so
// blocking in the filter will not block interfering requests. If it
// returns an error, the command will not be evaluated.
Expand Down