Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prioritize UI/tctl when system is throttled #4623

Merged
merged 11 commits into from
Jul 18, 2023
5 changes: 5 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ const (
// DefaultQueueReaderID is the default readerID when loading history tasks
DefaultQueueReaderID int64 = 0
)

const (
// DefaultOperatorRPSRatio is the default percentage of rate limit that should be used for operator priority requests
DefaultOperatorRPSRatio float64 = 0.2
)
3 changes: 3 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const (
// ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit
// these warning are not emitted if the value is set to 0 or less
ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent"
// OperatorRPSRatio is the percentage of the rate limit provided to priority rate limiters that should be used for
// operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20% if not specified)
OperatorRPSRatio = "system.operatorRPSRatio"

// Whether the deadlock detector should dump goroutines
DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines"
Expand Down
7 changes: 4 additions & 3 deletions common/headers/caller_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
)

const (
CallerTypeOperator = "operator"
CallerTypeAPI = "api"
CallerTypeBackground = "background"
CallerTypePreemptable = "preemptable"
Expand Down Expand Up @@ -113,7 +114,7 @@ func SetCallerInfo(
) context.Context {
return setIncomingMD(ctx, map[string]string{
callerNameHeaderName: info.CallerName,
callerTypeHeaderName: info.CallerType,
CallerTypeHeaderName: info.CallerType,
callOriginHeaderName: info.CallOrigin,
})
}
Expand All @@ -133,7 +134,7 @@ func SetCallerType(
ctx context.Context,
callerType string,
) context.Context {
return setIncomingMD(ctx, map[string]string{callerTypeHeaderName: callerType})
return setIncomingMD(ctx, map[string]string{CallerTypeHeaderName: callerType})
}

// SetOrigin set call origin in the context.
Expand Down Expand Up @@ -168,7 +169,7 @@ func setIncomingMD(
func GetCallerInfo(
ctx context.Context,
) CallerInfo {
values := GetValues(ctx, callerNameHeaderName, callerTypeHeaderName, callOriginHeaderName)
values := GetValues(ctx, callerNameHeaderName, CallerTypeHeaderName, callOriginHeaderName)
return CallerInfo{
CallerName: values[0],
CallerType: values[1],
Expand Down
8 changes: 4 additions & 4 deletions common/headers/caller_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_PreserveOtherValues() {
s.True(ok)
s.Equal(existingValue, md.Get(existingKey)[0])
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 4)
}
Expand All @@ -146,7 +146,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_NoExistingCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 3)
}
Expand All @@ -169,7 +169,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithExistingCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Equal(callOrigin, md.Get(callOriginHeaderName)[0])
s.Len(md, 3)
}
Expand All @@ -187,7 +187,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithPartialCallerInfo() {
md, ok := metadata.FromIncomingContext(ctx)
s.True(ok)
s.Equal(callerName, md.Get(callerNameHeaderName)[0])
s.Equal(callerType, md.Get(callerTypeHeaderName)[0])
s.Equal(callerType, md.Get(CallerTypeHeaderName)[0])
s.Empty(md.Get(callOriginHeaderName))
s.Len(md, 2)
}
4 changes: 2 additions & 2 deletions common/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
SupportedFeaturesHeaderDelim = ","

callerNameHeaderName = "caller-name"
callerTypeHeaderName = "caller-type"
CallerTypeHeaderName = "caller-type"
callOriginHeaderName = "call-initiation"
)

Expand All @@ -50,7 +50,7 @@ var (
SupportedServerVersionsHeaderName,
SupportedFeaturesHeaderName,
callerNameHeaderName,
callerTypeHeaderName,
CallerTypeHeaderName,
callOriginHeaderName,
}
)
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/client/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type (
PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter
PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter
EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn
OperatorRPSRatio dynamicconfig.FloatPropertyFn

DynamicRateLimitingParams dynamicconfig.MapPropertyFn

Expand All @@ -59,6 +60,7 @@ type (
PersistenceNamespaceMaxQPS PersistenceNamespaceMaxQps
PersistencePerShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS
EnablePriorityRateLimiting EnablePriorityRateLimiting
OperatorRPSRatio OperatorRPSRatio
ClusterName ClusterName
ServiceName primitives.ServiceName
MetricsHandler metrics.Handler
Expand Down Expand Up @@ -92,6 +94,7 @@ func FactoryProvider(
params.PersistenceMaxQPS,
params.PersistencePerShardNamespaceMaxQPS,
RequestPriorityFn,
params.OperatorRPSRatio,
params.HealthSignals,
params.DynamicRateLimitingParams,
params.Logger,
Expand Down
74 changes: 49 additions & 25 deletions common/persistence/client/quotas.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,30 @@ type (

var (
CallerTypeDefaultPriority = map[string]int{
headers.CallerTypeAPI: 1,
headers.CallerTypeBackground: 3,
headers.CallerTypePreemptable: 4,
headers.CallerTypeOperator: 0,
headers.CallerTypeAPI: 2,
headers.CallerTypeBackground: 4,
headers.CallerTypePreemptable: 5,
}

APITypeCallOriginPriorityOverride = map[string]int{
"StartWorkflowExecution": 0,
"SignalWithStartWorkflowExecution": 0,
"SignalWorkflowExecution": 0,
"RequestCancelWorkflowExecution": 0,
"TerminateWorkflowExecution": 0,
"GetWorkflowExecutionHistory": 0,
"UpdateWorkflowExecution": 0,
"StartWorkflowExecution": 1,
"SignalWithStartWorkflowExecution": 1,
"SignalWorkflowExecution": 1,
"RequestCancelWorkflowExecution": 1,
"TerminateWorkflowExecution": 1,
"GetWorkflowExecutionHistory": 1,
"UpdateWorkflowExecution": 1,
}

BackgroundTypeAPIPriorityOverride = map[string]int{
"GetOrCreateShard": 0,
"UpdateShard": 0,
"GetOrCreateShard": 1,
"UpdateShard": 1,

// This is a preprequisite for checkpointing queue process progress
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 0,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 1,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 1,
p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1,

// Task resource isolation assumes task can always be loaded.
// When one namespace has high load, all task processing goroutines
Expand All @@ -73,19 +74,20 @@ var (
// NOTE: we also don't want task loading to consume all persistence request tokens,
// and blocks all other operations. This is done by setting the queue host rps limit
// dynamic config.
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 2,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 3,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 3,
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3,
}

RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4}
RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5}
)

func NewPriorityRateLimiter(
namespaceMaxQPS PersistenceNamespaceMaxQps,
hostMaxQPS PersistenceMaxQps,
perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
healthSignals p.HealthSignalAggregator,
dynamicParams DynamicRateLimitingParams,
logger log.Logger,
Expand All @@ -94,20 +96,21 @@ func NewPriorityRateLimiter(

return quotas.NewMultiRequestRateLimiter(
// per shardID+namespaceID rate limiters
newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn),
newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio),
// per namespaceID rate limiters
newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn),
newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio),
// host-level dynamic rate limiter
newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, healthSignals, dynamicParams, logger),
newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio, healthSignals, dynamicParams, logger),
// basic host-level rate limiter
newPriorityRateLimiter(hostRateFn, requestPriorityFn),
newPriorityRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio),
)
}

func newPerShardPerNamespacePriorityRateLimiter(
perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS,
hostMaxQPS PersistenceMaxQps,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
return quotas.NewMapRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter {
if hasCaller(req) && hasCallerSegment(req) {
Expand All @@ -118,6 +121,7 @@ func newPerShardPerNamespacePriorityRateLimiter(
return float64(perShardNamespaceMaxQPS(req.Caller))
},
requestPriorityFn,
operatorRPSRatio,
)
}
return quotas.NoopRequestRateLimiter
Expand All @@ -137,6 +141,7 @@ func newPriorityNamespaceRateLimiter(
namespaceMaxQPS PersistenceNamespaceMaxQps,
hostMaxQPS PersistenceMaxQps,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
return quotas.NewNamespaceRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter {
if hasCaller(req) {
Expand All @@ -154,6 +159,7 @@ func newPriorityNamespaceRateLimiter(
return namespaceQPS
},
requestPriorityFn,
operatorRPSRatio,
)
}
return quotas.NoopRequestRateLimiter
Expand All @@ -163,10 +169,15 @@ func newPriorityNamespaceRateLimiter(
func newPriorityRateLimiter(
rateFn quotas.RateFn,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range RequestPrioritiesOrdered {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn))
if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio)))
} else {
rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn))
}
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -178,14 +189,19 @@ func newPriorityRateLimiter(
func newPriorityDynamicRateLimiter(
rateFn quotas.RateFn,
requestPriorityFn quotas.RequestPriorityFn,
operatorRPSRatio OperatorRPSRatio,
healthSignals p.HealthSignalAggregator,
dynamicParams DynamicRateLimitingParams,
logger log.Logger,
) quotas.RequestRateLimiter {
rateLimiters := make(map[int]quotas.RequestRateLimiter)
for priority := range RequestPrioritiesOrdered {
// TODO: refactor this so dynamic rate adjustment is global for all priorities
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger)
if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] {
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, operatorRateFn(rateFn, operatorRPSRatio), dynamicParams, logger)
} else {
rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger)
}
}

return quotas.NewPriorityRateLimiter(
Expand All @@ -211,6 +227,8 @@ func NewNoopPriorityRateLimiter(

func RequestPriorityFn(req quotas.Request) int {
switch req.CallerType {
case headers.CallerTypeOperator:
return CallerTypeDefaultPriority[req.CallerType]
case headers.CallerTypeAPI:
if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok {
return priority
Expand All @@ -229,6 +247,12 @@ func RequestPriorityFn(req quotas.Request) int {
}
}

func operatorRateFn(rateFn quotas.RateFn, operatorRPSRatio OperatorRPSRatio) quotas.RateFn {
return func() float64 {
return operatorRPSRatio() * rateFn()
}
}

func hasCaller(req quotas.Request) bool {
return req.Caller != "" && req.Caller != headers.CallerNameSystem
}
Expand Down
52 changes: 44 additions & 8 deletions common/persistence/client/quotas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/quotas"
"golang.org/x/exp/slices"

Expand Down Expand Up @@ -104,12 +105,13 @@ func (s *quotasSuite) TestCallOriginDefined() {
}

func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() {
var namespaceMaxRPS = func(namespace string) int { return 1 }
var hostMaxRPS = func() int { return 1 }
namespaceMaxRPS := func(namespace string) int { return 1 }
hostMaxRPS := func() int { return 1 }
operatorRPSRatioFn := func() float64 { return 0.2 }

var limiter = newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn)
limiter := newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn)

var request = quotas.NewRequest(
request := quotas.NewRequest(
"test-api",
1,
"test-namespace",
Expand All @@ -131,12 +133,13 @@ func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() {
}

func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() {
var perShardNamespaceMaxRPS = func(namespace string) int { return 1 }
var hostMaxRPS = func() int { return 1 }
perShardNamespaceMaxRPS := func(namespace string) int { return 1 }
hostMaxRPS := func() int { return 1 }
operatorRPSRatioFn := func() float64 { return 0.2 }

var limiter = newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn)
limiter := newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn)

var request = quotas.NewRequest(
request := quotas.NewRequest(
"test-api",
1,
"test-namespace",
Expand All @@ -156,3 +159,36 @@ func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() {

s.True(wasLimited)
}

func (s *quotasSuite) TestOperatorPrioritized() {
rateFn := func() float64 { return 5 }
operatorRPSRatioFn := func() float64 { return 0.2 }
limiter := newPriorityRateLimiter(rateFn, RequestPriorityFn, operatorRPSRatioFn)

operatorRequest := quotas.NewRequest(
"DescribeWorkflowExecution",
1,
"test-namespace",
headers.CallerTypeOperator,
-1,
"DescribeWorkflowExecution")

apiRequest := quotas.NewRequest(
"DescribeWorkflowExecution",
1,
"test-namespace",
headers.CallerTypeAPI,
-1,
"DescribeWorkflowExecution")

requestTime := time.Now()
wasLimited := false

for i := 0; i < 6; i++ {
if !limiter.Allow(requestTime, apiRequest) {
wasLimited = true
s.True(limiter.Allow(requestTime, operatorRequest))
}
}
s.True(wasLimited)
}
Loading