Skip to content

Commit

Permalink
Merge pull request #33015 from tbg/backport2.1-32594
Browse files Browse the repository at this point in the history
backport-2.1: storage: delay manual splits that would result in more snapshots
  • Loading branch information
tbg authored Dec 11, 2018
2 parents f77e1fc + a830397 commit 138369e
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 5 deletions.
15 changes: 15 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,14 @@ type RaftConfig struct {
// translates to ~1024 commands that might be executed in the handling of a
// single raft.Ready operation.
RaftMaxInflightMsgs int
// Splitting a range which has a replica needing a snapshot results in two
// ranges in that state. The delay configured here slows down splits when in
// that situation (limiting to those splits not run through the split
// queue). The most important target here are the splits performed by
// backup/restore.
//
// -1 to disable.
RaftDelaySplitToSuppressSnapshotTicks int
}

// SetDefaults initializes unset fields.
Expand Down Expand Up @@ -510,6 +518,13 @@ func (cfg *RaftConfig) SetDefaults() {
if cfg.RaftMaxInflightMsgs == 0 {
cfg.RaftMaxInflightMsgs = defaultRaftMaxInflightMsgs
}

if cfg.RaftDelaySplitToSuppressSnapshotTicks == 0 {
// A total of 100 ticks is >18s which experimentally has been shown to
// allow the small pile (<100) of Raft snapshots observed at the
// beginning of an import/restore to be resolved.
cfg.RaftDelaySplitToSuppressSnapshotTicks = 100
}
}

// RaftElectionTimeout returns the raft election timeout, as computed from the
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,9 @@ func runSetupSplitSnapshotRace(
sc.TestingKnobs.DisableAsyncIntentResolution = true
// Avoid fighting with the merge queue while trying to reproduce this race.
sc.TestingKnobs.DisableMergeQueue = true
// Disable the split delay mechanism, or it'll spend 10s going in circles.
// (We can't set it to zero as otherwise the default overrides us).
sc.RaftDelaySplitToSuppressSnapshotTicks = -1
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 6)
Expand Down
13 changes: 10 additions & 3 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (r *Replica) AdminSplit(
return roachpb.AdminSplitResponse{}, pErr
}

reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc())
reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc(), true /* delayable */)
// On seeing a ConditionFailedError or an AmbiguousResultError, retry
// the command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
Expand Down Expand Up @@ -257,7 +257,10 @@ func splitSnapshotWarningStr(rangeID roachpb.RangeID, status *raft.Status) strin
//
// See the comment on splitTrigger for details on the complexities.
func (r *Replica) adminSplitWithDescriptor(
ctx context.Context, args roachpb.AdminSplitRequest, desc *roachpb.RangeDescriptor,
ctx context.Context,
args roachpb.AdminSplitRequest,
desc *roachpb.RangeDescriptor,
delayable bool,
) (roachpb.AdminSplitResponse, error) {
var reply roachpb.AdminSplitResponse

Expand Down Expand Up @@ -336,7 +339,11 @@ func (r *Replica) adminSplitWithDescriptor(
}
leftDesc.EndKey = splitKey

extra := splitSnapshotWarningStr(r.RangeID, r.RaftStatus())
var extra string
if delayable {
extra += maybeDelaySplitToAvoidSnapshot(ctx, (*splitDelayHelper)(r))
}
extra += splitSnapshotWarningStr(r.RangeID, r.RaftStatus())

log.Infof(ctx, "initiating a split of this range at key %s [r%d]%s",
splitKey, rightDesc.RangeID, extra)
Expand Down
157 changes: 157 additions & 0 deletions pkg/storage/split_delay_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"go.etcd.io/etcd/raft"
)

type splitDelayHelperI interface {
RaftStatus(context.Context) (roachpb.RangeID, *raft.Status)
ProposeEmptyCommand(ctx context.Context)
NumAttempts() int
Sleep(context.Context) time.Duration
}

type splitDelayHelper Replica

func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, *raft.Status) {
r := (*Replica)(sdh)
r.mu.RLock()
raftStatus := r.raftStatusRLocked()
if raftStatus != nil {
updateRaftProgressFromActivity(
ctx, raftStatus.Progress, r.descRLocked().Replicas, r.mu.lastUpdateTimes, timeutil.Now(),
)
}
r.mu.RUnlock()
return r.RangeID, raftStatus
}

func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
r := (*Replica)(sdh)
r.raftMu.Lock()
_ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) {
// NB: intentionally ignore the error (which can be ErrProposalDropped
// when there's an SST inflight).
_ = rawNode.Propose(encodeRaftCommandV1(makeIDKey(), nil))
// NB: we need to unquiesce as the group might be quiesced.
return true /* unquiesceAndWakeLeader */, nil
})
r.raftMu.Unlock()
}

func (sdh *splitDelayHelper) NumAttempts() int {
return (*Replica)(sdh).store.cfg.RaftDelaySplitToSuppressSnapshotTicks
}

func (sdh *splitDelayHelper) Sleep(ctx context.Context) time.Duration {
tBegin := timeutil.Now()

r := (*Replica)(sdh)
select {
case <-time.After(r.store.cfg.RaftTickInterval):
case <-ctx.Done():
}

return timeutil.Since(tBegin)
}

func maybeDelaySplitToAvoidSnapshot(ctx context.Context, sdh splitDelayHelperI) string {
// We have an "optimization" to avoid Raft snapshots by dropping some
// outgoing MsgAppResp (see the _ assignment below) which takes effect for
// RaftPostSplitSuppressSnapshotTicks ticks after an uninitialized replica
// is created. This check can err, in which case the snapshot will be
// delayed for that many ticks, and so we want to delay by at least as much
// plus a bit of padding to give a snapshot a chance to catch the follower
// up. If we run out of time, we'll resume the split no matter what.
// _ = (*Replica)(nil).maybeDropMsgAppResp // guru assignment (not backported)
maxDelaySplitToAvoidSnapshotTicks := sdh.NumAttempts()

var slept time.Duration
var extra string
var succeeded bool
for ticks := 0; ticks < maxDelaySplitToAvoidSnapshotTicks; ticks++ {
succeeded = false
extra = ""
rangeID, raftStatus := sdh.RaftStatus(ctx)

if raftStatus == nil {
// Don't delay on followers (we don't know when to stop). This case
// is hit rarely enough to not matter.
extra += "; not Raft leader"
succeeded = true
break
}

done := true
for replicaID, pr := range raftStatus.Progress {
if replicaID == raftStatus.Lead {
// TODO(tschottdorf): remove this once we have picked up
// https://github.com/etcd-io/etcd/pull/10279
continue
}

if pr.State != raft.ProgressStateReplicate {
if !pr.RecentActive {
if ticks == 0 {
// Having set done = false, we make sure we're not exiting early.
// This is important because we sometimes need that Raft proposal
// below to make the followers active as there's no chatter on an
// idle range. (Note that there's a theoretical race in which the
// follower becomes inactive again during the sleep, but the
// inactivity interval is much larger than a tick).
//
// Don't do this more than once though: if a follower is down,
// we don't want to delay splits for it.
done = false
}
extra += fmt.Sprintf("; r%d/%d inactive", rangeID, replicaID)
continue
}
done = false
extra += fmt.Sprintf("; replica r%d/%d not caught up: %+v", rangeID, replicaID, &pr)
}
}
if done {
succeeded = true
break
}
// Propose an empty command which works around a Raft bug that can
// leave a follower in ProgressStateProbe even though it has caught
// up.
sdh.ProposeEmptyCommand(ctx)
slept += sdh.Sleep(ctx)

if ctx.Err() != nil {
return ""
}
}

if slept != 0 {
extra += fmt.Sprintf("; delayed split for %.1fs to avoid Raft snapshot", slept.Seconds())
if !succeeded {
extra += " (without success)"
}
}

return extra
}
166 changes: 166 additions & 0 deletions pkg/storage/split_delay_helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"go.etcd.io/etcd/raft"

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

type testSplitDelayHelper struct {
numAttempts int

rangeID roachpb.RangeID
raftStatus *raft.Status
sleep func()

slept, emptyProposed int
}

func (h *testSplitDelayHelper) RaftStatus(context.Context) (roachpb.RangeID, *raft.Status) {
return h.rangeID, h.raftStatus
}
func (h *testSplitDelayHelper) ProposeEmptyCommand(ctx context.Context) {
h.emptyProposed++
}
func (h *testSplitDelayHelper) NumAttempts() int {
return h.numAttempts
}
func (h *testSplitDelayHelper) Sleep(context.Context) time.Duration {
if h.sleep != nil {
h.sleep()
}
h.slept++
return time.Second
}

var _ splitDelayHelperI = (*testSplitDelayHelper)(nil)

func TestSplitDelayToAvoidSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()

t.Run("disabled", func(t *testing.T) {
// Should immediately bail out if told to run zero attempts.
h := &testSplitDelayHelper{
numAttempts: 0,
rangeID: 1,
raftStatus: nil,
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "", s)
assert.Equal(t, 0, h.slept)
})

t.Run("follower", func(t *testing.T) {
// Should immediately bail out if run on non-leader.
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: nil,
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; not Raft leader", s)
assert.Equal(t, 0, h.slept)
})

t.Run("inactive", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateProbe},
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
// We try to wake up the follower once, but then give up on it.
assert.Equal(t, "; r1/2 inactive; delayed split for 1.0s to avoid Raft snapshot", s)
assert.Equal(t, 1, h.slept)
assert.Equal(t, 1, h.emptyProposed)
})

for _, state := range []raft.ProgressStateType{raft.ProgressStateProbe, raft.ProgressStateSnapshot} {
t.Run(state.String(), func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: state, RecentActive: true, Paused: true /* unifies string output below */},
// Healthy follower just for kicks.
3: {State: raft.ProgressStateReplicate},
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; replica r1/2 not caught up: next = 0, match = 0, state = "+
state.String()+
", waiting = true, pendingSnapshot = 0; delayed split for 5.0s to avoid Raft snapshot (without success)", s)
assert.Equal(t, 5, h.slept)
assert.Equal(t, 5, h.emptyProposed)
})
}

t.Run("immediately-replicating", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateReplicate}, // intentionally not recently active
},
},
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "", s)
assert.Equal(t, 0, h.slept)
assert.Equal(t, 0, h.emptyProposed)
})

t.Run("becomes-replicating", func(t *testing.T) {
h := &testSplitDelayHelper{
numAttempts: 5,
rangeID: 1,
raftStatus: &raft.Status{
Progress: map[uint64]raft.Progress{
2: {State: raft.ProgressStateProbe, RecentActive: true},
},
},
}
// The fourth attempt will see the follower catch up.
h.sleep = func() {
if h.slept == 2 {
pr := h.raftStatus.Progress[2]
pr.State = raft.ProgressStateReplicate
h.raftStatus.Progress[2] = pr
}
}
s := maybeDelaySplitToAvoidSnapshot(ctx, h)
assert.Equal(t, "; delayed split for 3.0s to avoid Raft snapshot", s)
assert.Equal(t, 3, h.slept)
assert.Equal(t, 3, h.emptyProposed)
})
}
Loading

0 comments on commit 138369e

Please sign in to comment.