Skip to content

Commit

Permalink
storage: avoid errant Raft snapshots after splits
Browse files Browse the repository at this point in the history
A known race occurs during splits when some nodes apply the split
trigger faster than others. The "slow" node(s) may learn about the
newly created right hand side replica through Raft messages arriving
from the "fast" nodes. In such cases, the leader will immediately try to
catch up the follower (which it sees at log position zero) via a
snapshot, but this isn't possible since there's an overlapping replica
(the pre-split replica waiting to apply the trigger). This both leads to
unnecessary transfer of data and can clog the Raft snapshot queue which
tends to get stuck due to the throttling mechanisms both at the sender
and receivers.

To prevent this race (or make it exceedingly unlikely), we selectively
drop certain messages from uninitialized followers, namely those that
refuse an append to the log, for a number of ticks (corresponding to
at most a few seconds of real time). Not dropping such a message leads
to a Raft snapshot as the leader will learn that the follower has last
index zero, which is never an index that can be caught up to from the
log (our log "starts" at index 10).

The script below reproduces the race (prior to this commit) by running
1000 splits back to back in a three node local cluster, usually showing
north of a hundred Raft snapshots, i.e. a >10% chance to hit the race
for each split. There's also a unit test that exposes this problem and
can be stressed more conveniently (it also exposes the problems in the
preceding commit related to overly aggressive log truncation).

The false positives here are a) the LHS of the split needs a snapshot
which catches it up across the split trigger and b) the LHS is
rebalanced away (and GC'ed) before applying the split trigger. In both
cases the timeout-based mechanism would allow the snapshot after a few
seconds, once the Raft leader contacts the follower for the next time.

Note that the interaction with Raft group quiescence is benign. We're
only dropping MsgAppResp which is only sent by followers, implying that
the Raft group is already unquiesced.

```
set -euxo pipefail

killall -9 cockroach || true
killall -9 workload || true
sleep 1
rm -rf cockroach-data || true
mkdir -p cockroach-data

./cockroach start --insecure --host=localhost --port=26257 --http-port=26258 --store=cockroach-data/1 --cache=256MiB --background
./cockroach start --insecure --host=localhost --port=26259 --http-port=26260 --store=cockroach-data/2 --cache=256MiB --join=localhost:26257 --background
./cockroach start --insecure --host=localhost --port=26261 --http-port=26262 --store=cockroach-data/3 --cache=256MiB --join=localhost:26257 --background

sleep 5

./cockroach sql --insecure -e 'set cluster setting kv.range_merge.queue_enabled = false;'
./bin/workload run kv --splits 1000 --init --drop --max-ops 1

sleep 5

for port in 26257 26259 26261; do
  ./cockroach sql --insecure -e "select name, value from crdb_internal.node_metrics where name like '%raftsn%' order by name desc" --port "${port}"
done
```

Release note (bug fix): Avoid occasional unnecessary Raft snapshots after Range splits.
  • Loading branch information
tbg committed Nov 14, 2018
1 parent b64bdc9 commit 3fd4bd5
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 4 deletions.
9 changes: 9 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ var (
// will send to a follower without hearing a response.
defaultRaftMaxInflightMsgs = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_MAX_INFLIGHT_MSGS", 64)

defaultRaftPostSplitSuppressSnapshotTicks = envutil.EnvOrDefaultInt(
"COCKROACH_RAFT_POST_SPLIT_SUPPRESS_SNAPSHOT_TICKS", 20)
)

type lazyHTTPClient struct {
Expand Down Expand Up @@ -476,6 +479,8 @@ type RaftConfig struct {
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int

RaftPostSplitSuppressSnapshotTicks int
}

// SetDefaults initializes unset fields.
Expand Down Expand Up @@ -510,6 +515,10 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftPostSplitSuppressSnapshotTicks == 0 {
cfg.RaftPostSplitSuppressSnapshotTicks = defaultRaftPostSplitSuppressSnapshotTicks
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
69 changes: 65 additions & 4 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
Expand All @@ -51,6 +47,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -60,6 +58,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"go.etcd.io/etcd/raft/raftpb"
)

// adminSplitArgs creates an AdminSplitRequest for the provided split key.
Expand Down Expand Up @@ -504,6 +505,66 @@ func TestStoreRangeSplitConcurrent(t *testing.T) {
}
}

// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica
// resulting from a split hasn't been initialized via the split trigger yet, a
// grace period prevents the replica from requesting an errant Raft snapshot.
// This is verified by running a number of splits and asserting that no Raft
// snapshots are observed. As a nice side effect, this also verifies that log
// truncations don't cause any Raft snapshots in this test.
func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{DisableMergeQueue: true}
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

numSplits := 100
if util.RaceEnabled {
// Running 100 splits is overkill in race builds.
numSplits = 10
}
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically

checkNoSnaps := func(when string) {
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name LIKE 'queue.raftsnapshot.process.%'
OR
name LIKE 'queue.raftsnapshot.pending'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
if expRows := 3; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
if c > 0 {
t.Fatalf("observed %d Raft snapshots %s splits", c, when)
}
}
}

checkNoSnaps("before")

doSplit := func(ctx context.Context) error {
_, _, err := tc.SplitRange(
[]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)])))
return err
}

if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil {
t.Fatal(err)
}

checkNoSnaps("after")
}

// TestStoreRangeSplitIdempotency executes a split of a range and
// verifies that the resulting ranges respond to the right key ranges
// and that their stats have been properly accounted for and requests
Expand Down
61 changes: 61 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -5131,6 +5131,10 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
lastAppResp = message
drop = true
}

if r.maybeDropMsgAppResp(ctx, message) {
drop = true
}
}
if !drop {
r.sendRaftMessage(ctx, message)
Expand All @@ -5141,6 +5145,63 @@ func (r *Replica) sendRaftMessages(ctx context.Context, messages []raftpb.Messag
}
}

// maybeDropMsgAppResp returns true if the outgoing Raft message should be
// dropped. It does so if sending the message would likely result in an errant
// Raft snapshot after a split.
func (r *Replica) maybeDropMsgAppResp(ctx context.Context, msg raftpb.Message) bool {
if !msg.Reject {
return false
}

r.mu.RLock()
ticks := r.mu.ticks
initialized := r.isInitializedRLocked()
r.mu.RUnlock()

if initialized {
return false
}

if ticks > r.store.cfg.RaftPostSplitSuppressSnapshotTicks {
log.Infof(ctx, "allowing MsgAppResp for uninitialized replica")
return false
}

if msg.RejectHint != 0 {
log.Fatalf(ctx, "received reject hint %d from supposedly uninitialized replica", msg.RejectHint)
}

// This replica has a blank state, i.e. its last index is zero (because we
// start our Raft log at index 10). In particular, it's not a preemptive
// snapshot. This happens in two cases:
//
// 1. a rebalance operation is adding a new replica of the range to this
// node. We always send a preemptive snapshot before attempting to do so, so
// we wouldn't enter this branch as the replica would be initialized. We
// would however enter this branch if the preemptive snapshot got GC'ed
// before the actual replica change came through.
//
// 2. a split executed that created this replica as its right hand side, but
// this node's pre-split replica hasn't executed the split trigger (yet).
// The expectation is that it will do so momentarily, however if we don't
// drop this rejection, the Raft leader will try to catch us up via a
// snapshot. In 99.9% of cases this is a wasted effort since the pre-split
// replica already contains the data this replica will hold. The remaining
// 0.01% constitute the case in which our local replica of the pre-split
// range requires a snapshot which catches it up "past" the split trigger,
// in which case the trigger will never be executed (the snapshot instead
// wipes out the data the split trigger would've tried to put into this
// range). A similar scenario occurs if there's a rebalance operation that
// rapidly removes the pre-split replica, so that it never catches up (nor
// via log nor via snapshot); in that case too, the Raft snapshot is
// required to materialize the split's right hand side replica (i.e. this
// one). We're delaying the snapshot for a short amount of time only, so
// this seems tolerable.
log.VEventf(ctx, 2, "dropping rejection from index %d to index %d", msg.Index, msg.RejectHint)

return true
}

// sendRaftMessage sends a Raft message.
func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) {
r.mu.Lock()
Expand Down

0 comments on commit 3fd4bd5

Please sign in to comment.