Skip to content

Commit

Permalink
kv: rationalize load-based range merging
Browse files Browse the repository at this point in the history
Closes cockroachdb#62700.
Re-addresses cockroachdb#41317.

This commit reworks how queries-per-second measurements are used when
determining whether to merge two ranges together. At a high-level, the change
moves from a scheme where the QPS over the last second on the LHS and RHS ranges
are combined and compared against a threshold (half the load-based split
threshold) to a scheme where the maximum QPS measured over the past 5 minutes
(configurable) on the LHS and RHS ranges are combined and compared against said
threshold.

The commit makes this change to avoid thrashing and to avoid overreacting to
temporary fluctuations in load. These overreactions lead to general instability
in clusters, as we saw in cockroachdb#41317. Worse, the overreactions compound and can lead
to cluster-wide meltdowns where a transient slowdown can trigger a wave of range
merges, which can slow the cluster down further, which can lead to more merges,
etc. This is what we saw in cockroachdb#62700. This behavior is bad on small clusters and
it is even worse on large ones, where range merges don't just interrupt traffic,
but also result in a centralization of load in a previously well-distributed
dataset, undoing all of the hard work of load-based splitting and rebalancing
and creating serious hotspots.

The commit improves this situation by introducing a form of memory into the
load-based split `Decider`. This is the object which was previously only
responsible for measuring queries-per-second on a range and triggering the
process of finding a load-based split point. The object is now given an
additional role of taking the second-long QPS samples that it measures and
aggregating them together to track the maximum historical QPS over a
configurable retention period. This maximum QPS measurement can be used to
prevent load-based splits from being merged away until the resulting ranges have
consistently remained below a certain QPS threshold for a sufficiently long
period of time.

The `mergeQueue` is taught how to use this new source of information. It is also
taught that it should be conservative about imprecision in this QPS tracking,
opting to skip a merge rather than perform one when the maximum QPS measurement
has not been tracked for long enough. This means that range merges will
typically no longer fire within 5 minutes of a lease transfer. This seems fine,
as there are almost never situations where a range merge is desperately needed
and we should risk making a bad decision in order to perform one.

I've measured this change on the `clearrange` roachtest that we made heavy use
of in cockroachdb#62700. As expected, it has the same effect as bumping up the
`kv.range_split.by_load_merge_delay` high enough such that ranges never merge on
the active table. Here's a screenshot of a recent run. We still see a period of
increased tail latency and reduced throughput, which has a strong correlation
with Pebble compactions. However, we no longer see the subsequent cluster outage
that used to follow, where ranges on the active table would begin to merge and
throughput would fall to 0 and struggle to recover, bottoming out repeatedly.

<todo insert images>

Release note (performance improvement): Range merges are no longer considered if
a range has seen significant load over the previous 5 minutes, instead of being
considered as long as a range has low load over the last second. This improves
stability, as load-based splits will no longer rapidly disappear during transient
throughput dips.
  • Loading branch information
nvanbenschoten committed Jul 16, 2021
1 parent 62118b1 commit b461d2d
Show file tree
Hide file tree
Showing 18 changed files with 1,366 additions and 758 deletions.
9 changes: 7 additions & 2 deletions pkg/cmd/roachtest/clearrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo

if t.buildVersion.AtLeast(version.MustParse("v19.2.0")) {
conn := c.Conn(ctx, 1)
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = $1`, c.spec.NodeCount); err != nil {
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = 8`); err != nil {
t.Fatal(err)
}
conn.Close()
Expand Down Expand Up @@ -114,6 +114,11 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo
}()

m := newMonitor(ctx, c)
m.Go(func(ctx context.Context) error {
c.Run(ctx, c.Node(1), `./cockroach workload init kv`)
c.Run(ctx, c.All(), `./cockroach workload run kv --concurrency=32 --duration=1h`)
return nil
})
m.Go(func(ctx context.Context) error {
conn := c.Conn(ctx, 1)
defer conn.Close()
Expand All @@ -132,7 +137,7 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo

// Set a low TTL so that the ClearRange-based cleanup mechanism can kick in earlier.
// This could also be done after dropping the table.
if _, err := conn.ExecContext(ctx, `ALTER TABLE bigbank.bank CONFIGURE ZONE USING gc.ttlseconds = 30`); err != nil {
if _, err := conn.ExecContext(ctx, `ALTER TABLE bigbank.bank CONFIGURE ZONE USING gc.ttlseconds = 1200`); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ go_test(
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
"//pkg/kv/kvserver/tenantrate",
"//pkg/kv/kvserver/tscache",
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.QueriesPerSecond = cArgs.EvalCtx.GetSplitQPS()
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS()
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
reply.MaxQueriesPerSecond = -1
}
reply.MaxQueriesPerSecondSet = true
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
}
22 changes: 17 additions & 5 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,20 @@ type EvalContext interface {
// results due to concurrent writes.
GetMVCCStats() enginepb.MVCCStats

// GetSplitQPS returns the queries/s request rate for this range.
// GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a
// configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster
// setting is disabled.
GetSplitQPS() float64
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS() (float64, bool)

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
//
// TODO(nvanbenschoten): remove this method in v22.1.
GetLastSplitQPS() float64

GetGCThreshold() hlc.Timestamp
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
Expand Down Expand Up @@ -215,7 +224,10 @@ func (m *mockEvalCtxImpl) ContainsKey(key roachpb.Key) bool {
func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
return m.Stats
}
func (m *mockEvalCtxImpl) GetSplitQPS() float64 {
func (m *mockEvalCtxImpl) GetMaxSplitQPS() (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetLastSplitQPS() float64 {
return m.QPS
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
Expand Down
110 changes: 107 additions & 3 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4171,6 +4171,10 @@ func TestMergeQueue(t *testing.T) {
}
}
setZones(t, zoneConfig)
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging. Certain subtests
// re-enable the functionality.
kvserver.SplitByLoadEnabled.Override(sv, false)
store.MustForceMergeScanAndProcess() // drain any merges that might already be queued
split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */)
}
Expand Down Expand Up @@ -4201,7 +4205,7 @@ func TestMergeQueue(t *testing.T) {
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-threshold", func(t *testing.T) {
t.Run("combined-size-threshold", func(t *testing.T) {
reset(t)

// The ranges are individually beneath the minimum size threshold, but
Expand Down Expand Up @@ -4238,8 +4242,88 @@ func TestMergeQueue(t *testing.T) {
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})

// TODO(jeffreyxiao): Add subtest to consider load when making merging
// decisions.
t.Run("load-based-merging", func(t *testing.T) {
const splitByLoadQPS = 10
const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold
const splitByLoadMergeDelay = 500 * time.Millisecond

resetForLoadBasedSubtest := func(t *testing.T) {
reset(t)

// Enable load-based splitting for these subtests, which also instructs
// the mergeQueue to consider load when making range merge decisions. When
// load is a consideration, the mergeQueue is fairly conservative. In an
// effort to avoid thrashing and to avoid overreacting to temporary
// fluctuations in load, the mergeQueue will only consider a merge when
// the combined load across the RHS and LHS ranges is below half the
// threshold required to split a range due to load. Furthermore, to ensure
// that transient drops in load do not trigger range merges, the
// mergeQueue will only consider a merge when it deems the maximum qps
// measurement from both sides to be sufficiently stable and reliable,
// meaning that it was a maximum measurement over some extended period of
// time.
kvserver.SplitByLoadEnabled.Override(sv, true)
kvserver.SplitByLoadQPSThreshold.Override(sv, splitByLoadQPS)

// Drop the load-based splitting merge delay setting, which also dictates
// the duration that a leaseholder must measure QPS before considering its
// measurements to be reliable enough to base range merging decisions on.
kvserver.SplitByLoadMergeDelay.Override(sv, splitByLoadMergeDelay)

// Reset both range's load-based splitters, so that QPS measurements do
// not leak over between subtests. Then, bump the manual clock so that
// both range's load-based splitters consider their measurements to be
// reliable.
lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
}

t.Run("unreliable-lhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)

lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("unreliable-rhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)

rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-above-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)

moreThanHalfQPS := mergeByLoadQPS/2 + 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-below-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)

manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
lessThanHalfQPS := mergeByLoadQPS/2 - 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})
})

t.Run("sticky-bit", func(t *testing.T) {
reset(t)
Expand Down Expand Up @@ -4413,6 +4497,15 @@ func TestMergeQueueSeesNonVoters(t *testing.T) {
var clusterArgs = base.TestClusterArgs{
// We dont want the replicate queue mucking with our test, so disable it.
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
},
},
},
}
ctx := context.Background()

Expand Down Expand Up @@ -4500,6 +4593,15 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
var clusterArgs = base.TestClusterArgs{
// We dont want the replicate queue mucking with our test, so disable it.
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
},
},
},
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {
Knobs: base.TestingKnobs{
Expand All @@ -4512,6 +4614,8 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
}
return nil
},
// See above.
DisableLoadBasedSplitting: true,
},
},
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -416,6 +417,12 @@ func (r *Replica) LargestPreviousMaxRangeSizeBytes() int64 {
return r.mu.largestPreviousMaxRangeSizeBytes
}

// LoadBasedSplitter returns the replica's split.Decider, which is used to
// assist load-based split (and merge) decisions.
func (r *Replica) LoadBasedSplitter() *split.Decider {
return &r.loadBasedSplitter
}

func MakeSSTable(key, value string, ts hlc.Timestamp) ([]byte, storage.MVCCKeyValue) {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
Expand Down
47 changes: 41 additions & 6 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (*roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) {

var ba roachpb.BatchRequest
ba.Add(&roachpb.RangeStatsRequest{
Expand All @@ -182,10 +182,20 @@ func (mq *mergeQueue) requestRangeStats(

br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
return nil, enginepb.MVCCStats{}, 0, pErr.GoError()
return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError()
}
res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse)
return &res.RangeInfo.Desc, res.MVCCStats, res.QueriesPerSecond, nil

desc = &res.RangeInfo.Desc
stats = res.MVCCStats
if res.MaxQueriesPerSecondSet {
qps = res.MaxQueriesPerSecond
qpsOK = qps >= 0
} else {
qps = res.DeprecatedLastQueriesPerSecond
qpsOK = true
}
return desc, stats, qps, qpsOK, nil
}

func (mq *mergeQueue) process(
Expand All @@ -196,17 +206,17 @@ func (mq *mergeQueue) process(
return false, nil
}

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS()
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
minBytes, lhsStats.Total())
return false, nil
}

lhsDesc := lhsRepl.Desc()
lhsQPS := lhsRepl.GetSplitQPS()
rhsDesc, rhsStats, rhsQPS, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
Expand Down Expand Up @@ -234,6 +244,24 @@ func (mq *mergeQueue) process(

var mergedQPS float64
if lhsRepl.SplitByLoadEnabled() {
// When load is a consideration for splits and, by extension, merges, the
// mergeQueue is fairly conservative. In an effort to avoid thrashing and to
// avoid overreacting to temporary fluctuations in load, the mergeQueue will
// only consider a merge when the combined load across the RHS and LHS
// ranges is below half the threshold required to split a range due to load.
// Furthermore, to ensure that transient drops in load do not trigger range
// merges, the mergeQueue will only consider a merge when it deems the
// maximum qps measurement from both sides to be sufficiently stable and
// reliable, meaning that it was a maximum measurement over some extended
// period of time.
if !lhsQPSOK {
log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable")
return false, nil
}
if !rhsQPSOK {
log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable")
return false, nil
}
mergedQPS = lhsQPS + rhsQPS
}

Expand Down Expand Up @@ -363,6 +391,13 @@ func (mq *mergeQueue) process(
log.Warningf(ctx, "%v", err)
}
}

// Adjust the splitter to account for the additional load from the RHS. We
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
if mergedQPS != 0 {
lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS)
}
return true, nil
}

Expand Down
19 changes: 15 additions & 4 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -1035,14 +1034,26 @@ func (r *Replica) GetMVCCStats() enginepb.MVCCStats {
return *r.mu.state.Stats
}

// GetSplitQPS returns the Replica's queries/s request rate.
// GetMaxSplitQPS returns the Replica's maximum queries/s request rate over a
// configured measurement period. If the Replica has not been recording QPS for
// at least an entire measurement period, the method will return false.
//
// NOTE: This should only be used for load based splitting, only
// works when the load based splitting cluster setting is enabled.
//
// Use QueriesPerSecond() for current QPS stats for all other purposes.
func (r *Replica) GetSplitQPS() float64 {
return r.loadBasedSplitter.LastQPS(timeutil.Now())
func (r *Replica) GetMaxSplitQPS() (float64, bool) {
return r.loadBasedSplitter.MaxQPS(r.Clock().PhysicalTime())
}

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
//
// NOTE: This should only be used for load based splitting, only
// works when the load based splitting cluster setting is enabled.
//
// Use QueriesPerSecond() for current QPS stats for all other purposes.
func (r *Replica) GetLastSplitQPS() float64 {
return r.loadBasedSplitter.LastQPS(r.Clock().PhysicalTime())
}

// ContainsKey returns whether this range contains the specified key.
Expand Down
Loading

0 comments on commit b461d2d

Please sign in to comment.