From b64bdc99f2236c1d26f3043d376689cd6af69be2 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 9 Nov 2018 15:27:21 +0100 Subject: [PATCH 1/2] storage: never truncate log for inflight snapshots When a (preemptive or Raft) snapshot is inflight, a range should not truncate its log index to one ahead of the pending snapshot as doing so would require yet another snapshot. The code prior to this commit attempted to achieve that by treating the pending snapshot as an additional follower in the computation of the quorum index. This works as intended as long as the Raft log is below its configured maximum size but usually leads to abandoning the snapshot index when truncating based on size (to the quorum commit index). This situation occurs frequently during the split+scatter phase of data imports, where (mostly empty) ranges are rapidly upreplicated and split. This isn't limited to small replicas, however. Assume a range is ~32mb in size and a (say, preemptive) snapshot is underway. Preemptive snapshots are (at the time of writing) throttled to 2mb/s, so it will take approximately 16s to go through. At the same time, the Raft log may easily grow in size by the size truncation threshold (4mb at time of writing), allowing a log truncation that abandons the snapshot. A similar phenomenon applies to Raft snapshots, though now the quota pool will place restrictions on forward Raft progress, however not if a single command exceeds the size restriction (as is common during RESTORE operations where individual Raft commands are large). I haven't conclusively observed this in practice (though there has been enough badness to suspect that it happened), but in principle this could lead to a potentially infinite number of snapshots being sent out, a very expensive way of keeping a follower up to date. After this change, the pending snapshot index is managed more carefully (and is managed for Raft snapshots as well, not only for preemptive ones) and is never truncated away. As a result, in regular operation snapshots should now be able to be followed by regular Raft log catch-up in all cases. More precisely, we keep a map of pending snapshot index to deadline. The deadline is zero while the snapshot is ongoing and is set when the snapshot is completed. This avoids races between the snapshot completing and the replication change completing (which would occur even if the snapshot is only registering as completed after the replication change completes). There's an opportunity for a bigger refactor here by using learner replicas instead of preemptive snapshots. The idea is to add the replica as a learner first (ignoring it for the quota pool until it has received the first snapshot) first, and to use the regular Raft snapshot mechanism to catch it up. Once achieved, another configuration change would convert the learner into a regular follower. This change in itself will likely make our code more principled, but it is a more invasive change that is left for the future. Similarly, there is knowledge in the quota pool that it seems we should be using for log truncation decisions. The quota pool essentially knows about the size of each log entry whereas the Raft truncations only know the accumulated approximate size of the full log. For instance, instead of blindly truncating to the quorum commit index when the Raft log is too large, we could truncate it to an index that reduces the log size to about 50% of the maximum, in effect reducing the number of snapshots that are necessary due to quorum truncation. It's unclear whether this improvement will matter in practice. The following script reproduces redundant Raft snapshots (needs a somewhat beefy machine such as a gceworker). Note that this script will also incur Raft snapshots due to the splits, which are fixed in a follow-up commit. ``` set -euxo pipefail killall -9 cockroach || true killall -9 workload || true sleep 1 rm -rf cockroach-data || true mkdir -p cockroach-data ./cockroach start --insecure --host=localhost --port=26257 --http-port=26258 --store=cockroach-data/1 --cache=256MiB --background ./cockroach start --insecure --host=localhost --port=26259 --http-port=26260 --store=cockroach-data/2 --cache=256MiB --join=localhost:26257 --background ./cockroach start --insecure --host=localhost --port=26261 --http-port=26262 --store=cockroach-data/3 --cache=256MiB --join=localhost:26257 --background ./cockroach start --insecure --host=localhost --port=26263 --http-port=26264 --store=cockroach-data/4 --cache=256MiB --join=localhost:26257 --background sleep 5 ./cockroach sql --insecure -e 'set cluster setting kv.range_merge.queue_enabled = false;' ./cockroach sql --insecure < 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex { - decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex - decision.ChosenVia = truncatableIndexChosenViaPendingSnap - } + } + + // The pending snapshot index acts as a placeholder for a replica that is + // about to be added to the range (or is in Raft recovery). We don't want to + // truncate the log in a way that will require that new replica to be caught + // up via yet another Raft snapshot. + if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex { + decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex + decision.ChosenVia = truncatableIndexChosenViaPendingSnap } // Advance to the first index, but never truncate past the quorum commit @@ -300,7 +307,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { } // getQuorumIndex returns the index which a quorum of the nodes have -// committed. The pendingSnapshotIndex indicates the index of a pending +// committed. The snapshotLogTruncationConstraints indicates the index of a pending // snapshot which is considered part of the Raft group even though it hasn't // been added yet. Note that getQuorumIndex may return 0 if the progress map // doesn't contain information for a sufficient number of followers (e.g. the @@ -310,14 +317,11 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { // quorum was determined at the time the index was written. If you're thinking // of using getQuorumIndex for some purpose, consider that raftStatus.Commit // might be more appropriate (e.g. determining if a replica is up to date). -func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 { - match := make([]uint64, 0, len(raftStatus.Progress)+1) +func getQuorumIndex(raftStatus *raft.Status) uint64 { + match := make([]uint64, 0, len(raftStatus.Progress)) for _, progress := range raftStatus.Progress { match = append(match, progress.Match) } - if pendingSnapshotIndex != 0 { - match = append(match, pendingSnapshotIndex) - } sort.Sort(uint64Slice(match)) quorum := computeQuorum(len(match)) return match[len(match)-quorum] diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 5bfcb54539b8..b909617e8e05 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -20,9 +20,7 @@ import ( "math" "strings" "testing" - - "github.com/pkg/errors" - "go.etcd.io/etcd/raft" + "time" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" @@ -32,6 +30,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/raft" ) func TestShouldTruncate(t *testing.T) { @@ -66,23 +69,22 @@ func TestGetQuorumIndex(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { - progress []uint64 - pendingSnapshotIndex uint64 - expected uint64 + progress []uint64 + expected uint64 }{ // Basic cases. - {[]uint64{1}, 0, 1}, - {[]uint64{2}, 1, 1}, - {[]uint64{1, 2}, 0, 1}, - {[]uint64{2, 3}, 1, 2}, - {[]uint64{1, 2, 3}, 0, 2}, - {[]uint64{2, 3, 4}, 1, 2}, - {[]uint64{1, 2, 3, 4}, 0, 2}, - {[]uint64{2, 3, 4, 5}, 1, 3}, - {[]uint64{1, 2, 3, 4, 5}, 0, 3}, - {[]uint64{2, 3, 4, 5, 6}, 1, 3}, + {[]uint64{1}, 1}, + {[]uint64{2}, 2}, + {[]uint64{1, 2}, 1}, + {[]uint64{2, 3}, 2}, + {[]uint64{1, 2, 3}, 2}, + {[]uint64{2, 3, 4}, 3}, + {[]uint64{1, 2, 3, 4}, 2}, + {[]uint64{2, 3, 4, 5}, 3}, + {[]uint64{1, 2, 3, 4, 5}, 3}, + {[]uint64{2, 3, 4, 5, 6}, 4}, // Sorting. - {[]uint64{5, 4, 3, 2, 1}, 0, 3}, + {[]uint64{5, 4, 3, 2, 1}, 3}, } for i, c := range testCases { status := &raft.Status{ @@ -91,7 +93,7 @@ func TestGetQuorumIndex(t *testing.T) { for j, v := range c.progress { status.Progress[uint64(j)] = raft.Progress{Match: v} } - quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex) + quorumMatchedIndex := getQuorumIndex(status) if c.expected != quorumMatchedIndex { t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex) } @@ -161,9 +163,10 @@ func TestComputeTruncateDecision(t *testing.T) { []uint64{1, 3, 3, 4}, 2000, 1, 3, 0, "truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot", }, + // Don't truncate away pending snapshot, even when log too large. { []uint64{100, 100}, 2000, 1, 100, 50, - "truncate 99 entries to first index 100 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot", + "truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)", }, { []uint64{1, 3, 3, 4}, 2000, 2, 3, 0, @@ -176,7 +179,7 @@ func TestComputeTruncateDecision(t *testing.T) { // The pending snapshot index affects the quorum commit index. { []uint64{4}, 2000, 1, 7, 1, - "truncate 0 entries to first index 1 (chosen via: quorum); log too large (2.0 KiB > 1000 B)", + "truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)", }, // Never truncate past the quorum commit index. { @@ -407,3 +410,64 @@ func TestProactiveRaftLogTruncate(t *testing.T) { }) } } + +func TestSnapshotLogTruncationConstraints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + r := &Replica{} + id1, id2 := uuid.MakeV4(), uuid.MakeV4() + const ( + index1 = 50 + index2 = 60 + ) + + // Add first constraint. + r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1) + exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}} + + // Make sure it registered. + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + + // Add another constraint with the same id. Extremely unlikely in practice + // but we want to make sure it doesn't blow anything up. Collisions are + // handled by ignoring the colliding update. + r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2) + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + + // Helper that grabs the min constraint index (which can trigger GC as a + // byproduct) and asserts. + assertMin := func(exp uint64, now time.Time) { + t.Helper() + if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now); maxIndex != exp { + t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp) + } + } + + // Queue should be told index1 is the highest pending one. Note that the + // colliding update at index2 is not represented. + assertMin(index1, time.Time{}) + + // Add another, higher, index. We're not going to notice it's around + // until the lower one disappears. + r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2) + + now := timeutil.Now() + // The colliding snapshot comes back. Or the original, we can't tell. + r.completeSnapshotLogTruncationConstraint(ctx, id1, now) + // The index should show up when its deadline isn't hit. + assertMin(index1, now) + assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + // Once we're over deadline, the index returned so far disappears. + assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod+1)) + assertMin(index2, time.Time{}) + assertMin(index2, now.Add(10*raftLogQueuePendingSnapshotGracePeriod)) + + r.completeSnapshotLogTruncationConstraint(ctx, id2, now) + assertMin(index2, now) + assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + assertMin(0, now.Add(2*raftLogQueuePendingSnapshotGracePeriod)) + + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, map[uuid.UUID]snapTruncationInfo(nil)) +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 01cc313872ed..23526b0ab88e 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -346,10 +346,19 @@ type Replica struct { // from the Raft log entry. Use the invalidLastTerm constant for this // case. lastIndex, lastTerm uint64 - // The raft log index of a pending preemptive snapshot. Used to prohibit - // raft log truncation while a preemptive snapshot is in flight. A value of - // 0 indicates that there is no pending snapshot. - pendingSnapshotIndex uint64 + // A map of raft log index of pending preemptive snapshots to deadlines. + // Used to prohibit raft log truncations that would leave a gap between + // the snapshot and the new first index. The map entry has a zero + // deadline while the snapshot is being sent and turns nonzero when the + // snapshot has completed, preventing truncation for a grace period + // (since there is a race between the snapshot completing and its being + // reflected in the raft status used to make truncation decisions). + // + // NB: If we kept only one value, we could end up in situations in which + // we're either giving some snapshots no grace period, or keep an + // already finished snapshot "pending" for extended periods of time + // (preventing log truncation). + snapshotLogTruncationConstraints map[uuid.UUID]snapTruncationInfo // raftLogSize is the approximate size in bytes of the persisted raft log. // On server restart, this value is assumed to be zero to avoid costly scans // of the raft log. This will be correct when all log entries predating this @@ -6952,29 +6961,67 @@ func (r *Replica) exceedsMultipleOfSplitSizeRLocked(mult float64) bool { return maxBytes > 0 && float64(size) > float64(maxBytes)*mult } -func (r *Replica) setPendingSnapshotIndex(index uint64) error { +type snapTruncationInfo struct { + index uint64 + deadline time.Time +} + +func (r *Replica) addSnapshotLogTruncationConstraintLocked( + ctx context.Context, snapUUID uuid.UUID, index uint64, +) { + if r.mu.snapshotLogTruncationConstraints == nil { + r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) + } + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if ok { + // Uh-oh, there's either a programming error (resulting in the same snapshot + // fed into this method twice) or a UUID collision. We discard the update + // (which is benign) but log it loudly. If the index is the same, it's + // likely the former, otherwise the latter. + log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) + return + } + + r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index} +} + +func (r *Replica) completeSnapshotLogTruncationConstraint( + ctx context.Context, snapUUID uuid.UUID, now time.Time, +) { + deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod) + r.mu.Lock() defer r.mu.Unlock() - // We allow the pendingSnapshotIndex to change from 0 to 1 and then from 1 to - // a value greater than 1. Any other change indicates 2 current preemptive - // snapshots on the same replica which is disallowed. - if (index == 1 && r.mu.pendingSnapshotIndex != 0) || - (index > 1 && r.mu.pendingSnapshotIndex != 1) { - // NB: this path can be hit if the replicate queue and scatter work - // concurrently. It's still good to return an error to avoid duplicating - // work, but we make it a benign one (so that it isn't logged). - return &benignError{errors.Errorf( - "%s: can't set pending snapshot index to %d; pending snapshot already present: %d", - r, index, r.mu.pendingSnapshotIndex)} - } - r.mu.pendingSnapshotIndex = index - return nil + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if !ok { + // UUID collision while adding the snapshot in originally. Nothing + // else to do. + return + } + + item.deadline = deadline + r.mu.snapshotLogTruncationConstraints[snapUUID] = item } -func (r *Replica) clearPendingSnapshotIndex() { - r.mu.Lock() - r.mu.pendingSnapshotIndex = 0 - r.mu.Unlock() +func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( + now time.Time, +) (minSnapIndex uint64) { + for snapUUID, item := range r.mu.snapshotLogTruncationConstraints { + if item.deadline != (time.Time{}) && item.deadline.Before(now) { + // The snapshot has finished and its grace period has passed. + // Ignore it when making truncation decisions. + delete(r.mu.snapshotLogTruncationConstraints, snapUUID) + continue + } + if minSnapIndex == 0 || minSnapIndex > item.index { + minSnapIndex = item.index + } + } + if len(r.mu.snapshotLogTruncationConstraints) == 0 { + // Save a little bit of memory. + r.mu.snapshotLogTruncationConstraints = nil + } + return minSnapIndex } func (r *Replica) startKey() roachpb.RKey { @@ -7061,8 +7108,8 @@ func HasRaftLeader(raftStatus *raft.Status) bool { } func calcReplicaMetrics( - ctx context.Context, - now hlc.Timestamp, + _ context.Context, + _ hlc.Timestamp, raftCfg *base.RaftConfig, zone *config.ZoneConfig, livenessMap IsLiveMap, diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 256e7772a740..07eb3cade222 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -799,15 +799,6 @@ func (r *Replica) changeReplicas( return errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc) } - // Prohibit premature raft log truncation. We set the pending index to 1 - // here until we determine what it is below. This removes a small window of - // opportunity for the raft log to get truncated after the snapshot is - // generated. - if err := r.setPendingSnapshotIndex(1); err != nil { - return err - } - defer r.clearPendingSnapshotIndex() - // Send a pre-emptive snapshot. Note that the replica to which this // snapshot is addressed has not yet had its replica ID initialized; this // is intentional, and serves to avoid the following race with the replica @@ -958,12 +949,6 @@ func (r *Replica) sendSnapshot( return errors.Wrapf(err, "%s: change replicas failed", r) } - if snapType == snapTypePreemptive { - if err := r.setPendingSnapshotIndex(snap.RaftSnap.Metadata.Index); err != nil { - return err - } - } - status := r.RaftStatus() if status == nil { // This code path is sometimes hit during scatter for replicas that diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 890199c9a03b..37035c4675b5 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -392,15 +392,25 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { func (r *Replica) GetSnapshot( ctx context.Context, snapType string, ) (_ *OutgoingSnapshot, err error) { + snapUUID := uuid.MakeV4() // Get a snapshot while holding raftMu to make sure we're not seeing "half // an AddSSTable" (i.e. a state in which an SSTable has been linked in, but // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() + r.mu.Lock() + appliedIndex := r.mu.state.RaftAppliedIndex + r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex) // cleared when OutgoingSnapshot closes + r.mu.Unlock() r.raftMu.Unlock() + release := func() { + r.completeSnapshotLogTruncationConstraint(ctx, snapUUID, timeutil.Now()) + } + defer func() { if err != nil { + release() snap.Close() } }() @@ -427,13 +437,14 @@ func (r *Replica) GetSnapshot( // to use Replica.mu.stateLoader. This call is not performance sensitive, so // create a new state loader. snapData, err := snapshot( - ctx, stateloader.Make(r.store.cfg.Settings, rangeID), snapType, + ctx, snapUUID, stateloader.Make(r.store.cfg.Settings, rangeID), snapType, snap, rangeID, r.store.raftEntryCache, withSideloaded, startKey, ) if err != nil { log.Errorf(ctx, "error generating snapshot: %s", err) return nil, err } + snapData.onClose = release return &snapData, nil } @@ -457,6 +468,7 @@ type OutgoingSnapshot struct { WithSideloaded func(func(sideloadStorage) error) error RaftEntryCache *raftEntryCache snapType string + onClose func() } func (s *OutgoingSnapshot) String() string { @@ -467,6 +479,9 @@ func (s *OutgoingSnapshot) String() string { func (s *OutgoingSnapshot) Close() { s.Iter.Close() s.EngineSnap.Close() + if s.onClose != nil { + s.onClose() + } } // IncomingSnapshot contains the data for an incoming streaming snapshot message. @@ -485,6 +500,7 @@ type IncomingSnapshot struct { // given range. Note that snapshot() is called without Replica.raftMu held. func snapshot( ctx context.Context, + snapUUID uuid.UUID, rsl stateloader.StateLoader, snapType string, snap engine.Reader, @@ -536,7 +552,6 @@ func snapshot( // Intentionally let this iterator and the snapshot escape so that the // streamer can send chunks from it bit by bit. iter := rditer.NewReplicaDataIterator(&desc, snap, true /* replicatedOnly */) - snapUUID := uuid.MakeV4() return OutgoingSnapshot{ RaftEntryCache: eCache, From 3fd4bd554d8d9973b83eb99aaa345df6d5447e4f Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Wed, 14 Nov 2018 20:40:42 +0100 Subject: [PATCH 2/2] storage: avoid errant Raft snapshots after splits A known race occurs during splits when some nodes apply the split trigger faster than others. The "slow" node(s) may learn about the newly created right hand side replica through Raft messages arriving from the "fast" nodes. In such cases, the leader will immediately try to catch up the follower (which it sees at log position zero) via a snapshot, but this isn't possible since there's an overlapping replica (the pre-split replica waiting to apply the trigger). This both leads to unnecessary transfer of data and can clog the Raft snapshot queue which tends to get stuck due to the throttling mechanisms both at the sender and receivers. To prevent this race (or make it exceedingly unlikely), we selectively drop certain messages from uninitialized followers, namely those that refuse an append to the log, for a number of ticks (corresponding to at most a few seconds of real time). Not dropping such a message leads to a Raft snapshot as the leader will learn that the follower has last index zero, which is never an index that can be caught up to from the log (our log "starts" at index 10). The script below reproduces the race (prior to this commit) by running 1000 splits back to back in a three node local cluster, usually showing north of a hundred Raft snapshots, i.e. a >10% chance to hit the race for each split. There's also a unit test that exposes this problem and can be stressed more conveniently (it also exposes the problems in the preceding commit related to overly aggressive log truncation). The false positives here are a) the LHS of the split needs a snapshot which catches it up across the split trigger and b) the LHS is rebalanced away (and GC'ed) before applying the split trigger. In both cases the timeout-based mechanism would allow the snapshot after a few seconds, once the Raft leader contacts the follower for the next time. Note that the interaction with Raft group quiescence is benign. We're only dropping MsgAppResp which is only sent by followers, implying that the Raft group is already unquiesced. ``` set -euxo pipefail killall -9 cockroach || true killall -9 workload || true sleep 1 rm -rf cockroach-data || true mkdir -p cockroach-data ./cockroach start --insecure --host=localhost --port=26257 --http-port=26258 --store=cockroach-data/1 --cache=256MiB --background ./cockroach start --insecure --host=localhost --port=26259 --http-port=26260 --store=cockroach-data/2 --cache=256MiB --join=localhost:26257 --background ./cockroach start --insecure --host=localhost --port=26261 --http-port=26262 --store=cockroach-data/3 --cache=256MiB --join=localhost:26257 --background sleep 5 ./cockroach sql --insecure -e 'set cluster setting kv.range_merge.queue_enabled = false;' ./bin/workload run kv --splits 1000 --init --drop --max-ops 1 sleep 5 for port in 26257 26259 26261; do ./cockroach sql --insecure -e "select name, value from crdb_internal.node_metrics where name like '%raftsn%' order by name desc" --port "${port}" done ``` Release note (bug fix): Avoid occasional unnecessary Raft snapshots after Range splits. --- pkg/base/config.go | 9 +++++ pkg/storage/client_split_test.go | 69 ++++++++++++++++++++++++++++++-- pkg/storage/replica.go | 61 ++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 4 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index 0c79f28b028f..0a68b7723daf 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -111,6 +111,9 @@ var ( // will send to a follower without hearing a response. defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt( "COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64) + + defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt( + "COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20) ) type lazyHTTPClient struct { @@ -476,6 +479,8 @@ type RaftConfig struct { // translates to ~1024 commands that might be executed in the handling of a // single raft.Ready operation. RaftMaxInflightMsgs int + + RaftPostSplitSuppressSnapshotTicks int } // SetDefaults initializes unset fields. @@ -510,6 +515,10 @@ func (cfg *RaftConfig) SetDefaults() { if cfg.RaftMaxInflightMsgs == 0 { cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs } + + if cfg.RaftPostSplitSuppressSnapshotTicks == 0 { + cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks + } } // RaftElectionTimeout returns the raft election timeout, as computed from the diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 24cb8a8ba9be..cd7192551ccc 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -27,10 +27,6 @@ import ( "testing" "time" - "github.com/gogo/protobuf/proto" - "github.com/pkg/errors" - "go.etcd.io/etcd/raft/raftpb" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -51,6 +47,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/ts" "github.com/cockroachdb/cockroach/pkg/ts/tspb" + "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -60,6 +58,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft/raftpb" ) // adminSplitArgs creates an AdminSplitRequest for the provided split key. @@ -504,6 +505,66 @@ func TestStoreRangeSplitConcurrent(t *testing.T) { } } +// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica +// resulting from a split hasn't been initialized via the split trigger yet, a +// grace period prevents the replica from requesting an errant Raft snapshot. +// This is verified by running a number of splits and asserting that no Raft +// snapshots are observed. As a nice side effect, this also verifies that log +// truncations don't cause any Raft snapshots in this test. +func TestSplitTriggerRaftSnapshotRace(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + const numNodes = 3 + var args base.TestClusterArgs + args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true} + tc := testcluster.StartTestCluster(t, numNodes, args) + defer tc.Stopper().Stop(ctx) + + numSplits := 100 + if util.RaceEnabled { + // Running 100 splits is overkill in race builds. + numSplits = 10 + } + perm := rand.Perm(numSplits) + idx := int32(-1) // accessed atomically + + checkNoSnaps := func(when string) { + for i := 0; i < numNodes; i++ { + var n int // num rows (sanity check against test rotting) + var c int // num Raft snapshots + if err := tc.ServerConn(i).QueryRow(` +SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE + name LIKE 'queue.raftsnapshot.process.%' +OR + name LIKE 'queue.raftsnapshot.pending' +`).Scan(&n, &c); err != nil { + t.Fatal(err) + } + if expRows := 3; n != expRows { + t.Fatalf("%s: expected %d rows, got %d", when, expRows, n) + } + if c > 0 { + t.Fatalf("observed %d Raft snapshots %s splits", c, when) + } + } + } + + checkNoSnaps("before") + + doSplit := func(ctx context.Context) error { + _, _, err := tc.SplitRange( + []byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)]))) + return err + } + + if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil { + t.Fatal(err) + } + + checkNoSnaps("after") +} + // TestStoreRangeSplitIdempotency executes a split of a range and // verifies that the resulting ranges respond to the right key ranges // and that their stats have been properly accounted for and requests diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 23526b0ab88e..4404ef7067ea 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -5131,6 +5131,10 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag lastAppResp = message drop = true } + + if r.maybeDropMsgAppResp(ctx, message) { + drop = true + } } if !drop { r.sendRaftMessage(ctx, message) @@ -5141,6 +5145,63 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag } } +// maybeDropMsgAppResp returns true if the outgoing Raft message should be +// dropped. It does so if sending the message would likely result in an errant +// Raft snapshot after a split. +func (r *Replica) maybeDropMsgAppResp(ctx context.Context, msg raftpb.Message) bool { + if !msg.Reject { + return false + } + + r.mu.RLock() + ticks := r.mu.ticks + initialized := r.isInitializedRLocked() + r.mu.RUnlock() + + if initialized { + return false + } + + if ticks > r.store.cfg.RaftPostSplitSuppressSnapshotTicks { + log.Infof(ctx, "allowing MsgAppResp for uninitialized replica") + return false + } + + if msg.RejectHint != 0 { + log.Fatalf(ctx, "received reject hint %d from supposedly uninitialized replica", msg.RejectHint) + } + + // This replica has a blank state, i.e. its last index is zero (because we + // start our Raft log at index 10). In particular, it's not a preemptive + // snapshot. This happens in two cases: + // + // 1. a rebalance operation is adding a new replica of the range to this + // node. We always send a preemptive snapshot before attempting to do so, so + // we wouldn't enter this branch as the replica would be initialized. We + // would however enter this branch if the preemptive snapshot got GC'ed + // before the actual replica change came through. + // + // 2. a split executed that created this replica as its right hand side, but + // this node's pre-split replica hasn't executed the split trigger (yet). + // The expectation is that it will do so momentarily, however if we don't + // drop this rejection, the Raft leader will try to catch us up via a + // snapshot. In 99.9% of cases this is a wasted effort since the pre-split + // replica already contains the data this replica will hold. The remaining + // 0.01% constitute the case in which our local replica of the pre-split + // range requires a snapshot which catches it up "past" the split trigger, + // in which case the trigger will never be executed (the snapshot instead + // wipes out the data the split trigger would've tried to put into this + // range). A similar scenario occurs if there's a rebalance operation that + // rapidly removes the pre-split replica, so that it never catches up (nor + // via log nor via snapshot); in that case too, the Raft snapshot is + // required to materialize the split's right hand side replica (i.e. this + // one). We're delaying the snapshot for a short amount of time only, so + // this seems tolerable. + log.VEventf(ctx, 2, "dropping rejection from index %d to index %d", msg.Index, msg.RejectHint) + + return true +} + // sendRaftMessage sends a Raft message. func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { r.mu.Lock()