Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: avoid errant Raft snapshots during splits #31875

Merged
merged 2 commits into from
Nov 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ var (
// will send to a follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)

defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20)
)

type lazyHTTPClient struct {
Expand Down Expand Up @@ -476,6 +479,8 @@ type RaftConfig struct {
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int

RaftPostSplitSuppressSnapshotTicks int
}

// SetDefaults initializes unset fields.
Expand Down Expand Up @@ -510,6 +515,10 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftPostSplitSuppressSnapshotTicks == 0 {
cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
69 changes: 65 additions & 4 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
Expand All @@ -51,6 +47,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -60,6 +58,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"
)

// adminSplitArgs creates an AdminSplitRequest for the provided split key.
Expand Down Expand Up @@ -504,6 +505,66 @@ func TestStoreRangeSplitConcurrent(t *testing.T) {
}
}

// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica
// resulting from a split hasn't been initialized via the split trigger yet, a
// grace period prevents the replica from requesting an errant Raft snapshot.
// This is verified by running a number of splits and asserting that no Raft
// snapshots are observed. As a nice side effect, this also verifies that log
// truncations don't cause any Raft snapshots in this test.
func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true}
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

numSplits := 100
if util.RaceEnabled {
// Running 100 splits is overkill in race builds.
numSplits = 10
}
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically

checkNoSnaps := func(when string) {
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name LIKE 'queue.raftsnapshot.process.%'
OR
name LIKE 'queue.raftsnapshot.pending'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
if expRows := 3; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
if c > 0 {
t.Fatalf("observed %d Raft snapshots %s splits", c, when)
}
}
}

checkNoSnaps("before")

doSplit := func(ctx context.Context) error {
_, _, err := tc.SplitRange(
[]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)])))
return err
}

if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil {
t.Fatal(err)
}

checkNoSnaps("after")
}

// TestStoreRangeSplitIdempotency executes a split of a range and
// verifies that the resulting ranges respond to the right key ranges
// and that their stats have been properly accounted for and requests
Expand Down
36 changes: 20 additions & 16 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ const (
// Allow a limited number of Raft log truncations to be processed
// concurrently.
raftLogQueueConcurrency = 4
// While a snapshot is in flight, we won't truncate past the snapshot's log
// index. This behavior is extended to a grace period after the snapshot is
// marked as completed as it is applied at the receiver only a little later,
// leaving a window for a truncation that requires another snapshot.
raftLogQueuePendingSnapshotGracePeriod = 3 * time.Second
)

// raftLogQueue manages a queue of replicas slated to have their raft logs
Expand Down Expand Up @@ -90,6 +95,7 @@ func newRaftLogQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *raftLo
// returned.
func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, error) {
rangeID := r.RangeID
now := timeutil.Now()

r.mu.Lock()
raftLogSize := r.mu.raftLogSize
Expand All @@ -110,7 +116,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (*truncateDecision, er
raftStatus := r.raftStatusRLocked()

firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.mu.pendingSnapshotIndex
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now)
lastIndex := r.mu.lastIndex
r.mu.Unlock()

Expand Down Expand Up @@ -248,7 +254,7 @@ func (td *truncateDecision) ShouldTruncate() bool {
// snapshots. See #8629.
func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision := truncateDecision{Input: input}
decision.QuorumIndex = getQuorumIndex(input.RaftStatus, input.PendingPreemptiveSnapshotIndex)
decision.QuorumIndex = getQuorumIndex(input.RaftStatus)

decision.NewFirstIndex = decision.QuorumIndex
decision.ChosenVia = truncatableIndexChosenViaQuorumIndex
Expand All @@ -264,14 +270,15 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
decision.ChosenVia = truncatableIndexChosenViaFollowers
}
}
// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range. We don't want to truncate the log in a
// way that will require that new replica to be caught up via a Raft
// snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}
}

// The pending snapshot index acts as a placeholder for a replica that is
// about to be added to the range (or is in Raft recovery). We don't want to
// truncate the log in a way that will require that new replica to be caught
// up via yet another Raft snapshot.
if input.PendingPreemptiveSnapshotIndex > 0 && decision.NewFirstIndex > input.PendingPreemptiveSnapshotIndex {
decision.NewFirstIndex = input.PendingPreemptiveSnapshotIndex
decision.ChosenVia = truncatableIndexChosenViaPendingSnap
}

// Advance to the first index, but never truncate past the quorum commit
Expand Down Expand Up @@ -300,7 +307,7 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
}

// getQuorumIndex returns the index which a quorum of the nodes have
// committed. The pendingSnapshotIndex indicates the index of a pending
// committed. The snapshotLogTruncationConstraints indicates the index of a pending
// snapshot which is considered part of the Raft group even though it hasn't
// been added yet. Note that getQuorumIndex may return 0 if the progress map
// doesn't contain information for a sufficient number of followers (e.g. the
Expand All @@ -310,14 +317,11 @@ func computeTruncateDecision(input truncateDecisionInput) truncateDecision {
// quorum was determined at the time the index was written. If you're thinking
// of using getQuorumIndex for some purpose, consider that raftStatus.Commit
// might be more appropriate (e.g. determining if a replica is up to date).
func getQuorumIndex(raftStatus *raft.Status, pendingSnapshotIndex uint64) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress)+1)
func getQuorumIndex(raftStatus *raft.Status) uint64 {
match := make([]uint64, 0, len(raftStatus.Progress))
for _, progress := range raftStatus.Progress {
match = append(match, progress.Match)
}
if pendingSnapshotIndex != 0 {
match = append(match, pendingSnapshotIndex)
}
sort.Sort(uint64Slice(match))
quorum := computeQuorum(len(match))
return match[len(match)-quorum]
Expand Down
104 changes: 84 additions & 20 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import (
"math"
"strings"
"testing"

"github.com/pkg/errors"
"go.etcd.io/etcd/raft"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand All @@ -32,6 +30,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/raft"
)

func TestShouldTruncate(t *testing.T) {
Expand Down Expand Up @@ -66,23 +69,22 @@ func TestGetQuorumIndex(t *testing.T) {
defer leaktest.AfterTest(t)()

testCases := []struct {
progress []uint64
pendingSnapshotIndex uint64
expected uint64
progress []uint64
expected uint64
}{
// Basic cases.
{[]uint64{1}, 0, 1},
{[]uint64{2}, 1, 1},
{[]uint64{1, 2}, 0, 1},
{[]uint64{2, 3}, 1, 2},
{[]uint64{1, 2, 3}, 0, 2},
{[]uint64{2, 3, 4}, 1, 2},
{[]uint64{1, 2, 3, 4}, 0, 2},
{[]uint64{2, 3, 4, 5}, 1, 3},
{[]uint64{1, 2, 3, 4, 5}, 0, 3},
{[]uint64{2, 3, 4, 5, 6}, 1, 3},
{[]uint64{1}, 1},
{[]uint64{2}, 2},
{[]uint64{1, 2}, 1},
{[]uint64{2, 3}, 2},
{[]uint64{1, 2, 3}, 2},
{[]uint64{2, 3, 4}, 3},
{[]uint64{1, 2, 3, 4}, 2},
{[]uint64{2, 3, 4, 5}, 3},
{[]uint64{1, 2, 3, 4, 5}, 3},
{[]uint64{2, 3, 4, 5, 6}, 4},
// Sorting.
{[]uint64{5, 4, 3, 2, 1}, 0, 3},
{[]uint64{5, 4, 3, 2, 1}, 3},
}
for i, c := range testCases {
status := &raft.Status{
Expand All @@ -91,7 +93,7 @@ func TestGetQuorumIndex(t *testing.T) {
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{Match: v}
}
quorumMatchedIndex := getQuorumIndex(status, c.pendingSnapshotIndex)
quorumMatchedIndex := getQuorumIndex(status)
if c.expected != quorumMatchedIndex {
t.Fatalf("%d: expected %d, but got %d", i, c.expected, quorumMatchedIndex)
}
Expand Down Expand Up @@ -161,9 +163,10 @@ func TestComputeTruncateDecision(t *testing.T) {
[]uint64{1, 3, 3, 4}, 2000, 1, 3, 0,
"truncate 2 entries to first index 3 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot",
},
// Don't truncate away pending snapshot, even when log too large.
{
[]uint64{100, 100}, 2000, 1, 100, 50,
"truncate 99 entries to first index 100 (chosen via: quorum); log too large (2.0 KiB > 1000 B); implies 1 Raft snapshot",
"truncate 49 entries to first index 50 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
},
{
[]uint64{1, 3, 3, 4}, 2000, 2, 3, 0,
Expand All @@ -176,7 +179,7 @@ func TestComputeTruncateDecision(t *testing.T) {
// The pending snapshot index affects the quorum commit index.
{
[]uint64{4}, 2000, 1, 7, 1,
"truncate 0 entries to first index 1 (chosen via: quorum); log too large (2.0 KiB > 1000 B)",
"truncate 0 entries to first index 1 (chosen via: pending snapshot); log too large (2.0 KiB > 1000 B)",
},
// Never truncate past the quorum commit index.
{
Expand Down Expand Up @@ -407,3 +410,64 @@ func TestProactiveRaftLogTruncate(t *testing.T) {
})
}
}

func TestSnapshotLogTruncationConstraints(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
r := &Replica{}
id1, id2 := uuid.MakeV4(), uuid.MakeV4()
const (
index1 = 50
index2 = 60
)

// Add first constraint.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}

// Make sure it registered.
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Helper that grabs the min constraint index (which can trigger GC as a
// byproduct) and asserts.
assertMin := func(exp uint64, now time.Time) {
t.Helper()
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}

// Queue should be told index1 is the highest pending one. Note that the
// colliding update at index2 is not represented.
assertMin(index1, time.Time{})

// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2)

now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
r.completeSnapshotLogTruncationConstraint(ctx, id1, now)
// The index should show up when its deadline isn't hit.
assertMin(index1, now)
assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod))
assertMin(index1, now.Add(raftLogQueuePendingSnapshotGracePeriod))
// Once we're over deadline, the index returned so far disappears.
assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod+1))
assertMin(index2, time.Time{})
assertMin(index2, now.Add(10*raftLogQueuePendingSnapshotGracePeriod))

r.completeSnapshotLogTruncationConstraint(ctx, id2, now)
assertMin(index2, now)
assertMin(index2, now.Add(raftLogQueuePendingSnapshotGracePeriod))
assertMin(0, now.Add(2*raftLogQueuePendingSnapshotGracePeriod))

assert.Equal(t, r.mu.snapshotLogTruncationConstraints, map[uuid.UUID]snapTruncationInfo(nil))
}
Loading