Skip to content

Commit

Permalink
Add a domain cluster filter during replication (cadence-workflow#4069)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed May 4, 2021
1 parent 5e84ce4 commit 85376fb
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 3 deletions.
21 changes: 20 additions & 1 deletion service/history/replication/task_ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -155,14 +156,23 @@ func (t *taskAckManagerImpl) GetTasks(
readLevel := lastReadTaskID
TaskInfoLoop:
for _, taskInfo := range taskInfoList {
// filter task info by domain clusters.
domainEntity, err := t.shard.GetDomainCache().GetDomainByID(taskInfo.GetDomainID())
if err != nil {
return nil, err
}
if skipTask(pollingCluster, domainEntity) {
continue
}

// construct replication task from DB
_ = t.rateLimiter.Wait(ctx)
var replicationTask *types.ReplicationTask
op := func() error {
var err error
replicationTask, err = t.toReplicationTask(ctx, taskInfo)
return err
}

err = backoff.Retry(op, t.retryPolicy, common.IsPersistenceTransientError)
switch err.(type) {
case nil:
Expand Down Expand Up @@ -614,6 +624,15 @@ func (t *taskAckManagerImpl) generateHistoryReplicationTask(
)
}

func skipTask(pollingCluster string, domainEntity *cache.DomainCacheEntry) bool {
for _, cluster := range domainEntity.GetReplicationConfig().Clusters {
if cluster.ClusterName == pollingCluster {
return false
}
}
return true
}

func getVersionHistoryItems(
versionHistories *persistence.VersionHistories,
eventID int64,
Expand Down
53 changes: 51 additions & 2 deletions service/history/replication/task_ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func (s *taskAckManagerSuite) TestGetTasks() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
clusterName := "cluster"
clusterName := cluster.TestCurrentClusterName
taskInfo := &persistence.ReplicationTaskInfo{
TaskType: persistence.ReplicationTaskTypeFailoverMarker,
DomainID: domainID,
Expand All @@ -930,6 +930,19 @@ func (s *taskAckManagerSuite) TestGetTasks() {
NextPageToken: []byte{1},
}, nil)
s.mockShard.Resource.ShardMgr.On("UpdateShard", mock.Anything, mock.Anything).Return(nil)
s.mockDomainCache.EXPECT().GetDomainByID(domainID).Return(cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
nil,
), nil).AnyTimes()

_, err := s.ackManager.GetTasks(context.Background(), clusterName, 10)
s.NoError(err)
Expand All @@ -941,7 +954,7 @@ func (s *taskAckManagerSuite) TestGetTasks_ReturnDataErrors() {
domainID := uuid.New()
workflowID := uuid.New()
runID := uuid.New()
clusterName := "cluster"
clusterName := cluster.TestCurrentClusterName
taskID := int64(10)
taskInfo := &persistence.ReplicationTaskInfo{
TaskType: persistence.ReplicationTaskTypeHistory,
Expand Down Expand Up @@ -1024,3 +1037,39 @@ func (s *taskAckManagerSuite) TestGetTasks_ReturnDataErrors() {
s.NoError(err)
s.Equal(taskID+1, msg.GetLastRetrievedMessageID())
}

func (s *taskAckManagerSuite) TestSkipTask_ReturnTrue() {
domainID := uuid.New()
domainEntity := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
nil,
)
s.True(skipTask("test", domainEntity))
}

func (s *taskAckManagerSuite) TestSkipTask_ReturnFalse() {
domainID := uuid.New()
domainEntity := cache.NewGlobalDomainCacheEntryForTest(
&persistence.DomainInfo{ID: domainID, Name: "domainName"},
&persistence.DomainConfig{Retention: 1},
&persistence.DomainReplicationConfig{
ActiveClusterName: cluster.TestCurrentClusterName,
Clusters: []*persistence.ClusterReplicationConfig{
{ClusterName: cluster.TestCurrentClusterName},
{ClusterName: cluster.TestAlternativeClusterName},
},
},
1,
nil,
)
s.False(skipTask(cluster.TestAlternativeClusterName, domainEntity))
}

0 comments on commit 85376fb

Please sign in to comment.