Skip to content

Commit

Permalink
Merge pull request #88368 from cockroachdb/blathers/backport-release-…
Browse files Browse the repository at this point in the history
…22.1-83667

release-22.1: kvserver: the timeout of queued items should consider the rates of al…
  • Loading branch information
shralex authored Sep 21, 2022
2 parents 5e13acb + 29d3cbb commit 6669632
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,32 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
// When the queue contains different types of work items, with different rates,
// the timeout of all items is set according to the minimum rate of the
// different types, to prevent slower items from causing faster items appearing
// after them in the queue to time-out.
//
// The parameter controls which rate to use.
func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
if !ok || len(rateSettings) == 0 {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
minSnapshotRate := rateSettings[0].Get(&cs.SV)
for i := 1; i < len(rateSettings); i++ {
snapshotRate := rateSettings[i].Get(&cs.SV)
if snapshotRate < minSnapshotRate {
minSnapshotRate = snapshotRate
}
}
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
estimatedDuration := time.Duration(totalBytes/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
Expand Down
52 changes: 43 additions & 9 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,16 +979,18 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
ctx := context.Background()
type testCase struct {
guaranteedProcessingTime time.Duration
rateLimit int64 // bytes/s
recoverySnapshotRate int64 // bytes/s
rebalanceSnapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
recoverySnapshotRate.Override(ctx, &st.SV, tc.recoverySnapshotRate)
rebalanceSnapshotRate.Override(ctx, &st.SV, tc.rebalanceSnapshotRate)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
Expand All @@ -998,25 +1000,57 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 30,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 2 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 2 << 20,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Hour,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 10, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 10,
recoverySnapshotRate: 1 << 20,
rebalanceSnapshotRate: 1 << 10, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func newReplicateQueue(store *Store, allocator Allocator) *replicateQueue {
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
Expand Down

0 comments on commit 6669632

Please sign in to comment.