Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: fix delaying of splits with uninitialized followers #64060

Merged
merged 2 commits into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ go_test(
"client_replica_backpressure_test.go",
"client_replica_gc_test.go",
"client_replica_test.go",
"client_split_burst_test.go",
"client_split_test.go",
"client_status_test.go",
"client_tenant_test.go",
Expand Down
202 changes: 202 additions & 0 deletions pkg/kv/kvserver/client_split_burst_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"math"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type splitBurstTest struct {
*testcluster.TestCluster
baseKey roachpb.Key
magicStickyBit hlc.Timestamp
numSplitsSeenOnSlowFollower *int32 // atomic
initialRaftSnaps int
}

func (sbt *splitBurstTest) SplitWithDelay(t *testing.T, location byte) {
t.Helper()
require.NoError(t, sbt.SplitWithDelayE(location))
}

func (sbt *splitBurstTest) SplitWithDelayE(location byte) error {
k := append([]byte(nil), sbt.baseKey...)
splitKey := append(k, location)
_, _, err := sbt.SplitRangeWithExpiration(splitKey, sbt.magicStickyBit)
return err
}

func (sbt *splitBurstTest) NumRaftSnaps(t *testing.T) int {
var totalSnaps int
for i := range sbt.Servers {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := sbt.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.applied-voter'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
require.EqualValues(t, 1, n)
totalSnaps += c
}
return totalSnaps - sbt.initialRaftSnaps
}

func setupSplitBurstTest(t *testing.T, delay time.Duration) *splitBurstTest {
var magicStickyBit = hlc.Timestamp{WallTime: math.MaxInt64 - 123, Logical: 987654321}

numSplitsSeenOnSlowFollower := new(int32) // atomic
var quiesceCh <-chan struct{}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if args.Split == nil || delay == 0 {
return 0, nil
}
if args.Split.RightDesc.GetStickyBit() != magicStickyBit {
return 0, nil
}
select {
case <-time.After(delay):
case <-quiesceCh:
}
atomic.AddInt32(numSplitsSeenOnSlowFollower, 1)
return 0, nil
},
}}

ctx := context.Background()

// n1 and n3 are fast, n2 is slow (to apply the splits). We need
// three nodes here; delaying the apply loop on n2 also delays
// how quickly commands can reach quorum and would backpressure
// the splits by accident.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {Knobs: knobs},
},
ReplicationMode: base.ReplicationManual,
})
defer t.Cleanup(func() {
tc.Stopper().Stop(ctx)
})
quiesceCh = tc.Stopper().ShouldQuiesce()

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Target(1), tc.Target(2))

sbc := &splitBurstTest{
TestCluster: tc,
baseKey: k,
magicStickyBit: magicStickyBit,
numSplitsSeenOnSlowFollower: numSplitsSeenOnSlowFollower,
}
sbc.initialRaftSnaps = sbc.NumRaftSnaps(t)
return sbc
}

func TestSplitBurstWithSlowFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
t.Run("forward", func(t *testing.T) {
// When splitting at an increasing sequence of keys, in each step we split
// the most recently split range, and we expect the splits to wait for that
// range to have caught up its follower across the preceding split, which
// was delayed as well. So when we're done splitting we should have seen at
// least (numSplits-1) splits get applied on the slow follower.
// This end-to-end exercises `splitDelayHelper`.
//
// This test is fairly slow because every split will incur a 1s penalty
// (dictated by the raft leader's probing interval). We could fix this
// delay here and in production if we had a way to send a signal from
// the slow follower to the leader when the split trigger initializes
// the right-hand side. This is actually an interesting point, because
// the split trigger *replaces* a snapshot - but doesn't fully act like
// one: when a raft group applies a snapshot, it generates an MsgAppResp
// to the leader which will let the leader probe proactively. We could
// signal the split trigger to the raft group as a snapshot being applied
// (as opposed to recreating the in-memory instance as we do now), and
// then this MsgAppResp should be created automatically.

sbt := setupSplitBurstTest(t, 50*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(5)

for i := byte(0); i < numSplits; i++ {
sbt.SplitWithDelay(t, i)
}
// We should have applied all but perhaps the last split on the slow node.
// If we didn't, that indicates a failure to delay the splits accordingly.
require.GreaterOrEqual(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower), int32(numSplits-1))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("backward", func(t *testing.T) {
// When splitting at a decreasing sequence of keys, we're repeatedly splitting
// the first range. All of its followers are initialized to begin with, and
// even though there is a slow follower, `splitDelayHelper` isn't interested in
// delaying this here (which would imply that it's trying to check that every-
// one is "caught up").
// We set a 100s timeout so that below we can assert that `splitDelayHelper`
// isn't somehow forcing us to wait here.
infiniteDelay := 100 * time.Second
sbt := setupSplitBurstTest(t, infiniteDelay)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(50)

for i := byte(0); i < numSplits; i++ {
tBegin := timeutil.Now()
sbt.SplitWithDelay(t, numSplits-i)
if dur := timeutil.Since(tBegin); dur > infiniteDelay {
t.Fatalf("waited %s for split #%d", dur, i+1)
}
}
require.Zero(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("random", func(t *testing.T) {
// When splitting randomly, we'll see a mixture of forward and backward
// splits, so we can't assert on how many split triggers we observe.
// However, there still shouldn't be any snapshots.
sbt := setupSplitBurstTest(t, 10*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = 20
perm := rand.Perm(numSplits)

doSplit := func(ctx context.Context, i int) error {
return sbt.SplitWithDelayE(byte(perm[i]))
}
require.NoError(t, ctxgroup.GroupWorkers(ctx, numSplits, doSplit))

require.Zero(t, sbt.NumRaftSnaps(t))
})
}
64 changes: 0 additions & 64 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -518,69 +517,6 @@ func TestStoreRangeSplitAtRangeBounds(t *testing.T) {
}
}

// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica
// resulting from a split hasn't been initialized via the split trigger yet, a
// grace period prevents the replica from requesting an errant Raft snapshot.
// This is verified by running a number of splits and asserting that no Raft
// snapshots are observed. As a nice side effect, this also verifies that log
// truncations don't cause any Raft snapshots in this test.
func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
// NB: the merge queue is enabled for additional "chaos". Note that the test
// uses three nodes and so there is no replica movement, which would other-
// wise tickle Raft snapshots for unrelated reasons.
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

numSplits := 100
if util.RaceEnabled {
// Running 100 splits is overkill in race builds.
numSplits = 10
}
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically

numRaftSnaps := func(when string) int {
var totalSnaps int
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.applied-voter'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
if expRows := 1; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
totalSnaps += c
}
return totalSnaps
}

// There are usually no raft snaps before, but there is a race condition where
// they can occasionally happen during upreplication.
numSnapsBefore := numRaftSnaps("before")

doSplit := func(ctx context.Context, _ int) error {
_, _, err := tc.SplitRange([]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)])))
return err
}

if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil {
t.Fatal(err)
}

// Check that no snaps happened during the splits.
require.Equal(t, numSnapsBefore, numRaftSnaps("after"))
}

// TestStoreRangeSplitIdempotency executes a split of a range and
// verifies that the resulting ranges respond to the right key ranges
// and that their stats have been properly accounted for and requests
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) {
if err := r.mu.internalRaftGroup.Campaign(); err != nil {
log.VEventf(ctx, 1, "failed to campaign: %s", err)
}
r.store.enqueueRaftUpdateCheck(r.RangeID)
}
}

Expand Down
Loading