Skip to content

Commit

Permalink
Set rate limit on Async APIs (#5659)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Feb 14, 2024
1 parent 7703628 commit a78ee69
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 17 deletions.
35 changes: 35 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ const (
// Default value: UnlimitedRPS
// Allowed filters: N/A
FrontendVisibilityRPS
// FrontendAsync is the async workflow request rate limit per second
// KeyName: frontend.asyncrps
// Value type: Int
// Default value: 10000
// Allowed filters: N/A
FrontendAsyncRPS
// FrontendMaxDomainUserRPSPerInstance is workflow domain rate limit per second
// KeyName: frontend.domainrps
// Value type: Int
Expand All @@ -544,6 +550,12 @@ const (
// Default value: UnlimitedRPS
// Allowed filters: DomainName
FrontendMaxDomainVisibilityRPSPerInstance
// FrontendMaxDomainAsyncRPSPerInstance is the per-instance async workflow request rate limit per second
// KeyName: frontend.domainasyncrps
// Value type: Int
// Default value: 10000
// Allowed filters: DomainName
FrontendMaxDomainAsyncRPSPerInstance
// FrontendGlobalDomainUserRPS is workflow domain rate limit per second for the whole Cadence cluster
// KeyName: frontend.globalDomainrps
// Value type: Int
Expand All @@ -562,6 +574,12 @@ const (
// Default value: UnlimitedRPS
// Allowed filters: DomainName
FrontendGlobalDomainVisibilityRPS
// FrontendGlobalDomainAsyncRPS is the per-domain async workflow request rate limit per second
// KeyName: frontend.globalDomainAsyncrps
// Value type: Int
// Default value: 100000
// Allowed filters: DomainName
FrontendGlobalDomainAsyncRPS
// FrontendDecisionResultCountLimit is max number of decisions per RespondDecisionTaskCompleted request
// KeyName: frontend.decisionResultCountLimit
// Value type: Int
Expand Down Expand Up @@ -2952,6 +2970,11 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "FrontendVisibilityRPS is the global workflow List*WorkflowExecutions request rate limit per second",
DefaultValue: UnlimitedRPS,
},
FrontendAsyncRPS: DynamicInt{
KeyName: "frontend.asyncrps",
Description: "FrontendAsyncRPS is the async workflow request rate limit per second",
DefaultValue: 10000,
},
FrontendMaxDomainUserRPSPerInstance: DynamicInt{
KeyName: "frontend.domainrps",
Filters: []Filter{DomainName},
Expand All @@ -2970,6 +2993,12 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "FrontendMaxDomainVisibilityRPSPerInstance is the per-instance List*WorkflowExecutions request rate limit per second",
DefaultValue: UnlimitedRPS,
},
FrontendMaxDomainAsyncRPSPerInstance: DynamicInt{
KeyName: "frontend.domainasyncrps",
Filters: []Filter{DomainName},
Description: "FrontendMaxDomainAsyncRPSPerInstance is the per-instance async workflow request rate limit per second",
DefaultValue: 10000,
},
FrontendGlobalDomainUserRPS: DynamicInt{
KeyName: "frontend.globalDomainrps",
Filters: []Filter{DomainName},
Expand All @@ -2988,6 +3017,12 @@ var IntKeys = map[IntKey]DynamicInt{
Description: "FrontendGlobalDomainVisibilityRPS is the per-domain List*WorkflowExecutions request rate limit per second",
DefaultValue: UnlimitedRPS,
},
FrontendGlobalDomainAsyncRPS: DynamicInt{
KeyName: "frontend.globalDomainAsyncrps",
Filters: []Filter{DomainName},
Description: "FrontendGlobalDomainAsyncRPS is the per-domain async workflow request rate limit per second",
DefaultValue: 100000,
},
FrontendDecisionResultCountLimit: DynamicInt{
KeyName: "frontend.decisionResultCountLimit",
Filters: []Filter{DomainName},
Expand Down
20 changes: 6 additions & 14 deletions service/frontend/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
package config

import (
"os"
"strings"

"github.com/uber/cadence/common/domain"
"github.com/uber/cadence/common/dynamicconfig"
)
Expand All @@ -50,12 +47,15 @@ type Config struct {
UserRPS dynamicconfig.IntPropertyFn
WorkerRPS dynamicconfig.IntPropertyFn
VisibilityRPS dynamicconfig.IntPropertyFn
AsyncRPS dynamicconfig.IntPropertyFn
MaxDomainUserRPSPerInstance dynamicconfig.IntPropertyFnWithDomainFilter
MaxDomainWorkerRPSPerInstance dynamicconfig.IntPropertyFnWithDomainFilter
MaxDomainVisibilityRPSPerInstance dynamicconfig.IntPropertyFnWithDomainFilter
MaxDomainAsyncRPSPerInstance dynamicconfig.IntPropertyFnWithDomainFilter
GlobalDomainUserRPS dynamicconfig.IntPropertyFnWithDomainFilter
GlobalDomainWorkerRPS dynamicconfig.IntPropertyFnWithDomainFilter
GlobalDomainVisibilityRPS dynamicconfig.IntPropertyFnWithDomainFilter
GlobalDomainAsyncRPS dynamicconfig.IntPropertyFnWithDomainFilter
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
EnableQueryAttributeValidation dynamicconfig.BoolPropertyFn
DisallowQuery dynamicconfig.BoolPropertyFnWithDomainFilter
Expand Down Expand Up @@ -138,12 +138,15 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVis
UserRPS: dc.GetIntProperty(dynamicconfig.FrontendUserRPS),
WorkerRPS: dc.GetIntProperty(dynamicconfig.FrontendWorkerRPS),
VisibilityRPS: dc.GetIntProperty(dynamicconfig.FrontendVisibilityRPS),
AsyncRPS: dc.GetIntProperty(dynamicconfig.FrontendAsyncRPS),
MaxDomainUserRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainUserRPSPerInstance),
MaxDomainWorkerRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainWorkerRPSPerInstance),
MaxDomainVisibilityRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainVisibilityRPSPerInstance),
MaxDomainAsyncRPSPerInstance: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxDomainAsyncRPSPerInstance),
GlobalDomainUserRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainUserRPS),
GlobalDomainWorkerRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainWorkerRPS),
GlobalDomainVisibilityRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainVisibilityRPS),
GlobalDomainAsyncRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendGlobalDomainAsyncRPS),
MaxIDLengthWarnLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthWarnLimit),
DomainNameMaxLength: dc.GetIntPropertyFilteredByDomain(dynamicconfig.DomainNameMaxLength),
IdentityMaxLength: dc.GetIntPropertyFilteredByDomain(dynamicconfig.IdentityMaxLength),
Expand Down Expand Up @@ -187,14 +190,3 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVis
HostName: hostName,
}
}

// TODO remove this and return 10 always, after cadence-web improve the List requests with backoff retry
// https://github.com/uber/cadence-web/issues/337
func defaultVisibilityListMaxQPS() int {
cmd := strings.Join(os.Args, " ")
// NOTE: this is safe because only dev box should start cadence in a single box with 4 services, and only docker should use `--env docker`
if strings.Contains(cmd, "--root /etc/cadence --env docker start --services=history,matching,frontend,worker") {
return 10000
}
return 10
}
11 changes: 10 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,18 @@ func (s *Service) Start() {
s.GetMembershipResolver(),
)),
)
asyncRateLimiter := quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(s.config.AsyncRPS.AsFloat64()),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
s.config.GlobalDomainAsyncRPS,
s.config.MaxDomainAsyncRPSPerInstance,
s.GetMembershipResolver(),
)),
)
// Additional decorations
var handler api.Handler = s.handler
handler = ratelimited.NewAPIHandler(handler, s.GetDomainCache(), userRateLimiter, workerRateLimiter, visibilityRateLimiter)
handler = ratelimited.NewAPIHandler(handler, s.GetDomainCache(), userRateLimiter, workerRateLimiter, visibilityRateLimiter, asyncRateLimiter)
handler = metered.NewAPIHandler(handler, s.GetLogger(), s.GetMetricsClient(), s.GetDomainCache(), s.config)
if s.params.ClusterRedirectionPolicy != nil {
handler = clusterredirection.NewAPIHandler(handler, s, s.config, *s.params.ClusterRedirectionPolicy)
Expand Down
19 changes: 17 additions & 2 deletions service/frontend/templates/ratelimited.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ import (
{{$ratelimitTypeMap = set $ratelimitTypeMap "ListWorkflowExecutions" "ratelimitTypeVisibility"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "ScanWorkflowExecutions" "ratelimitTypeVisibility"}}

{{$ratelimitTypeMap = set $ratelimitTypeMap "StartWorkflowExecutionAsync" "ratelimitTypeAsync"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "SignalWithStartWorkflowExecutionAsync" "ratelimitTypeAsync"}}

{{$ratelimitTypeMap = set $ratelimitTypeMap "Health" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "DeprecateDomain" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "DescribeDomain" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "GetClusterInfo" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "GetSearchAttributes" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "ListDomains" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "RegisterDomain" "ratelimitTypeNoop"}}
{{$ratelimitTypeMap = set $ratelimitTypeMap "UpdateDomain" "ratelimitTypeNoop"}}

{{$domainIDAPIs := list "RecordActivityTaskHeartbeat" "RespondActivityTaskCanceled" "RespondActivityTaskCompleted" "RespondActivityTaskFailed" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted"}}
{{$queryTaskTokenAPIs := list "RespondQueryTaskCompleted"}}
{{$nonBlockingAPIs := list "RecordActivityTaskHeartbeat" "RecordActivityTaskHeartbeatByID" "RespondActivityTaskCompleted" "RespondActivityTaskCompletedByID" "RespondActivityTaskFailed" "RespondActivityTaskFailedByID" "RespondActivityTaskCanceled" "RespondActivityTaskCanceledByID" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted" "ResetStickyTaskList"}}
Expand All @@ -62,6 +74,7 @@ type {{$decorator}} struct {
userRateLimiter quotas.Policy
workerRateLimiter quotas.Policy
visibilityRateLimiter quotas.Policy
asyncRateLimiter quotas.Policy
}

// New{{$Decorator}} creates a new instance of {{$interfaceName}} with ratelimiter.
Expand All @@ -71,6 +84,7 @@ func New{{$Decorator}}(
userRateLimiter quotas.Policy,
workerRateLimiter quotas.Policy,
visibilityRateLimiter quotas.Policy,
asyncRateLimiter quotas.Policy,
) {{.Interface.Type}} {
return &{{$decorator}}{
wrapped: wrapped,
Expand All @@ -79,17 +93,18 @@ func New{{$Decorator}}(
userRateLimiter: userRateLimiter,
workerRateLimiter: workerRateLimiter,
visibilityRateLimiter: visibilityRateLimiter,
asyncRateLimiter: asyncRateLimiter,
}
}

{{range $method := .Interface.Methods}}
func (h *{{$decorator}}) {{$method.Declaration}} {
{{- if hasKey $ratelimitTypeMap $method.Name}}
{{- $ratelimitType := get $ratelimitTypeMap $method.Name}}
{{- if not (eq $ratelimitType "ratelimitTypeNoop")}}
if {{(index $method.Params 1).Name}} == nil {
err = validate.ErrRequestNotSet
return
}
{{- $ratelimitType := get $ratelimitTypeMap $method.Name}}
{{- $domain := printf "%s.GetDomain()" (index $method.Params 1).Name}}
{{- if has $method.Name $domainIDAPIs}}
{{- $domain = "domainName"}}
Expand Down
27 changes: 27 additions & 0 deletions service/frontend/wrappers/ratelimited/api_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions service/frontend/wrappers/ratelimited/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ratelimitTypeUser ratelimitType = iota + 1
ratelimitTypeWorker
ratelimitTypeVisibility
ratelimitTypeAsync
)

func (h *apiHandler) allowDomain(requestType ratelimitType, domain string) bool {
Expand All @@ -43,6 +44,8 @@ func (h *apiHandler) allowDomain(requestType ratelimitType, domain string) bool
return h.workerRateLimiter.Allow(quotas.Info{Domain: domain})
case ratelimitTypeVisibility:
return h.visibilityRateLimiter.Allow(quotas.Info{Domain: domain})
case ratelimitTypeAsync:
return h.asyncRateLimiter.Allow(quotas.Info{Domain: domain})
default:
panic("coding error, unrecognized request ratelimit type value")
}
Expand Down

0 comments on commit a78ee69

Please sign in to comment.