Skip to content

Commit

Permalink
Redirect replication requests when shard count is different (#3789)
Browse files Browse the repository at this point in the history
* Redirect replication requests when shard count is different
  • Loading branch information
yux0 committed Jan 13, 2023
1 parent 6c70e13 commit 58f5c79
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 40 deletions.
5 changes: 0 additions & 5 deletions service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,15 +339,10 @@ func (r *dlqHandlerImpl) getOrCreateTaskExecutor(ctx context.Context, clusterNam
if executor, ok := r.taskExecutors[clusterName]; ok {
return executor, nil
}
engine, err := r.shard.GetEngine(ctx)
if err != nil {
return nil, err
}
taskExecutor := r.taskExecutorProvider(TaskExecutorParams{
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: engine,
DeleteManager: r.deleteManager,
WorkflowCache: r.workflowCache,
})
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (s *dlqHandlerSuite) SetupTest() {
params.RemoteCluster,
params.Shard,
params.HistoryResender,
params.HistoryEngine,
params.DeleteManager,
params.WorkflowCache,
)
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func ReplicationTaskExecutorProvider() TaskExecutorProvider {
params.RemoteCluster,
params.Shard,
params.HistoryResender,
params.HistoryEngine,
params.DeleteManager,
params.WorkflowCache,
)
Expand Down
49 changes: 30 additions & 19 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,48 +56,46 @@ type (
RemoteCluster string // TODO: Remove this remote cluster from executor then it can use singleton.
Shard shard.Context
HistoryResender xdc.NDCHistoryResender
HistoryEngine shard.Engine
DeleteManager workflow.DeleteManager
WorkflowCache workflow.Cache

DeleteManager workflow.DeleteManager
WorkflowCache workflow.Cache
}

TaskExecutorProvider func(params TaskExecutorParams) TaskExecutor

taskExecutorImpl struct {
currentCluster string
remoteCluster string
shard shard.Context
shardContext shard.Context
namespaceRegistry namespace.Registry
nDCHistoryResender xdc.NDCHistoryResender
historyEngine shard.Engine
deleteManager workflow.DeleteManager
workflowCache workflow.Cache
metricsHandler metrics.MetricsHandler
logger log.Logger

logger log.Logger
}
)

// NewTaskExecutor creates a replication task executor
// The executor uses by 1) DLQ replication task handler 2) history replication task processor
func NewTaskExecutor(
remoteCluster string,
shard shard.Context,
shardContext shard.Context,
nDCHistoryResender xdc.NDCHistoryResender,
historyEngine shard.Engine,
deleteManager workflow.DeleteManager,
workflowCache workflow.Cache,
) TaskExecutor {
return &taskExecutorImpl{
currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(),
currentCluster: shardContext.GetClusterMetadata().GetCurrentClusterName(),
remoteCluster: remoteCluster,
shard: shard,
namespaceRegistry: shard.GetNamespaceRegistry(),
shardContext: shardContext,
namespaceRegistry: shardContext.GetNamespaceRegistry(),
nDCHistoryResender: nDCHistoryResender,
historyEngine: historyEngine,
deleteManager: deleteManager,
workflowCache: workflowCache,
metricsHandler: shard.GetMetricsHandler(),
logger: shard.GetLogger(),
metricsHandler: shardContext.GetMetricsHandler(),
logger: shardContext.GetLogger(),
}
}

Expand Down Expand Up @@ -165,7 +163,9 @@ func (e *taskExecutorImpl) handleActivityTask(
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
defer cancel()

err = e.historyEngine.SyncActivity(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
switch retryErr := err.(type) {
case nil:
return nil
Expand Down Expand Up @@ -204,7 +204,10 @@ func (e *taskExecutorImpl) handleActivityTask(
e.logger.Error("error resend history for history event", tag.Error(resendErr))
return err
}
return e.historyEngine.SyncActivity(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().SyncActivity(ctx, request)
return err

default:
return err
Expand Down Expand Up @@ -245,7 +248,9 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
ctx, cancel := e.newTaskContext(ctx, attr.NamespaceId)
defer cancel()

err = e.historyEngine.ReplicateEventsV2(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
switch retryErr := err.(type) {
case nil:
return nil
Expand Down Expand Up @@ -285,7 +290,10 @@ func (e *taskExecutorImpl) handleHistoryReplicationTask(
return err
}

return e.historyEngine.ReplicateEventsV2(ctx, request)
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateEventsV2(ctx, request)
return err

default:
return err
Expand All @@ -310,10 +318,13 @@ func (e *taskExecutorImpl) handleSyncWorkflowStateTask(
ctx, cancel := e.newTaskContext(ctx, executionInfo.NamespaceId)
defer cancel()

return e.historyEngine.ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
// This might be extra cost if the workflow belongs to local shard.
// Add a wrapper of the history client to call history engine directly if it becomes an issue.
_, err = e.shardContext.GetHistoryClient().ReplicateWorkflowState(ctx, &historyservice.ReplicateWorkflowStateRequest{
WorkflowState: attr.GetWorkflowState(),
RemoteCluster: e.remoteCluster,
})
return err
}

func (e *taskExecutorImpl) filterTask(
Expand Down
23 changes: 10 additions & 13 deletions service/history/replication/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ type (
remoteCluster string
mockResource *resourcetest.Test
mockShard *shard.ContextTest
mockEngine *shard.MockEngine
config *configs.Config
historyClient *historyservicemock.MockHistoryServiceClient
mockNamespaceCache *namespace.MockRegistry
Expand Down Expand Up @@ -106,7 +105,6 @@ func (s *taskExecutorSuite) SetupTest() {
}},
s.config,
)
s.mockEngine = shard.NewMockEngine(s.controller)
s.mockResource = s.mockShard.Resource
s.mockNamespaceCache = s.mockResource.NamespaceCache
s.clusterMetadata = s.mockResource.ClusterMetadata
Expand All @@ -116,12 +114,11 @@ func (s *taskExecutorSuite) SetupTest() {

s.clusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(gomock.Any()).Return(tests.Namespace, nil).AnyTimes()

s.mockShard.SetHistoryClientForTesting(s.historyClient)
s.replicationTaskExecutor = NewTaskExecutor(
s.remoteCluster,
s.mockShard,
s.nDCHistoryResender,
s.mockEngine,
workflow.NewMockDeleteManager(s.controller),
s.workflowCache,
).(*taskExecutorImpl)
Expand Down Expand Up @@ -230,7 +227,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() {
LastWorkerIdentity: "",
}

s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -283,7 +280,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
345,
456,
)
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(resendErr)
s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(nil, resendErr)
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
gomock.Any(),
s.remoteCluster,
Expand All @@ -295,7 +292,8 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
int64(345),
int64(456),
)
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)

s.historyClient.EXPECT().SyncActivity(gomock.Any(), request).Return(&historyservice.SyncActivityResponse{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -327,8 +325,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
Events: nil,
NewRunEvents: nil,
}

s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand Down Expand Up @@ -371,7 +368,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
345,
456,
)
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(resendErr)
s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil, resendErr)
s.nDCHistoryResender.EXPECT().SendSingleWorkflowHistory(
gomock.Any(),
s.remoteCluster,
Expand All @@ -383,7 +380,8 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
int64(345),
int64(456),
)
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)

s.historyClient.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(&historyservice.ReplicateEventsV2Response{}, nil)
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
}
Expand All @@ -402,8 +400,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() {
},
},
}

s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)
s.historyClient.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(&historyservice.ReplicateWorkflowStateResponse{}, nil)

err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
s.NoError(err)
Expand Down
1 change: 0 additions & 1 deletion service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: r.engine,
DeleteManager: r.deleteMgr,
WorkflowCache: r.workflowCache,
}),
Expand Down
6 changes: 6 additions & 0 deletions service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/golang/mock/gomock"

"go.temporal.io/server/api/historyservice/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/future"
Expand Down Expand Up @@ -132,6 +133,11 @@ func (s *ContextTest) SetEventsCacheForTesting(c events.Cache) {
s.eventsCache = c
}

// SetHistoryClientForTesting sets history client. Only used by tests.
func (s *ContextTest) SetHistoryClientForTesting(client historyservice.HistoryServiceClient) {
s.historyClient = client
}

// StopForTest calls private method finishStop(). In general only the controller
// should call that, but integration tests need to do it also to clean up any
// background acquireShard goroutines that may exist.
Expand Down

0 comments on commit 58f5c79

Please sign in to comment.