diff --git a/pkg/base/config.go b/pkg/base/config.go index 0c79f28b028f..0a68b7723daf 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -111,6 +111,9 @@ var ( // will send to a follower without hearing a response. defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt( "COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) + + defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20) ) type lazyHTTPClient struct { @@ -476,6 +479,8 @@ type RaftConfig struct { // translates to ~1024 commands that might be executed in the handling of a // single raft.Ready operation. RaftMaxInflightMsgs int + + RaftPostSplitSuppressSnapshotTicks int } // SetDefaults initializes unset fields. @@ -510,6 +515,10 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RaftMaxInflightMsgs == 0 { cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs } + + if cfg.RaftPostSplitSuppressSnapshotTicks == 0 { + cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks + } } // RaftElectionTimeout returns the raft election timeout, as computed from the diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 24cb8a8ba9be..cd7192551ccc 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -27,10 +27,6 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" - "go.etcd.io/etcd/raft/raftpb" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -51,6 +47,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -60,6 +58,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" ) // adminSplitArgs creates an AdminSplitRequest for the provided split key. @@ -504,6 +505,66 @@ func TestStoreRangeSplitConcurrent(t *testing.T) { } } +// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica +// resulting from a split hasn't been initialized via the split trigger yet, a +// grace period prevents the replica from requesting an errant Raft snapshot. +// This is verified by running a number of splits and asserting that no Raft +// snapshots are observed. As a nice side effect, this also verifies that log +// truncations don't cause any Raft snapshots in this test. +func TestSplitTriggerRaftSnapshotRace(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const numNodes = 3 + var args base.TestClusterArgs + args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true} + tc := testcluster.StartTestCluster(t, numNodes, args) + defer tc.Stopper().Stop(ctx) + + numSplits := 100 + if util.RaceEnabled { + // Running 100 splits is overkill in race builds. + numSplits = 10 + } + perm := rand.Perm(numSplits) + idx := int32(-1) // accessed atomically + + checkNoSnaps := func(when string) { + for i := 0; i < numNodes; i++ { + var n int // num rows (sanity check against test rotting) + var c int // num Raft snapshots + if err := tc.ServerConn(i).QueryRow(` +SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE + name LIKE 'queue.raftsnapshot.process.%' +OR + name LIKE 'queue.raftsnapshot.pending' +`).Scan(&n, &c); err != nil { + t.Fatal(err) + } + if expRows := 3; n != expRows { + t.Fatalf("%s: expected %d rows, got %d", when, expRows, n) + } + if c > 0 { + t.Fatalf("observed %d Raft snapshots %s splits", c, when) + } + } + } + + checkNoSnaps("before") + + doSplit := func(ctx context.Context) error { + _, _, err := tc.SplitRange( + []byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)]))) + return err + } + + if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil { + t.Fatal(err) + } + + checkNoSnaps("after") +} + // TestStoreRangeSplitIdempotency executes a split of a range and // verifies that the resulting ranges respond to the right key ranges // and that their stats have been properly accounted for and requests diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 23526b0ab88e..4404ef7067ea 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -5131,6 +5131,10 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag lastAppResp = message drop = true } + + if r.maybeDropMsgAppResp(ctx, message) { + drop = true + } } if !drop { r.sendRaftMessage(ctx, message) @@ -5141,6 +5145,63 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag } } +// maybeDropMsgAppResp returns true if the outgoing Raft message should be +// dropped. It does so if sending the message would likely result in an errant +// Raft snapshot after a split. +func (r *Replica) maybeDropMsgAppResp(ctx context.Context, msg raftpb.Message) bool { + if !msg.Reject { + return false + } + + r.mu.RLock() + ticks := r.mu.ticks + initialized := r.isInitializedRLocked() + r.mu.RUnlock() + + if initialized { + return false + } + + if ticks > r.store.cfg.RaftPostSplitSuppressSnapshotTicks { + log.Infof(ctx, "allowing MsgAppResp for uninitialized replica") + return false + } + + if msg.RejectHint != 0 { + log.Fatalf(ctx, "received reject hint %d from supposedly uninitialized replica", msg.RejectHint) + } + + // This replica has a blank state, i.e. its last index is zero (because we + // start our Raft log at index 10). In particular, it's not a preemptive + // snapshot. This happens in two cases: + // + // 1. a rebalance operation is adding a new replica of the range to this + // node. We always send a preemptive snapshot before attempting to do so, so + // we wouldn't enter this branch as the replica would be initialized. We + // would however enter this branch if the preemptive snapshot got GC'ed + // before the actual replica change came through. + // + // 2. a split executed that created this replica as its right hand side, but + // this node's pre-split replica hasn't executed the split trigger (yet). + // The expectation is that it will do so momentarily, however if we don't + // drop this rejection, the Raft leader will try to catch us up via a + // snapshot. In 99.9% of cases this is a wasted effort since the pre-split + // replica already contains the data this replica will hold. The remaining + // 0.01% constitute the case in which our local replica of the pre-split + // range requires a snapshot which catches it up "past" the split trigger, + // in which case the trigger will never be executed (the snapshot instead + // wipes out the data the split trigger would've tried to put into this + // range). A similar scenario occurs if there's a rebalance operation that + // rapidly removes the pre-split replica, so that it never catches up (nor + // via log nor via snapshot); in that case too, the Raft snapshot is + // required to materialize the split's right hand side replica (i.e. this + // one). We're delaying the snapshot for a short amount of time only, so + // this seems tolerable. + log.VEventf(ctx, 2, "dropping rejection from index %d to index %d", msg.Index, msg.RejectHint) + + return true +} + // sendRaftMessage sends a Raft message. func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { r.mu.Lock()