diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 90c51047e3ce..b877f50f4bf8 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -51,6 +51,11 @@ const ( // Allow a limited number of Raft log truncations to be processed // concurrently. raftLogQueueConcurrency = 4 + // While a snapshot is in flight, we won't truncate past the snapshot's log + // index. This behavior is extended to a grace period after the snapshot is + // marked as completed as it is applied at the receiver only a little later, + // leaving a window for a truncation that requires another snapshot. + raftLogQueuePendingSnapshotGracePeriod = 3 * time.Second ) // raftLogQueue manages a queue of replicas slated to have their raft logs @@ -90,6 +95,7 @@ func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLo // returned. func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, error) { rangeID := r.RangeID + now := timeutil.Now() r.mu.Lock() raftLogSize := r.mu.raftLogSize @@ -110,7 +116,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er raftStatus := r.raftStatusRLocked() firstIndex, err := r.raftFirstIndexLocked() - pendingSnapshotIndex := r.mu.pendingSnapshotIndex + pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now) lastIndex := r.mu.lastIndex r.mu.Unlock() @@ -248,7 +254,7 @@ func (td *truncateDecision) ShouldTruncate() bool { // snapshots. See #8629. func computeTruncateDecision(input truncateDecisionInput) truncateDecision { decision := truncateDecision{Input: input} - decision.QuorumIndex = getQuorumIndex(input.RaftStatus, input.PendingPreemptiveSnapshotIndex) + decision.QuorumIndex = getQuorumIndex(input.RaftStatus) decision.NewFirstIndex = decision.QuorumIndex decision.ChosenVia = truncatableIndexChosenViaQuorumIndex @@ -264,14 +270,15 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { decision.ChosenVia = truncatableIndexChosenViaFollowers } } - // The pending snapshot index acts as a placeholder for a replica that is - // about to be added to the range. We don't want to truncate the log in a - // way that will require that new replica to be caught up via a Raft - // snapshot. - if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex { - decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex - decision.ChosenVia = truncatableIndexChosenViaPendingSnap - } + } + + // The pending snapshot index acts as a placeholder for a replica that is + // about to be added to the range (or is in Raft recovery). We don't want to + // truncate the log in a way that will require that new replica to be caught + // up via yet another Raft snapshot. + if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex { + decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex + decision.ChosenVia = truncatableIndexChosenViaPendingSnap } // Advance to the first index, but never truncate past the quorum commit @@ -300,7 +307,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { } // getQuorumIndex returns the index which a quorum of the nodes have -// committed. The pendingSnapshotIndex indicates the index of a pending +// committed. The snapshotLogTruncationConstraints indicates the index of a pending // snapshot which is considered part of the Raft group even though it hasn't // been added yet. Note that getQuorumIndex may return 0 if the progress map // doesn't contain information for a sufficient number of followers (e.g. the @@ -310,14 +317,11 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision { // quorum was determined at the time the index was written. If you're thinking // of using getQuorumIndex for some purpose, consider that raftStatus.Commit // might be more appropriate (e.g. determining if a replica is up to date). -func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 { - match := make([]uint64, 0, len(raftStatus.Progress)+1) +func getQuorumIndex(raftStatus *raft.Status) uint64 { + match := make([]uint64, 0, len(raftStatus.Progress)) for _, progress := range raftStatus.Progress { match = append(match, progress.Match) } - if pendingSnapshotIndex != 0 { - match = append(match, pendingSnapshotIndex) - } sort.Sort(uint64Slice(match)) quorum := computeQuorum(len(match)) return match[len(match)-quorum] diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 5bfcb54539b8..b909617e8e05 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -20,9 +20,7 @@ import ( "math" "strings" "testing" - - "github.com/pkg/errors" - "go.etcd.io/etcd/raft" + "time" "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/keys" @@ -32,6 +30,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/raft" ) func TestShouldTruncate(t *testing.T) { @@ -66,23 +69,22 @@ func TestGetQuorumIndex(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { - progress []uint64 - pendingSnapshotIndex uint64 - expected uint64 + progress []uint64 + expected uint64 }{ // Basic cases. - {[]uint64{1}, 0, 1}, - {[]uint64{2}, 1, 1}, - {[]uint64{1, 2}, 0, 1}, - {[]uint64{2, 3}, 1, 2}, - {[]uint64{1, 2, 3}, 0, 2}, - {[]uint64{2, 3, 4}, 1, 2}, - {[]uint64{1, 2, 3, 4}, 0, 2}, - {[]uint64{2, 3, 4, 5}, 1, 3}, - {[]uint64{1, 2, 3, 4, 5}, 0, 3}, - {[]uint64{2, 3, 4, 5, 6}, 1, 3}, + {[]uint64{1}, 1}, + {[]uint64{2}, 2}, + {[]uint64{1, 2}, 1}, + {[]uint64{2, 3}, 2}, + {[]uint64{1, 2, 3}, 2}, + {[]uint64{2, 3, 4}, 3}, + {[]uint64{1, 2, 3, 4}, 2}, + {[]uint64{2, 3, 4, 5}, 3}, + {[]uint64{1, 2, 3, 4, 5}, 3}, + {[]uint64{2, 3, 4, 5, 6}, 4}, // Sorting. - {[]uint64{5, 4, 3, 2, 1}, 0, 3}, + {[]uint64{5, 4, 3, 2, 1}, 3}, } for i, c := range testCases { status := &raft.Status{ @@ -91,7 +93,7 @@ func TestGetQuorumIndex(t *testing.T) { for j, v := range c.progress { status.Progress[uint64(j)] = raft.Progress{Match: v} } - quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex) + quorumMatchedIndex := getQuorumIndex(status) if c.expected != quorumMatchedIndex { t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex) } @@ -161,9 +163,10 @@ func TestComputeTruncateDecision(t *testing.T) { []uint64{1, 3, 3, 4}, 2000, 1, 3, 0, "truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot", }, + // Don't truncate away pending snapshot, even when log too large. { []uint64{100, 100}, 2000, 1, 100, 50, - "truncate 99 entries to first index 100 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot", + "truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)", }, { []uint64{1, 3, 3, 4}, 2000, 2, 3, 0, @@ -176,7 +179,7 @@ func TestComputeTruncateDecision(t *testing.T) { // The pending snapshot index affects the quorum commit index. { []uint64{4}, 2000, 1, 7, 1, - "truncate 0 entries to first index 1 (chosen via: quorum); log too large (2.0 KiB > 1000 B)", + "truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)", }, // Never truncate past the quorum commit index. { @@ -407,3 +410,64 @@ func TestProactiveRaftLogTruncate(t *testing.T) { }) } } + +func TestSnapshotLogTruncationConstraints(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + r := &Replica{} + id1, id2 := uuid.MakeV4(), uuid.MakeV4() + const ( + index1 = 50 + index2 = 60 + ) + + // Add first constraint. + r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1) + exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}} + + // Make sure it registered. + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + + // Add another constraint with the same id. Extremely unlikely in practice + // but we want to make sure it doesn't blow anything up. Collisions are + // handled by ignoring the colliding update. + r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2) + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1) + + // Helper that grabs the min constraint index (which can trigger GC as a + // byproduct) and asserts. + assertMin := func(exp uint64, now time.Time) { + t.Helper() + if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now); maxIndex != exp { + t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp) + } + } + + // Queue should be told index1 is the highest pending one. Note that the + // colliding update at index2 is not represented. + assertMin(index1, time.Time{}) + + // Add another, higher, index. We're not going to notice it's around + // until the lower one disappears. + r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2) + + now := timeutil.Now() + // The colliding snapshot comes back. Or the original, we can't tell. + r.completeSnapshotLogTruncationConstraint(ctx, id1, now) + // The index should show up when its deadline isn't hit. + assertMin(index1, now) + assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + // Once we're over deadline, the index returned so far disappears. + assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod+1)) + assertMin(index2, time.Time{}) + assertMin(index2, now.Add(10*raftLogQueuePendingSnapshotGracePeriod)) + + r.completeSnapshotLogTruncationConstraint(ctx, id2, now) + assertMin(index2, now) + assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod)) + assertMin(0, now.Add(2*raftLogQueuePendingSnapshotGracePeriod)) + + assert.Equal(t, r.mu.snapshotLogTruncationConstraints, map[uuid.UUID]snapTruncationInfo(nil)) +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 01cc313872ed..23526b0ab88e 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -346,10 +346,19 @@ type Replica struct { // from the Raft log entry. Use the invalidLastTerm constant for this // case. lastIndex, lastTerm uint64 - // The raft log index of a pending preemptive snapshot. Used to prohibit - // raft log truncation while a preemptive snapshot is in flight. A value of - // 0 indicates that there is no pending snapshot. - pendingSnapshotIndex uint64 + // A map of raft log index of pending preemptive snapshots to deadlines. + // Used to prohibit raft log truncations that would leave a gap between + // the snapshot and the new first index. The map entry has a zero + // deadline while the snapshot is being sent and turns nonzero when the + // snapshot has completed, preventing truncation for a grace period + // (since there is a race between the snapshot completing and its being + // reflected in the raft status used to make truncation decisions). + // + // NB: If we kept only one value, we could end up in situations in which + // we're either giving some snapshots no grace period, or keep an + // already finished snapshot "pending" for extended periods of time + // (preventing log truncation). + snapshotLogTruncationConstraints map[uuid.UUID]snapTruncationInfo // raftLogSize is the approximate size in bytes of the persisted raft log. // On server restart, this value is assumed to be zero to avoid costly scans // of the raft log. This will be correct when all log entries predating this @@ -6952,29 +6961,67 @@ func (r *Replica) exceedsMultipleOfSplitSizeRLocked(mult float64) bool { return maxBytes > 0 && float64(size) > float64(maxBytes)*mult } -func (r *Replica) setPendingSnapshotIndex(index uint64) error { +type snapTruncationInfo struct { + index uint64 + deadline time.Time +} + +func (r *Replica) addSnapshotLogTruncationConstraintLocked( + ctx context.Context, snapUUID uuid.UUID, index uint64, +) { + if r.mu.snapshotLogTruncationConstraints == nil { + r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) + } + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if ok { + // Uh-oh, there's either a programming error (resulting in the same snapshot + // fed into this method twice) or a UUID collision. We discard the update + // (which is benign) but log it loudly. If the index is the same, it's + // likely the former, otherwise the latter. + log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) + return + } + + r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index} +} + +func (r *Replica) completeSnapshotLogTruncationConstraint( + ctx context.Context, snapUUID uuid.UUID, now time.Time, +) { + deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod) + r.mu.Lock() defer r.mu.Unlock() - // We allow the pendingSnapshotIndex to change from 0 to 1 and then from 1 to - // a value greater than 1. Any other change indicates 2 current preemptive - // snapshots on the same replica which is disallowed. - if (index == 1 && r.mu.pendingSnapshotIndex != 0) || - (index > 1 && r.mu.pendingSnapshotIndex != 1) { - // NB: this path can be hit if the replicate queue and scatter work - // concurrently. It's still good to return an error to avoid duplicating - // work, but we make it a benign one (so that it isn't logged). - return &benignError{errors.Errorf( - "%s: can't set pending snapshot index to %d; pending snapshot already present: %d", - r, index, r.mu.pendingSnapshotIndex)} - } - r.mu.pendingSnapshotIndex = index - return nil + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if !ok { + // UUID collision while adding the snapshot in originally. Nothing + // else to do. + return + } + + item.deadline = deadline + r.mu.snapshotLogTruncationConstraints[snapUUID] = item } -func (r *Replica) clearPendingSnapshotIndex() { - r.mu.Lock() - r.mu.pendingSnapshotIndex = 0 - r.mu.Unlock() +func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( + now time.Time, +) (minSnapIndex uint64) { + for snapUUID, item := range r.mu.snapshotLogTruncationConstraints { + if item.deadline != (time.Time{}) && item.deadline.Before(now) { + // The snapshot has finished and its grace period has passed. + // Ignore it when making truncation decisions. + delete(r.mu.snapshotLogTruncationConstraints, snapUUID) + continue + } + if minSnapIndex == 0 || minSnapIndex > item.index { + minSnapIndex = item.index + } + } + if len(r.mu.snapshotLogTruncationConstraints) == 0 { + // Save a little bit of memory. + r.mu.snapshotLogTruncationConstraints = nil + } + return minSnapIndex } func (r *Replica) startKey() roachpb.RKey { @@ -7061,8 +7108,8 @@ func HasRaftLeader(raftStatus *raft.Status) bool { } func calcReplicaMetrics( - ctx context.Context, - now hlc.Timestamp, + _ context.Context, + _ hlc.Timestamp, raftCfg *base.RaftConfig, zone *config.ZoneConfig, livenessMap IsLiveMap, diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 256e7772a740..07eb3cade222 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -799,15 +799,6 @@ func (r *Replica) changeReplicas( return errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc) } - // Prohibit premature raft log truncation. We set the pending index to 1 - // here until we determine what it is below. This removes a small window of - // opportunity for the raft log to get truncated after the snapshot is - // generated. - if err := r.setPendingSnapshotIndex(1); err != nil { - return err - } - defer r.clearPendingSnapshotIndex() - // Send a pre-emptive snapshot. Note that the replica to which this // snapshot is addressed has not yet had its replica ID initialized; this // is intentional, and serves to avoid the following race with the replica @@ -958,12 +949,6 @@ func (r *Replica) sendSnapshot( return errors.Wrapf(err, "%s: change replicas failed", r) } - if snapType == snapTypePreemptive { - if err := r.setPendingSnapshotIndex(snap.RaftSnap.Metadata.Index); err != nil { - return err - } - } - status := r.RaftStatus() if status == nil { // This code path is sometimes hit during scatter for replicas that diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 890199c9a03b..37035c4675b5 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -392,15 +392,25 @@ func (r *Replica) raftSnapshotLocked() (raftpb.Snapshot, error) { func (r *Replica) GetSnapshot( ctx context.Context, snapType string, ) (_ *OutgoingSnapshot, err error) { + snapUUID := uuid.MakeV4() // Get a snapshot while holding raftMu to make sure we're not seeing "half // an AddSSTable" (i.e. a state in which an SSTable has been linked in, but // the corresponding Raft command not applied yet). r.raftMu.Lock() snap := r.store.engine.NewSnapshot() + r.mu.Lock() + appliedIndex := r.mu.state.RaftAppliedIndex + r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, appliedIndex) // cleared when OutgoingSnapshot closes + r.mu.Unlock() r.raftMu.Unlock() + release := func() { + r.completeSnapshotLogTruncationConstraint(ctx, snapUUID, timeutil.Now()) + } + defer func() { if err != nil { + release() snap.Close() } }() @@ -427,13 +437,14 @@ func (r *Replica) GetSnapshot( // to use Replica.mu.stateLoader. This call is not performance sensitive, so // create a new state loader. snapData, err := snapshot( - ctx, stateloader.Make(r.store.cfg.Settings, rangeID), snapType, + ctx, snapUUID, stateloader.Make(r.store.cfg.Settings, rangeID), snapType, snap, rangeID, r.store.raftEntryCache, withSideloaded, startKey, ) if err != nil { log.Errorf(ctx, "error generating snapshot: %s", err) return nil, err } + snapData.onClose = release return &snapData, nil } @@ -457,6 +468,7 @@ type OutgoingSnapshot struct { WithSideloaded func(func(sideloadStorage) error) error RaftEntryCache *raftEntryCache snapType string + onClose func() } func (s *OutgoingSnapshot) String() string { @@ -467,6 +479,9 @@ func (s *OutgoingSnapshot) String() string { func (s *OutgoingSnapshot) Close() { s.Iter.Close() s.EngineSnap.Close() + if s.onClose != nil { + s.onClose() + } } // IncomingSnapshot contains the data for an incoming streaming snapshot message. @@ -485,6 +500,7 @@ type IncomingSnapshot struct { // given range. Note that snapshot() is called without Replica.raftMu held. func snapshot( ctx context.Context, + snapUUID uuid.UUID, rsl stateloader.StateLoader, snapType string, snap engine.Reader, @@ -536,7 +552,6 @@ func snapshot( // Intentionally let this iterator and the snapshot escape so that the // streamer can send chunks from it bit by bit. iter := rditer.NewReplicaDataIterator(&desc, snap, true /* replicatedOnly */) - snapUUID := uuid.MakeV4() return OutgoingSnapshot{ RaftEntryCache: eCache,