Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kv: rationalize load-based range merging #65362

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/cmd/roachtest/clearrange.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo

if t.buildVersion.AtLeast(version.MustParse("v19.2.0")) {
conn := c.Conn(ctx, 1)
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = $1`, c.spec.NodeCount); err != nil {
if _, err := conn.ExecContext(ctx, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = 8`); err != nil {
t.Fatal(err)
}
conn.Close()
Expand Down Expand Up @@ -114,6 +114,11 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo
}()

m := newMonitor(ctx, c)
m.Go(func(ctx context.Context) error {
c.Run(ctx, c.Node(1), `./cockroach workload init kv`)
c.Run(ctx, c.All(), `./cockroach workload run kv --concurrency=32 --duration=1h`)
return nil
})
m.Go(func(ctx context.Context) error {
conn := c.Conn(ctx, 1)
defer conn.Close()
Expand All @@ -132,7 +137,7 @@ func runClearRange(ctx context.Context, t *test, c *cluster, aggressiveChecks bo

// Set a low TTL so that the ClearRange-based cleanup mechanism can kick in earlier.
// This could also be done after dropping the table.
if _, err := conn.ExecContext(ctx, `ALTER TABLE bigbank.bank CONFIGURE ZONE USING gc.ttlseconds = 30`); err != nil {
if _, err := conn.ExecContext(ctx, `ALTER TABLE bigbank.bank CONFIGURE ZONE USING gc.ttlseconds = 1200`); err != nil {
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ go_test(
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/split",
"//pkg/kv/kvserver/stateloader",
"//pkg/kv/kvserver/tenantrate",
"//pkg/kv/kvserver/tscache",
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_range_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,14 @@ func RangeStats(
) (result.Result, error) {
reply := resp.(*roachpb.RangeStatsResponse)
reply.MVCCStats = cArgs.EvalCtx.GetMVCCStats()
reply.QueriesPerSecond = cArgs.EvalCtx.GetSplitQPS()
reply.DeprecatedLastQueriesPerSecond = cArgs.EvalCtx.GetLastSplitQPS()
if qps, ok := cArgs.EvalCtx.GetMaxSplitQPS(); ok {
reply.MaxQueriesPerSecond = qps
} else {
// See comment on MaxQueriesPerSecond. -1 means !ok.
reply.MaxQueriesPerSecond = -1
}
reply.MaxQueriesPerSecondSet = true
reply.RangeInfo = cArgs.EvalCtx.GetRangeInfo(ctx)
return result.Result{}, nil
}
22 changes: 17 additions & 5 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,20 @@ type EvalContext interface {
// results due to concurrent writes.
GetMVCCStats() enginepb.MVCCStats

// GetSplitQPS returns the queries/s request rate for this range.
// GetMaxSplitQPS returns the Replicas maximum queries/s request rate over a
// configured retention period.
//
// NOTE: This should not be used when the load based splitting cluster
// setting is disabled.
GetSplitQPS() float64
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
GetMaxSplitQPS() (float64, bool)

// GetLastSplitQPS returns the Replica's most recent queries/s request rate.
//
// NOTE: This should not be used when the load based splitting cluster setting
// is disabled.
//
// TODO(nvanbenschoten): remove this method in v22.1.
GetLastSplitQPS() float64

GetGCThreshold() hlc.Timestamp
GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error)
Expand Down Expand Up @@ -215,7 +224,10 @@ func (m *mockEvalCtxImpl) ContainsKey(key roachpb.Key) bool {
func (m *mockEvalCtxImpl) GetMVCCStats() enginepb.MVCCStats {
return m.Stats
}
func (m *mockEvalCtxImpl) GetSplitQPS() float64 {
func (m *mockEvalCtxImpl) GetMaxSplitQPS() (float64, bool) {
return m.QPS, true
}
func (m *mockEvalCtxImpl) GetLastSplitQPS() float64 {
return m.QPS
}
func (m *mockEvalCtxImpl) CanCreateTxnRecord(
Expand Down
131 changes: 122 additions & 9 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4148,9 +4148,18 @@ func TestMergeQueue(t *testing.T) {

// setThresholds simulates a zone config update that updates the ranges'
// minimum and maximum sizes.
setZones := func(zone zonepb.ZoneConfig) {
lhs().SetZoneConfig(&zone)
rhs().SetZoneConfig(&zone)
setZones := func(t *testing.T, zone zonepb.ZoneConfig) {
t.Helper()
if l := lhs(); l == nil {
t.Fatal("left-hand side range not found")
} else {
l.SetZoneConfig(&zone)
}
if r := rhs(); r == nil {
t.Fatal("right-hand side range not found")
} else {
r.SetZoneConfig(&zone)
}
}

reset := func(t *testing.T) {
Expand All @@ -4161,7 +4170,11 @@ func TestMergeQueue(t *testing.T) {
t.Fatal(err)
}
}
setZones(zoneConfig)
setZones(t, zoneConfig)
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging. Certain subtests
// re-enable the functionality.
kvserver.SplitByLoadEnabled.Override(sv, false)
store.MustForceMergeScanAndProcess() // drain any merges that might already be queued
split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */)
}
Expand Down Expand Up @@ -4192,21 +4205,21 @@ func TestMergeQueue(t *testing.T) {
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-threshold", func(t *testing.T) {
t.Run("combined-size-threshold", func(t *testing.T) {
reset(t)

// The ranges are individually beneath the minimum size threshold, but
// together they'll exceed the maximum size threshold.
zone := protoutil.Clone(&zoneConfig).(*zonepb.ZoneConfig)
zone.RangeMinBytes = proto.Int64(rhs().GetMVCCStats().Total() + 1)
zone.RangeMaxBytes = proto.Int64(lhs().GetMVCCStats().Total() + rhs().GetMVCCStats().Total() - 1)
setZones(*zone)
setZones(t, *zone)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)

// Once the maximum size threshold is increased, the merge can occur.
zone.RangeMaxBytes = proto.Int64(*zone.RangeMaxBytes + 1)
setZones(*zone)
setZones(t, *zone)
l := lhs().RangeID
r := rhs().RangeID
log.Infof(ctx, "Left=%s, Right=%s", l, r)
Expand All @@ -4229,8 +4242,88 @@ func TestMergeQueue(t *testing.T) {
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})

// TODO(jeffreyxiao): Add subtest to consider load when making merging
// decisions.
t.Run("load-based-merging", func(t *testing.T) {
const splitByLoadQPS = 10
const mergeByLoadQPS = splitByLoadQPS / 2 // see conservativeLoadBasedSplitThreshold
const splitByLoadMergeDelay = 500 * time.Millisecond

resetForLoadBasedSubtest := func(t *testing.T) {
reset(t)

// Enable load-based splitting for these subtests, which also instructs
// the mergeQueue to consider load when making range merge decisions. When
// load is a consideration, the mergeQueue is fairly conservative. In an
// effort to avoid thrashing and to avoid overreacting to temporary
// fluctuations in load, the mergeQueue will only consider a merge when
// the combined load across the RHS and LHS ranges is below half the
// threshold required to split a range due to load. Furthermore, to ensure
// that transient drops in load do not trigger range merges, the
// mergeQueue will only consider a merge when it deems the maximum qps
// measurement from both sides to be sufficiently stable and reliable,
// meaning that it was a maximum measurement over some extended period of
// time.
kvserver.SplitByLoadEnabled.Override(sv, true)
kvserver.SplitByLoadQPSThreshold.Override(sv, splitByLoadQPS)

// Drop the load-based splitting merge delay setting, which also dictates
// the duration that a leaseholder must measure QPS before considering its
// measurements to be reliable enough to base range merging decisions on.
kvserver.SplitByLoadMergeDelay.Override(sv, splitByLoadMergeDelay)

// Reset both range's load-based splitters, so that QPS measurements do
// not leak over between subtests. Then, bump the manual clock so that
// both range's load-based splitters consider their measurements to be
// reliable.
lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())
manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
}

t.Run("unreliable-lhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)

lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("unreliable-rhs-qps", func(t *testing.T) {
resetForLoadBasedSubtest(t)

rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-above-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)

moreThanHalfQPS := mergeByLoadQPS/2 + 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(moreThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-below-threshold", func(t *testing.T) {
resetForLoadBasedSubtest(t)

manualClock.Increment(splitByLoadMergeDelay.Nanoseconds())
lessThanHalfQPS := mergeByLoadQPS/2 - 1
rhs().LoadBasedSplitter().RecordMax(tc.Servers[0].Clock().PhysicalTime(), float64(lessThanHalfQPS))
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
})
})

t.Run("sticky-bit", func(t *testing.T) {
reset(t)
Expand Down Expand Up @@ -4404,6 +4497,15 @@ func TestMergeQueueSeesNonVoters(t *testing.T) {
var clusterArgs = base.TestClusterArgs{
// We dont want the replicate queue mucking with our test, so disable it.
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
},
},
},
}
ctx := context.Background()

Expand Down Expand Up @@ -4491,6 +4593,15 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
var clusterArgs = base.TestClusterArgs{
// We dont want the replicate queue mucking with our test, so disable it.
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Disable load-based splitting, so that the absence of sufficient QPS
// measurements do not prevent ranges from merging.
DisableLoadBasedSplitting: true,
},
},
},
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {
Knobs: base.TestingKnobs{
Expand All @@ -4503,6 +4614,8 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
}
return nil
},
// See above.
DisableLoadBasedSplitting: true,
},
},
},
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -416,6 +417,12 @@ func (r *Replica) LargestPreviousMaxRangeSizeBytes() int64 {
return r.mu.largestPreviousMaxRangeSizeBytes
}

// LoadBasedSplitter returns the replica's split.Decider, which is used to
// assist load-based split (and merge) decisions.
func (r *Replica) LoadBasedSplitter() *split.Decider {
return &r.loadBasedSplitter
}

func MakeSSTable(key, value string, ts hlc.Timestamp) ([]byte, storage.MVCCKeyValue) {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
Expand Down
47 changes: 41 additions & 6 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ var _ purgatoryError = rangeMergePurgatoryError{}

func (mq *mergeQueue) requestRangeStats(
ctx context.Context, key roachpb.Key,
) (*roachpb.RangeDescriptor, enginepb.MVCCStats, float64, error) {
) (desc *roachpb.RangeDescriptor, stats enginepb.MVCCStats, qps float64, qpsOK bool, err error) {

var ba roachpb.BatchRequest
ba.Add(&roachpb.RangeStatsRequest{
Expand All @@ -182,10 +182,20 @@ func (mq *mergeQueue) requestRangeStats(

br, pErr := mq.db.NonTransactionalSender().Send(ctx, ba)
if pErr != nil {
return nil, enginepb.MVCCStats{}, 0, pErr.GoError()
return nil, enginepb.MVCCStats{}, 0, false, pErr.GoError()
}
res := br.Responses[0].GetInner().(*roachpb.RangeStatsResponse)
return &res.RangeInfo.Desc, res.MVCCStats, res.QueriesPerSecond, nil

desc = &res.RangeInfo.Desc
stats = res.MVCCStats
if res.MaxQueriesPerSecondSet {
qps = res.MaxQueriesPerSecond
qpsOK = qps >= 0
} else {
qps = res.DeprecatedLastQueriesPerSecond
qpsOK = true
}
return desc, stats, qps, qpsOK, nil
}

func (mq *mergeQueue) process(
Expand All @@ -196,17 +206,17 @@ func (mq *mergeQueue) process(
return false, nil
}

lhsDesc := lhsRepl.Desc()
lhsStats := lhsRepl.GetMVCCStats()
lhsQPS, lhsQPSOK := lhsRepl.GetMaxSplitQPS()
minBytes := lhsRepl.GetMinBytes()
if lhsStats.Total() >= minBytes {
log.VEventf(ctx, 2, "skipping merge: LHS meets minimum size threshold %d with %d bytes",
minBytes, lhsStats.Total())
return false, nil
}

lhsDesc := lhsRepl.Desc()
lhsQPS := lhsRepl.GetSplitQPS()
rhsDesc, rhsStats, rhsQPS, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
rhsDesc, rhsStats, rhsQPS, rhsQPSOK, err := mq.requestRangeStats(ctx, lhsDesc.EndKey.AsRawKey())
if err != nil {
return false, err
}
Expand Down Expand Up @@ -234,6 +244,24 @@ func (mq *mergeQueue) process(

var mergedQPS float64
if lhsRepl.SplitByLoadEnabled() {
// When load is a consideration for splits and, by extension, merges, the
// mergeQueue is fairly conservative. In an effort to avoid thrashing and to
// avoid overreacting to temporary fluctuations in load, the mergeQueue will
// only consider a merge when the combined load across the RHS and LHS
// ranges is below half the threshold required to split a range due to load.
// Furthermore, to ensure that transient drops in load do not trigger range
// merges, the mergeQueue will only consider a merge when it deems the
// maximum qps measurement from both sides to be sufficiently stable and
// reliable, meaning that it was a maximum measurement over some extended
// period of time.
if !lhsQPSOK {
log.VEventf(ctx, 2, "skipping merge: LHS QPS measurement not yet reliable")
return false, nil
}
if !rhsQPSOK {
log.VEventf(ctx, 2, "skipping merge: RHS QPS measurement not yet reliable")
return false, nil
}
mergedQPS = lhsQPS + rhsQPS
}

Expand Down Expand Up @@ -363,6 +391,13 @@ func (mq *mergeQueue) process(
log.Warningf(ctx, "%v", err)
}
}

// Adjust the splitter to account for the additional load from the RHS. We
// could just Reset the splitter, but then we'd need to wait out a full
// measurement period (default of 5m) before merging this range again.
if mergedQPS != 0 {
lhsRepl.loadBasedSplitter.RecordMax(mq.store.Clock().PhysicalTime(), mergedQPS)
}
return true, nil
}

Expand Down
Loading