From 374bb063160a762574f5debee96ae8512f3eaf07 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 9 Feb 2024 14:05:27 +0100 Subject: [PATCH 1/3] Emit metrics when transfer tasks could be ratelimited --- config/dynamicconfig/development.yaml | 2 +- service/history/handler.go | 1 + service/history/historyEngine.go | 5 +++++ service/history/queue/transfer_queue_processor.go | 4 ++++ service/history/task/transfer_active_task_executor.go | 3 +++ service/history/task/transfer_active_task_executor_test.go | 6 +++++- service/history/task/transfer_standby_task_executor.go | 3 +++ .../history/task/transfer_standby_task_executor_test.go | 6 +++++- service/history/task/transfer_task_executor_base.go | 7 +++++++ 9 files changed, 34 insertions(+), 3 deletions(-) diff --git a/config/dynamicconfig/development.yaml b/config/dynamicconfig/development.yaml index ca6b0b202e9..6805841d27f 100644 --- a/config/dynamicconfig/development.yaml +++ b/config/dynamicconfig/development.yaml @@ -46,4 +46,4 @@ frontend.validSearchAttributes: service: 1 user: 1 IsDeleted: 4 - constraints: {} \ No newline at end of file + constraints: {} diff --git a/service/history/handler.go b/service/history/handler.go index 881606b6a00..e773dd2c03d 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -245,6 +245,7 @@ func (h *handlerImpl) CreateEngine( h.GetMatchingRawClient(), h.queueTaskProcessor, h.failoverCoordinator, + h.workflowIDCache, ) } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 424eff20bd8..8acf77165ef 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -27,6 +27,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/pborman/uuid" @@ -123,6 +124,7 @@ type ( clientChecker client.VersionChecker replicationDLQHandler replication.DLQHandler failoverMarkerNotifier failover.MarkerNotifier + wfIDCache workflowcache.WFCache } ) @@ -152,6 +154,7 @@ func NewEngineWithShardContext( rawMatchingClient matching.Client, queueTaskProcessor task.Processor, failoverCoordinator failover.Coordinator, + wfIDCache workflowcache.WFCache, ) engine.Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() @@ -234,6 +237,7 @@ func NewEngineWithShardContext( replicationTaskStore: replicationTaskStore, replicationMetricsEmitter: replication.NewMetricsEmitter( shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()), + wfIDCache: wfIDCache, } historyEngImpl.decisionHandler = decision.NewHandler( shard, @@ -255,6 +259,7 @@ func NewEngineWithShardContext( historyEngImpl.workflowResetter, historyEngImpl.archivalClient, openExecutionCheck, + historyEngImpl.wfIDCache, ) historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor( diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 54c5af68b65..208ad9f3f98 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -24,6 +24,7 @@ import ( "context" "errors" "fmt" + "github.com/uber/cadence/service/history/workflowcache" "math" "sync" "sync/atomic" @@ -87,6 +88,7 @@ func NewTransferQueueProcessor( workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, + wfIDCache workflowcache.WFCache, ) Processor { logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue) currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() @@ -100,6 +102,7 @@ func NewTransferQueueProcessor( workflowResetter, logger, config, + wfIDCache, ) activeQueueProcessor := newTransferQueueActiveProcessor( @@ -131,6 +134,7 @@ func NewTransferQueueProcessor( logger, clusterName, config, + wfIDCache, ) standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor( clusterName, diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 68284953384..6887a690adb 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -25,6 +25,7 @@ import ( "context" "errors" "fmt" + "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/pborman/uuid" @@ -84,6 +85,7 @@ func NewTransferActiveTaskExecutor( workflowResetter reset.WorkflowResetter, logger log.Logger, config *config.Config, + wfIDCache workflowcache.WFCache, ) Executor { return &transferActiveTaskExecutor{ @@ -93,6 +95,7 @@ func NewTransferActiveTaskExecutor( executionCache, logger, config, + wfIDCache, ), historyClient: shard.GetService().GetHistoryClient(), parentClosePolicyClient: parentclosepolicy.NewClient( diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index c6a3c72b5a5..d5af1d6bd39 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -22,6 +22,7 @@ package task import ( "context" + "github.com/uber/cadence/service/history/workflowcache" "math/rand" "strconv" "testing" @@ -66,6 +67,7 @@ type ( mockShard *shard.TestContext mockEngine *engine.MockEngine mockDomainCache *cache.MockDomainCache + mockWFCache *workflowcache.MockWFCache mockHistoryClient *hclient.MockClient mockMatchingClient *matching.MockClient @@ -168,6 +170,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider s.mockDomainCache = s.mockShard.Resource.DomainCache + s.mockWFCache = workflowcache.NewMockWFCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(s.domainID).Return(s.domainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(s.domainID).Return(s.domainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainID(s.domainName).Return(s.domainID, nil).AnyTimes() @@ -197,6 +200,7 @@ func (s *transferActiveTaskExecutorSuite) SetupTest() { nil, s.logger, config, + s.mockWFCache, ).(*transferActiveTaskExecutor) s.transferActiveTaskExecutor.parentClosePolicyClient = s.mockParentClosePolicyClient } @@ -239,7 +243,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessActivityTask_Success() { s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1) - + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) err = s.transferActiveTaskExecutor.Execute(transferTask, true) s.Nil(err) } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index c13e6594768..ae5fa488fea 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -23,6 +23,7 @@ package task import ( "context" "fmt" + "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/uber/cadence/common" @@ -56,6 +57,7 @@ func NewTransferStandbyTaskExecutor( logger log.Logger, clusterName string, config *config.Config, + wfIDCache workflowcache.WFCache, ) Executor { return &transferStandbyTaskExecutor{ transferTaskExecutorBase: newTransferTaskExecutorBase( @@ -64,6 +66,7 @@ func NewTransferStandbyTaskExecutor( executionCache, logger, config, + wfIDCache, ), clusterName: clusterName, historyResender: historyResender, diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index eb364951429..ee492c53409 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -22,6 +22,7 @@ package task import ( "context" + "github.com/uber/cadence/service/history/workflowcache" "testing" "time" @@ -60,6 +61,7 @@ type ( controller *gomock.Controller mockShard *shard.TestContext mockDomainCache *cache.MockDomainCache + mockWFCache *workflowcache.MockWFCache mockNDCHistoryResender *ndc.MockHistoryResender mockMatchingClient *matching.MockClient @@ -136,6 +138,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.mockArchivalMetadata = s.mockShard.Resource.ArchivalMetadata s.mockArchiverProvider = s.mockShard.Resource.ArchiverProvider s.mockDomainCache = s.mockShard.Resource.DomainCache + s.mockWFCache = workflowcache.NewMockWFCache(s.controller) s.mockDomainCache.EXPECT().GetDomainByID(constants.TestDomainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomainName(constants.TestDomainID).Return(constants.TestDomainName, nil).AnyTimes() s.mockDomainCache.EXPECT().GetDomain(constants.TestDomainName).Return(constants.TestGlobalDomainEntry, nil).AnyTimes() @@ -159,6 +162,7 @@ func (s *transferStandbyTaskExecutorSuite) SetupTest() { s.logger, s.clusterName, config, + s.mockWFCache, ).(*transferStandbyTaskExecutor) } @@ -236,7 +240,7 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessActivityTask_Pending_PushT s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) s.mockMatchingClient.EXPECT().AddActivityTask(gomock.Any(), createAddActivityTaskRequest(transferTask, ai, mutableState.GetExecutionInfo().PartitionConfig)).Return(nil).Times(1) - + s.mockWFCache.EXPECT().AllowInternal(constants.TestDomainID, constants.TestWorkflowID).Return(true).Times(1) s.mockShard.SetCurrentTime(s.clusterName, now) err = s.transferStandbyTaskExecutor.Execute(transferTask, true) s.Nil(err) diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index da9470c5ac0..1ec1cf60bc3 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -22,6 +22,7 @@ package task import ( "context" + "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/uber/cadence/client/matching" @@ -59,6 +60,7 @@ type ( visibilityMgr persistence.VisibilityManager config *config.Config throttleRetry *backoff.ThrottleRetry + wfIDCache workflowcache.WFCache } ) @@ -68,6 +70,7 @@ func newTransferTaskExecutorBase( executionCache *execution.Cache, logger log.Logger, config *config.Config, + wfIDCache workflowcache.WFCache, ) *transferTaskExecutorBase { return &transferTaskExecutorBase{ shard: shard, @@ -82,6 +85,7 @@ func newTransferTaskExecutorBase( backoff.WithRetryPolicy(taskRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ), + wfIDCache: wfIDCache, } } @@ -99,6 +103,9 @@ func (t *transferTaskExecutorBase) pushActivity( t.logger.Fatal("Cannot process non activity task", tag.TaskType(task.GetTaskType())) } + // Ratelimiting is not done. This is only to count the number of requests via metrics + t.wfIDCache.AllowInternal(task.DomainID, task.WorkflowID) + return t.matchingClient.AddActivityTask(ctx, &types.AddActivityTaskRequest{ DomainUUID: task.TargetDomainID, SourceDomainUUID: task.DomainID, From bab331e4290b207c680a442bbaef070f47bccfe5 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 9 Feb 2024 14:57:13 +0100 Subject: [PATCH 2/3] lint --- service/history/historyEngine.go | 2 +- service/history/queue/transfer_queue_processor.go | 2 +- service/history/task/transfer_active_task_executor.go | 2 +- service/history/task/transfer_active_task_executor_test.go | 2 +- service/history/task/transfer_standby_task_executor.go | 3 ++- service/history/task/transfer_standby_task_executor_test.go | 2 +- service/history/task/transfer_task_executor_base.go | 2 +- 7 files changed, 8 insertions(+), 7 deletions(-) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 8acf77165ef..8a0d9d8d51c 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -27,7 +27,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/pborman/uuid" @@ -67,6 +66,7 @@ import ( "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" "github.com/uber/cadence/service/history/workflow" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 208ad9f3f98..21830bd9af8 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -24,7 +24,6 @@ import ( "context" "errors" "fmt" - "github.com/uber/cadence/service/history/workflowcache" "math" "sync" "sync/atomic" @@ -47,6 +46,7 @@ import ( "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 6887a690adb..714bca3b0ff 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -25,7 +25,6 @@ import ( "context" "errors" "fmt" - "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/pborman/uuid" @@ -44,6 +43,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/parentclosepolicy" ) diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index d5af1d6bd39..cf6a92d695f 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -22,7 +22,6 @@ package task import ( "context" - "github.com/uber/cadence/service/history/workflowcache" "math/rand" "strconv" "testing" @@ -54,6 +53,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" test "github.com/uber/cadence/service/history/testing" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/parentclosepolicy" ) diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index ae5fa488fea..3380b6ba1e0 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -23,7 +23,7 @@ package task import ( "context" "fmt" - "github.com/uber/cadence/service/history/workflowcache" + "time" "github.com/uber/cadence/common" @@ -36,6 +36,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index ee492c53409..6149f6fa1aa 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -22,7 +22,6 @@ package task import ( "context" - "github.com/uber/cadence/service/history/workflowcache" "testing" "time" @@ -50,6 +49,7 @@ import ( "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" test "github.com/uber/cadence/service/history/testing" + "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index 1ec1cf60bc3..d2c2782ca05 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -22,7 +22,6 @@ package task import ( "context" - "github.com/uber/cadence/service/history/workflowcache" "time" "github.com/uber/cadence/client/matching" @@ -37,6 +36,7 @@ import ( "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/shard" + "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) From 8d799f128633181cc01ea796e0a75cefe4d86276 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 9 Feb 2024 15:09:17 +0100 Subject: [PATCH 3/3] Update transfer_standby_task_executor.go --- service/history/task/transfer_standby_task_executor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 3380b6ba1e0..56f432b318d 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -23,7 +23,6 @@ package task import ( "context" "fmt" - "time" "github.com/uber/cadence/common"