diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 92f54e6de9d4..a8164ce650b5 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -218,6 +218,7 @@ go_test( "client_replica_backpressure_test.go", "client_replica_gc_test.go", "client_replica_test.go", + "client_split_burst_test.go", "client_split_test.go", "client_status_test.go", "client_tenant_test.go", diff --git a/pkg/kv/kvserver/client_split_burst_test.go b/pkg/kv/kvserver/client_split_burst_test.go new file mode 100644 index 000000000000..74cab23db5d3 --- /dev/null +++ b/pkg/kv/kvserver/client_split_burst_test.go @@ -0,0 +1,202 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "math" + "math/rand" + "sync/atomic" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +type splitBurstTest struct { + *testcluster.TestCluster + baseKey roachpb.Key + magicStickyBit hlc.Timestamp + numSplitsSeenOnSlowFollower *int32 // atomic + initialRaftSnaps int +} + +func (sbt *splitBurstTest) SplitWithDelay(t *testing.T, location byte) { + t.Helper() + require.NoError(t, sbt.SplitWithDelayE(location)) +} + +func (sbt *splitBurstTest) SplitWithDelayE(location byte) error { + k := append([]byte(nil), sbt.baseKey...) + splitKey := append(k, location) + _, _, err := sbt.SplitRangeWithExpiration(splitKey, sbt.magicStickyBit) + return err +} + +func (sbt *splitBurstTest) NumRaftSnaps(t *testing.T) int { + var totalSnaps int + for i := range sbt.Servers { + var n int // num rows (sanity check against test rotting) + var c int // num Raft snapshots + if err := sbt.ServerConn(i).QueryRow(` +SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE + name = 'range.snapshots.applied-voter' +`).Scan(&n, &c); err != nil { + t.Fatal(err) + } + require.EqualValues(t, 1, n) + totalSnaps += c + } + return totalSnaps - sbt.initialRaftSnaps +} + +func setupSplitBurstTest(t *testing.T, delay time.Duration) *splitBurstTest { + var magicStickyBit = hlc.Timestamp{WallTime: math.MaxInt64 - 123, Logical: 987654321} + + numSplitsSeenOnSlowFollower := new(int32) // atomic + var quiesceCh <-chan struct{} + knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) { + if args.Split == nil || delay == 0 { + return 0, nil + } + if args.Split.RightDesc.GetStickyBit() != magicStickyBit { + return 0, nil + } + select { + case <-time.After(delay): + case <-quiesceCh: + } + atomic.AddInt32(numSplitsSeenOnSlowFollower, 1) + return 0, nil + }, + }} + + ctx := context.Background() + + // n1 and n3 are fast, n2 is slow (to apply the splits). We need + // three nodes here; delaying the apply loop on n2 also delays + // how quickly commands can reach quorum and would backpressure + // the splits by accident. + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{ + 1: {Knobs: knobs}, + }, + ReplicationMode: base.ReplicationManual, + }) + defer t.Cleanup(func() { + tc.Stopper().Stop(ctx) + }) + quiesceCh = tc.Stopper().ShouldQuiesce() + + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Target(1), tc.Target(2)) + + sbc := &splitBurstTest{ + TestCluster: tc, + baseKey: k, + magicStickyBit: magicStickyBit, + numSplitsSeenOnSlowFollower: numSplitsSeenOnSlowFollower, + } + sbc.initialRaftSnaps = sbc.NumRaftSnaps(t) + return sbc +} + +func TestSplitBurstWithSlowFollower(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + t.Run("forward", func(t *testing.T) { + // When splitting at an increasing sequence of keys, in each step we split + // the most recently split range, and we expect the splits to wait for that + // range to have caught up its follower across the preceding split, which + // was delayed as well. So when we're done splitting we should have seen at + // least (numSplits-1) splits get applied on the slow follower. + // This end-to-end exercises `splitDelayHelper`. + // + // This test is fairly slow because every split will incur a 1s penalty + // (dictated by the raft leader's probing interval). We could fix this + // delay here and in production if we had a way to send a signal from + // the slow follower to the leader when the split trigger initializes + // the right-hand side. This is actually an interesting point, because + // the split trigger *replaces* a snapshot - but doesn't fully act like + // one: when a raft group applies a snapshot, it generates an MsgAppResp + // to the leader which will let the leader probe proactively. We could + // signal the split trigger to the raft group as a snapshot being applied + // (as opposed to recreating the in-memory instance as we do now), and + // then this MsgAppResp should be created automatically. + + sbt := setupSplitBurstTest(t, 50*time.Millisecond) + defer sbt.Stopper().Stop(ctx) + + const numSplits = byte(5) + + for i := byte(0); i < numSplits; i++ { + sbt.SplitWithDelay(t, i) + } + // We should have applied all but perhaps the last split on the slow node. + // If we didn't, that indicates a failure to delay the splits accordingly. + require.GreaterOrEqual(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower), int32(numSplits-1)) + require.Zero(t, sbt.NumRaftSnaps(t)) + }) + t.Run("backward", func(t *testing.T) { + // When splitting at a decreasing sequence of keys, we're repeatedly splitting + // the first range. All of its followers are initialized to begin with, and + // even though there is a slow follower, `splitDelayHelper` isn't interested in + // delaying this here (which would imply that it's trying to check that every- + // one is "caught up"). + // We set a 100s timeout so that below we can assert that `splitDelayHelper` + // isn't somehow forcing us to wait here. + infiniteDelay := 100 * time.Second + sbt := setupSplitBurstTest(t, infiniteDelay) + defer sbt.Stopper().Stop(ctx) + + const numSplits = byte(50) + + for i := byte(0); i < numSplits; i++ { + tBegin := timeutil.Now() + sbt.SplitWithDelay(t, numSplits-i) + if dur := timeutil.Since(tBegin); dur > infiniteDelay { + t.Fatalf("waited %s for split #%d", dur, i+1) + } + } + require.Zero(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower)) + require.Zero(t, sbt.NumRaftSnaps(t)) + }) + t.Run("random", func(t *testing.T) { + // When splitting randomly, we'll see a mixture of forward and backward + // splits, so we can't assert on how many split triggers we observe. + // However, there still shouldn't be any snapshots. + sbt := setupSplitBurstTest(t, 10*time.Millisecond) + defer sbt.Stopper().Stop(ctx) + + const numSplits = 20 + perm := rand.Perm(numSplits) + + doSplit := func(ctx context.Context, i int) error { + return sbt.SplitWithDelayE(byte(perm[i])) + } + require.NoError(t, ctxgroup.GroupWorkers(ctx, numSplits, doSplit)) + + require.Zero(t, sbt.NumRaftSnaps(t)) + }) +} diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 865eb4eb0b6e..32a25ae6e70d 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -49,7 +49,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -518,69 +517,6 @@ func TestStoreRangeSplitAtRangeBounds(t *testing.T) { } } -// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica -// resulting from a split hasn't been initialized via the split trigger yet, a -// grace period prevents the replica from requesting an errant Raft snapshot. -// This is verified by running a number of splits and asserting that no Raft -// snapshots are observed. As a nice side effect, this also verifies that log -// truncations don't cause any Raft snapshots in this test. -func TestSplitTriggerRaftSnapshotRace(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - ctx := context.Background() - const numNodes = 3 - var args base.TestClusterArgs - // NB: the merge queue is enabled for additional "chaos". Note that the test - // uses three nodes and so there is no replica movement, which would other- - // wise tickle Raft snapshots for unrelated reasons. - tc := testcluster.StartTestCluster(t, numNodes, args) - defer tc.Stopper().Stop(ctx) - - numSplits := 100 - if util.RaceEnabled { - // Running 100 splits is overkill in race builds. - numSplits = 10 - } - perm := rand.Perm(numSplits) - idx := int32(-1) // accessed atomically - - numRaftSnaps := func(when string) int { - var totalSnaps int - for i := 0; i < numNodes; i++ { - var n int // num rows (sanity check against test rotting) - var c int // num Raft snapshots - if err := tc.ServerConn(i).QueryRow(` -SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE - name = 'range.snapshots.applied-voter' -`).Scan(&n, &c); err != nil { - t.Fatal(err) - } - if expRows := 1; n != expRows { - t.Fatalf("%s: expected %d rows, got %d", when, expRows, n) - } - totalSnaps += c - } - return totalSnaps - } - - // There are usually no raft snaps before, but there is a race condition where - // they can occasionally happen during upreplication. - numSnapsBefore := numRaftSnaps("before") - - doSplit := func(ctx context.Context, _ int) error { - _, _, err := tc.SplitRange([]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)]))) - return err - } - - if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil { - t.Fatal(err) - } - - // Check that no snaps happened during the splits. - require.Equal(t, numSnapsBefore, numRaftSnaps("after")) -} - // TestStoreRangeSplitIdempotency executes a split of a range and // verifies that the resulting ranges respond to the right key ranges // and that their stats have been properly accounted for and requests diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 4f8ee3e79066..7f0732e42801 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1536,6 +1536,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { if err := r.mu.internalRaftGroup.Campaign(); err != nil { log.VEventf(ctx, 1, "failed to campaign: %s", err) } + r.store.enqueueRaftUpdateCheck(r.RangeID) } } diff --git a/pkg/kv/kvserver/split_delay_helper.go b/pkg/kv/kvserver/split_delay_helper.go index ba4c9cbb9c25..c69752d49109 100644 --- a/pkg/kv/kvserver/split_delay_helper.go +++ b/pkg/kv/kvserver/split_delay_helper.go @@ -13,6 +13,8 @@ package kvserver import ( "context" "fmt" + "math" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,8 +26,9 @@ import ( type splitDelayHelperI interface { RaftStatus(context.Context) (roachpb.RangeID, *raft.Status) ProposeEmptyCommand(ctx context.Context) - NumAttempts() int - Sleep(context.Context) time.Duration + MaxTicks() int + TickDuration() time.Duration + Sleep(context.Context, time.Duration) } type splitDelayHelper Replica @@ -47,6 +50,13 @@ func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, * return r.RangeID, raftStatus } +func (sdh *splitDelayHelper) Sleep(ctx context.Context, dur time.Duration) { + select { + case <-ctx.Done(): + case <-time.After(dur): + } +} + func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { r := (*Replica)(sdh) r.raftMu.Lock() @@ -61,7 +71,7 @@ func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { r.raftMu.Unlock() } -func (sdh *splitDelayHelper) NumAttempts() int { +func (sdh *splitDelayHelper) MaxTicks() int { // There is a related mechanism regarding snapshots and splits that is worth // pointing out here: Incoming MsgApp (see the _ assignment below) are // dropped if they are addressed to uninitialized replicas likely to become @@ -69,7 +79,7 @@ func (sdh *splitDelayHelper) NumAttempts() int { // per heartbeat interval, but sometimes there's an additional delay thanks // to having to wait for a GC run. In effect, it shouldn't take more than a // small number of heartbeats until the follower leaves probing status, so - // NumAttempts should at least match that. + // MaxTicks should at least match that. _ = maybeDropMsgApp // guru assignment // Snapshots can come up for other reasons and at the end of the day, the // delay introduced here needs to make sure that the snapshot queue @@ -78,81 +88,145 @@ func (sdh *splitDelayHelper) NumAttempts() int { return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks } -func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration { - tBegin := timeutil.Now() - +func (sdh *splitDelayHelper) TickDuration() time.Duration { r := (*Replica)(sdh) - select { - case <-time.After(r.store.cfg.RaftTickInterval): - case <-ctx.Done(): - } - - return timeutil.Since(tBegin) + return r.store.cfg.RaftTickInterval } func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) string { - maxDelaySplitToAvoidSnapshotTicks := sdh.NumAttempts() + maxDelaySplitToAvoidSnapshotTicks := sdh.MaxTicks() + tickDur := sdh.TickDuration() + budget := tickDur * time.Duration(maxDelaySplitToAvoidSnapshotTicks) var slept time.Duration - var extra string - var succeeded bool - for ticks := 0; ticks < maxDelaySplitToAvoidSnapshotTicks; ticks++ { - succeeded = false - extra = "" + var problems []string + var lastProblems []string + var i int + for slept < budget { + i++ + problems = problems[:0] rangeID, raftStatus := sdh.RaftStatus(ctx) - if raftStatus == nil { - // Don't delay on followers (we don't know when to stop). This case - // is hit rarely enough to not matter. - extra += "; not Raft leader" - succeeded = true + if raftStatus == nil || raftStatus.RaftState == raft.StateFollower { + // Don't delay on followers (we don't have information about the + // peers in that state and thus can't determine when it is safe + // to continue). This case is hit rarely enough to not matter, + // as the raft leadership follows the lease and splits get routed + // to leaseholders only. + problems = append(problems, "replica is raft follower") break } - done := true + // If we're not a follower nor a leader, there are elections going on. + // Wait until these have concluded (at which point we'll either be + // follower, terminated above, or leader, and will have a populated + // progress). This is an important step for preventing cascading + // issues when a range is split rapidly in ascending order (i.e. + // each split splits the right hand side resulting from the prior + // split). Without this code, the split delay helper may end up + // returning early for each split in the following sequence: + // + // - r1=[a,z) has members [s1, s2] + // - r1 splits into r1=[a,b) and r2=[b,z) + // - s1 applies the split, r2@s1 reaches out to r2@s2 (MsgApp) + // - r2@s2 (not initialized yet) drops the MsgApp thanks to `maybeDropMsgApp` + // - r2@s1 marks r2@s1 as probing, will only contact again in 1s + // - r2 splits again into r2=[b,c), r3=[c,z) + // - r2@s1 applies the split, r3@s1 reaches out to r3@s2 (which is not initialized) + // - r3@s2 drops MsgApp due to `maybeDropMsgApp` + // - r3@s1 marks r3@s2 as probing, will only contact again in 1s + // - ... + // - r24@s1 splits into r25=[x,y) and r26=[y,z) + // - r24@s1 reaches out to r24@s2 (not inited and drops MsgApp) + // + // Note that every step here except the fourth one is almost guaranteed. + // Once an MsgApp has been dropped, the next split is also going to have + // the same behavior, since the dropped MsgApp prevents the next split + // from applying on that follower in a timely manner. The issue thus + // becomes self-sustaining. + // + // At some point during this cascade, s2 will likely apply the first split + // trigger on its replica of r1=[a,z), which will initialize r2@s2. However, + // since r2@s1 has already marked r2@s2 as probing, it won't contact it, on + // average, for another 500ms. When it does, it will append the next split to + // the log which can then be applied, but then there is another ~500ms wait + // until r3@s2 will be caught up by r3@s1 to learn about the next split. This + // means that on average, split N is delayed by ~N*500ms. `maybeDropMsgApp` on + // deeply nested ranges on s2 will eventually give up and this will lead to, + // roughly, snapshots being requested across most of the ranges, but none + // of these snapshots can apply because the keyspace is always occupied by + // one of the descendants of the initial range (however far the splits have + // made it on s2). On top of log spam and wasted work, this prevents the + // Raft snapshot queue from doing useful work that may also be necessary. + // + // The main contribution of the helper to avoiding this cascade is to wait + // for the replicas of the right-hand side to be initialized. This breaks + // the above history because a split will only commence once all prior + // splits in the chain have applied on all members. + // + // See TestSplitBurstWithSlowFollower for end-to-end verification of this + // mechanism. + if raftStatus.RaftState != raft.StateLeader { + problems = append(problems, fmt.Sprintf("not leader (%s)", raftStatus.RaftState)) + } + for replicaID, pr := range raftStatus.Progress { if pr.State != tracker.StateReplicate { if !pr.RecentActive { - if ticks == 0 { - // Having set done = false, we make sure we're not exiting early. - // This is important because we sometimes need that Raft proposal - // below to make the followers active as there's no chatter on an - // idle range. (Note that there's a theoretical race in which the - // follower becomes inactive again during the sleep, but the - // inactivity interval is much larger than a tick). - // - // Don't do this more than once though: if a follower is down, - // we don't want to delay splits for it. - done = false + if slept < tickDur { + // We don't want to delay splits for a follower who hasn't responded within a tick. + problems = append(problems, fmt.Sprintf("r%d/%d inactive", rangeID, replicaID)) + if i == 1 { + // Propose an empty command which works around a Raft bug that can + // leave a follower in ProgressStateProbe even though it has caught + // up. + // + // We have long picked up a fix[1] for the bug, but there might be similar + // issues we're not aware of and this doesn't hurt, so leave it in for now. + // + // [1]: https://github.com/etcd-io/etcd/commit/bfaae1ba462c91aaf149a285b8d2369807044f71 + sdh.ProposeEmptyCommand(ctx) + } } - extra += fmt.Sprintf("; r%d/%d inactive", rangeID, replicaID) continue } - done = false - extra += fmt.Sprintf("; replica r%d/%d not caught up: %+v", rangeID, replicaID, &pr) + problems = append(problems, fmt.Sprintf("replica r%d/%d not caught up: %+v", rangeID, replicaID, &pr)) } } - if done { - succeeded = true + if len(problems) == 0 { break } - // Propose an empty command which works around a Raft bug that can - // leave a follower in ProgressStateProbe even though it has caught - // up. - sdh.ProposeEmptyCommand(ctx) - slept += sdh.Sleep(ctx) - - if ctx.Err() != nil { - return "" + + lastProblems = problems + + // The second factor starts out small and reaches ~0.7 approximately at i=maxDelaySplitToAvoidSnapshotTicks. + // In effect we loop approximately 2*maxDelaySplitToAvoidSnapshotTicks to exhaust the entire budget we have. + // By having shorter sleeps at the beginning, we optimize for the common case in which things get fixed up + // quickly early on. In particular, splitting in a tight loop will usually always wait on the election of the + // previous split's right-hand side, which finishes within a few network latencies (which is typically much + // less than a full tick). + sleepDur := time.Duration(float64(tickDur) * (1.0 - math.Exp(-float64(i-1)/float64(maxDelaySplitToAvoidSnapshotTicks+1)))) + sdh.Sleep(ctx, sleepDur) + slept += sleepDur + + if err := ctx.Err(); err != nil { + problems = append(problems, err.Error()) + break } } - if slept != 0 { - extra += fmt.Sprintf("; delayed split for %.1fs to avoid Raft snapshot", slept.Seconds()) - if !succeeded { - extra += " (without success)" + var msg string + // If we exited the loop with problems, use them as lastProblems + // and indicate that we did not manage to "delay the problems away". + if len(problems) != 0 { + lastProblems = problems + } + if len(lastProblems) != 0 { + msg = fmt.Sprintf("; delayed by %.1fs to resolve: %s", slept.Seconds(), strings.Join(lastProblems, "; ")) + if len(problems) != 0 { + msg += " (without success)" } } - return extra + return msg } diff --git a/pkg/kv/kvserver/split_delay_helper_test.go b/pkg/kv/kvserver/split_delay_helper_test.go index 8499b2404a49..728dc5b63a6a 100644 --- a/pkg/kv/kvserver/split_delay_helper_test.go +++ b/pkg/kv/kvserver/split_delay_helper_test.go @@ -30,7 +30,8 @@ type testSplitDelayHelper struct { raftStatus *raft.Status sleep func() - slept, emptyProposed int + slept time.Duration + emptyProposed int } func (h *testSplitDelayHelper) RaftStatus(context.Context) (roachpb.RangeID, *raft.Status) { @@ -39,15 +40,19 @@ func (h *testSplitDelayHelper) RaftStatus(context.Context) (roachpb.RangeID, *ra func (h *testSplitDelayHelper) ProposeEmptyCommand(ctx context.Context) { h.emptyProposed++ } -func (h *testSplitDelayHelper) NumAttempts() int { +func (h *testSplitDelayHelper) MaxTicks() int { return h.numAttempts } -func (h *testSplitDelayHelper) Sleep(context.Context) time.Duration { + +func (h *testSplitDelayHelper) TickDuration() time.Duration { + return time.Second +} + +func (h *testSplitDelayHelper) Sleep(_ context.Context, dur time.Duration) { + h.slept += dur if h.sleep != nil { h.sleep() } - h.slept++ - return time.Second } var _ splitDelayHelperI = (*testSplitDelayHelper)(nil) @@ -67,101 +72,133 @@ func TestSplitDelayToAvoidSnapshot(t *testing.T) { } s := maybeDelaySplitToAvoidSnapshot(ctx, h) assert.Equal(t, "", s) - assert.Equal(t, 0, h.slept) + assert.EqualValues(t, 0, h.slept) }) - t.Run("follower", func(t *testing.T) { - // Should immediately bail out if run on non-leader. + statusWithState := func(status raft.StateType) *raft.Status { + return &raft.Status{ + BasicStatus: raft.BasicStatus{ + SoftState: raft.SoftState{ + RaftState: status, + }, + }, + } + } + + t.Run("nil", func(t *testing.T) { + // Should immediately bail out if raftGroup is nil. h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, raftStatus: nil, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) - assert.Equal(t, "; not Raft leader", s) - assert.Equal(t, 0, h.slept) + assert.Equal(t, "; delayed by 0.0s to resolve: replica is raft follower (without success)", s) + assert.EqualValues(t, 0, h.slept) + }) + + t.Run("follower", func(t *testing.T) { + // Should immediately bail out if run on follower. + h := &testSplitDelayHelper{ + numAttempts: 5, + rangeID: 1, + raftStatus: statusWithState(raft.StateFollower), + } + s := maybeDelaySplitToAvoidSnapshot(ctx, h) + assert.Equal(t, "; delayed by 0.0s to resolve: replica is raft follower (without success)", s) + assert.EqualValues(t, 0, h.slept) }) + for _, state := range []raft.StateType{raft.StatePreCandidate, raft.StateCandidate} { + t.Run(state.String(), func(t *testing.T) { + h := &testSplitDelayHelper{ + numAttempts: 5, + rangeID: 1, + raftStatus: statusWithState(state), + } + s := maybeDelaySplitToAvoidSnapshot(ctx, h) + assert.Equal(t, "; delayed by 5.5s to resolve: not leader ("+state.String()+") (without success)", s) + }) + } + t.Run("inactive", func(t *testing.T) { + st := statusWithState(raft.StateLeader) + st.Progress = map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe}, + } h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, - raftStatus: &raft.Status{ - Progress: map[uint64]tracker.Progress{ - 2: {State: tracker.StateProbe}, - }, - }, + raftStatus: st, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) // We try to wake up the follower once, but then give up on it. - assert.Equal(t, "; r1/2 inactive; delayed split for 1.0s to avoid Raft snapshot", s) - assert.Equal(t, 1, h.slept) + assert.Equal(t, "; delayed by 1.3s to resolve: r1/2 inactive", s) + assert.Less(t, int64(h.slept), int64(2*h.TickDuration())) assert.Equal(t, 1, h.emptyProposed) }) for _, state := range []tracker.StateType{tracker.StateProbe, tracker.StateSnapshot} { t.Run(state.String(), func(t *testing.T) { + st := statusWithState(raft.StateLeader) + st.Progress = map[uint64]tracker.Progress{ + 2: { + State: state, + RecentActive: true, + ProbeSent: true, // Unifies string output below. + Inflights: &tracker.Inflights{}, + }, + // Healthy follower just for kicks. + 3: {State: tracker.StateReplicate}, + } h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, - raftStatus: &raft.Status{ - Progress: map[uint64]tracker.Progress{ - 2: { - State: state, - RecentActive: true, - ProbeSent: true, // Unifies string output below. - Inflights: &tracker.Inflights{}, - }, - // Healthy follower just for kicks. - 3: {State: tracker.StateReplicate}, - }, - }, + raftStatus: st, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) - assert.Equal(t, "; replica r1/2 not caught up: "+state.String()+ - " match=0 next=0 paused; delayed split for 5.0s to avoid Raft snapshot (without success)", s) - assert.Equal(t, 5, h.slept) - assert.Equal(t, 5, h.emptyProposed) + assert.Equal(t, "; delayed by 5.5s to resolve: replica r1/2 not caught up: "+ + state.String()+" match=0 next=0 paused (without success)", s) + assert.Equal(t, 0, h.emptyProposed) }) } t.Run("immediately-replicating", func(t *testing.T) { + st := statusWithState(raft.StateLeader) + st.Progress = map[uint64]tracker.Progress{ + 2: {State: tracker.StateReplicate}, // intentionally not recently active + } h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, - raftStatus: &raft.Status{ - Progress: map[uint64]tracker.Progress{ - 2: {State: tracker.StateReplicate}, // intentionally not recently active - }, - }, + raftStatus: st, } s := maybeDelaySplitToAvoidSnapshot(ctx, h) assert.Equal(t, "", s) - assert.Equal(t, 0, h.slept) + assert.EqualValues(t, 0, h.slept) assert.Equal(t, 0, h.emptyProposed) }) t.Run("becomes-replicating", func(t *testing.T) { + st := statusWithState(raft.StateLeader) + st.Progress = map[uint64]tracker.Progress{ + 2: {State: tracker.StateProbe, RecentActive: true, Inflights: &tracker.Inflights{}}, + } h := &testSplitDelayHelper{ numAttempts: 5, rangeID: 1, - raftStatus: &raft.Status{ - Progress: map[uint64]tracker.Progress{ - 2: {State: tracker.StateProbe, RecentActive: true, Inflights: &tracker.Inflights{}}, - }, - }, + raftStatus: st, } - // The fourth attempt will see the follower catch up. + // Once >= 2s have passed, the follower becomes replicating. h.sleep = func() { - if h.slept == 2 { + if h.slept >= 2*time.Second { pr := h.raftStatus.Progress[2] pr.State = tracker.StateReplicate h.raftStatus.Progress[2] = pr } } s := maybeDelaySplitToAvoidSnapshot(ctx, h) - assert.Equal(t, "; delayed split for 3.0s to avoid Raft snapshot", s) - assert.Equal(t, 3, h.slept) - assert.Equal(t, 3, h.emptyProposed) + assert.Equal(t, "; delayed by 2.5s to resolve: replica r1/2 not caught up: StateProbe match=0 next=0", s) + assert.EqualValues(t, 0, h.emptyProposed) }) }