diff --git a/service/history/replication/dlq_handler.go b/service/history/replication/dlq_handler.go index 3b008c713c5..3857064122d 100644 --- a/service/history/replication/dlq_handler.go +++ b/service/history/replication/dlq_handler.go @@ -340,15 +340,10 @@ func (r *dlqHandlerImpl) getOrCreateTaskExecutor(ctx context.Context, clusterNam if executor, ok := r.taskExecutors[clusterName]; ok { return executor, nil } - engine, err := r.shard.GetEngine(ctx) - if err != nil { - return nil, err - } taskExecutor := r.taskExecutorProvider(TaskExecutorParams{ RemoteCluster: clusterName, Shard: r.shard, HistoryResender: r.resender, - HistoryEngine: engine, DeleteManager: r.deleteManager, WorkflowCache: r.workflowCache, }) diff --git a/service/history/replication/dlq_handler_test.go b/service/history/replication/dlq_handler_test.go index 5c21e1939ed..1d785f20c4c 100644 --- a/service/history/replication/dlq_handler_test.go +++ b/service/history/replication/dlq_handler_test.go @@ -127,7 +127,6 @@ func (s *dlqHandlerSuite) SetupTest() { params.RemoteCluster, params.Shard, params.HistoryResender, - params.HistoryEngine, params.DeleteManager, params.WorkflowCache, ) diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index 7881056a43b..bddb32fedd2 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -58,7 +58,6 @@ func ReplicationTaskExecutorProvider() TaskExecutorProvider { params.RemoteCluster, params.Shard, params.HistoryResender, - params.HistoryEngine, params.DeleteManager, params.WorkflowCache, ) diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 4fe6d463737..15d46da7fe0 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -58,7 +58,6 @@ type ( RemoteCluster string // TODO: Remove this remote cluster from executor then it can use singleton. Shard shard.Context HistoryResender xdc.NDCHistoryResender - HistoryEngine shard.Engine DeleteManager deletemanager.DeleteManager WorkflowCache wcache.Cache } @@ -68,10 +67,9 @@ type ( taskExecutorImpl struct { currentCluster string remoteCluster string - shard shard.Context + shardContext shard.Context namespaceRegistry namespace.Registry nDCHistoryResender xdc.NDCHistoryResender - historyEngine shard.Engine deleteManager deletemanager.DeleteManager workflowCache wcache.Cache metricsHandler metrics.Handler @@ -83,23 +81,21 @@ type ( // The executor uses by 1) DLQ replication task handler 2) history replication task processor func NewTaskExecutor( remoteCluster string, - shard shard.Context, + shardContext shard.Context, nDCHistoryResender xdc.NDCHistoryResender, - historyEngine shard.Engine, deleteManager deletemanager.DeleteManager, workflowCache wcache.Cache, ) TaskExecutor { return &taskExecutorImpl{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + currentCluster: shardContext.GetClusterMetadata().GetCurrentClusterName(), remoteCluster: remoteCluster, - shard: shard, - namespaceRegistry: shard.GetNamespaceRegistry(), + shardContext: shardContext, + namespaceRegistry: shardContext.GetNamespaceRegistry(), nDCHistoryResender: nDCHistoryResender, - historyEngine: historyEngine, deleteManager: deleteManager, workflowCache: workflowCache, - metricsHandler: shard.GetMetricsHandler(), - logger: shard.GetLogger(), + metricsHandler: shardContext.GetMetricsHandler(), + logger: shardContext.GetLogger(), } } @@ -167,7 +163,9 @@ func (e *taskExecutorImpl) handleActivityTask( ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId) defer cancel() - err = e.historyEngine.SyncActivity(ctx, request) + // This might be extra cost if the workflow belongs to local shard. + // Add a wrapper of the history client to call history engine directly if it becomes an issue. + _, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request) switch retryErr := err.(type) { case nil: return nil @@ -206,7 +204,10 @@ func (e *taskExecutorImpl) handleActivityTask( e.logger.Error("error resend history for history event", tag.Error(resendErr)) return err } - return e.historyEngine.SyncActivity(ctx, request) + // This might be extra cost if the workflow belongs to local shard. + // Add a wrapper of the history client to call history engine directly if it becomes an issue. + _, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request) + return err default: return err @@ -247,7 +248,9 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask( ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId) defer cancel() - err = e.historyEngine.ReplicateEventsV2(ctx, request) + // This might be extra cost if the workflow belongs to local shard. + // Add a wrapper of the history client to call history engine directly if it becomes an issue. + _, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request) switch retryErr := err.(type) { case nil: return nil @@ -287,7 +290,10 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask( return err } - return e.historyEngine.ReplicateEventsV2(ctx, request) + // This might be extra cost if the workflow belongs to local shard. + // Add a wrapper of the history client to call history engine directly if it becomes an issue. + _, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request) + return err default: return err @@ -312,10 +318,13 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask( ctx, cancel := e.newTaskContext(ctx, executionInfo.NamespaceId) defer cancel() - return e.historyEngine.ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{ + // This might be extra cost if the workflow belongs to local shard. + // Add a wrapper of the history client to call history engine directly if it becomes an issue. + _, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{ WorkflowState: attr.GetWorkflowState(), RemoteCluster: e.remoteCluster, }) + return err } func (e *taskExecutorImpl) filterTask( diff --git a/service/history/replication/task_executor_test.go b/service/history/replication/task_executor_test.go index 63fd8ba1b9a..bb4f29fbaf4 100644 --- a/service/history/replication/task_executor_test.go +++ b/service/history/replication/task_executor_test.go @@ -64,7 +64,6 @@ type ( remoteCluster string mockResource *resourcetest.Test mockShard *shard.ContextTest - mockEngine *shard.MockEngine config *configs.Config historyClient *historyservicemock.MockHistoryServiceClient mockNamespaceCache *namespace.MockRegistry @@ -107,7 +106,6 @@ func (s *taskExecutorSuite) SetupTest() { }}, s.config, ) - s.mockEngine = shard.NewMockEngine(s.controller) s.mockResource = s.mockShard.Resource s.mockNamespaceCache = s.mockResource.NamespaceCache s.clusterMetadata = s.mockResource.ClusterMetadata @@ -117,12 +115,11 @@ func (s *taskExecutorSuite) SetupTest() { s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes() - + s.mockShard.SetHistoryClientForTesting(s.historyClient) s.replicationTaskExecutor = NewTaskExecutor( s.remoteCluster, s.mockShard, s.nDCHistoryResender, - s.mockEngine, deletemanager.NewMockDeleteManager(s.controller), s.workflowCache, ).(*taskExecutorImpl) @@ -231,7 +228,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() { LastWorkerIdentity: "", } - s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) + s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil) err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -284,7 +281,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese 345, 456, ) - s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(resendErr) + s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(nil, resendErr) s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory( gomock.Any(), s.remoteCluster, @@ -296,7 +293,8 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese int64(345), int64(456), ) - s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil) + + s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil) err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -328,8 +326,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() { Events: nil, NewRunEvents: nil, } - - s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) + s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil) err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -372,7 +369,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() { 345, 456, ) - s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(resendErr) + s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil, resendErr) s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory( gomock.Any(), s.remoteCluster, @@ -384,7 +381,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() { int64(345), int64(456), ) - s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil) + + s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil) err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) } @@ -403,8 +401,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() { }, }, } - - s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil) + s.historyClient.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(&historyservice.ReplicateWorkflowStateResponse{}, nil) err := s.replicationTaskExecutor.Execute(context.Background(), task, true) s.NoError(err) diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index e3b520199ce..722f02840eb 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -190,7 +190,6 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate( RemoteCluster: clusterName, Shard: r.shard, HistoryResender: r.resender, - HistoryEngine: r.engine, DeleteManager: r.deleteMgr, WorkflowCache: r.workflowCache, }), diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index b7b69e4789e..d19b824573f 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -31,6 +31,7 @@ import ( "github.com/golang/mock/gomock" + "go.temporal.io/server/api/historyservice/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/common/clock" "go.temporal.io/server/common/future" @@ -133,6 +134,11 @@ func (s *ContextTest) SetEventsCacheForTesting(c events.Cache) { s.eventsCache = c } +// SetHistoryClientForTesting sets history client. Only used by tests. +func (s *ContextTest) SetHistoryClientForTesting(client historyservice.HistoryServiceClient) { + s.historyClient = client +} + // StopForTest calls private method finishStop(). In general only the controller // should call that, but integration tests need to do it also to clean up any // background acquireShard goroutines that may exist.