diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 6dc69cfff6c..c1c1f9c7de0 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -553,6 +553,8 @@ const ( // ReplicatorTaskBatchSize is batch size for ReplicatorProcessor ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize" + // ReplicatorMaxSkipTaskCount is maximum number of tasks that can be skipped during tasks pagination due to not meeting filtering conditions (e.g. missed namespace). + ReplicatorMaxSkipTaskCount = "history.replicatorMaxSkipTaskCount" // ReplicatorTaskWorkerCount is number of worker for ReplicatorProcessor ReplicatorTaskWorkerCount = "history.replicatorTaskWorkerCount" // ReplicatorTaskMaxRetryCount is max times of retry for ReplicatorProcessor diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index f6779bdd9f8..ec711634e6e 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -177,6 +177,16 @@ func (ns *Namespace) ClusterNames() []string { return out } +// IsOnCluster returns true is namespace is registered on cluster otherwise false. +func (ns *Namespace) IsOnCluster(clusterName string) bool { + for _, namespaceCluster := range ns.replicationConfig.Clusters { + if namespaceCluster == clusterName { + return true + } + } + return false +} + // ConfigVersion return the namespace config version func (ns *Namespace) ConfigVersion() int64 { return ns.configVersion diff --git a/service/history/configs/config.go b/service/history/configs/config.go index fa2687b0840..f02995e2eb2 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -158,6 +158,7 @@ type Config struct { ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn + ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn // System Limits MaximumBufferedEventsBatch dynamicconfig.IntPropertyFn @@ -417,6 +418,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000), ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false), ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25), + ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250), ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500), ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30), diff --git a/service/history/replication/ack_manager.go b/service/history/replication/ack_manager.go index 0be5bf3cea2..6a91c766a47 100644 --- a/service/history/replication/ack_manager.go +++ b/service/history/replication/ack_manager.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/collection" "go.temporal.io/server/common/convert" "go.temporal.io/server/common/definition" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" @@ -73,7 +74,9 @@ type ( metricsHandler metrics.MetricsHandler logger log.Logger retryPolicy backoff.RetryPolicy - pageSize int + namespaceRegistry namespace.Registry + pageSize dynamicconfig.IntPropertyFn + maxSkipTaskCount dynamicconfig.IntPropertyFn sync.Mutex // largest replication task ID generated @@ -85,7 +88,6 @@ type ( var ( errUnknownReplicationTask = serviceerror.NewInternal("unknown replication task") - emptyReplicationTasks = []*replicationspb.ReplicationTask{} ) func NewAckManager( @@ -111,7 +113,9 @@ func NewAckManager( metricsHandler: shard.GetMetricsHandler().WithTags(metrics.OperationTag(metrics.ReplicatorQueueProcessorScope)), logger: log.With(logger, tag.ComponentReplicatorQueue), retryPolicy: retryPolicy, - pageSize: config.ReplicatorProcessorFetchTasksBatchSize(), + namespaceRegistry: shard.GetNamespaceRegistry(), + pageSize: config.ReplicatorProcessorFetchTasksBatchSize, + maxSkipTaskCount: config.ReplicatorProcessorMaxSkipTaskCount, maxTaskID: nil, sanityCheckTime: time.Time{}, @@ -223,9 +227,9 @@ func (p *ackMgrImpl) GetTasks( minTaskID, maxTaskID := p.taskIDsRange(queryMessageID) replicationTasks, lastTaskID, err := p.getTasks( ctx, + pollingCluster, minTaskID, maxTaskID, - p.pageSize, ) if err != nil { return nil, err @@ -254,49 +258,65 @@ func (p *ackMgrImpl) GetTasks( func (p *ackMgrImpl) getTasks( ctx context.Context, + pollingCluster string, minTaskID int64, maxTaskID int64, - batchSize int, ) ([]*replicationspb.ReplicationTask, int64, error) { if minTaskID > maxTaskID { - return nil, 0, serviceerror.NewUnavailable("min task ID < max task ID, probably due to shard re-balancing") + return nil, 0, serviceerror.NewUnavailable("min task ID > max task ID, probably due to shard re-balancing") } else if minTaskID == maxTaskID { - return []*replicationspb.ReplicationTask{}, maxTaskID, nil + return nil, maxTaskID, nil } - replicationTasks := make([]*replicationspb.ReplicationTask, 0, batchSize) - iter := collection.NewPagingIterator(p.getPaginationFn(ctx, minTaskID, maxTaskID, batchSize)) - for iter.HasNext() && len(replicationTasks) < batchSize { + replicationTasks := make([]*replicationspb.ReplicationTask, 0, p.pageSize()) + skippedTaskCount := 0 + lastTaskID := maxTaskID // If no tasks are returned, then it means there are no tasks bellow maxTaskID. + iter := collection.NewPagingIterator(p.getReplicationTasksFn(ctx, minTaskID, maxTaskID, p.pageSize())) + // iter.HasNext() should be the last check to avoid extra page read in case if replicationTasks is already full. + for len(replicationTasks) < p.pageSize() && skippedTaskCount <= p.maxSkipTaskCount() && iter.HasNext() { task, err := iter.Next() if err != nil { - p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err)) - if len(replicationTasks) == 0 { - return nil, 0, err - } else { - return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil - } + return p.swallowPartialResultsError(replicationTasks, lastTaskID, err) } - if replicationTask, err := p.toReplicationTask(ctx, task); err != nil { - p.logger.Error("replication task reader encounter error, return earlier", tag.Error(err)) - if len(replicationTasks) == 0 { - return nil, 0, err - } else { - return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil + // If, for any reason, task is skipped: + // - lastTaskID needs to be updated because this task should not be read next time, + // - skippedTaskCount needs to be incremented to prevent timeout on caller side (too many tasks are skipped). + // If error has occurred though, lastTaskID shouldn't be updated, and next time task needs to be read again. + + ns, err := p.namespaceRegistry.GetNamespaceByID(namespace.ID(task.GetNamespaceID())) + if err != nil { + if _, isNotFound := err.(*serviceerror.NamespaceNotFound); !isNotFound { + return p.swallowPartialResultsError(replicationTasks, lastTaskID, err) } - } else if replicationTask != nil { - replicationTasks = append(replicationTasks, replicationTask) + // Namespace doesn't exist on this cluster (i.e. deleted). It is safe to skip the task. + lastTaskID = task.GetTaskID() + skippedTaskCount++ + continue + } + // If namespace doesn't exist on polling cluster, there is no reason to send the task. + if !ns.IsOnCluster(pollingCluster) { + lastTaskID = task.GetTaskID() + skippedTaskCount++ + continue } - } - if len(replicationTasks) == 0 { - return emptyReplicationTasks, maxTaskID, nil - } else { - return replicationTasks, replicationTasks[len(replicationTasks)-1].GetSourceTaskId(), nil + replicationTask, err := p.toReplicationTask(ctx, task) + if err != nil { + return p.swallowPartialResultsError(replicationTasks, lastTaskID, err) + } else if replicationTask == nil { + lastTaskID = task.GetTaskID() + skippedTaskCount++ + continue + } + lastTaskID = task.GetTaskID() + replicationTasks = append(replicationTasks, replicationTask) } + + return replicationTasks, lastTaskID, nil } -func (p *ackMgrImpl) getPaginationFn( +func (p *ackMgrImpl) getReplicationTasksFn( ctx context.Context, minTaskID int64, maxTaskID int64, @@ -318,6 +338,19 @@ func (p *ackMgrImpl) getPaginationFn( } } +func (p *ackMgrImpl) swallowPartialResultsError( + replicationTasks []*replicationspb.ReplicationTask, + lastTaskID int64, + err error, +) ([]*replicationspb.ReplicationTask, int64, error) { + + p.logger.Error("Replication tasks reader encountered error, return earlier.", tag.Error(err), tag.Value(len(replicationTasks))) + if len(replicationTasks) == 0 { + return nil, 0, err + } + return replicationTasks, lastTaskID, nil +} + func (p *ackMgrImpl) taskIDsRange( lastReadMessageID int64, ) (minTaskID int64, maxTaskID int64) { diff --git a/service/history/replication/ack_manager_test.go b/service/history/replication/ack_manager_test.go index 61077d5403d..a39c70f4bbc 100644 --- a/service/history/replication/ack_manager_test.go +++ b/service/history/replication/ack_manager_test.go @@ -26,7 +26,7 @@ package replication import ( "context" - "math/rand" + "strconv" "testing" "time" @@ -35,7 +35,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" commonpb "go.temporal.io/api/common/v1" - enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" enumsspb "go.temporal.io/server/api/enums/v1" @@ -52,6 +51,7 @@ import ( "go.temporal.io/server/common/payloads" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -63,11 +63,11 @@ type ( suite.Suite *require.Assertions - controller *gomock.Controller - mockShard *shard.ContextTest - mockNamespaceCache *namespace.MockRegistry - mockMutableState *workflow.MockMutableState - mockClusterMetadata *cluster.MockMetadata + controller *gomock.Controller + mockShard *shard.ContextTest + mockNamespaceRegistry *namespace.MockRegistry + mockMutableState *workflow.MockMutableState + mockClusterMetadata *cluster.MockMetadata mockExecutionMgr *persistence.MockExecutionManager @@ -106,11 +106,14 @@ func (s *ackManagerSuite) SetupTest() { tests.NewDynamicConfig(), ) - s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache + s.mockNamespaceRegistry = s.mockShard.Resource.NamespaceCache + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes() + s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes() s.logger = s.mockShard.GetLogger() historyCache := workflow.NewCache(s.mockShard) @@ -233,7 +236,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowMissing() { WorkflowID: workflowID, RunID: runID, }).Return(nil, serviceerror.NewNotFound("")) - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() result, err := s.replicationAckManager.generateSyncActivityTask(ctx, task) s.NoError(err) @@ -273,7 +276,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowCompleted() { release(nil) s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil) s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes() - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() result, err := s.replicationAckManager.generateSyncActivityTask(ctx, task) s.NoError(err) @@ -315,7 +318,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityCompleted() { s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil) s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() s.mockMutableState.EXPECT().GetActivityInfo(scheduledEventID).Return(nil, false).AnyTimes() - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() result, err := s.replicationAckManager.generateSyncActivityTask(ctx, task) s.NoError(err) @@ -394,7 +397,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRetry() { }, } s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() result, err := s.replicationAckManager.generateSyncActivityTask(ctx, task) s.NoError(err) @@ -497,7 +500,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRunning() { }, } s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() result, err := s.replicationAckManager.generateSyncActivityTask(ctx, task) s.NoError(err) @@ -549,236 +552,313 @@ func (s *ackManagerSuite) Test_GetMaxTaskInfo() { s.Equal(now.Add(time.Hour), maxVisibilityTimestamp) } -func (s *ackManagerSuite) TestGetTasks_Empty() { +func (s *ackManagerSuite) TestGetTasks_NoTasksInDB() { ctx := context.Background() - minTaskID := rand.Int63() + minTaskID := int64(220878) maxTaskID := minTaskID + 100 - batchSize := 100 s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), - BatchSize: batchSize, + BatchSize: s.replicationAckManager.pageSize(), NextPageToken: nil, - }).Return(&persistence.GetHistoryTasksResponse{ - Tasks: nil, - NextPageToken: nil, - }, nil) + }).Return(s.getHistoryTasksResponse(0), nil) - replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, minTaskID, maxTaskID, batchSize) + replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, cluster.TestCurrentClusterName, minTaskID, maxTaskID) s.NoError(err) s.Empty(replicationTasks) s.Equal(maxTaskID, lastTaskID) } -func (s *ackManagerSuite) TestGetTasks_PartialResult_Case1() { +func (s *ackManagerSuite) TestGetTasks_FirstPersistenceErrorReturnsErrorAndEmptyResult() { ctx := context.Background() - minTaskID := rand.Int63() + minTaskID := int64(220878) maxTaskID := minTaskID + 100 - batchSize := 100 - - namespaceID := tests.NamespaceID - workflowID := "some random workflow ID" - - historyTask := &tasks.HistoryReplicationTask{ - TaskID: minTaskID + 10, - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, uuid.New()), - FirstEventID: rand.Int63(), - NextEventID: rand.Int63(), - } - activityTask := &tasks.SyncActivityTask{ - TaskID: minTaskID + 20, - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, uuid.New()), - } - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + tasksResponse := s.getHistoryTasksResponse(2) s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), - BatchSize: batchSize, + BatchSize: s.replicationAckManager.pageSize(), NextPageToken: nil, - }).Return(&persistence.GetHistoryTasksResponse{ - Tasks: []tasks.Task{historyTask, activityTask}, - NextPageToken: nil, - }, nil) + }).Return(tasksResponse, nil) + + gweErr := serviceerror.NewUnavailable("random error") s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{ ShardID: s.mockShard.GetShardID(), - NamespaceID: historyTask.NamespaceID, - WorkflowID: historyTask.WorkflowID, - RunID: historyTask.RunID, - }).Return(nil, serviceerror.NewUnavailable("")) + NamespaceID: tasksResponse.Tasks[0].GetNamespaceID(), + WorkflowID: tasksResponse.Tasks[0].GetWorkflowID(), + RunID: tasksResponse.Tasks[0].GetRunID(), + }).Return(nil, gweErr) - _, _, err := s.replicationAckManager.getTasks(ctx, minTaskID, maxTaskID, batchSize) + replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, cluster.TestCurrentClusterName, minTaskID, maxTaskID) s.Error(err) + s.ErrorIs(err, gweErr) + s.Empty(replicationTasks) + s.EqualValues(0, lastTaskID) } -func (s *ackManagerSuite) TestGetTasks_PartialResult_Case2() { +func (s *ackManagerSuite) TestGetTasks_SecondPersistenceErrorReturnsPartialResult() { ctx := context.Background() - minTaskID := rand.Int63() + minTaskID := int64(220878) maxTaskID := minTaskID + 100 - batchSize := 100 - - namespaceID := tests.NamespaceID - workflowID := "some random workflow ID" - - historyTask := &tasks.HistoryReplicationTask{ - TaskID: minTaskID + 10, - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, uuid.New()), - FirstEventID: 1, - NextEventID: 1 + rand.Int63(), - Version: rand.Int63(), - BranchToken: []byte("random history branch token"), - } - activityTask := &tasks.SyncActivityTask{ - TaskID: minTaskID + 20, - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, uuid.New()), - ScheduledEventID: rand.Int63(), - Version: rand.Int63(), - } - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() + tasksResponse := s.getHistoryTasksResponse(2) s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), - BatchSize: batchSize, + BatchSize: s.replicationAckManager.pageSize(), NextPageToken: nil, - }).Return(&persistence.GetHistoryTasksResponse{ - Tasks: []tasks.Task{historyTask, activityTask}, - NextPageToken: nil, - }, nil) - - context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution( - ctx, - namespace.ID(historyTask.NamespaceID), - commonpb.WorkflowExecution{ - WorkflowId: historyTask.WorkflowID, - RunId: historyTask.RunID, - }, - workflow.CallerTypeTask, + }).Return(tasksResponse, nil) + + eventsCache := events.NewEventsCache( + s.mockShard.GetShardID(), + s.mockShard.GetConfig().EventsCacheInitialSize(), + s.mockShard.GetConfig().EventsCacheMaxSize(), + s.mockShard.GetConfig().EventsCacheTTL(), + s.mockShard.GetExecutionManager(), + false, + s.mockShard.GetLogger(), + s.mockShard.GetMetricsHandler(), ) - context.(*workflow.ContextImpl).MutableState = s.mockMutableState - s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil) - versionHistories := &historyspb.VersionHistories{ - CurrentVersionHistoryIndex: 0, - Histories: []*historyspb.VersionHistory{{ - BranchToken: historyTask.BranchToken, - Items: []*historyspb.VersionHistoryItem{ - { - EventId: historyTask.NextEventID - 1, - Version: historyTask.Version, + ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID) + ei := ms.GetExecutionInfo() + ei.NamespaceId = tests.NamespaceID.String() + ei.VersionHistories = &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + Items: []*historyspb.VersionHistoryItem{ + { + EventId: 1, + Version: 1, + }, }, }, - }}, + }, } - s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() - s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), &persistence.ReadHistoryBranchRequest{ - ShardID: s.mockShard.GetShardID(), - MinEventID: historyTask.FirstEventID, - MaxEventID: historyTask.NextEventID, - BranchToken: historyTask.BranchToken, - PageSize: 1, - NextPageToken: nil, - }).Return(&persistence.ReadRawHistoryBranchResponse{ - HistoryEventBlobs: []*commonpb.DataBlob{{ - EncodingType: enumspb.ENCODING_TYPE_PROTO3, - Data: []byte("some random events blob"), - }}, - NextPageToken: nil, - }, nil) - release(nil) - s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), &persistence.GetWorkflowExecutionRequest{ - ShardID: s.mockShard.GetShardID(), - NamespaceID: activityTask.NamespaceID, - WorkflowID: activityTask.WorkflowID, - RunID: activityTask.RunID, - }).Return(nil, serviceerror.NewUnavailable("")) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{ + State: workflow.TestCloneToProto(ms)}, nil) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil, serviceerror.NewUnavailable("some random error")) + s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{ + HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil) - replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, minTaskID, maxTaskID, batchSize) + replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, cluster.TestCurrentClusterName, minTaskID, maxTaskID) s.NoError(err) s.Equal(1, len(replicationTasks)) - s.Equal(historyTask.TaskID, lastTaskID) + s.Equal(tasksResponse.Tasks[0].GetTaskID(), lastTaskID) } -func (s *ackManagerSuite) TestGetTasks_FullResult() { - ctx := context.Background() - minTaskID := rand.Int63() - maxTaskID := minTaskID + 100 - batchSize := 100 +func (s *ackManagerSuite) TestGetTasks_FullPage() { + tasksResponse := s.getHistoryTasksResponse(s.replicationAckManager.pageSize()) + tasksResponse.NextPageToken = []byte{22, 3, 83} // There is more in DB. + minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22) + s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ + ShardID: s.mockShard.GetShardID(), + TaskCategory: tasks.CategoryReplication, + InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), + ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), + BatchSize: s.replicationAckManager.pageSize(), + NextPageToken: nil, + }).Return(tasksResponse, nil) + + eventsCache := events.NewEventsCache( + s.mockShard.GetShardID(), + s.mockShard.GetConfig().EventsCacheInitialSize(), + s.mockShard.GetConfig().EventsCacheMaxSize(), + s.mockShard.GetConfig().EventsCacheTTL(), + s.mockShard.GetExecutionManager(), + false, + s.mockShard.GetLogger(), + s.mockShard.GetMetricsHandler(), + ) + ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID) + ei := ms.GetExecutionInfo() + ei.NamespaceId = tests.NamespaceID.String() + ei.VersionHistories = &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + Items: []*historyspb.VersionHistoryItem{ + { + EventId: 1, + Version: 1, + }, + }, + }, + }, + } - namespaceID := tests.NamespaceID - workflowID := "some random workflow ID" + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{ + State: workflow.TestCloneToProto(ms)}, nil).Times(s.replicationAckManager.pageSize()) + s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{ + HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(s.replicationAckManager.pageSize()) - historyTask := &tasks.HistoryReplicationTask{ - TaskID: minTaskID + 10, - WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, uuid.New()), - FirstEventID: 1, - NextEventID: 1 + rand.Int63(), - Version: rand.Int63(), - BranchToken: []byte("random history branch token"), + replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22) + s.NoError(err) + s.NotNil(replicationMessages) + s.Len(replicationMessages.ReplicationTasks, s.replicationAckManager.pageSize()) + s.Equal(tasksResponse.Tasks[len(tasksResponse.Tasks)-1].GetTaskID(), replicationMessages.LastRetrievedMessageId) + +} +func (s *ackManagerSuite) TestGetTasks_PartialPage() { + numTasks := s.replicationAckManager.pageSize() / 2 + tasksResponse := s.getHistoryTasksResponse(numTasks) + minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22) + s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ + ShardID: s.mockShard.GetShardID(), + TaskCategory: tasks.CategoryReplication, + InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), + ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), + BatchSize: s.replicationAckManager.pageSize(), + NextPageToken: nil, + }).Return(tasksResponse, nil) + + eventsCache := events.NewEventsCache( + s.mockShard.GetShardID(), + s.mockShard.GetConfig().EventsCacheInitialSize(), + s.mockShard.GetConfig().EventsCacheMaxSize(), + s.mockShard.GetConfig().EventsCacheTTL(), + s.mockShard.GetExecutionManager(), + false, + s.mockShard.GetLogger(), + s.mockShard.GetMetricsHandler(), + ) + ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID) + ei := ms.GetExecutionInfo() + ei.NamespaceId = tests.NamespaceID.String() + ei.VersionHistories = &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + Items: []*historyspb.VersionHistoryItem{ + { + EventId: 1, + Version: 1, + }, + }, + }, + }, } - s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes() - s.mockExecutionMgr.EXPECT().GetHistoryTasks(ctx, &persistence.GetHistoryTasksRequest{ + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{ + State: workflow.TestCloneToProto(ms)}, nil).Times(numTasks) + s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{ + HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(numTasks) + + replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22) + s.NoError(err) + s.NotNil(replicationMessages) + s.Len(replicationMessages.ReplicationTasks, numTasks) + s.Equal(tasksResponse.Tasks[len(tasksResponse.Tasks)-1].GetTaskID(), replicationMessages.LastRetrievedMessageId) +} + +func (s *ackManagerSuite) TestGetTasks_FilterNamespace() { + notExistOnTestClusterNamespaceID := namespace.ID("not-exist-on-" + cluster.TestCurrentClusterName) + notExistOnTestClusterNamespaceEntry := namespace.NewLocalNamespaceForTest( + &persistencespb.NamespaceInfo{}, + &persistencespb.NamespaceConfig{}, + "not-a-"+cluster.TestCurrentClusterName, + ) + s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(notExistOnTestClusterNamespaceID).Return(notExistOnTestClusterNamespaceEntry, nil).AnyTimes() + + minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22) + + tasksResponse1 := s.getHistoryTasksResponse(s.replicationAckManager.pageSize()) + // 2 of 25 tasks are for namespace that doesn't exist on poll cluster. + tasksResponse1.Tasks[1].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String() + tasksResponse1.Tasks[3].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String() + tasksResponse1.NextPageToken = []byte{22, 3, 83} // There is more in DB. + s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ ShardID: s.mockShard.GetShardID(), TaskCategory: tasks.CategoryReplication, InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), - BatchSize: batchSize, + BatchSize: s.replicationAckManager.pageSize(), NextPageToken: nil, - }).Return(&persistence.GetHistoryTasksResponse{ - Tasks: []tasks.Task{historyTask}, - NextPageToken: nil, - }, nil) + }).Return(tasksResponse1, nil) - context, release, _ := s.replicationAckManager.workflowCache.GetOrCreateWorkflowExecution( - ctx, - namespace.ID(historyTask.NamespaceID), - commonpb.WorkflowExecution{ - WorkflowId: historyTask.WorkflowID, - RunId: historyTask.RunID, - }, - workflow.CallerTypeTask, + tasksResponse2 := s.getHistoryTasksResponse(2) + // 1 of 2 task is for namespace that doesn't exist on poll cluster. + tasksResponse2.Tasks[1].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String() + tasksResponse2.NextPageToken = []byte{22, 8, 78} // There is more in DB. + s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ + ShardID: s.mockShard.GetShardID(), + TaskCategory: tasks.CategoryReplication, + InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), + ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), + BatchSize: s.replicationAckManager.pageSize(), + NextPageToken: []byte{22, 3, 83}, // previous token + }).Return(tasksResponse2, nil) + + tasksResponse3 := s.getHistoryTasksResponse(1) + s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{ + ShardID: s.mockShard.GetShardID(), + TaskCategory: tasks.CategoryReplication, + InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1), + ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1), + BatchSize: s.replicationAckManager.pageSize(), + NextPageToken: []byte{22, 8, 78}, // previous token + }).Return(tasksResponse3, nil) + + eventsCache := events.NewEventsCache( + s.mockShard.GetShardID(), + s.mockShard.GetConfig().EventsCacheInitialSize(), + s.mockShard.GetConfig().EventsCacheMaxSize(), + s.mockShard.GetConfig().EventsCacheTTL(), + s.mockShard.GetExecutionManager(), + false, + s.mockShard.GetLogger(), + s.mockShard.GetMetricsHandler(), ) - context.(*workflow.ContextImpl).MutableState = s.mockMutableState - s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil) - versionHistories := &historyspb.VersionHistories{ - CurrentVersionHistoryIndex: 0, - Histories: []*historyspb.VersionHistory{{ - BranchToken: historyTask.BranchToken, - Items: []*historyspb.VersionHistoryItem{ - { - EventId: historyTask.NextEventID - 1, - Version: historyTask.Version, + ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID) + ei := ms.GetExecutionInfo() + ei.NamespaceId = tests.NamespaceID.String() + ei.VersionHistories = &historyspb.VersionHistories{ + Histories: []*historyspb.VersionHistory{ + { + Items: []*historyspb.VersionHistoryItem{ + { + EventId: 1, + Version: 1, + }, }, }, - }}, + }, } - s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes() - s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), &persistence.ReadHistoryBranchRequest{ - ShardID: s.mockShard.GetShardID(), - MinEventID: historyTask.FirstEventID, - MaxEventID: historyTask.NextEventID, - BranchToken: historyTask.BranchToken, - PageSize: 1, - NextPageToken: nil, - }).Return(&persistence.ReadRawHistoryBranchResponse{ - HistoryEventBlobs: []*commonpb.DataBlob{{ - EncodingType: enumspb.ENCODING_TYPE_PROTO3, - Data: []byte("some random events blob"), - }}, - NextPageToken: nil, - }, nil) - release(nil) - replicationTasks, lastTaskID, err := s.replicationAckManager.getTasks(ctx, minTaskID, maxTaskID, batchSize) + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{ + State: workflow.TestCloneToProto(ms)}, nil).Times(s.replicationAckManager.pageSize()) + s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{ + HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(s.replicationAckManager.pageSize()) + + replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22) s.NoError(err) - s.Equal(1, len(replicationTasks)) - s.Equal(historyTask.TaskID, lastTaskID) + s.NotNil(replicationMessages) + s.Len(replicationMessages.ReplicationTasks, s.replicationAckManager.pageSize()) + s.Equal(tasksResponse3.Tasks[len(tasksResponse3.Tasks)-1].GetTaskID(), replicationMessages.LastRetrievedMessageId) +} + +func (s *ackManagerSuite) getHistoryTasksResponse(size int) *persistence.GetHistoryTasksResponse { + result := &persistence.GetHistoryTasksResponse{} + for i := 1; i <= size; i++ { + result.Tasks = append(result.Tasks, &tasks.HistoryReplicationTask{ + WorkflowKey: definition.WorkflowKey{ + NamespaceID: tests.NamespaceID.String(), + WorkflowID: tests.WorkflowID + strconv.Itoa(i), + RunID: uuid.New(), + }, + TaskID: int64(i), + FirstEventID: 1, + NextEventID: 1, + Version: 1, + }, + ) + } + + return result }