diff --git a/common/persistence/persistence-tests/visibilityPersistenceTest.go b/common/persistence/persistence-tests/visibilityPersistenceTest.go index 482d1d744b0..887df25ec93 100644 --- a/common/persistence/persistence-tests/visibilityPersistenceTest.go +++ b/common/persistence/persistence-tests/visibilityPersistenceTest.go @@ -43,7 +43,7 @@ type ( // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, // not merely log an error *require.Assertions - VisibilityMgr p.VisibilityManager + VisibilityMgr p.VisibilityManager } ) diff --git a/service/history/replication/task_ack_manager.go b/service/history/replication/task_ack_manager.go index 3e5cbd84f6e..dfbc83de233 100644 --- a/service/history/replication/task_ack_manager.go +++ b/service/history/replication/task_ack_manager.go @@ -33,6 +33,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -155,6 +156,16 @@ func (t *taskAckManagerImpl) GetTasks( readLevel := lastReadTaskID TaskInfoLoop: for _, taskInfo := range taskInfoList { + // filter task info by domain clusters. + domainEntity, err := t.shard.GetDomainCache().GetDomainByID(taskInfo.GetDomainID()) + if err != nil { + return nil, err + } + if skipTask(pollingCluster, domainEntity) { + continue + } + + // construct replication task from DB _ = t.rateLimiter.Wait(ctx) var replicationTask *types.ReplicationTask op := func() error { @@ -162,7 +173,6 @@ TaskInfoLoop: replicationTask, err = t.toReplicationTask(ctx, taskInfo) return err } - err = backoff.Retry(op, t.retryPolicy, common.IsPersistenceTransientError) switch err.(type) { case nil: @@ -614,6 +624,15 @@ func (t *taskAckManagerImpl) generateHistoryReplicationTask( ) } +func skipTask(pollingCluster string, domainEntity *cache.DomainCacheEntry) bool { + for _, cluster := range domainEntity.GetReplicationConfig().Clusters { + if cluster.ClusterName == pollingCluster { + return false + } + } + return true +} + func getVersionHistoryItems( versionHistories *persistence.VersionHistories, eventID int64, diff --git a/service/history/replication/task_ack_manager_test.go b/service/history/replication/task_ack_manager_test.go index 07db14c6bf7..f326fea49a7 100644 --- a/service/history/replication/task_ack_manager_test.go +++ b/service/history/replication/task_ack_manager_test.go @@ -916,7 +916,7 @@ func (s *taskAckManagerSuite) TestGetTasks() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() - clusterName := "cluster" + clusterName := cluster.TestCurrentClusterName taskInfo := &persistence.ReplicationTaskInfo{ TaskType: persistence.ReplicationTaskTypeFailoverMarker, DomainID: domainID, @@ -930,6 +930,19 @@ func (s *taskAckManagerSuite) TestGetTasks() { NextPageToken: []byte{1}, }, nil) s.mockShard.Resource.ShardMgr.On("UpdateShard", mock.Anything, mock.Anything).Return(nil) + s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: domainID, Name: "domainName"}, + &persistence.DomainConfig{Retention: 1}, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + 1, + nil, + ), nil).AnyTimes() _, err := s.ackManager.GetTasks(context.Background(), clusterName, 10) s.NoError(err) @@ -941,7 +954,7 @@ func (s *taskAckManagerSuite) TestGetTasks_ReturnDataErrors() { domainID := uuid.New() workflowID := uuid.New() runID := uuid.New() - clusterName := "cluster" + clusterName := cluster.TestCurrentClusterName taskID := int64(10) taskInfo := &persistence.ReplicationTaskInfo{ TaskType: persistence.ReplicationTaskTypeHistory, @@ -1024,3 +1037,39 @@ func (s *taskAckManagerSuite) TestGetTasks_ReturnDataErrors() { s.NoError(err) s.Equal(taskID+1, msg.GetLastRetrievedMessageID()) } + +func (s *taskAckManagerSuite) TestSkipTask_ReturnTrue() { + domainID := uuid.New() + domainEntity := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: domainID, Name: "domainName"}, + &persistence.DomainConfig{Retention: 1}, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + 1, + nil, + ) + s.True(skipTask("test", domainEntity)) +} + +func (s *taskAckManagerSuite) TestSkipTask_ReturnFalse() { + domainID := uuid.New() + domainEntity := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: domainID, Name: "domainName"}, + &persistence.DomainConfig{Retention: 1}, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + 1, + nil, + ) + s.False(skipTask(cluster.TestAlternativeClusterName, domainEntity)) +} diff --git a/tools/cli/flags.go b/tools/cli/flags.go index 7445cf8571a..66878598f2d 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -358,7 +358,7 @@ func getFlagsForStart() []cli.Flag { cli.IntFlag{ Name: FlagWorkflowIDReusePolicyAlias, Usage: "Optional input to configure if the same workflow ID is allow to use for new workflow execution. " + - "Available options: 0: AllowDuplicateFailedOnly, 1: AllowDuplicate, 2: RejectDuplicate, 3:TerminateIfRunning", + "Available options: 0: AllowDuplicateFailedOnly, 1: AllowDuplicate, 2: RejectDuplicate, 3:TerminateIfRunning", }, cli.StringFlag{ Name: FlagInputWithAlias,