Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redirect replication requests when shard count is different #3789

Merged
merged 4 commits into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,15 +340,10 @@ func (r *dlqHandlerImpl) getOrCreateTaskExecutor(ctx context.Context, clusterNam
if executor, ok := r.taskExecutors[clusterName]; ok {
return executor, nil
}
engine, err := r.shard.GetEngine(ctx)
if err != nil {
return nil, err
}
taskExecutor := r.taskExecutorProvider(TaskExecutorParams{
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: engine,
DeleteManager: r.deleteManager,
WorkflowCache: r.workflowCache,
})
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ func (s *dlqHandlerSuite) SetupTest() {
params.RemoteCluster,
params.Shard,
params.HistoryResender,
params.HistoryEngine,
params.DeleteManager,
params.WorkflowCache,
)
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func ReplicationTaskExecutorProvider() TaskExecutorProvider {
params.RemoteCluster,
params.Shard,
params.HistoryResender,
params.HistoryEngine,
params.DeleteManager,
params.WorkflowCache,
)
Expand Down
41 changes: 25 additions & 16 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ type (
RemoteCluster string // TODO: Remove this remote cluster from executor then it can use singleton.
Shard shard.Context
HistoryResender xdc.NDCHistoryResender
HistoryEngine shard.Engine
DeleteManager deletemanager.DeleteManager
WorkflowCache wcache.Cache
}
Expand All @@ -68,10 +67,9 @@ type (
taskExecutorImpl struct {
currentCluster string
remoteCluster string
shard shard.Context
shardContext shard.Context
namespaceRegistry namespace.Registry
nDCHistoryResender xdc.NDCHistoryResender
historyEngine shard.Engine
deleteManager deletemanager.DeleteManager
workflowCache wcache.Cache
metricsHandler metrics.Handler
Expand All @@ -83,23 +81,21 @@ type (
// The executor uses by 1) DLQ replication task handler 2) history replication task processor
func NewTaskExecutor(
remoteCluster string,
shard shard.Context,
shardContext shard.Context,
nDCHistoryResender xdc.NDCHistoryResender,
historyEngine shard.Engine,
deleteManager deletemanager.DeleteManager,
workflowCache wcache.Cache,
) TaskExecutor {
return &taskExecutorImpl{
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
currentCluster: shardContext.GetClusterMetadata().GetCurrentClusterName(),
remoteCluster: remoteCluster,
shard: shard,
namespaceRegistry: shard.GetNamespaceRegistry(),
shardContext: shardContext,
namespaceRegistry: shardContext.GetNamespaceRegistry(),
nDCHistoryResender: nDCHistoryResender,
historyEngine: historyEngine,
deleteManager: deleteManager,
workflowCache: workflowCache,
metricsHandler: shard.GetMetricsHandler(),
logger: shard.GetLogger(),
metricsHandler: shardContext.GetMetricsHandler(),
logger: shardContext.GetLogger(),
}
}

Expand Down Expand Up @@ -167,7 +163,9 @@ func (e *taskExecutorImpl) handleActivityTask(
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
defer cancel()

err = e.historyEngine.SyncActivity(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be additional cost when we always go through history client? Should we still go through history engine if the request is for the local shard?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can certainly add additional wrapper, but the ROI is low?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, if we decide to do perf optimization, the optimization should be done within the wrapper of client

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, if we decide to do perf optimization, the optimization should be done within the wrapper of client

Yes. There will be additional network cost but I don't think it is a concern at this point. I can also add an additional check for now and move it to the wrapper of client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok Let's leave it for now and address it in case it becomes an issue in prod.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add a TODO there.

switch retryErr := err.(type) {
case nil:
return nil
Expand Down Expand Up @@ -206,7 +204,10 @@ func (e *taskExecutorImpl) handleActivityTask(
e.logger.Error("error resend history for history event", tag.Error(resendErr))
return err
}
return e.historyEngine.SyncActivity(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
return err

default:
return err
Expand Down Expand Up @@ -247,7 +248,9 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
defer cancel()

err = e.historyEngine.ReplicateEventsV2(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
switch retryErr := err.(type) {
case nil:
return nil
Expand Down Expand Up @@ -287,7 +290,10 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
return err
}

return e.historyEngine.ReplicateEventsV2(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
return err

default:
return err
Expand All @@ -312,10 +318,13 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask(
ctx, cancel := e.newTaskContext(ctx, executionInfo.NamespaceId)
defer cancel()

return e.historyEngine.ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
WorkflowState: attr.GetWorkflowState(),
RemoteCluster: e.remoteCluster,
})
return err
}

func (e *taskExecutorImpl) filterTask(
Expand Down
23 changes: 10 additions & 13 deletions service/history/replication/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type (
remoteCluster string
mockResource *resourcetest.Test
mockShard *shard.ContextTest
mockEngine *shard.MockEngine
config *configs.Config
historyClient *historyservicemock.MockHistoryServiceClient
mockNamespaceCache *namespace.MockRegistry
Expand Down Expand Up @@ -107,7 +106,6 @@ func (s *taskExecutorSuite) SetupTest() {
}},
s.config,
)
s.mockEngine = shard.NewMockEngine(s.controller)
s.mockResource = s.mockShard.Resource
s.mockNamespaceCache = s.mockResource.NamespaceCache
s.clusterMetadata = s.mockResource.ClusterMetadata
Expand All @@ -117,12 +115,11 @@ func (s *taskExecutorSuite) SetupTest() {

s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes()

s.mockShard.SetHistoryClientForTesting(s.historyClient)
s.replicationTaskExecutor = NewTaskExecutor(
s.remoteCluster,
s.mockShard,
s.nDCHistoryResender,
s.mockEngine,
deletemanager.NewMockDeleteManager(s.controller),
s.workflowCache,
).(*taskExecutorImpl)
Expand Down Expand Up @@ -231,7 +228,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() {
LastWorkerIdentity: "",
}

s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -284,7 +281,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
345,
456,
)
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(resendErr)
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(nil, resendErr)
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
gomock.Any(),
s.remoteCluster,
Expand All @@ -296,7 +293,8 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
int64(345),
int64(456),
)
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)

s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -328,8 +326,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
Events: nil,
NewRunEvents: nil,
}

s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -372,7 +369,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
345,
456,
)
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(resendErr)
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil, resendErr)
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
gomock.Any(),
s.remoteCluster,
Expand All @@ -384,7 +381,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
int64(345),
int64(456),
)
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)

s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand All @@ -403,8 +401,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() {
},
},
}

s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)
s.historyClient.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(&historyservice.ReplicateWorkflowStateResponse{}, nil)

err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: r.engine,
DeleteManager: r.deleteMgr,
WorkflowCache: r.workflowCache,
}),
Expand Down
6 changes: 6 additions & 0 deletions service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/golang/mock/gomock"

"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/future"
Expand Down Expand Up @@ -133,6 +134,11 @@ func (s *ContextTest) SetEventsCacheForTesting(c events.Cache) {
s.eventsCache = c
}

// SetHistoryClientForTesting sets history client. Only used by tests.
func (s *ContextTest) SetHistoryClientForTesting(client historyservice.HistoryServiceClient) {
s.historyClient = client
}

// StopForTest calls private method finishStop(). In general only the controller
// should call that, but integration tests need to do it also to clean up any
// background acquireShard goroutines that may exist.
Expand Down