Skip to content

Commit

Permalink
storage: never truncate log for inflight snapshots
Browse files Browse the repository at this point in the history
When a (preemptive or Raft) snapshot is inflight, a range should not
truncate its log index to one ahead of the pending snapshot as doing so
would require yet another snapshot.

The code prior to this commit attempted to achieve that by treating the
pending snapshot as an additional follower in the computation of the
quorum index. This works as intended as long as the Raft log is below
its configured maximum size but usually leads to abandoning the snapshot
index when truncating based on size (to the quorum commit index). This
situation occurs frequently during the split+scatter phase of data
imports, where (mostly empty) ranges are rapidly upreplicated and split.

This isn't limited to small replicas, however. Assume a range is ~32mb
in size and a (say, preemptive) snapshot is underway. Preemptive
snapshots are (at the time of writing) throttled to 2mb/s, so it will
take approximately 16s to go through. At the same time, the Raft log may
easily grow in size by the size truncation threshold (4mb at time of
writing), allowing a log truncation that abandons the snapshot. A
similar phenomenon applies to Raft snapshots, though now the quota pool
will place restrictions on forward Raft progress, however not if a
single command exceeds the size restriction (as is common during RESTORE
operations where individual Raft commands are large). I haven't
conclusively observed this in practice (though there has been enough
badness to suspect that it happened), but in principle this could lead
to a potentially infinite number of snapshots being sent out, a very
expensive way of keeping a follower up to date.

After this change, the pending snapshot index is managed more carefully
(and is managed for Raft snapshots as well, not only for preemptive
ones) and is never truncated away. As a result, in regular operation
snapshots should now be able to be followed by regular Raft log catch-up
in all cases. More precisely, we keep a map of pending snapshot index to
deadline. The deadline is zero while the snapshot is ongoing and is set
when the snapshot is completed. This avoids races between the snapshot
completing and the replication change completing (which would occur
even if the snapshot is only registering as completed after the
replication change completes).

There's an opportunity for a bigger refactor here by using learner
replicas instead of preemptive snapshots. The idea is to add the replica
as a learner first (ignoring it for the quota pool until it has received
the first snapshot) first, and to use the regular Raft snapshot
mechanism to catch it up. Once achieved, another configuration change
would convert the learner into a regular follower. This change in itself
will likely make our code more principled, but it is a more invasive
change that is left for the future.

Similarly, there is knowledge in the quota pool that it seems we should
be using for log truncation decisions. The quota pool essentially knows
about the size of each log entry whereas the Raft truncations only know
the accumulated approximate size of the full log. For instance, instead
of blindly truncating to the quorum commit index when the Raft log is
too large, we could truncate it to an index that reduces the log size to
about 50% of the maximum, in effect reducing the number of snapshots
that are necessary due to quorum truncation. It's unclear whether this
improvement will matter in practice.

The following script reproduces redundant Raft snapshots (needs a
somewhat beefy machine such as a gceworker). Note that this script will
also incur Raft snapshots due to the splits, which are fixed in a
follow-up commit.

```
set -euxo pipefail

killall -9 cockroach || true
killall -9 workload || true
sleep 1
rm -rf cockroach-data || true
mkdir -p cockroach-data

./cockroach start --insecure --host=localhost --port=26257 --http-port=26258 --store=cockroach-data/1 --cache=256MiB --background
./cockroach start --insecure --host=localhost --port=26259 --http-port=26260 --store=cockroach-data/2 --cache=256MiB --join=localhost:26257 --background
./cockroach start --insecure --host=localhost --port=26261 --http-port=26262 --store=cockroach-data/3 --cache=256MiB --join=localhost:26257 --background
./cockroach start --insecure --host=localhost --port=26263 --http-port=26264 --store=cockroach-data/4 --cache=256MiB --join=localhost:26257 --background

sleep 5

./cockroach sql --insecure -e 'set cluster setting kv.range_merge.queue_enabled = false;'

./cockroach sql --insecure <<EOF
CREATE DATABASE csv;
IMPORT TABLE csv.lineitem CREATE USING 'gs://cockroach-fixtures/tpch-csv/schema/lineitem.sql'
CSV DATA (
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.1',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.2',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.3',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.4',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.5',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.6',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.7',
 'gs://cockroach-fixtures/tpch-csv/sf-1/lineitem.tbl.8'
) WITH delimiter='|' ;
EOF

sleep 5

for port in 26257 26259 26261 26263; do
 ./cockroach sql --insecure -e "select name, value from crdb_internal.node_metrics where name like '%raftsn%' order by name desc" --port "${port}"
+done
```

Release note: None
  • Loading branch information
tbg committed Nov 14, 2018
1 parent df690c7 commit b64bdc9
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 78 deletions.
36 changes: 20 additions & 16 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
// Allow a limited number of Raft log truncations to be processed
// concurrently.
raftLogQueueConcurrency = 4
// While a snapshot is in flight, we won't truncate past the snapshot's log
// index. This behavior is extended to a grace period after the snapshot is
// marked as completed as it is applied at the receiver only a little later,
// leaving a window for a truncation that requires another snapshot.
raftLogQueuePendingSnapshotGracePeriod = 3 * time.Second
)

// raftLogQueue manages a queue of replicas slated to have their raft logs
Expand Down Expand Up @@ -90,6 +95,7 @@ func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLo
// returned.
func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, error) {
rangeID := r.RangeID
now := timeutil.Now()

r.mu.Lock()
raftLogSize := r.mu.raftLogSize
Expand All @@ -110,7 +116,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er
raftStatus := r.raftStatusRLocked()

firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.mu.pendingSnapshotIndex
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now)
lastIndex := r.mu.lastIndex
r.mu.Unlock()

Expand Down Expand Up @@ -248,7 +254,7 @@ func (td *truncateDecision) ShouldTruncate() bool {
// snapshots. See #8629.
func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(input.RaftStatus, input.PendingPreemptiveSnapshotIndex)
decision.QuorumIndex = getQuorumIndex(input.RaftStatus)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex
Expand All @@ -264,14 +270,15 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision.ChosenVia = truncatableIndexChosenViaFollowers
}
}
// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range. We don't want to truncate the log in a
// way that will require that new replica to be caught up via a Raft
// snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}
}

// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range (or is in Raft recovery). We don't want to
// truncate the log in a way that will require that new replica to be caught
// up via yet another Raft snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}

// Advance to the first index, but never truncate past the quorum commit
Expand Down Expand Up @@ -300,7 +307,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
}

// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The pendingSnapshotIndex indicates the index of a pending
// committed. The snapshotLogTruncationConstraints indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet. Note that getQuorumIndex may return 0 if the progress map
// doesn't contain information for a sufficient number of followers (e.g. the
Expand All @@ -310,14 +317,11 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
// quorum was determined at the time the index was written. If you're thinking
// of using getQuorumIndex for some purpose, consider that raftStatus.Commit
// might be more appropriate (e.g. determining if a replica is up to date).
func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress)+1)
func getQuorumIndex(raftStatus *raft.Status) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress))
for _, progress := range raftStatus.Progress {
match = append(match, progress.Match)
}
if pendingSnapshotIndex != 0 {
match = append(match, pendingSnapshotIndex)
}
sort.Sort(uint64Slice(match))
quorum := computeQuorum(len(match))
return match[len(match)-quorum]
Expand Down
104 changes: 84 additions & 20 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"math"
"strings"
"testing"

"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -32,6 +30,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft"
)

func TestShouldTruncate(t *testing.T) {
Expand Down Expand Up @@ -66,23 +69,22 @@ func TestGetQuorumIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
progress []uint64
pendingSnapshotIndex uint64
expected uint64
progress []uint64
expected uint64
}{
// Basic cases.
{[]uint64{1}, 0, 1},
{[]uint64{2}, 1, 1},
{[]uint64{1, 2}, 0, 1},
{[]uint64{2, 3}, 1, 2},
{[]uint64{1, 2, 3}, 0, 2},
{[]uint64{2, 3, 4}, 1, 2},
{[]uint64{1, 2, 3, 4}, 0, 2},
{[]uint64{2, 3, 4, 5}, 1, 3},
{[]uint64{1, 2, 3, 4, 5}, 0, 3},
{[]uint64{2, 3, 4, 5, 6}, 1, 3},
{[]uint64{1}, 1},
{[]uint64{2}, 2},
{[]uint64{1, 2}, 1},
{[]uint64{2, 3}, 2},
{[]uint64{1, 2, 3}, 2},
{[]uint64{2, 3, 4}, 3},
{[]uint64{1, 2, 3, 4}, 2},
{[]uint64{2, 3, 4, 5}, 3},
{[]uint64{1, 2, 3, 4, 5}, 3},
{[]uint64{2, 3, 4, 5, 6}, 4},
// Sorting.
{[]uint64{5, 4, 3, 2, 1}, 0, 3},
{[]uint64{5, 4, 3, 2, 1}, 3},
}
for i, c := range testCases {
status := &raft.Status{
Expand All @@ -91,7 +93,7 @@ func TestGetQuorumIndex(t *testing.T) {
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex)
quorumMatchedIndex := getQuorumIndex(status)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
}
Expand Down Expand Up @@ -161,9 +163,10 @@ func TestComputeTruncateDecision(t *testing.T) {
[]uint64{1, 3, 3, 4}, 2000, 1, 3, 0,
"truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot",
},
// Don't truncate away pending snapshot, even when log too large.
{
[]uint64{100, 100}, 2000, 1, 100, 50,
"truncate 99 entries to first index 100 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot",
"truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
},
{
[]uint64{1, 3, 3, 4}, 2000, 2, 3, 0,
Expand All @@ -176,7 +179,7 @@ func TestComputeTruncateDecision(t *testing.T) {
// The pending snapshot index affects the quorum commit index.
{
[]uint64{4}, 2000, 1, 7, 1,
"truncate 0 entries to first index 1 (chosen via: quorum); log too large (2.0 KiB > 1000 B)",
"truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
},
// Never truncate past the quorum commit index.
{
Expand Down Expand Up @@ -407,3 +410,64 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
})
}
}

func TestSnapshotLogTruncationConstraints(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
r := &Replica{}
id1, id2 := uuid.MakeV4(), uuid.MakeV4()
const (
index1 = 50
index2 = 60
)

// Add first constraint.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}

// Make sure it registered.
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Helper that grabs the min constraint index (which can trigger GC as a
// byproduct) and asserts.
assertMin := func(exp uint64, now time.Time) {
t.Helper()
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}

// Queue should be told index1 is the highest pending one. Note that the
// colliding update at index2 is not represented.
assertMin(index1, time.Time{})

// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2)

now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
r.completeSnapshotLogTruncationConstraint(ctx, id1, now)
// The index should show up when its deadline isn't hit.
assertMin(index1, now)
assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod))
assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod))
// Once we're over deadline, the index returned so far disappears.
assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod+1))
assertMin(index2, time.Time{})
assertMin(index2, now.Add(10*raftLogQueuePendingSnapshotGracePeriod))

r.completeSnapshotLogTruncationConstraint(ctx, id2, now)
assertMin(index2, now)
assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod))
assertMin(0, now.Add(2*raftLogQueuePendingSnapshotGracePeriod))

assert.Equal(t, r.mu.snapshotLogTruncationConstraints, map[uuid.UUID]snapTruncationInfo(nil))
}
97 changes: 72 additions & 25 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,19 @@ type Replica struct {
// from the Raft log entry. Use the invalidLastTerm constant for this
// case.
lastIndex, lastTerm uint64
// The raft log index of a pending preemptive snapshot. Used to prohibit
// raft log truncation while a preemptive snapshot is in flight. A value of
// 0 indicates that there is no pending snapshot.
pendingSnapshotIndex uint64
// A map of raft log index of pending preemptive snapshots to deadlines.
// Used to prohibit raft log truncations that would leave a gap between
// the snapshot and the new first index. The map entry has a zero
// deadline while the snapshot is being sent and turns nonzero when the
// snapshot has completed, preventing truncation for a grace period
// (since there is a race between the snapshot completing and its being
// reflected in the raft status used to make truncation decisions).
//
// NB: If we kept only one value, we could end up in situations in which
// we're either giving some snapshots no grace period, or keep an
// already finished snapshot "pending" for extended periods of time
// (preventing log truncation).
snapshotLogTruncationConstraints map[uuid.UUID]snapTruncationInfo
// raftLogSize is the approximate size in bytes of the persisted raft log.
// On server restart, this value is assumed to be zero to avoid costly scans
// of the raft log. This will be correct when all log entries predating this
Expand Down Expand Up @@ -6952,29 +6961,67 @@ func (r *Replica) exceedsMultipleOfSplitSizeRLocked(mult float64) bool {
return maxBytes > 0 && float64(size) > float64(maxBytes)*mult
}

func (r *Replica) setPendingSnapshotIndex(index uint64) error {
type snapTruncationInfo struct {
index uint64
deadline time.Time
}

func (r *Replica) addSnapshotLogTruncationConstraintLocked(
ctx context.Context, snapUUID uuid.UUID, index uint64,
) {
if r.mu.snapshotLogTruncationConstraints == nil {
r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo)
}
item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID]
if ok {
// Uh-oh, there's either a programming error (resulting in the same snapshot
// fed into this method twice) or a UUID collision. We discard the update
// (which is benign) but log it loudly. If the index is the same, it's
// likely the former, otherwise the latter.
log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index)
return
}

r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index}
}

func (r *Replica) completeSnapshotLogTruncationConstraint(
ctx context.Context, snapUUID uuid.UUID, now time.Time,
) {
deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod)

r.mu.Lock()
defer r.mu.Unlock()
// We allow the pendingSnapshotIndex to change from 0 to 1 and then from 1 to
// a value greater than 1. Any other change indicates 2 current preemptive
// snapshots on the same replica which is disallowed.
if (index == 1 && r.mu.pendingSnapshotIndex != 0) ||
(index > 1 && r.mu.pendingSnapshotIndex != 1) {
// NB: this path can be hit if the replicate queue and scatter work
// concurrently. It's still good to return an error to avoid duplicating
// work, but we make it a benign one (so that it isn't logged).
return &benignError{errors.Errorf(
"%s: can't set pending snapshot index to %d; pending snapshot already present: %d",
r, index, r.mu.pendingSnapshotIndex)}
}
r.mu.pendingSnapshotIndex = index
return nil
item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID]
if !ok {
// UUID collision while adding the snapshot in originally. Nothing
// else to do.
return
}

item.deadline = deadline
r.mu.snapshotLogTruncationConstraints[snapUUID] = item
}

func (r *Replica) clearPendingSnapshotIndex() {
r.mu.Lock()
r.mu.pendingSnapshotIndex = 0
r.mu.Unlock()
func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
now time.Time,
) (minSnapIndex uint64) {
for snapUUID, item := range r.mu.snapshotLogTruncationConstraints {
if item.deadline != (time.Time{}) && item.deadline.Before(now) {
// The snapshot has finished and its grace period has passed.
// Ignore it when making truncation decisions.
delete(r.mu.snapshotLogTruncationConstraints, snapUUID)
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
}
if len(r.mu.snapshotLogTruncationConstraints) == 0 {
// Save a little bit of memory.
r.mu.snapshotLogTruncationConstraints = nil
}
return minSnapIndex
}

func (r *Replica) startKey() roachpb.RKey {
Expand Down Expand Up @@ -7061,8 +7108,8 @@ func HasRaftLeader(raftStatus *raft.Status) bool {
}

func calcReplicaMetrics(
ctx context.Context,
now hlc.Timestamp,
_ context.Context,
_ hlc.Timestamp,
raftCfg *base.RaftConfig,
zone *config.ZoneConfig,
livenessMap IsLiveMap,
Expand Down
15 changes: 0 additions & 15 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,15 +799,6 @@ func (r *Replica) changeReplicas(
return errors.Errorf("%s: unable to add replica %v; node already has a replica", r, repDesc)
}

// Prohibit premature raft log truncation. We set the pending index to 1
// here until we determine what it is below. This removes a small window of
// opportunity for the raft log to get truncated after the snapshot is
// generated.
if err := r.setPendingSnapshotIndex(1); err != nil {
return err
}
defer r.clearPendingSnapshotIndex()

// Send a pre-emptive snapshot. Note that the replica to which this
// snapshot is addressed has not yet had its replica ID initialized; this
// is intentional, and serves to avoid the following race with the replica
Expand Down Expand Up @@ -958,12 +949,6 @@ func (r *Replica) sendSnapshot(
return errors.Wrapf(err, "%s: change replicas failed", r)
}

if snapType == snapTypePreemptive {
if err := r.setPendingSnapshotIndex(snap.RaftSnap.Metadata.Index); err != nil {
return err
}
}

status := r.RaftStatus()
if status == nil {
// This code path is sometimes hit during scatter for replicas that
Expand Down
Loading

0 comments on commit b64bdc9

Please sign in to comment.