From ce86e35927d2a42a1d717b53f11071b64627774f Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Wed, 12 Jul 2023 14:08:35 -0700 Subject: [PATCH 1/6] WIP --- common/headers/caller_info.go | 1 + common/persistence/client/quotas.go | 46 +++++++++++++++++++++------ common/rpc/interceptor/caller_info.go | 2 +- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/common/headers/caller_info.go b/common/headers/caller_info.go index 214fa5a5e47..3c8595cef60 100644 --- a/common/headers/caller_info.go +++ b/common/headers/caller_info.go @@ -31,6 +31,7 @@ import ( ) const ( + CallerTypeOperator = "operator" CallerTypeAPI = "api" CallerTypeBackground = "background" CallerTypePreemptable = "preemptable" diff --git a/common/persistence/client/quotas.go b/common/persistence/client/quotas.go index b2466be540d..0f9e514d8b8 100644 --- a/common/persistence/client/quotas.go +++ b/common/persistence/client/quotas.go @@ -39,11 +39,19 @@ type ( } ) +const ( + // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that + // should be used for operator API calls. Operator API calls have a lower rate limit to + // prevent users from abusing this to get high priority for all requests. + OperatorQPSRatio = 0.2 +) + var ( CallerTypeDefaultPriority = map[string]int{ - headers.CallerTypeAPI: 1, - headers.CallerTypeBackground: 3, - headers.CallerTypePreemptable: 4, + headers.CallerTypeOperator: 1, + headers.CallerTypeAPI: 2, + headers.CallerTypeBackground: 4, + headers.CallerTypePreemptable: 5, } APITypeCallOriginPriorityOverride = map[string]int{ @@ -73,12 +81,12 @@ var ( // NOTE: we also don't want task loading to consume all persistence request tokens, // and blocks all other operations. This is done by setting the queue host rps limit // dynamic config. - p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 2, - p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 2, - p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 2, + p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTransfer): 3, + p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryTimer): 3, + p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 3, } - RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4} + RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4, 5} ) func NewPriorityRateLimiter( @@ -166,7 +174,11 @@ func newPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range RequestPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) + if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(OperatorRateFn(rateFn))) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) + } } return quotas.NewPriorityRateLimiter( @@ -185,7 +197,11 @@ func newPriorityDynamicRateLimiter( rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range RequestPrioritiesOrdered { // TODO: refactor this so dynamic rate adjustment is global for all priorities - rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger) + if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { + rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, OperatorRateFn(rateFn), dynamicParams, logger) + } else { + rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger) + } } return quotas.NewPriorityRateLimiter( @@ -211,6 +227,11 @@ func NewNoopPriorityRateLimiter( func RequestPriorityFn(req quotas.Request) int { switch req.CallerType { + case headers.CallerTypeOperator: + if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok { + return priority + } + return CallerTypeDefaultPriority[req.CallerType] case headers.CallerTypeAPI: if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok { return priority @@ -229,6 +250,13 @@ func RequestPriorityFn(req quotas.Request) int { } } +// TODO: maybe unexport this +func OperatorRateFn(rateFn quotas.RateFn) quotas.RateFn { + return func() float64 { + return rateFn() * OperatorQPSRatio + } +} + func hasCaller(req quotas.Request) bool { return req.Caller != "" && req.Caller != headers.CallerNameSystem } diff --git a/common/rpc/interceptor/caller_info.go b/common/rpc/interceptor/caller_info.go index ff06f0bfb4c..d62967c4fc0 100644 --- a/common/rpc/interceptor/caller_info.go +++ b/common/rpc/interceptor/caller_info.go @@ -66,7 +66,7 @@ func (i *CallerInfoInterceptor) Intercept( callerInfo.CallerType = headers.CallerTypeAPI updateInfo = true } - if callerInfo.CallerType == headers.CallerTypeAPI && + if (callerInfo.CallerType == headers.CallerTypeAPI || callerInfo.CallerType == headers.CallerTypeOperator) && callerInfo.CallOrigin == "" { _, method := SplitMethodName(info.FullMethod) callerInfo.CallOrigin = method From 1ac22f1275d51a79a016f128a7a16981b242dfb4 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 13 Jul 2023 09:12:44 -0700 Subject: [PATCH 2/6] add operator rate limiter priority --- common/headers/caller_info.go | 6 +- common/headers/caller_info_test.go | 8 +- common/headers/headers.go | 4 +- common/persistence/client/quotas.go | 7 +- .../rpc/interceptor/namespace_rate_limit.go | 3 +- common/rpc/interceptor/rate_limit.go | 3 +- service/frontend/configs/quotas.go | 199 ++++++++++++------ service/history/configs/quotas.go | 125 ++++++----- service/matching/configs/quotas.go | 63 ++++-- 9 files changed, 265 insertions(+), 153 deletions(-) diff --git a/common/headers/caller_info.go b/common/headers/caller_info.go index 3c8595cef60..41e1237780b 100644 --- a/common/headers/caller_info.go +++ b/common/headers/caller_info.go @@ -114,7 +114,7 @@ func SetCallerInfo( ) context.Context { return setIncomingMD(ctx, map[string]string{ callerNameHeaderName: info.CallerName, - callerTypeHeaderName: info.CallerType, + CallerTypeHeaderName: info.CallerType, callOriginHeaderName: info.CallOrigin, }) } @@ -134,7 +134,7 @@ func SetCallerType( ctx context.Context, callerType string, ) context.Context { - return setIncomingMD(ctx, map[string]string{callerTypeHeaderName: callerType}) + return setIncomingMD(ctx, map[string]string{CallerTypeHeaderName: callerType}) } // SetOrigin set call origin in the context. @@ -169,7 +169,7 @@ func setIncomingMD( func GetCallerInfo( ctx context.Context, ) CallerInfo { - values := GetValues(ctx, callerNameHeaderName, callerTypeHeaderName, callOriginHeaderName) + values := GetValues(ctx, callerNameHeaderName, CallerTypeHeaderName, callOriginHeaderName) return CallerInfo{ CallerName: values[0], CallerType: values[1], diff --git a/common/headers/caller_info_test.go b/common/headers/caller_info_test.go index 0e075489bc9..b815faa0046 100644 --- a/common/headers/caller_info_test.go +++ b/common/headers/caller_info_test.go @@ -127,7 +127,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_PreserveOtherValues() { s.True(ok) s.Equal(existingValue, md.Get(existingKey)[0]) s.Equal(callerName, md.Get(callerNameHeaderName)[0]) - s.Equal(callerType, md.Get(callerTypeHeaderName)[0]) + s.Equal(callerType, md.Get(CallerTypeHeaderName)[0]) s.Equal(callOrigin, md.Get(callOriginHeaderName)[0]) s.Len(md, 4) } @@ -146,7 +146,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_NoExistingCallerInfo() { md, ok := metadata.FromIncomingContext(ctx) s.True(ok) s.Equal(callerName, md.Get(callerNameHeaderName)[0]) - s.Equal(callerType, md.Get(callerTypeHeaderName)[0]) + s.Equal(callerType, md.Get(CallerTypeHeaderName)[0]) s.Equal(callOrigin, md.Get(callOriginHeaderName)[0]) s.Len(md, 3) } @@ -169,7 +169,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithExistingCallerInfo() { md, ok := metadata.FromIncomingContext(ctx) s.True(ok) s.Equal(callerName, md.Get(callerNameHeaderName)[0]) - s.Equal(callerType, md.Get(callerTypeHeaderName)[0]) + s.Equal(callerType, md.Get(CallerTypeHeaderName)[0]) s.Equal(callOrigin, md.Get(callOriginHeaderName)[0]) s.Len(md, 3) } @@ -187,7 +187,7 @@ func (s *callerInfoSuite) TestSetCallerInfo_WithPartialCallerInfo() { md, ok := metadata.FromIncomingContext(ctx) s.True(ok) s.Equal(callerName, md.Get(callerNameHeaderName)[0]) - s.Equal(callerType, md.Get(callerTypeHeaderName)[0]) + s.Equal(callerType, md.Get(CallerTypeHeaderName)[0]) s.Empty(md.Get(callOriginHeaderName)) s.Len(md, 2) } diff --git a/common/headers/headers.go b/common/headers/headers.go index 6d42b76465b..db12b3dc423 100644 --- a/common/headers/headers.go +++ b/common/headers/headers.go @@ -38,7 +38,7 @@ const ( SupportedFeaturesHeaderDelim = "," callerNameHeaderName = "caller-name" - callerTypeHeaderName = "caller-type" + CallerTypeHeaderName = "caller-type" callOriginHeaderName = "call-initiation" ) @@ -50,7 +50,7 @@ var ( SupportedServerVersionsHeaderName, SupportedFeaturesHeaderName, callerNameHeaderName, - callerTypeHeaderName, + CallerTypeHeaderName, callOriginHeaderName, } ) diff --git a/common/persistence/client/quotas.go b/common/persistence/client/quotas.go index 0f9e514d8b8..31164710b75 100644 --- a/common/persistence/client/quotas.go +++ b/common/persistence/client/quotas.go @@ -175,7 +175,7 @@ func newPriorityRateLimiter( rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range RequestPrioritiesOrdered { if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(OperatorRateFn(rateFn))) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(rateFn))) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) } @@ -198,7 +198,7 @@ func newPriorityDynamicRateLimiter( for priority := range RequestPrioritiesOrdered { // TODO: refactor this so dynamic rate adjustment is global for all priorities if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { - rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, OperatorRateFn(rateFn), dynamicParams, logger) + rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, operatorRateFn(rateFn), dynamicParams, logger) } else { rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger) } @@ -250,8 +250,7 @@ func RequestPriorityFn(req quotas.Request) int { } } -// TODO: maybe unexport this -func OperatorRateFn(rateFn quotas.RateFn) quotas.RateFn { +func operatorRateFn(rateFn quotas.RateFn) quotas.RateFn { return func() float64 { return rateFn() * OperatorQPSRatio } diff --git a/common/rpc/interceptor/namespace_rate_limit.go b/common/rpc/interceptor/namespace_rate_limit.go index d442b4643d2..88524c12c6d 100644 --- a/common/rpc/interceptor/namespace_rate_limit.go +++ b/common/rpc/interceptor/namespace_rate_limit.go @@ -30,6 +30,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/headers" "google.golang.org/grpc" "go.temporal.io/server/common/namespace" @@ -83,7 +84,7 @@ func (ni *NamespaceRateLimitInterceptor) Intercept( methodName, token, namespace.String(), - "", // this interceptor layer does not throttle based on caller type + headers.GetValues(ctx, headers.CallerTypeHeaderName)[0], 0, // this interceptor layer does not throttle based on caller segment "", // this interceptor layer does not throttle based on call initiation )) { diff --git a/common/rpc/interceptor/rate_limit.go b/common/rpc/interceptor/rate_limit.go index 1762d8789ef..f02fca3b8e9 100644 --- a/common/rpc/interceptor/rate_limit.go +++ b/common/rpc/interceptor/rate_limit.go @@ -30,6 +30,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/headers" "google.golang.org/grpc" "go.temporal.io/server/common/quotas" @@ -78,7 +79,7 @@ func (i *RateLimitInterceptor) Intercept( methodName, token, "", // this interceptor layer does not throttle based on caller name - "", // this interceptor layer does not throttle based on caller type + headers.GetValues(ctx, headers.CallerTypeHeaderName)[0], 0, // this interceptor layer does not throttle based on caller segment "", // this interceptor layer does not throttle based on call initiation )) { diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 9c48b7634d9..9f2a725d265 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -28,9 +28,19 @@ import ( "time" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" ) +const ( + // OperatorPriority is used to give precedence to calls coming from web UI or tctl + OperatorPriority = 0 + // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that + // should be used for operator API calls. Operator API calls have a lower rate limit to + // prevent users from abusing this to get high priority for all requests. + OperatorQPSRatio = 0.2 +) + var ( ExecutionAPICountLimitOverride = map[string]int{ "PollActivityTaskQueue": 1, @@ -41,95 +51,95 @@ var ( } ExecutionAPIToPriority = map[string]int{ - // priority 0 - "StartWorkflowExecution": 0, - "SignalWithStartWorkflowExecution": 0, - "SignalWorkflowExecution": 0, - "RequestCancelWorkflowExecution": 0, - "TerminateWorkflowExecution": 0, - "GetWorkflowExecutionHistory": 0, - "UpdateWorkflowExecution": 0, - "PollWorkflowExecutionUpdate": 0, - // priority 1 - "RecordActivityTaskHeartbeat": 1, - "RecordActivityTaskHeartbeatById": 1, - "RespondActivityTaskCanceled": 1, - "RespondActivityTaskCanceledById": 1, - "RespondActivityTaskFailed": 1, - "RespondActivityTaskFailedById": 1, - "RespondActivityTaskCompleted": 1, - "RespondActivityTaskCompletedById": 1, - "RespondWorkflowTaskCompleted": 1, + "StartWorkflowExecution": 1, + "SignalWithStartWorkflowExecution": 1, + "SignalWorkflowExecution": 1, + "RequestCancelWorkflowExecution": 1, + "TerminateWorkflowExecution": 1, + "GetWorkflowExecutionHistory": 1, + "UpdateWorkflowExecution": 1, + "PollWorkflowExecutionUpdate": 1, // priority 2 - "ResetWorkflowExecution": 2, - "DescribeWorkflowExecution": 2, - "RespondWorkflowTaskFailed": 2, - "QueryWorkflow": 2, - "RespondQueryTaskCompleted": 2, - "PollWorkflowTaskQueue": 2, - "PollActivityTaskQueue": 2, - "GetWorkflowExecutionHistoryReverse": 2, - "GetWorkerBuildIdCompatibility": 2, - "GetWorkerTaskReachability": 2, - "DeleteWorkflowExecution": 2, + "RecordActivityTaskHeartbeat": 2, + "RecordActivityTaskHeartbeatById": 2, + "RespondActivityTaskCanceled": 2, + "RespondActivityTaskCanceledById": 2, + "RespondActivityTaskFailed": 2, + "RespondActivityTaskFailedById": 2, + "RespondActivityTaskCompleted": 2, + "RespondActivityTaskCompletedById": 2, + "RespondWorkflowTaskCompleted": 2, // priority 3 - "ResetStickyTaskQueue": 3, - "DescribeTaskQueue": 3, - "ListTaskQueuePartitions": 3, + "ResetWorkflowExecution": 3, + "DescribeWorkflowExecution": 3, + "RespondWorkflowTaskFailed": 3, + "QueryWorkflow": 3, + "RespondQueryTaskCompleted": 3, + "PollWorkflowTaskQueue": 3, + "PollActivityTaskQueue": 3, + "GetWorkflowExecutionHistoryReverse": 3, + "GetWorkerBuildIdCompatibility": 3, + "GetWorkerTaskReachability": 3, + "DeleteWorkflowExecution": 3, + + // priority 4 + "ResetStickyTaskQueue": 4, + "DescribeTaskQueue": 4, + "ListTaskQueuePartitions": 4, } - ExecutionAPIPrioritiesOrdered = []int{0, 1, 2, 3} + ExecutionAPIPrioritiesOrdered = []int{0, 1, 2, 3, 4} VisibilityAPIToPriority = map[string]int{ - "CountWorkflowExecutions": 0, - "ScanWorkflowExecutions": 0, - "ListOpenWorkflowExecutions": 0, - "ListClosedWorkflowExecutions": 0, - "ListWorkflowExecutions": 0, - "ListArchivedWorkflowExecutions": 0, + "CountWorkflowExecutions": 1, + "ScanWorkflowExecutions": 1, + "ListOpenWorkflowExecutions": 1, + "ListClosedWorkflowExecutions": 1, + "ListWorkflowExecutions": 1, + "ListArchivedWorkflowExecutions": 1, } - VisibilityAPIPrioritiesOrdered = []int{0} + VisibilityAPIPrioritiesOrdered = []int{0, 1} // Special rate limiting for APIs that may insert replication tasks into a namespace replication queue. // The replication queue is used to propagate critical failover messages and this mapping prevents flooding the // queue and delaying failover. NamespaceReplicationInducingAPIToPriority = map[string]int{ - "RegisterNamespace": 0, - "UpdateNamespace": 0, - "UpdateWorkerBuildIdCompatibility": 1, + "RegisterNamespace": 1, + "UpdateNamespace": 1, + "UpdateWorkerBuildIdCompatibility": 2, } - NamespaceReplicationInducingAPIPrioritiesOrdered = []int{0, 1} + NamespaceReplicationInducingAPIPrioritiesOrdered = []int{0, 1, 2} OtherAPIToPriority = map[string]int{ - "GetClusterInfo": 0, - "GetSystemInfo": 0, - "GetSearchAttributes": 0, - - "DescribeNamespace": 0, - "ListNamespaces": 0, - "DeprecateNamespace": 0, - - "CreateSchedule": 0, - "DescribeSchedule": 0, - "UpdateSchedule": 0, - "PatchSchedule": 0, - "ListScheduleMatchingTimes": 0, - "DeleteSchedule": 0, - "ListSchedules": 0, + "GetClusterInfo": 1, + "GetSystemInfo": 1, + "GetSearchAttributes": 1, + + "DescribeNamespace": 1, + "ListNamespaces": 1, + "DeprecateNamespace": 1, + + "CreateSchedule": 1, + "DescribeSchedule": 1, + "UpdateSchedule": 1, + "PatchSchedule": 1, + "ListScheduleMatchingTimes": 1, + "DeleteSchedule": 1, + "ListSchedules": 1, // TODO(yx): added temporarily here; need to check if it's the right place and priority - "DescribeBatchOperation": 0, - "ListBatchOperations": 0, - "StartBatchOperation": 0, - "StopBatchOperation": 0, + "DescribeBatchOperation": 1, + "ListBatchOperations": 1, + "StartBatchOperation": 1, + "StopBatchOperation": 1, } - OtherAPIPrioritiesOrdered = []int{0} + OtherAPIPrioritiesOrdered = []int{0, 1} ) type ( @@ -138,9 +148,15 @@ type ( rateFn dynamicconfig.FloatPropertyFnWithNamespaceFilter burstFn dynamicconfig.IntPropertyFnWithNamespaceFilter } + + operatorRateBurstImpl struct { + operatorRateRatio float64 + baseRateBurstFn quotas.RateBurst + } ) var _ quotas.RateBurst = (*NamespaceRateBurstImpl)(nil) +var _ quotas.RateBurst = (*operatorRateBurstImpl)(nil) func NewNamespaceRateBurst( namespaceName string, @@ -162,6 +178,23 @@ func (c *NamespaceRateBurstImpl) Burst() int { return c.burstFn(c.namespaceName) } +func newOperatorRateBurst( + baseRateBurstFn quotas.RateBurst, +) *operatorRateBurstImpl { + return &operatorRateBurstImpl{ + operatorRateRatio: OperatorQPSRatio, + baseRateBurstFn: baseRateBurstFn, + } +} + +func (c *operatorRateBurstImpl) Rate() float64 { + return c.operatorRateRatio * c.baseRateBurstFn.Rate() +} + +func (c *operatorRateBurstImpl) Burst() int { + return c.baseRateBurstFn.Burst() +} + func NewRequestToRateLimiter( executionRateBurstFn quotas.RateBurst, visibilityRateBurstFn quotas.RateBurst, @@ -196,9 +229,16 @@ func NewExecutionPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range ExecutionAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := ExecutionAPIToPriority[req.API]; ok { return priority } @@ -211,9 +251,16 @@ func NewVisibilityPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range VisibilityAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := VisibilityAPIToPriority[req.API]; ok { return priority } @@ -226,9 +273,16 @@ func NewNamespaceReplicationInducingAPIPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := NamespaceReplicationInducingAPIToPriority[req.API]; ok { return priority } @@ -241,9 +295,16 @@ func NewOtherAPIPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range OtherAPIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := OtherAPIToPriority[req.API]; ok { return priority } diff --git a/service/history/configs/quotas.go b/service/history/configs/quotas.go index e7736bdd9ab..c72541bf23a 100644 --- a/service/history/configs/quotas.go +++ b/service/history/configs/quotas.go @@ -25,62 +25,72 @@ package configs import ( + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" ) +const ( + // OperatorPriority is used to give precedence to calls coming from web UI or tctl + OperatorPriority = 0 + // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that + // should be used for operator API calls. Operator API calls have a lower rate limit to + // prevent users from abusing this to get high priority for all requests. + OperatorQPSRatio = 0.2 +) + var ( APIToPriority = map[string]int{ - "CloseShard": 0, - "GetShard": 0, - "DeleteWorkflowExecution": 0, - "DescribeHistoryHost": 0, - "DescribeMutableState": 0, - "DescribeWorkflowExecution": 0, - "GetDLQMessages": 0, - "GetDLQReplicationMessages": 0, - "GetMutableState": 0, - "GetReplicationMessages": 0, - "MergeDLQMessages": 0, - "PollMutableState": 0, - "PurgeDLQMessages": 0, - "QueryWorkflow": 0, - "ReapplyEvents": 0, - "RebuildMutableState": 0, - "RecordActivityTaskHeartbeat": 0, - "RecordActivityTaskStarted": 0, - "RecordChildExecutionCompleted": 0, - "VerifyChildExecutionCompletionRecorded": 0, - "RecordWorkflowTaskStarted": 0, - "RefreshWorkflowTasks": 0, - "RemoveSignalMutableState": 0, - "RemoveTask": 0, - "ReplicateEventsV2": 0, - "ReplicateWorkflowState": 0, - "RequestCancelWorkflowExecution": 0, - "ResetStickyTaskQueue": 0, - "ResetWorkflowExecution": 0, - "RespondActivityTaskCanceled": 0, - "RespondActivityTaskCompleted": 0, - "RespondActivityTaskFailed": 0, - "RespondWorkflowTaskCompleted": 0, - "RespondWorkflowTaskFailed": 0, - "ScheduleWorkflowTask": 0, - "VerifyFirstWorkflowTaskScheduled": 0, - "SignalWithStartWorkflowExecution": 0, - "SignalWorkflowExecution": 0, - "StartWorkflowExecution": 0, - "SyncActivity": 0, - "SyncShardStatus": 0, - "TerminateWorkflowExecution": 0, - "GenerateLastHistoryReplicationTasks": 0, - "GetReplicationStatus": 0, - "DeleteWorkflowVisibilityRecord": 0, - "UpdateWorkflowExecution": 0, - "PollWorkflowExecutionUpdate": 0, - "StreamWorkflowReplicationMessages": 0, + "CloseShard": 1, + "GetShard": 1, + "DeleteWorkflowExecution": 1, + "DescribeHistoryHost": 1, + "DescribeMutableState": 1, + "DescribeWorkflowExecution": 1, + "GetDLQMessages": 1, + "GetDLQReplicationMessages": 1, + "GetMutableState": 1, + "GetReplicationMessages": 1, + "MergeDLQMessages": 1, + "PollMutableState": 1, + "PurgeDLQMessages": 1, + "QueryWorkflow": 1, + "ReapplyEvents": 1, + "RebuildMutableState": 1, + "RecordActivityTaskHeartbeat": 1, + "RecordActivityTaskStarted": 1, + "RecordChildExecutionCompleted": 1, + "VerifyChildExecutionCompletionRecorded": 1, + "RecordWorkflowTaskStarted": 1, + "RefreshWorkflowTasks": 1, + "RemoveSignalMutableState": 1, + "RemoveTask": 1, + "ReplicateEventsV2": 1, + "ReplicateWorkflowState": 1, + "RequestCancelWorkflowExecution": 1, + "ResetStickyTaskQueue": 1, + "ResetWorkflowExecution": 1, + "RespondActivityTaskCanceled": 1, + "RespondActivityTaskCompleted": 1, + "RespondActivityTaskFailed": 1, + "RespondWorkflowTaskCompleted": 1, + "RespondWorkflowTaskFailed": 1, + "ScheduleWorkflowTask": 1, + "VerifyFirstWorkflowTaskScheduled": 1, + "SignalWithStartWorkflowExecution": 1, + "SignalWorkflowExecution": 1, + "StartWorkflowExecution": 1, + "SyncActivity": 1, + "SyncShardStatus": 1, + "TerminateWorkflowExecution": 1, + "GenerateLastHistoryReplicationTasks": 1, + "GetReplicationStatus": 1, + "DeleteWorkflowVisibilityRecord": 1, + "UpdateWorkflowExecution": 1, + "PollWorkflowExecutionUpdate": 1, + "StreamWorkflowReplicationMessages": 1, } - APIPrioritiesOrdered = []int{0} + APIPrioritiesOrdered = []int{OperatorPriority, 1} ) func NewPriorityRateLimiter( @@ -88,12 +98,27 @@ func NewPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn))) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := APIToPriority[req.API]; ok { return priority } return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1] }, rateLimiters) } + +func operatorRateFn( + rateFn quotas.RateFn, +) quotas.RateFn { + return func() float64 { + return rateFn() * OperatorQPSRatio + } +} diff --git a/service/matching/configs/quotas.go b/service/matching/configs/quotas.go index 9294778c8af..7413c9bd66b 100644 --- a/service/matching/configs/quotas.go +++ b/service/matching/configs/quotas.go @@ -25,31 +25,41 @@ package configs import ( + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" ) +const ( + // OperatorPriority is used to give precedence to calls coming from web UI or tctl + OperatorPriority = 0 + // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that + // should be used for operator API calls. Operator API calls have a lower rate limit to + // prevent users from abusing this to get high priority for all requests. + OperatorQPSRatio = 0.2 +) + var ( APIToPriority = map[string]int{ - "AddActivityTask": 0, - "AddWorkflowTask": 0, - "CancelOutstandingPoll": 0, - "DescribeTaskQueue": 0, - "ListTaskQueuePartitions": 0, - "PollActivityTaskQueue": 0, - "PollWorkflowTaskQueue": 0, - "QueryWorkflow": 0, - "RespondQueryTaskCompleted": 0, - "GetWorkerBuildIdCompatibility": 0, - "UpdateWorkerBuildIdCompatibility": 0, - "GetTaskQueueUserData": 0, - "ApplyTaskQueueUserDataReplicationEvent": 0, - "GetBuildIdTaskQueueMapping": 0, - "ForceUnloadTaskQueue": 0, - "UpdateTaskQueueUserData": 0, - "ReplicateTaskQueueUserData": 0, + "AddActivityTask": 1, + "AddWorkflowTask": 1, + "CancelOutstandingPoll": 1, + "DescribeTaskQueue": 1, + "ListTaskQueuePartitions": 1, + "PollActivityTaskQueue": 1, + "PollWorkflowTaskQueue": 1, + "QueryWorkflow": 1, + "RespondQueryTaskCompleted": 1, + "GetWorkerBuildIdCompatibility": 1, + "UpdateWorkerBuildIdCompatibility": 1, + "GetTaskQueueUserData": 1, + "ApplyTaskQueueUserDataReplicationEvent": 1, + "GetBuildIdTaskQueueMapping": 1, + "ForceUnloadTaskQueue": 1, + "UpdateTaskQueueUserData": 1, + "ReplicateTaskQueueUserData": 1, } - APIPrioritiesOrdered = []int{0} + APIPrioritiesOrdered = []int{0, 1} ) func NewPriorityRateLimiter( @@ -57,12 +67,27 @@ func NewPriorityRateLimiter( ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn))) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) + } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } if priority, ok := APIToPriority[req.API]; ok { return priority } return APIPrioritiesOrdered[len(APIPrioritiesOrdered)-1] }, rateLimiters) } + +func operatorRateFn( + rateFn quotas.RateFn, +) quotas.RateFn { + return func() float64 { + return rateFn() * OperatorQPSRatio + } +} From 0c2fb8bb2bb42984864c7c5238d002c7c773d2a0 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Thu, 13 Jul 2023 10:26:53 -0700 Subject: [PATCH 3/6] tests --- common/persistence/client/quotas_test.go | 33 ++++++++++++++ service/frontend/configs/quotas_test.go | 56 ++++++++++++++++++++++++ service/history/configs/quotas_test.go | 35 +++++++++++++++ service/matching/configs/quotas_test.go | 35 +++++++++++++++ 4 files changed, 159 insertions(+) diff --git a/common/persistence/client/quotas_test.go b/common/persistence/client/quotas_test.go index 04078435c3a..c84ef07856a 100644 --- a/common/persistence/client/quotas_test.go +++ b/common/persistence/client/quotas_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" @@ -156,3 +157,35 @@ func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() { s.True(wasLimited) } + +func (s *quotasSuite) TestOperatorPrioritized() { + rateFn := func() float64 { return 5 } + limiter := newPriorityRateLimiter(rateFn, RequestPriorityFn) + + operatorRequest := quotas.NewRequest( + "DescribeWorkflowExecution", + 1, + "test-namespace", + headers.CallerTypeOperator, + -1, + "DescribeWorkflowExecution") + + apiRequest := quotas.NewRequest( + "DescribeWorkflowExecution", + 1, + "test-namespace", + headers.CallerTypeAPI, + -1, + "DescribeWorkflowExecution") + + requestTime := time.Now() + wasLimited := false + + for i := 0; i < 6; i++ { + if !limiter.Allow(requestTime, apiRequest) { + wasLimited = true + s.True(limiter.Allow(requestTime, operatorRequest)) + } + } + s.True(wasLimited) +} diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index c7bf63c3b0e..591442e981f 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -27,13 +27,20 @@ package configs import ( "reflect" "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" ) +var ( + testRateBurstFn = quotas.NewDefaultIncomingRateBurst(func() float64 { return 5 }) +) + type ( quotasSuite struct { suite.Suite @@ -262,3 +269,52 @@ func (s *quotasSuite) TestAllAPIs() { } s.Equal(expectedAPIs, actualAPIs) } + +func (s *quotasSuite) TestOperatorPriority_Execution() { + limiter := NewExecutionPriorityRateLimiter(testRateBurstFn) + s.testOperatorPrioritized(limiter, "DescribeWorkflowExecution") +} + +func (s *quotasSuite) TestOperatorPriority_Visibility() { + limiter := NewVisibilityPriorityRateLimiter(testRateBurstFn) + s.testOperatorPrioritized(limiter, "ListOpenWorkflowExecutions") +} + +func (s *quotasSuite) TestOperatorPriority_NamespaceReplicationInducing() { + limiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(testRateBurstFn) + s.testOperatorPrioritized(limiter, "RegisterNamespace") +} + +func (s *quotasSuite) TestOperatorPriority_Other() { + limiter := NewOtherAPIPriorityRateLimiter(testRateBurstFn) + s.testOperatorPrioritized(limiter, "DescribeNamespace") +} + +func (s *quotasSuite) testOperatorPrioritized(limiter quotas.RequestRateLimiter, api string) { + operatorRequest := quotas.NewRequest( + api, + 1, + "test-namespace", + headers.CallerTypeOperator, + -1, + "") + + apiRequest := quotas.NewRequest( + api, + 1, + "test-namespace", + headers.CallerTypeAPI, + -1, + "") + + requestTime := time.Now() + limitCount := 0 + + for i := 0; i < 12; i++ { + if !limiter.Allow(requestTime, apiRequest) { + limitCount++ + s.True(limiter.Allow(requestTime, operatorRequest)) + } + } + s.Equal(2, limitCount) +} diff --git a/service/history/configs/quotas_test.go b/service/history/configs/quotas_test.go index 6ea7b6c0da7..7a3378975ae 100644 --- a/service/history/configs/quotas_test.go +++ b/service/history/configs/quotas_test.go @@ -27,9 +27,12 @@ package configs import ( "reflect" "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" "go.temporal.io/server/api/historyservice/v1" @@ -83,3 +86,35 @@ func (s *quotasSuite) TestAPIs() { } s.Equal(apiToPriority, APIToPriority) } + +func (s *quotasSuite) TestOperatorPrioritized() { + rateFn := func() float64 { return 5 } + limiter := NewPriorityRateLimiter(rateFn) + + operatorRequest := quotas.NewRequest( + "StartWorkflowExecution", + 1, + "", + headers.CallerTypeOperator, + -1, + "") + + apiRequest := quotas.NewRequest( + "StartWorkflowExecution", + 1, + "", + headers.CallerTypeAPI, + -1, + "") + + requestTime := time.Now() + limitCount := 0 + + for i := 0; i < 12; i++ { + if !limiter.Allow(requestTime, apiRequest) { + limitCount++ + s.True(limiter.Allow(requestTime, operatorRequest)) + } + } + s.Equal(2, limitCount) +} diff --git a/service/matching/configs/quotas_test.go b/service/matching/configs/quotas_test.go index 863cc50fc13..79965117ce0 100644 --- a/service/matching/configs/quotas_test.go +++ b/service/matching/configs/quotas_test.go @@ -27,9 +27,12 @@ package configs import ( "reflect" "testing" + "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" "go.temporal.io/server/api/matchingservice/v1" @@ -83,3 +86,35 @@ func (s *quotasSuite) TestAPIs() { } s.Equal(apiToPriority, APIToPriority) } + +func (s *quotasSuite) TestOperatorPrioritized() { + rateFn := func() float64 { return 5 } + limiter := NewPriorityRateLimiter(rateFn) + + operatorRequest := quotas.NewRequest( + "QueryWorkflow", + 1, + "", + headers.CallerTypeOperator, + -1, + "") + + apiRequest := quotas.NewRequest( + "QueryWorkflow", + 1, + "", + headers.CallerTypeAPI, + -1, + "") + + requestTime := time.Now() + limitCount := 0 + + for i := 0; i < 12; i++ { + if !limiter.Allow(requestTime, apiRequest) { + limitCount++ + s.True(limiter.Allow(requestTime, operatorRequest)) + } + } + s.Equal(2, limitCount) +} From c12fa5054deb8d2969565161f40cf51de311512e Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Fri, 14 Jul 2023 14:07:55 -0700 Subject: [PATCH 4/6] feedback --- common/constants.go | 5 ++ common/dynamicconfig/constants.go | 3 + common/persistence/client/fx.go | 3 + common/persistence/client/quotas.go | 59 +++++++-------- common/persistence/client/quotas_test.go | 21 +++--- .../visibility_persistence_suite_test.go | 1 + common/persistence/visibility/factory.go | 9 ++- common/persistence/visibility/quotas.go | 73 +++++++++++++++++++ .../visibility_manager_rate_limited.go | 69 +++++++++++------- .../visibility/visibility_manager_test.go | 1 + service/frontend/configs/quotas.go | 32 ++++---- service/frontend/configs/quotas_test.go | 11 +-- service/frontend/fx.go | 4 + service/frontend/service.go | 2 + service/fx.go | 3 + service/history/configs/config.go | 2 + service/history/configs/quotas.go | 11 ++- service/history/configs/quotas_test.go | 7 +- service/history/fx.go | 4 +- service/matching/config.go | 3 + service/matching/configs/quotas.go | 11 ++- service/matching/configs/quotas_test.go | 7 +- service/matching/fx.go | 4 +- service/worker/fx.go | 2 + service/worker/service.go | 3 + 25 files changed, 244 insertions(+), 106 deletions(-) create mode 100644 common/persistence/visibility/quotas.go diff --git a/common/constants.go b/common/constants.go index 776f772f0f3..666c826644e 100644 --- a/common/constants.go +++ b/common/constants.go @@ -102,3 +102,8 @@ const ( // DefaultQueueReaderID is the default readerID when loading history tasks DefaultQueueReaderID int64 = 0 ) + +const ( + // DefaultOperatorRPSRatio is the default percentage of rate limit that should be used for operator priority requests + DefaultOperatorRPSRatio float64 = 0.2 +) diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index f05024c1674..904f7d29fc1 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -119,6 +119,9 @@ const ( // ShardPerNsRPSWarnPercent is the per-shard per-namespace RPS limit for warning as a percentage of ShardRPSWarnLimit // these warning are not emitted if the value is set to 0 or less ShardPerNsRPSWarnPercent = "system.shardPerNsRPSWarnPercent" + // OperatorRPSRatio is the percentage of the rate limit provided to priority rate limiters that should be used for + // operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20% if not specified) + OperatorRPSRatio = "system.operatorRPSRatio" // Whether the deadlock detector should dump goroutines DeadlockDumpGoroutines = "system.deadlock.DumpGoroutines" diff --git a/common/persistence/client/fx.go b/common/persistence/client/fx.go index 244a29ac0c5..dc2942e6e76 100644 --- a/common/persistence/client/fx.go +++ b/common/persistence/client/fx.go @@ -45,6 +45,7 @@ type ( PersistenceNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePriorityRateLimiting dynamicconfig.BoolPropertyFn + OperatorRPSRatio dynamicconfig.FloatPropertyFn DynamicRateLimitingParams dynamicconfig.MapPropertyFn @@ -59,6 +60,7 @@ type ( PersistenceNamespaceMaxQPS PersistenceNamespaceMaxQps PersistencePerShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS EnablePriorityRateLimiting EnablePriorityRateLimiting + OperatorRPSRatio OperatorRPSRatio ClusterName ClusterName ServiceName primitives.ServiceName MetricsHandler metrics.Handler @@ -92,6 +94,7 @@ func FactoryProvider( params.PersistenceMaxQPS, params.PersistencePerShardNamespaceMaxQPS, RequestPriorityFn, + params.OperatorRPSRatio, params.HealthSignals, params.DynamicRateLimitingParams, params.Logger, diff --git a/common/persistence/client/quotas.go b/common/persistence/client/quotas.go index 31164710b75..806d4821d99 100644 --- a/common/persistence/client/quotas.go +++ b/common/persistence/client/quotas.go @@ -39,39 +39,32 @@ type ( } ) -const ( - // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that - // should be used for operator API calls. Operator API calls have a lower rate limit to - // prevent users from abusing this to get high priority for all requests. - OperatorQPSRatio = 0.2 -) - var ( CallerTypeDefaultPriority = map[string]int{ - headers.CallerTypeOperator: 1, + headers.CallerTypeOperator: 0, headers.CallerTypeAPI: 2, headers.CallerTypeBackground: 4, headers.CallerTypePreemptable: 5, } APITypeCallOriginPriorityOverride = map[string]int{ - "StartWorkflowExecution": 0, - "SignalWithStartWorkflowExecution": 0, - "SignalWorkflowExecution": 0, - "RequestCancelWorkflowExecution": 0, - "TerminateWorkflowExecution": 0, - "GetWorkflowExecutionHistory": 0, - "UpdateWorkflowExecution": 0, + "StartWorkflowExecution": 1, + "SignalWithStartWorkflowExecution": 1, + "SignalWorkflowExecution": 1, + "RequestCancelWorkflowExecution": 1, + "TerminateWorkflowExecution": 1, + "GetWorkflowExecutionHistory": 1, + "UpdateWorkflowExecution": 1, } BackgroundTypeAPIPriorityOverride = map[string]int{ - "GetOrCreateShard": 0, - "UpdateShard": 0, + "GetOrCreateShard": 1, + "UpdateShard": 1, // This is a preprequisite for checkpointing queue process progress - p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 0, - p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 0, - p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 0, + p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTransfer): 1, + p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryTimer): 1, + p.ConstructHistoryTaskAPI("RangeCompleteHistoryTasks", tasks.CategoryVisibility): 1, // Task resource isolation assumes task can always be loaded. // When one namespace has high load, all task processing goroutines @@ -94,6 +87,7 @@ func NewPriorityRateLimiter( hostMaxQPS PersistenceMaxQps, perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS, requestPriorityFn quotas.RequestPriorityFn, + operatorRPSRatio OperatorRPSRatio, healthSignals p.HealthSignalAggregator, dynamicParams DynamicRateLimitingParams, logger log.Logger, @@ -102,13 +96,13 @@ func NewPriorityRateLimiter( return quotas.NewMultiRequestRateLimiter( // per shardID+namespaceID rate limiters - newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn), + newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio), // per namespaceID rate limiters - newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn), + newPriorityNamespaceRateLimiter(namespaceMaxQPS, hostMaxQPS, requestPriorityFn, operatorRPSRatio), // host-level dynamic rate limiter - newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, healthSignals, dynamicParams, logger), + newPriorityDynamicRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio, healthSignals, dynamicParams, logger), // basic host-level rate limiter - newPriorityRateLimiter(hostRateFn, requestPriorityFn), + newPriorityRateLimiter(hostRateFn, requestPriorityFn, operatorRPSRatio), ) } @@ -116,6 +110,7 @@ func newPerShardPerNamespacePriorityRateLimiter( perShardNamespaceMaxQPS PersistencePerShardNamespaceMaxQPS, hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, + operatorRPSRatio OperatorRPSRatio, ) quotas.RequestRateLimiter { return quotas.NewMapRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter { if hasCaller(req) && hasCallerSegment(req) { @@ -126,6 +121,7 @@ func newPerShardPerNamespacePriorityRateLimiter( return float64(perShardNamespaceMaxQPS(req.Caller)) }, requestPriorityFn, + operatorRPSRatio, ) } return quotas.NoopRequestRateLimiter @@ -145,6 +141,7 @@ func newPriorityNamespaceRateLimiter( namespaceMaxQPS PersistenceNamespaceMaxQps, hostMaxQPS PersistenceMaxQps, requestPriorityFn quotas.RequestPriorityFn, + operatorRPSRatio OperatorRPSRatio, ) quotas.RequestRateLimiter { return quotas.NewNamespaceRequestRateLimiter(func(req quotas.Request) quotas.RequestRateLimiter { if hasCaller(req) { @@ -162,6 +159,7 @@ func newPriorityNamespaceRateLimiter( return namespaceQPS }, requestPriorityFn, + operatorRPSRatio, ) } return quotas.NoopRequestRateLimiter @@ -171,11 +169,12 @@ func newPriorityNamespaceRateLimiter( func newPriorityRateLimiter( rateFn quotas.RateFn, requestPriorityFn quotas.RequestPriorityFn, + operatorRPSRatio OperatorRPSRatio, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range RequestPrioritiesOrdered { if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(rateFn))) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio))) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn)) } @@ -190,6 +189,7 @@ func newPriorityRateLimiter( func newPriorityDynamicRateLimiter( rateFn quotas.RateFn, requestPriorityFn quotas.RequestPriorityFn, + operatorRPSRatio OperatorRPSRatio, healthSignals p.HealthSignalAggregator, dynamicParams DynamicRateLimitingParams, logger log.Logger, @@ -198,7 +198,7 @@ func newPriorityDynamicRateLimiter( for priority := range RequestPrioritiesOrdered { // TODO: refactor this so dynamic rate adjustment is global for all priorities if priority == CallerTypeDefaultPriority[headers.CallerTypeOperator] { - rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, operatorRateFn(rateFn), dynamicParams, logger) + rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, operatorRateFn(rateFn, operatorRPSRatio), dynamicParams, logger) } else { rateLimiters[priority] = NewHealthRequestRateLimiterImpl(healthSignals, rateFn, dynamicParams, logger) } @@ -228,9 +228,6 @@ func NewNoopPriorityRateLimiter( func RequestPriorityFn(req quotas.Request) int { switch req.CallerType { case headers.CallerTypeOperator: - if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok { - return priority - } return CallerTypeDefaultPriority[req.CallerType] case headers.CallerTypeAPI: if priority, ok := APITypeCallOriginPriorityOverride[req.Initiation]; ok { @@ -250,9 +247,9 @@ func RequestPriorityFn(req quotas.Request) int { } } -func operatorRateFn(rateFn quotas.RateFn) quotas.RateFn { +func operatorRateFn(rateFn quotas.RateFn, operatorRPSRatio OperatorRPSRatio) quotas.RateFn { return func() float64 { - return rateFn() * OperatorQPSRatio + return operatorRPSRatio() * rateFn() } } diff --git a/common/persistence/client/quotas_test.go b/common/persistence/client/quotas_test.go index c84ef07856a..c0704facca9 100644 --- a/common/persistence/client/quotas_test.go +++ b/common/persistence/client/quotas_test.go @@ -105,12 +105,13 @@ func (s *quotasSuite) TestCallOriginDefined() { } func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() { - var namespaceMaxRPS = func(namespace string) int { return 1 } - var hostMaxRPS = func() int { return 1 } + namespaceMaxRPS := func(namespace string) int { return 1 } + hostMaxRPS := func() int { return 1 } + operatorRPSRatioFn := func() float64 { return 0.2 } - var limiter = newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn) + limiter := newPriorityNamespaceRateLimiter(namespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn) - var request = quotas.NewRequest( + request := quotas.NewRequest( "test-api", 1, "test-namespace", @@ -132,12 +133,13 @@ func (s *quotasSuite) TestPriorityNamespaceRateLimiter_DoesLimit() { } func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() { - var perShardNamespaceMaxRPS = func(namespace string) int { return 1 } - var hostMaxRPS = func() int { return 1 } + perShardNamespaceMaxRPS := func(namespace string) int { return 1 } + hostMaxRPS := func() int { return 1 } + operatorRPSRatioFn := func() float64 { return 0.2 } - var limiter = newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn) + limiter := newPerShardPerNamespacePriorityRateLimiter(perShardNamespaceMaxRPS, hostMaxRPS, RequestPriorityFn, operatorRPSRatioFn) - var request = quotas.NewRequest( + request := quotas.NewRequest( "test-api", 1, "test-namespace", @@ -160,7 +162,8 @@ func (s *quotasSuite) TestPerShardNamespaceRateLimiter_DoesLimit() { func (s *quotasSuite) TestOperatorPrioritized() { rateFn := func() float64 { return 5 } - limiter := newPriorityRateLimiter(rateFn, RequestPriorityFn) + operatorRPSRatioFn := func() float64 { return 0.2 } + limiter := newPriorityRateLimiter(rateFn, RequestPriorityFn, operatorRPSRatioFn) operatorRequest := quotas.NewRequest( "DescribeWorkflowExecution", diff --git a/common/persistence/tests/visibility_persistence_suite_test.go b/common/persistence/tests/visibility_persistence_suite_test.go index c266d8c6889..46e8305b7b4 100644 --- a/common/persistence/tests/visibility_persistence_suite_test.go +++ b/common/persistence/tests/visibility_persistence_suite_test.go @@ -87,6 +87,7 @@ func (s *VisibilityPersistenceSuite) SetupSuite() { s.SearchAttributesMapperProvider, dynamicconfig.GetIntPropertyFn(1000), dynamicconfig.GetIntPropertyFn(1000), + dynamicconfig.GetFloatPropertyFn(0.2), dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false), diff --git a/common/persistence/visibility/factory.go b/common/persistence/visibility/factory.go index ac6fc0fa16c..c89ae50b421 100644 --- a/common/persistence/visibility/factory.go +++ b/common/persistence/visibility/factory.go @@ -56,6 +56,7 @@ func NewManager( maxReadQPS dynamicconfig.IntPropertyFn, maxWriteQPS dynamicconfig.IntPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, enableReadFromSecondaryVisibility dynamicconfig.BoolPropertyFnWithNamespaceFilter, secondaryVisibilityWritingMode dynamicconfig.StringPropertyFn, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, @@ -76,6 +77,7 @@ func NewManager( searchAttributesMapperProvider, maxReadQPS, maxWriteQPS, + operatorRPSRatio, visibilityDisableOrderByClause, visibilityEnableManualPagination, metricsHandler, @@ -98,6 +100,7 @@ func NewManager( searchAttributesMapperProvider, maxReadQPS, maxWriteQPS, + operatorRPSRatio, visibilityDisableOrderByClause, visibilityEnableManualPagination, metricsHandler, @@ -143,6 +146,7 @@ func newVisibilityManager( visStore store.VisibilityStore, maxReadQPS dynamicconfig.IntPropertyFn, maxWriteQPS dynamicconfig.IntPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, metricsHandler metrics.Handler, tag metrics.Tag, logger log.Logger, @@ -156,7 +160,8 @@ func newVisibilityManager( visManager = NewVisibilityManagerRateLimited( visManager, maxReadQPS, - maxWriteQPS) + maxWriteQPS, + operatorRPSRatio) // wrap with metrics client visManager = NewVisibilityManagerMetrics( visManager, @@ -179,6 +184,7 @@ func newVisibilityManagerFromDataStoreConfig( maxReadQPS dynamicconfig.IntPropertyFn, maxWriteQPS dynamicconfig.IntPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, visibilityDisableOrderByClause dynamicconfig.BoolPropertyFnWithNamespaceFilter, visibilityEnableManualPagination dynamicconfig.BoolPropertyFnWithNamespaceFilter, @@ -207,6 +213,7 @@ func newVisibilityManagerFromDataStoreConfig( visStore, maxReadQPS, maxWriteQPS, + operatorRPSRatio, metricsHandler, metrics.AdvancedVisibilityTypeTag(), logger, diff --git a/common/persistence/visibility/quotas.go b/common/persistence/visibility/quotas.go new file mode 100644 index 00000000000..d661d55d95c --- /dev/null +++ b/common/persistence/visibility/quotas.go @@ -0,0 +1,73 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package visibility + +import ( + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" +) + +const ( + // OperatorPriority is used to give precedence to calls coming from web UI or tctl + OperatorPriority = 0 +) + +var ( + PrioritiesOrdered = []int{OperatorPriority, 1} +) + +func newPriorityRateLimiter( + maxQPS dynamicconfig.IntPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, +) quotas.RequestRateLimiter { + rateLimiters := make(map[int]quotas.RequestRateLimiter) + for priority := range PrioritiesOrdered { + if priority == OperatorPriority { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn(maxQPS))) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(maxQPS, operatorRPSRatio))) + } + } + return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { + if req.CallerType == headers.CallerTypeOperator { + return OperatorPriority + } + // default to lowest priority + return PrioritiesOrdered[len(PrioritiesOrdered)-1] + }, rateLimiters) +} + +func rateFn(maxQPS dynamicconfig.IntPropertyFn) quotas.RateFn { + return func() float64 { + return float64(maxQPS()) + } +} + +func operatorRateFn(maxQPS dynamicconfig.IntPropertyFn, operatorRPSRatio dynamicconfig.FloatPropertyFn) quotas.RateFn { + return func() float64 { + return float64(maxQPS()) * operatorRPSRatio() + } +} diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index 62446259b36..1f82af694d9 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -26,37 +26,39 @@ package visibility import ( "context" + "time" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/headers" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/visibility/manager" "go.temporal.io/server/common/quotas" ) +const ( + RateLimitDefaultToken = 1 + CallerSegmentMissing = -1 +) + var _ manager.VisibilityManager = (*visibilityManagerRateLimited)(nil) type visibilityManagerRateLimited struct { delegate manager.VisibilityManager - readRateLimiter quotas.RateLimiter - writeRateLimiter quotas.RateLimiter + readRateLimiter quotas.RequestRateLimiter + writeRateLimiter quotas.RequestRateLimiter } func NewVisibilityManagerRateLimited( delegate manager.VisibilityManager, readMaxQPS dynamicconfig.IntPropertyFn, writeMaxQPS dynamicconfig.IntPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) *visibilityManagerRateLimited { - readRateLimiter := quotas.NewDefaultOutgoingRateLimiter( - func() float64 { return float64(readMaxQPS()) }, - ) - writeRateLimiter := quotas.NewDefaultOutgoingRateLimiter( - func() float64 { return float64(writeMaxQPS()) }, - ) return &visibilityManagerRateLimited{ delegate: delegate, - readRateLimiter: readRateLimiter, - writeRateLimiter: writeRateLimiter, + readRateLimiter: newPriorityRateLimiter(readMaxQPS, operatorRPSRatio), + writeRateLimiter: newPriorityRateLimiter(writeMaxQPS, operatorRPSRatio), } } @@ -86,7 +88,7 @@ func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, ) error { - if ok := m.writeRateLimiter.Allow(); !ok { + if ok := allow(ctx, "RecordWorkflowExecutionStarted", request.ShardID, m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.RecordWorkflowExecutionStarted(ctx, request) @@ -96,7 +98,7 @@ func (m *visibilityManagerRateLimited) RecordWorkflowExecutionClosed( ctx context.Context, request *manager.RecordWorkflowExecutionClosedRequest, ) error { - if ok := m.writeRateLimiter.Allow(); !ok { + if ok := allow(ctx, "RecordWorkflowExecutionClosed", request.ShardID, m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.RecordWorkflowExecutionClosed(ctx, request) @@ -106,7 +108,7 @@ func (m *visibilityManagerRateLimited) UpsertWorkflowExecution( ctx context.Context, request *manager.UpsertWorkflowExecutionRequest, ) error { - if ok := m.writeRateLimiter.Allow(); !ok { + if ok := allow(ctx, "UpsertWorkflowExecution", request.ShardID, m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.UpsertWorkflowExecution(ctx, request) @@ -116,7 +118,7 @@ func (m *visibilityManagerRateLimited) DeleteWorkflowExecution( ctx context.Context, request *manager.VisibilityDeleteWorkflowExecutionRequest, ) error { - if ok := m.writeRateLimiter.Allow(); !ok { + if ok := allow(ctx, "DeleteWorkflowExecution", CallerSegmentMissing, m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.DeleteWorkflowExecution(ctx, request) @@ -128,7 +130,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutions(ctx, request) @@ -138,7 +140,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutions(ctx, request) @@ -148,7 +150,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutionsByType( ctx context.Context, request *manager.ListWorkflowExecutionsByTypeRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutionsByType", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutionsByType(ctx, request) @@ -158,7 +160,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByType( ctx context.Context, request *manager.ListWorkflowExecutionsByTypeRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByType", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByType(ctx, request) @@ -168,7 +170,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutionsByWorkflowID( ctx context.Context, request *manager.ListWorkflowExecutionsByWorkflowIDRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutionsByWorkflowID", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutionsByWorkflowID(ctx, request) @@ -178,7 +180,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByWorkflowID( ctx context.Context, request *manager.ListWorkflowExecutionsByWorkflowIDRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByWorkflowID", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByWorkflowID(ctx, request) @@ -188,7 +190,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByStatus( ctx context.Context, request *manager.ListClosedWorkflowExecutionsByStatusRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByStatus", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByStatus(ctx, request) @@ -198,7 +200,7 @@ func (m *visibilityManagerRateLimited) ListWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ListWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListWorkflowExecutions(ctx, request) @@ -208,7 +210,7 @@ func (m *visibilityManagerRateLimited) ScanWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "ScanWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ScanWorkflowExecutions(ctx, request) @@ -218,7 +220,7 @@ func (m *visibilityManagerRateLimited) CountWorkflowExecutions( ctx context.Context, request *manager.CountWorkflowExecutionsRequest, ) (*manager.CountWorkflowExecutionsResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "CountWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.CountWorkflowExecutions(ctx, request) @@ -228,8 +230,25 @@ func (m *visibilityManagerRateLimited) GetWorkflowExecution( ctx context.Context, request *manager.GetWorkflowExecutionRequest, ) (*manager.GetWorkflowExecutionResponse, error) { - if ok := m.readRateLimiter.Allow(); !ok { + if ok := allow(ctx, "GetWorkflowExecution", CallerSegmentMissing, m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.GetWorkflowExecution(ctx, request) } + +func allow( + ctx context.Context, + api string, + shardID int32, + rateLimiter quotas.RequestRateLimiter, +) bool { + callerInfo := headers.GetCallerInfo(ctx) + return rateLimiter.Allow(time.Now().UTC(), quotas.NewRequest( + api, + RateLimitDefaultToken, + callerInfo.CallerName, + callerInfo.CallerType, + shardID, + callerInfo.CallOrigin, + )) +} diff --git a/common/persistence/visibility/visibility_manager_test.go b/common/persistence/visibility/visibility_manager_test.go index 0d050d5709b..e1f17427d1c 100644 --- a/common/persistence/visibility/visibility_manager_test.go +++ b/common/persistence/visibility/visibility_manager_test.go @@ -80,6 +80,7 @@ func (s *VisibilityManagerSuite) SetupTest() { s.visibilityStore, dynamicconfig.GetIntPropertyFn(1), dynamicconfig.GetIntPropertyFn(1), + dynamicconfig.GetFloatPropertyFn(0.2), s.metricsHandler, metrics.StandardVisibilityTypeTag(), log.NewNoopLogger()) diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 9f2a725d265..02b273835cd 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -35,10 +35,6 @@ import ( const ( // OperatorPriority is used to give precedence to calls coming from web UI or tctl OperatorPriority = 0 - // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that - // should be used for operator API calls. Operator API calls have a lower rate limit to - // prevent users from abusing this to get high priority for all requests. - OperatorQPSRatio = 0.2 ) var ( @@ -150,7 +146,7 @@ type ( } operatorRateBurstImpl struct { - operatorRateRatio float64 + operatorRateRatio dynamicconfig.FloatPropertyFn baseRateBurstFn quotas.RateBurst } ) @@ -180,15 +176,16 @@ func (c *NamespaceRateBurstImpl) Burst() int { func newOperatorRateBurst( baseRateBurstFn quotas.RateBurst, + operatorRateRatio dynamicconfig.FloatPropertyFn, ) *operatorRateBurstImpl { return &operatorRateBurstImpl{ - operatorRateRatio: OperatorQPSRatio, + operatorRateRatio: operatorRateRatio, baseRateBurstFn: baseRateBurstFn, } } func (c *operatorRateBurstImpl) Rate() float64 { - return c.operatorRateRatio * c.baseRateBurstFn.Rate() + return c.operatorRateRatio() * c.baseRateBurstFn.Rate() } func (c *operatorRateBurstImpl) Burst() int { @@ -200,13 +197,14 @@ func NewRequestToRateLimiter( visibilityRateBurstFn quotas.RateBurst, namespaceReplicationInducingRateBurstFn quotas.RateBurst, otherRateBurstFn quotas.RateBurst, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { mapping := make(map[string]quotas.RequestRateLimiter) - executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn) - visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn) - namespaceReplicationInducingRateLimiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(namespaceReplicationInducingRateBurstFn) - otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn) + executionRateLimiter := NewExecutionPriorityRateLimiter(executionRateBurstFn, operatorRPSRatio) + visibilityRateLimiter := NewVisibilityPriorityRateLimiter(visibilityRateBurstFn, operatorRPSRatio) + namespaceReplicationInducingRateLimiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(namespaceReplicationInducingRateBurstFn, operatorRPSRatio) + otherRateLimiter := NewOtherAPIPriorityRateLimiter(otherRateBurstFn, operatorRPSRatio) for api := range ExecutionAPIToPriority { mapping[api] = executionRateLimiter @@ -226,11 +224,12 @@ func NewRequestToRateLimiter( func NewExecutionPriorityRateLimiter( rateBurstFn quotas.RateBurst, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range ExecutionAPIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } @@ -248,11 +247,12 @@ func NewExecutionPriorityRateLimiter( func NewVisibilityPriorityRateLimiter( rateBurstFn quotas.RateBurst, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range VisibilityAPIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } @@ -270,11 +270,12 @@ func NewVisibilityPriorityRateLimiter( func NewNamespaceReplicationInducingAPIPriorityRateLimiter( rateBurstFn quotas.RateBurst, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range NamespaceReplicationInducingAPIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } @@ -292,11 +293,12 @@ func NewNamespaceReplicationInducingAPIPriorityRateLimiter( func NewOtherAPIPriorityRateLimiter( rateBurstFn quotas.RateBurst, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range OtherAPIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn), time.Minute)) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(newOperatorRateBurst(rateBurstFn, operatorRPSRatio), time.Minute)) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDynamicRateLimiter(rateBurstFn, time.Minute)) } diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index 591442e981f..ff7a747aff3 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -38,7 +38,8 @@ import ( ) var ( - testRateBurstFn = quotas.NewDefaultIncomingRateBurst(func() float64 { return 5 }) + testRateBurstFn = quotas.NewDefaultIncomingRateBurst(func() float64 { return 5 }) + testOperatorRPSRatioFn = func() float64 { return 0.2 } ) type ( @@ -271,22 +272,22 @@ func (s *quotasSuite) TestAllAPIs() { } func (s *quotasSuite) TestOperatorPriority_Execution() { - limiter := NewExecutionPriorityRateLimiter(testRateBurstFn) + limiter := NewExecutionPriorityRateLimiter(testRateBurstFn, testOperatorRPSRatioFn) s.testOperatorPrioritized(limiter, "DescribeWorkflowExecution") } func (s *quotasSuite) TestOperatorPriority_Visibility() { - limiter := NewVisibilityPriorityRateLimiter(testRateBurstFn) + limiter := NewVisibilityPriorityRateLimiter(testRateBurstFn, testOperatorRPSRatioFn) s.testOperatorPrioritized(limiter, "ListOpenWorkflowExecutions") } func (s *quotasSuite) TestOperatorPriority_NamespaceReplicationInducing() { - limiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(testRateBurstFn) + limiter := NewNamespaceReplicationInducingAPIPriorityRateLimiter(testRateBurstFn, testOperatorRPSRatioFn) s.testOperatorPrioritized(limiter, "RegisterNamespace") } func (s *quotasSuite) TestOperatorPriority_Other() { - limiter := NewOtherAPIPriorityRateLimiter(testRateBurstFn) + limiter := NewOtherAPIPriorityRateLimiter(testRateBurstFn, testOperatorRPSRatioFn) s.testOperatorPrioritized(limiter, "DescribeNamespace") } diff --git a/service/frontend/fx.go b/service/frontend/fx.go index e3afdbcc255..a7f4cf8acc0 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -300,6 +300,7 @@ func RateLimitInterceptorProvider( quotas.NewDefaultIncomingRateLimiter(rateFn), quotas.NewDefaultIncomingRateLimiter(namespaceReplicationInducingRateFn), quotas.NewDefaultIncomingRateLimiter(rateFn), + serviceConfig.OperatorRPSRatio, ), map[string]int{}, ) @@ -359,6 +360,7 @@ func NamespaceRateLimitInterceptorProvider( configs.NewNamespaceRateBurst(req.Caller, visibilityRateFn, serviceConfig.MaxNamespaceVisibilityBurstPerInstance), configs.NewNamespaceRateBurst(req.Caller, namespaceReplicationInducingRateFn, serviceConfig.MaxNamespaceNamespaceReplicationInducingAPIsBurstPerInstance), configs.NewNamespaceRateBurst(req.Caller, rateFn, serviceConfig.MaxNamespaceBurstPerInstance), + serviceConfig.OperatorRPSRatio, ) }, ) @@ -408,6 +410,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.OperatorRPSRatio, serviceConfig.PersistenceDynamicRateLimitingParams, ) } @@ -431,6 +434,7 @@ func VisibilityManagerProvider( searchAttributesMapperProvider, serviceConfig.VisibilityPersistenceMaxReadQPS, serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.OperatorRPSRatio, serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // frontend visibility never write serviceConfig.VisibilityDisableOrderByClause, diff --git a/service/frontend/service.go b/service/frontend/service.go index 9605ba5647c..5332970acbb 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -71,6 +71,7 @@ type Config struct { HistoryMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter RPS dynamicconfig.IntPropertyFn GlobalRPS dynamicconfig.IntPropertyFn + OperatorRPSRatio dynamicconfig.FloatPropertyFn NamespaceReplicationInducingAPIsRPS dynamicconfig.IntPropertyFn MaxNamespaceRPSPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter MaxNamespaceBurstPerInstance dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -206,6 +207,7 @@ func NewConfig( HistoryMaxPageSize: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize), RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 2400), GlobalRPS: dc.GetIntProperty(dynamicconfig.FrontendGlobalRPS, 0), + OperatorRPSRatio: dc.GetFloat64Property(dynamicconfig.OperatorRPSRatio, common.DefaultOperatorRPSRatio), NamespaceReplicationInducingAPIsRPS: dc.GetIntProperty(dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS, 20), MaxNamespaceRPSPerInstance: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.FrontendMaxNamespaceRPSPerInstance, 2400), diff --git a/service/fx.go b/service/fx.go index 4ab67d281a1..3618a630dcd 100644 --- a/service/fx.go +++ b/service/fx.go @@ -47,6 +47,7 @@ type ( PersistenceNamespaceMaxQps persistenceClient.PersistenceNamespaceMaxQps PersistencePerShardNamespaceMaxQPS persistenceClient.PersistencePerShardNamespaceMaxQPS EnablePriorityRateLimiting persistenceClient.EnablePriorityRateLimiting + OperatorRPSRatio persistenceClient.OperatorRPSRatio DynamicRateLimitingParams persistenceClient.DynamicRateLimitingParams } ) @@ -57,6 +58,7 @@ func NewPersistenceRateLimitingParams( namespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter, perShardNamespaceMaxQps dynamicconfig.IntPropertyFnWithNamespaceFilter, enablePriorityRateLimiting dynamicconfig.BoolPropertyFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, dynamicRateLimitingParams dynamicconfig.MapPropertyFn, ) PersistenceRateLimitingParams { return PersistenceRateLimitingParams{ @@ -64,6 +66,7 @@ func NewPersistenceRateLimitingParams( PersistenceNamespaceMaxQps: persistenceClient.PersistenceNamespaceMaxQps(namespaceMaxQps), PersistencePerShardNamespaceMaxQPS: persistenceClient.PersistencePerShardNamespaceMaxQPS(perShardNamespaceMaxQps), EnablePriorityRateLimiting: persistenceClient.EnablePriorityRateLimiting(enablePriorityRateLimiting), + OperatorRPSRatio: persistenceClient.OperatorRPSRatio(operatorRPSRatio), DynamicRateLimitingParams: persistenceClient.DynamicRateLimitingParams(dynamicRateLimitingParams), } } diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e9096deb22d..7f09dd19a62 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -42,6 +42,7 @@ type Config struct { EnableReplicationStream dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn + OperatorRPSRatio dynamicconfig.FloatPropertyFn MaxIDLengthLimit dynamicconfig.IntPropertyFn PersistenceMaxQPS dynamicconfig.IntPropertyFn PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn @@ -326,6 +327,7 @@ func NewConfig( EnableReplicationStream: dc.GetBoolProperty(dynamicconfig.EnableReplicationStream, false), RPS: dc.GetIntProperty(dynamicconfig.HistoryRPS, 3000), + OperatorRPSRatio: dc.GetFloat64Property(dynamicconfig.OperatorRPSRatio, common.DefaultOperatorRPSRatio), MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000), PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.HistoryPersistenceMaxQPS, 9000), PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.HistoryPersistenceGlobalMaxQPS, 0), diff --git a/service/history/configs/quotas.go b/service/history/configs/quotas.go index c72541bf23a..eff56821c03 100644 --- a/service/history/configs/quotas.go +++ b/service/history/configs/quotas.go @@ -25,6 +25,7 @@ package configs import ( + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" ) @@ -32,10 +33,6 @@ import ( const ( // OperatorPriority is used to give precedence to calls coming from web UI or tctl OperatorPriority = 0 - // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that - // should be used for operator API calls. Operator API calls have a lower rate limit to - // prevent users from abusing this to get high priority for all requests. - OperatorQPSRatio = 0.2 ) var ( @@ -95,11 +92,12 @@ var ( func NewPriorityRateLimiter( rateFn quotas.RateFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn))) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio))) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) } @@ -117,8 +115,9 @@ func NewPriorityRateLimiter( func operatorRateFn( rateFn quotas.RateFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RateFn { return func() float64 { - return rateFn() * OperatorQPSRatio + return operatorRPSRatio() * rateFn() } } diff --git a/service/history/configs/quotas_test.go b/service/history/configs/quotas_test.go index 7a3378975ae..85595af3812 100644 --- a/service/history/configs/quotas_test.go +++ b/service/history/configs/quotas_test.go @@ -31,11 +31,11 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.temporal.io/server/common/headers" - "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" ) type ( @@ -89,7 +89,8 @@ func (s *quotasSuite) TestAPIs() { func (s *quotasSuite) TestOperatorPrioritized() { rateFn := func() float64 { return 5 } - limiter := NewPriorityRateLimiter(rateFn) + operatorRPSRatioFn := func() float64 { return 0.2 } + limiter := NewPriorityRateLimiter(rateFn, operatorRPSRatioFn) operatorRequest := quotas.NewRequest( "StartWorkflowExecution", diff --git a/service/history/fx.go b/service/history/fx.go index 2fe676359ab..352db0ce8dc 100644 --- a/service/history/fx.go +++ b/service/history/fx.go @@ -198,7 +198,7 @@ func RateLimitInterceptorProvider( serviceConfig *configs.Config, ) *interceptor.RateLimitInterceptor { return interceptor.NewRateLimitInterceptor( - configs.NewPriorityRateLimiter(func() float64 { return float64(serviceConfig.RPS()) }), + configs.NewPriorityRateLimiter(func() float64 { return float64(serviceConfig.RPS()) }, serviceConfig.OperatorRPSRatio), map[string]int{}, ) } @@ -225,6 +225,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.OperatorRPSRatio, serviceConfig.PersistenceDynamicRateLimitingParams, ) } @@ -249,6 +250,7 @@ func VisibilityManagerProvider( searchAttributesMapperProvider, serviceConfig.VisibilityPersistenceMaxReadQPS, serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.OperatorRPSRatio, serviceConfig.EnableReadFromSecondaryVisibility, serviceConfig.SecondaryVisibilityWritingMode, serviceConfig.VisibilityDisableOrderByClause, diff --git a/service/matching/config.go b/service/matching/config.go index d7877391d03..bc72cb39357 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -27,6 +27,7 @@ package matching import ( "time" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/namespace" @@ -47,6 +48,7 @@ type ( SyncMatchWaitDuration dynamicconfig.DurationPropertyFnWithTaskQueueInfoFilters TestDisableSyncMatch dynamicconfig.BoolPropertyFn RPS dynamicconfig.IntPropertyFn + OperatorRPSRatio dynamicconfig.FloatPropertyFn ShutdownDrainDuration dynamicconfig.DurationPropertyFn // taskQueueManager configuration @@ -163,6 +165,7 @@ func NewConfig( TestDisableSyncMatch: dc.GetBoolProperty(dynamicconfig.TestMatchingDisableSyncMatch, false), LoadUserData: dc.GetBoolPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingLoadUserData, true), RPS: dc.GetIntProperty(dynamicconfig.MatchingRPS, 1200), + OperatorRPSRatio: dc.GetFloat64Property(dynamicconfig.OperatorRPSRatio, common.DefaultOperatorRPSRatio), RangeSize: 100000, GetTasksBatchSize: dc.GetIntPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingGetTasksBatchSize, 1000), UpdateAckInterval: dc.GetDurationPropertyFilteredByTaskQueueInfo(dynamicconfig.MatchingUpdateAckInterval, defaultUpdateAckInterval), diff --git a/service/matching/configs/quotas.go b/service/matching/configs/quotas.go index 7413c9bd66b..2146e9d8913 100644 --- a/service/matching/configs/quotas.go +++ b/service/matching/configs/quotas.go @@ -25,6 +25,7 @@ package configs import ( + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/quotas" ) @@ -32,10 +33,6 @@ import ( const ( // OperatorPriority is used to give precedence to calls coming from web UI or tctl OperatorPriority = 0 - // OperatorQPSRatio is the percentage of the rate provided to priority rate limiters that - // should be used for operator API calls. Operator API calls have a lower rate limit to - // prevent users from abusing this to get high priority for all requests. - OperatorQPSRatio = 0.2 ) var ( @@ -64,11 +61,12 @@ var ( func NewPriorityRateLimiter( rateFn quotas.RateFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RequestRateLimiter { rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range APIPrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn))) + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(operatorRateFn(rateFn, operatorRPSRatio))) } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultIncomingRateLimiter(rateFn)) } @@ -86,8 +84,9 @@ func NewPriorityRateLimiter( func operatorRateFn( rateFn quotas.RateFn, + operatorRPSRatio dynamicconfig.FloatPropertyFn, ) quotas.RateFn { return func() float64 { - return rateFn() * OperatorQPSRatio + return operatorRPSRatio() * rateFn() } } diff --git a/service/matching/configs/quotas_test.go b/service/matching/configs/quotas_test.go index 79965117ce0..7973cc1e2b3 100644 --- a/service/matching/configs/quotas_test.go +++ b/service/matching/configs/quotas_test.go @@ -31,11 +31,11 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.temporal.io/server/common/headers" - "go.temporal.io/server/common/quotas" "golang.org/x/exp/slices" "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common/headers" + "go.temporal.io/server/common/quotas" ) type ( @@ -89,7 +89,8 @@ func (s *quotasSuite) TestAPIs() { func (s *quotasSuite) TestOperatorPrioritized() { rateFn := func() float64 { return 5 } - limiter := NewPriorityRateLimiter(rateFn) + operatorRPSRatioFn := func() float64 { return 0.2 } + limiter := NewPriorityRateLimiter(rateFn, operatorRPSRatioFn) operatorRequest := quotas.NewRequest( "QueryWorkflow", diff --git a/service/matching/fx.go b/service/matching/fx.go index b992bd1af4a..19357fbe095 100644 --- a/service/matching/fx.go +++ b/service/matching/fx.go @@ -105,7 +105,7 @@ func RateLimitInterceptorProvider( serviceConfig *Config, ) *interceptor.RateLimitInterceptor { return interceptor.NewRateLimitInterceptor( - configs.NewPriorityRateLimiter(func() float64 { return float64(serviceConfig.RPS()) }), + configs.NewPriorityRateLimiter(func() float64 { return float64(serviceConfig.RPS()) }, serviceConfig.OperatorRPSRatio), map[string]int{}, ) } @@ -121,6 +121,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.OperatorRPSRatio, serviceConfig.PersistenceDynamicRateLimitingParams, ) } @@ -163,6 +164,7 @@ func VisibilityManagerProvider( searchAttributesMapperProvider, serviceConfig.VisibilityPersistenceMaxReadQPS, serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.OperatorRPSRatio, serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // matching visibility never writes serviceConfig.VisibilityDisableOrderByClause, diff --git a/service/worker/fx.go b/service/worker/fx.go index 89ab84d56c2..119db927e92 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -76,6 +76,7 @@ func PersistenceRateLimitingParamsProvider( serviceConfig.PersistenceNamespaceMaxQPS, serviceConfig.PersistencePerShardNamespaceMaxQPS, serviceConfig.EnablePersistencePriorityRateLimiting, + serviceConfig.OperatorRPSRatio, serviceConfig.PersistenceDynamicRateLimitingParams, ) } @@ -111,6 +112,7 @@ func VisibilityManagerProvider( searchAttributesMapperProvider, serviceConfig.VisibilityPersistenceMaxReadQPS, serviceConfig.VisibilityPersistenceMaxWriteQPS, + serviceConfig.OperatorRPSRatio, serviceConfig.EnableReadFromSecondaryVisibility, dynamicconfig.GetStringPropertyFn(visibility.SecondaryVisibilityWritingModeOff), // worker visibility never write serviceConfig.VisibilityDisableOrderByClause, diff --git a/service/worker/service.go b/service/worker/service.go index 3d48b983534..3c50cffa56b 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -30,6 +30,7 @@ import ( "time" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/api/matchingservice/v1" @@ -109,6 +110,7 @@ type ( PersistencePerShardNamespaceMaxQPS dynamicconfig.IntPropertyFnWithNamespaceFilter EnablePersistencePriorityRateLimiting dynamicconfig.BoolPropertyFn PersistenceDynamicRateLimitingParams dynamicconfig.MapPropertyFn + OperatorRPSRatio dynamicconfig.FloatPropertyFn EnableBatcher dynamicconfig.BoolPropertyFn BatcherRPS dynamicconfig.IntPropertyFnWithNamespaceFilter BatcherConcurrency dynamicconfig.IntPropertyFnWithNamespaceFilter @@ -362,6 +364,7 @@ func NewConfig( true, ), PersistenceDynamicRateLimitingParams: dc.GetMapProperty(dynamicconfig.WorkerPersistenceDynamicRateLimitingParams, dynamicconfig.DefaultDynamicRateLimitingParams), + OperatorRPSRatio: dc.GetFloat64Property(dynamicconfig.OperatorRPSRatio, common.DefaultOperatorRPSRatio), VisibilityPersistenceMaxReadQPS: visibility.GetVisibilityPersistenceMaxReadQPS(dc, enableReadFromES), VisibilityPersistenceMaxWriteQPS: visibility.GetVisibilityPersistenceMaxWriteQPS(dc, enableReadFromES), From 29cd4ab476f35054873387805f2c418ce13afe0c Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Fri, 14 Jul 2023 14:54:32 -0700 Subject: [PATCH 5/6] test --- service/frontend/fx_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/service/frontend/fx_test.go b/service/frontend/fx_test.go index 25dd32c1ce7..a941f718ecd 100644 --- a/service/frontend/fx_test.go +++ b/service/frontend/fx_test.go @@ -49,6 +49,8 @@ type testCase struct { globalRPSLimit int // perInstanceRPSLimit is the RPS limit for each frontend host perInstanceRPSLimit int + // operatorRPSRatio is the ratio of the global RPS limit that is reserved for operator requests + operatorRPSRatio float64 // expectRateLimit is true if the interceptor should return a rate limit error expectRateLimit bool // numRequests is the number of requests to send to the interceptor @@ -74,6 +76,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { numHosts := 10 lowGlobalRPSLimit := lowPerInstanceRPSLimit * numHosts highGlobalRPSLimit := highPerInstanceRPSLimit * numHosts + operatorRPSRatio := 0.2 testCases := []testCase{ { @@ -81,6 +84,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = lowGlobalRPSLimit tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = true }, }, @@ -89,6 +93,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = lowGlobalRPSLimit tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = true }, }, @@ -97,6 +102,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = highGlobalRPSLimit tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = false }, }, @@ -105,6 +111,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = highGlobalRPSLimit tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = false }, }, @@ -113,6 +120,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = 0 tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = false }, }, @@ -121,6 +129,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = 0 tc.perInstanceRPSLimit = lowPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = true }, }, @@ -129,6 +138,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = 0 tc.perInstanceRPSLimit = 0 + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = true }, }, @@ -137,6 +147,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = lowPerInstanceRPSLimit tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = false tc.serviceResolver = nil }, @@ -146,6 +157,7 @@ func TestRateLimitInterceptorProvider(t *testing.T) { configure: func(tc *testCase) { tc.globalRPSLimit = lowPerInstanceRPSLimit tc.perInstanceRPSLimit = highPerInstanceRPSLimit + tc.operatorRPSRatio = operatorRPSRatio tc.expectRateLimit = true serviceResolver := membership.NewMockServiceResolver(gomock.NewController(tc.t)) serviceResolver.EXPECT().MemberCount().Return(0).AnyTimes() @@ -184,6 +196,9 @@ func TestRateLimitInterceptorProvider(t *testing.T) { // this is not used in this test return 0 }, + OperatorRPSRatio: func() float64 { + return tc.operatorRPSRatio + }, }, tc.serviceResolver) // Create a gRPC server for the fake workflow service. From af0eed7880e095349bc6ddf02c6854c27c7b0019 Mon Sep 17 00:00:00 2001 From: PJ Doerner Date: Mon, 17 Jul 2023 15:58:14 -0700 Subject: [PATCH 6/6] comments --- common/persistence/visibility/quotas.go | 4 +-- .../visibility_manager_rate_limited.go | 35 +++++++++---------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/common/persistence/visibility/quotas.go b/common/persistence/visibility/quotas.go index d661d55d95c..f0c1446bdbc 100644 --- a/common/persistence/visibility/quotas.go +++ b/common/persistence/visibility/quotas.go @@ -46,9 +46,9 @@ func newPriorityRateLimiter( rateLimiters := make(map[int]quotas.RequestRateLimiter) for priority := range PrioritiesOrdered { if priority == OperatorPriority { - rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn(maxQPS))) - } else { rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(operatorRateFn(maxQPS, operatorRPSRatio))) + } else { + rateLimiters[priority] = quotas.NewRequestRateLimiterAdapter(quotas.NewDefaultOutgoingRateLimiter(rateFn(maxQPS))) } } return quotas.NewPriorityRateLimiter(func(req quotas.Request) int { diff --git a/common/persistence/visibility/visibility_manager_rate_limited.go b/common/persistence/visibility/visibility_manager_rate_limited.go index 1f82af694d9..5745c88d14f 100644 --- a/common/persistence/visibility/visibility_manager_rate_limited.go +++ b/common/persistence/visibility/visibility_manager_rate_limited.go @@ -38,7 +38,6 @@ import ( const ( RateLimitDefaultToken = 1 - CallerSegmentMissing = -1 ) var _ manager.VisibilityManager = (*visibilityManagerRateLimited)(nil) @@ -88,7 +87,7 @@ func (m *visibilityManagerRateLimited) RecordWorkflowExecutionStarted( ctx context.Context, request *manager.RecordWorkflowExecutionStartedRequest, ) error { - if ok := allow(ctx, "RecordWorkflowExecutionStarted", request.ShardID, m.writeRateLimiter); !ok { + if ok := allow(ctx, "RecordWorkflowExecutionStarted", m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.RecordWorkflowExecutionStarted(ctx, request) @@ -98,7 +97,7 @@ func (m *visibilityManagerRateLimited) RecordWorkflowExecutionClosed( ctx context.Context, request *manager.RecordWorkflowExecutionClosedRequest, ) error { - if ok := allow(ctx, "RecordWorkflowExecutionClosed", request.ShardID, m.writeRateLimiter); !ok { + if ok := allow(ctx, "RecordWorkflowExecutionClosed", m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.RecordWorkflowExecutionClosed(ctx, request) @@ -108,7 +107,7 @@ func (m *visibilityManagerRateLimited) UpsertWorkflowExecution( ctx context.Context, request *manager.UpsertWorkflowExecutionRequest, ) error { - if ok := allow(ctx, "UpsertWorkflowExecution", request.ShardID, m.writeRateLimiter); !ok { + if ok := allow(ctx, "UpsertWorkflowExecution", m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.UpsertWorkflowExecution(ctx, request) @@ -118,7 +117,7 @@ func (m *visibilityManagerRateLimited) DeleteWorkflowExecution( ctx context.Context, request *manager.VisibilityDeleteWorkflowExecutionRequest, ) error { - if ok := allow(ctx, "DeleteWorkflowExecution", CallerSegmentMissing, m.writeRateLimiter); !ok { + if ok := allow(ctx, "DeleteWorkflowExecution", m.writeRateLimiter); !ok { return persistence.ErrPersistenceLimitExceeded } return m.delegate.DeleteWorkflowExecution(ctx, request) @@ -130,7 +129,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListOpenWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutions", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutions(ctx, request) @@ -140,7 +139,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListClosedWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutions", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutions(ctx, request) @@ -150,7 +149,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutionsByType( ctx context.Context, request *manager.ListWorkflowExecutionsByTypeRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListOpenWorkflowExecutionsByType", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutionsByType", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutionsByType(ctx, request) @@ -160,7 +159,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByType( ctx context.Context, request *manager.ListWorkflowExecutionsByTypeRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListClosedWorkflowExecutionsByType", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByType", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByType(ctx, request) @@ -170,7 +169,7 @@ func (m *visibilityManagerRateLimited) ListOpenWorkflowExecutionsByWorkflowID( ctx context.Context, request *manager.ListWorkflowExecutionsByWorkflowIDRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListOpenWorkflowExecutionsByWorkflowID", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListOpenWorkflowExecutionsByWorkflowID", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListOpenWorkflowExecutionsByWorkflowID(ctx, request) @@ -180,7 +179,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByWorkflowID( ctx context.Context, request *manager.ListWorkflowExecutionsByWorkflowIDRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListClosedWorkflowExecutionsByWorkflowID", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByWorkflowID", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByWorkflowID(ctx, request) @@ -190,7 +189,7 @@ func (m *visibilityManagerRateLimited) ListClosedWorkflowExecutionsByStatus( ctx context.Context, request *manager.ListClosedWorkflowExecutionsByStatusRequest, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListClosedWorkflowExecutionsByStatus", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListClosedWorkflowExecutionsByStatus", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListClosedWorkflowExecutionsByStatus(ctx, request) @@ -200,7 +199,7 @@ func (m *visibilityManagerRateLimited) ListWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ListWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ListWorkflowExecutions", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ListWorkflowExecutions(ctx, request) @@ -210,7 +209,7 @@ func (m *visibilityManagerRateLimited) ScanWorkflowExecutions( ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2, ) (*manager.ListWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "ScanWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "ScanWorkflowExecutions", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.ScanWorkflowExecutions(ctx, request) @@ -220,7 +219,7 @@ func (m *visibilityManagerRateLimited) CountWorkflowExecutions( ctx context.Context, request *manager.CountWorkflowExecutionsRequest, ) (*manager.CountWorkflowExecutionsResponse, error) { - if ok := allow(ctx, "CountWorkflowExecutions", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "CountWorkflowExecutions", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.CountWorkflowExecutions(ctx, request) @@ -230,7 +229,7 @@ func (m *visibilityManagerRateLimited) GetWorkflowExecution( ctx context.Context, request *manager.GetWorkflowExecutionRequest, ) (*manager.GetWorkflowExecutionResponse, error) { - if ok := allow(ctx, "GetWorkflowExecution", CallerSegmentMissing, m.readRateLimiter); !ok { + if ok := allow(ctx, "GetWorkflowExecution", m.readRateLimiter); !ok { return nil, persistence.ErrPersistenceLimitExceeded } return m.delegate.GetWorkflowExecution(ctx, request) @@ -239,16 +238,16 @@ func (m *visibilityManagerRateLimited) GetWorkflowExecution( func allow( ctx context.Context, api string, - shardID int32, rateLimiter quotas.RequestRateLimiter, ) bool { callerInfo := headers.GetCallerInfo(ctx) + // Currently only CallerType is used. See common/persistence/visibility/quotas.go for rate limiter details. return rateLimiter.Allow(time.Now().UTC(), quotas.NewRequest( api, RateLimitDefaultToken, callerInfo.CallerName, callerInfo.CallerType, - shardID, + -1, callerInfo.CallOrigin, )) }