Skip to content

Commit

Permalink
Implemented ratelimiting for external calls pr wfid (guarded by featu…
Browse files Browse the repository at this point in the history
…re flag) (cadence-workflow#5704)

What changed?
Implemented the actual rate limiting on the pr workflowID project

Why?
We can now roll it out and test it without needed to wait for rollouts

How did you test it?
Tested locally and with unit tests

Potential risks
Should be very low as it is guarded by a feature flag

Release notes

Documentation Changes
  • Loading branch information
jakobht authored and ketsiambaku committed Mar 6, 2024
1 parent 60784a1 commit edc381e
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 40 deletions.
8 changes: 7 additions & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,13 @@ func (s *Service) Start() {
})

rawHandler := handler.NewHandler(s.Resource, s.config, wfIDCache)
s.handler = ratelimited.NewHistoryHandler(rawHandler, wfIDCache)
s.handler = ratelimited.NewHistoryHandler(
rawHandler,
wfIDCache,
s.config.WorkflowIDExternalRateLimitEnabled,
s.Resource.GetDomainCache(),
s.Resource.GetLogger(),
)

thriftHandler := thrift.NewThriftHandler(s.handler)
thriftHandler.Register(s.GetDispatcher())
Expand Down
26 changes: 22 additions & 4 deletions service/history/templates/ratelimited.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history"
"github.com/uber/cadence/common/log"
)

{{ $ratelimitTypeMap := dict "StartWorkflowExecution" (
Expand Down Expand Up @@ -39,19 +40,32 @@ import (

// {{$decorator}} implements {{.Interface.Type}} interface instrumented with rate limiter.
type {{$decorator}} struct {
wrapped {{.Interface.Type}}
workflowIDCache workflowcache.WFCache
wrapped {{.Interface.Type}}
workflowIDCache workflowcache.WFCache
ratelimitExternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter
domainCache cache.DomainCache
logger log.Logger
allowFunc func (domainID string, workflowID string) bool
}

// New{{$Decorator}} creates a new instance of {{$interfaceName}} with ratelimiter.
func New{{$Decorator}}(
wrapped {{.Interface.Type}},
workflowIDCache workflowcache.WFCache,
ratelimitExternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
domainCache cache.DomainCache,
logger log.Logger,
) {{.Interface.Type}} {
return &{{$decorator}}{
wrapper := &{{$decorator}}{
wrapped: wrapped,
workflowIDCache: workflowIDCache,
ratelimitExternalPerWorkflowID: ratelimitExternalPerWorkflowID,
domainCache: domainCache,
logger: logger,
}
wrapper.allowFunc = wrapper.allowWfID

return wrapper
}

{{range $method := .Interface.Methods}}
Expand Down Expand Up @@ -82,7 +96,11 @@ func (h *{{$decorator}}) {{$method.Declaration}} {
return
}

h.workflowIDCache.AllowExternal({{$domainID}}, {{$workflowID}})
if !h.allowFunc({{$domainID}}, {{$workflowID}}) {
err = &types.ServiceBusyError{"Too many requests for the workflow ID"}
return
}

{{- end}}
{{- end}}
{{$method.Pass "h.wrapped."}}
Expand Down
46 changes: 37 additions & 9 deletions service/history/wrappers/ratelimited/handler_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 39 additions & 26 deletions service/history/wrappers/ratelimited/handler_generated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,64 +24,75 @@ package ratelimited

import (
"context"
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/handler"
"github.com/uber/cadence/service/history/workflowcache"
)

const (
testDomainID = "test-domain-id"
testWorkflowID = "test-workflow-id"
testDomainName = "test-domain-name"
)

func TestRatelimitedEndpoints_Table(t *testing.T) {
controller := gomock.NewController(t)

workflowIDCache := workflowcache.NewMockWFCache(controller)
handlerMock := handler.NewMockHandler(controller)

wrapper := NewHistoryHandler(handlerMock, workflowIDCache)

tests := []struct {
var rateLimitingEnabled bool

wrapper := NewHistoryHandler(
handlerMock,
nil,
func(domainName string) bool { return rateLimitingEnabled },
nil,
log.NewNoop(),
)

// We define the calls that should be ratelimited
limitedCalls := []struct {
name string
call func() (interface{}, error)
mock func()
// Defines how to call the wrapper function (correct request type, and call)
callWrapper func() (interface{}, error)
// Defines the expected call to the wrapped handler (what to call if the call is not ratelimited)
expectCallToEndpoint func()
}{
{
name: "StartWorkflowExecution",
call: func() (interface{}, error) {
callWrapper: func() (interface{}, error) {
startRequest := &types.HistoryStartWorkflowExecutionRequest{
DomainUUID: testDomainID,
StartRequest: &types.StartWorkflowExecutionRequest{WorkflowID: testWorkflowID},
}
return wrapper.StartWorkflowExecution(context.Background(), startRequest)
},
mock: func() {
expectCallToEndpoint: func() {
handlerMock.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
},
{
name: "SignalWithStartWorkflowExecution",
call: func() (interface{}, error) {
callWrapper: func() (interface{}, error) {
signalWithStartRequest := &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: testDomainID,
SignalWithStartRequest: &types.SignalWithStartWorkflowExecutionRequest{WorkflowID: testWorkflowID},
}

return wrapper.SignalWithStartWorkflowExecution(context.Background(), signalWithStartRequest)
},
mock: func() {
expectCallToEndpoint: func() {
handlerMock.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
},
{
name: "SignalWorkflowExecution",
call: func() (interface{}, error) {
callWrapper: func() (interface{}, error) {
signalRequest := &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: testDomainID,
SignalRequest: &types.SignalWorkflowExecutionRequest{
Expand All @@ -91,13 +102,13 @@ func TestRatelimitedEndpoints_Table(t *testing.T) {

return nil, wrapper.SignalWorkflowExecution(context.Background(), signalRequest)
},
mock: func() {
expectCallToEndpoint: func() {
handlerMock.EXPECT().SignalWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).Times(1)
},
},
{
name: "DescribeWorkflowExecution",
call: func() (interface{}, error) {
callWrapper: func() (interface{}, error) {
describeRequest := &types.HistoryDescribeWorkflowExecutionRequest{
DomainUUID: testDomainID,
Request: &types.DescribeWorkflowExecutionRequest{
Expand All @@ -107,24 +118,26 @@ func TestRatelimitedEndpoints_Table(t *testing.T) {

return wrapper.DescribeWorkflowExecution(context.Background(), describeRequest)
},
mock: func() {
expectCallToEndpoint: func() {
handlerMock.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// For now true and false needs to do the same as we are only shadowing
workflowIDCache.EXPECT().AllowExternal(testDomainID, testWorkflowID).Return(true).Times(1)
tt.mock()
_, err := tt.call()
for _, endpoint := range limitedCalls {
t.Run(fmt.Sprintf("%s, %s", endpoint.name, "not limited"), func(t *testing.T) {
wrapper.(*historyHandler).allowFunc = func(string, string) bool { return true }
endpoint.expectCallToEndpoint()
_, err := endpoint.callWrapper()
assert.NoError(t, err)
})

workflowIDCache.EXPECT().AllowExternal(testDomainID, testWorkflowID).Return(false).Times(1)
tt.mock()
_, err = tt.call()
assert.NoError(t, err)
t.Run(fmt.Sprintf("%s, %s", endpoint.name, "limited"), func(t *testing.T) {
wrapper.(*historyHandler).allowFunc = func(string, string) bool { return false }
_, err := endpoint.callWrapper()
var sbErr *types.ServiceBusyError
assert.ErrorAs(t, err, &sbErr)
assert.ErrorContains(t, err, "Too many requests for the workflow ID")
})
}
}
39 changes: 39 additions & 0 deletions service/history/wrappers/ratelimited/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package ratelimited

import "github.com/uber/cadence/common/log/tag"

func (h *historyHandler) allowWfID(domainUUID, workflowID string) bool {
domainName, err := h.domainCache.GetDomainName(domainUUID)
if err != nil {
h.logger.Error("Error when getting domain name", tag.Error(err))
// Fail open
return true
}

allow := h.workflowIDCache.AllowExternal(domainUUID, workflowID)
enabled := h.ratelimitExternalPerWorkflowID(domainName)

return allow || !enabled
}
Loading

0 comments on commit edc381e

Please sign in to comment.