Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: the timeout of queued items should consider the rates of al… #83667

Merged
merged 1 commit into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMergeQueue(store *Store, db *kv.DB) *mergeQueue {
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,32 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
// or calculate a range checksum) while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
// When the queue contains different types of work items, with different rates,
// the timeout of all items is set according to the minimum rate of the
// different types, to prevent slower items from causing faster items appearing
// after them in the queue to time-out.
//
// The parameter controls which rate to use.
func makeRateLimitedTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
if !ok || len(rateSettings) == 0 {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
minSnapshotRate := rateSettings[0].Get(&cs.SV)
for i := 1; i < len(rateSettings); i++ {
snapshotRate := rateSettings[i].Get(&cs.SV)
if snapshotRate < minSnapshotRate {
minSnapshotRate = snapshotRate
}
}
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
estimatedDuration := time.Duration(totalBytes/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
Expand Down
52 changes: 43 additions & 9 deletions pkg/kv/kvserver/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,18 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
ctx := context.Background()
type testCase struct {
guaranteedProcessingTime time.Duration
rateLimit int64 // bytes/s
recoverySnapshotRate int64 // bytes/s
rebalanceSnapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(ctx, &st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(ctx, &st.SV, tc.rateLimit)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate)
recoverySnapshotRate.Override(ctx, &st.SV, tc.recoverySnapshotRate)
rebalanceSnapshotRate.Override(ctx, &st.SV, tc.rebalanceSnapshotRate)
tf := makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
Expand All @@ -1003,25 +1005,57 @@ func TestQueueRateLimitedTimeoutFunc(t *testing.T) {
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 30,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 2 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 2 << 20,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
rateLimit: 1 << 20,
recoverySnapshotRate: 1 << 20, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 30,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Hour,
recoverySnapshotRate: 1 << 30,
rebalanceSnapshotRate: 1 << 20, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: time.Hour, // the minimum timeout (guaranteedProcessingTime).
},
{
guaranteedProcessingTime: time.Minute,
recoverySnapshotRate: 1 << 10, // minimum rate for timeout calculation.
rebalanceSnapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
{
guaranteedProcessingTime: time.Minute,
rateLimit: 1 << 10,
recoverySnapshotRate: 1 << 20,
rebalanceSnapshotRate: 1 << 10, // minimum rate for timeout calculation.
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedRangeScanSlowdown,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newRaftSnapshotQueue(store *Store) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(recoverySnapshotRate, rebalanceSnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replica
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate),
processTimeoutFunc: makeRateLimitedTimeoutFunc(rebalanceSnapshotRate, recoverySnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
Expand Down