Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: detect lease transfer and back off in DistSender #32877

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 46 additions & 16 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ var (
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderInLeaseTransferBackoffsCount = metric.Metadata{
Name: "distsender.errors.inleasetransferbackoffs",
Help: "Number of times backed off due to NotLeaseHolderErrors during lease transfer.",
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand All @@ -104,26 +110,28 @@ var rangeDescriptorCacheSize = settings.RegisterIntSetting(

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
return DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
}
}

Expand Down Expand Up @@ -1324,6 +1332,15 @@ func (ds *DistSender) sendToReplicas(
}
br, err := transport.SendNext(ctx, ba)

// maxSeenLeaseSequence tracks the maximum LeaseSequence seen in a
// NotLeaseHolderError. If we encounter a sequence number less than or equal
// to maxSeenLeaseSequence number in a subsequent NotLeaseHolderError then
// the range must be experiencing a least transfer and the client should back
// off using inTransferRetry.
maxSeenLeaseSequence := roachpb.LeaseSequence(-1)
inTransferRetry := retry.StartWithCtx(ctx, ds.rpcRetryOptions)
inTransferRetry.Next() // The first call to Next does not block.

// This loop will retry operations that fail with errors that reflect
// per-replica state and may succeed on other replicas.
for {
Expand Down Expand Up @@ -1409,6 +1426,19 @@ func (ds *DistSender) sendToReplicas(
transport.MoveToFront(*lh)
}
}
if l := tErr.Lease; !propagateError && l != nil {
// Check whether we've seen this lease or a prior lease before and
// backoff if so or update maxSeenLeaseSequence if not.
if l.Sequence > maxSeenLeaseSequence {
maxSeenLeaseSequence = l.Sequence
inTransferRetry.Reset() // The following Next call will not block.
} else {
ds.metrics.InLeaseTransferBackoffs.Inc(1)
log.VErrEventf(ctx, 2, "backing off due to NotLeaseHolderErr at "+
"LeaseSequence %d <= %d", l.Sequence, maxSeenLeaseSequence)
}
inTransferRetry.Next()
}
default:
propagateError = true
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -581,6 +582,90 @@ func TestRetryOnNotLeaseHolderError(t *testing.T) {
}
}

// TestRetryOnNotLeaseHolderError verifies that the DistSender backs off upon
// receiving multiple NotLeaseHolderErrors without observing an increase in
// LeaseSequence.
func TestBackoffOnNotLeaseHolderErrorDuringTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())

g, clock := makeGossip(t, stopper)
leaseHolders := testUserRangeDescriptor3Replicas.Replicas
for _, n := range leaseHolders {
if err := g.AddInfoProto(
gossip.MakeNodeIDKey(n.NodeID),
&roachpb.NodeDescriptor{
NodeID: n.NodeID,
Address: util.MakeUnresolvedAddr("tcp", fmt.Sprintf("neverused:%d", n.NodeID)),
},
gossip.NodeDescriptorTTL,
); err != nil {
t.Fatal(err)
}
}
var sequences []roachpb.LeaseSequence
var testFn simpleSendFn = func(
_ context.Context,
_ SendOptions,
_ ReplicaSlice,
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := &roachpb.BatchResponse{}
if len(sequences) > 0 {
seq := sequences[0]
sequences = sequences[1:]
lease := roachpb.Lease{
Sequence: seq,
Replica: leaseHolders[int(seq)%2],
}
reply.Error = roachpb.NewError(
&roachpb.NotLeaseHolderError{
Replica: leaseHolders[int(seq)%2],
LeaseHolder: &leaseHolders[(int(seq)+1)%2],
Lease: &lease,
})
return reply, nil
}
// Return an error to bail out of retries.
reply.Error = roachpb.NewErrorf("boom")
return reply, nil
}

cfg := DistSenderConfig{
AmbientCtx: log.AmbientContext{Tracer: tracing.NewTracer()},
Clock: clock,
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(testFn),
},
RangeDescriptorDB: threeReplicaMockRangeDescriptorDB,
NodeDialer: nodedialer.New(nil, gossip.AddressResolver(g)),
RPCRetryOptions: &retry.Options{
InitialBackoff: time.Microsecond,
MaxBackoff: time.Microsecond,
},
}
for i, c := range []struct {
leaseSequences []roachpb.LeaseSequence
expected int64
}{
{[]roachpb.LeaseSequence{1, 0, 1, 2}, 2},
{[]roachpb.LeaseSequence{0}, 0},
{[]roachpb.LeaseSequence{1, 0, 1, 2, 1}, 3},
} {
sequences = c.leaseSequences
ds := NewDistSender(cfg, g)
v := roachpb.MakeValueFromString("value")
put := roachpb.NewPut(roachpb.Key("a"), v)
if _, pErr := client.SendWrapped(context.Background(), ds, put); !testutils.IsPError(pErr, "boom") {
t.Fatalf("%d: unexpected error: %v", i, pErr)
}
if got := ds.Metrics().InLeaseTransferBackoffs.Count(); got != c.expected {
t.Fatalf("%d: expected %d backoffs, got %d", i, c.expected, got)
}
}
}

// This test verifies that when we have a cached leaseholder that is down
// it is ejected from the cache.
func TestDistSenderDownNodeEvictLeaseholder(t *testing.T) {
Expand Down