Skip to content

Commit

Permalink
Improve task scheduler rate limiter (#3860)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 4, 2023
1 parent 3ade263 commit 008f056
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 117 deletions.
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,11 @@ const (

// TaskSchedulerEnableRateLimiter indicates if rate limiter should be enabled in task scheduler
TaskSchedulerEnableRateLimiter = "history.taskSchedulerEnableRateLimiter"
// TaskSchedulerEnableRateLimiterShadowMode indicates if task scheduler rate limiter should run in shadow mode
// i.e. through rate limiter and emit metrics but do not actually block/throttle task scheduling
TaskSchedulerEnableRateLimiterShadowMode = "history.taskSchedulerEnableRateLimiterShadowMode"
// TaskSchedulerThrottleDuration is the throttle duration when task scheduled exceeds max qps
TaskSchedulerThrottleDuration = "history.taskSchedulerThrottleDuration"
// TaskSchedulerMaxQPS is the max qps task schedulers on a host can schedule tasks
// If value less or equal to 0, will fall back to HistoryPersistenceMaxQPS
TaskSchedulerMaxQPS = "history.taskSchedulerMaxQPS"
Expand Down
1 change: 1 addition & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ var (
TaskBatchCompleteCounter = NewCounterDef("task_batch_complete_counter")
TaskReschedulerPendingTasks = NewDimensionlessHistogramDef("task_rescheduler_pending_tasks")
PendingTasksCounter = NewDimensionlessHistogramDef("pending_tasks")
TaskSchedulerThrottled = NewCounterDef("task_scheduler_throttled")
QueueScheduleLatency = NewTimerDef("queue_latency_schedule") // latency for scheduling 100 tasks in one task channel
QueueReaderCountHistogram = NewDimensionlessHistogramDef("queue_reader_count")
QueueSliceCountHistogram = NewDimensionlessHistogramDef("queue_slice_count")
Expand Down
23 changes: 15 additions & 8 deletions common/tasks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/quotas"
)

Expand Down Expand Up @@ -62,15 +63,18 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Sequential(b *testing.B) {

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
TaskChannelMetricTagsFn: func(key int) []metrics.Tag { return nil },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(false),
},
Scheduler[*noopTask](&noopScheduler{}),
quotas.NoopRequestRateLimiter,
clock.NewRealTimeSource(),
logger,
metrics.NoopMetricsHandler,
)
scheduler.Start()
defer scheduler.Stop()
Expand All @@ -92,15 +96,18 @@ func BenchmarkInterleavedWeightedRoundRobinScheduler_Parallel(b *testing.B) {

scheduler := NewInterleavedWeightedRoundRobinScheduler(
InterleavedWeightedRoundRobinSchedulerOptions[*noopTask, int]{
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
TaskChannelKeyFn: func(nt *noopTask) int { return rand.Intn(4) },
ChannelWeightFn: func(key int) int { return channelKeyToWeight[key] },
ChannelQuotaRequestFn: func(key int) quotas.Request { return quotas.NewRequest("", 1, "", "", "") },
TaskChannelMetricTagsFn: func(key int) []metrics.Tag { return nil },
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
EnableRateLimiterShadowMode: dynamicconfig.GetBoolPropertyFn(false),
},
Scheduler[*noopTask](&noopScheduler{}),
quotas.NoopRequestRateLimiter,
clock.NewRealTimeSource(),
logger,
metrics.NoopMetricsHandler,
)
scheduler.Start()
defer scheduler.Stop()
Expand Down
Loading

0 comments on commit 008f056

Please sign in to comment.