Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Nov 22, 2022
1 parent 5359b4e commit 7089d94
Showing 1 changed file with 204 additions and 36 deletions.
240 changes: 204 additions & 36 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package replication

import (
"context"
"strconv"
"testing"
"time"

Expand All @@ -50,6 +51,7 @@ import (
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives/timestamp"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
"go.temporal.io/server/service/history/tests"
Expand All @@ -61,11 +63,11 @@ type (
suite.Suite
*require.Assertions

controller *gomock.Controller
mockShard *shard.ContextTest
mockNamespaceCache *namespace.MockRegistry
mockMutableState *workflow.MockMutableState
mockClusterMetadata *cluster.MockMetadata
controller *gomock.Controller
mockShard *shard.ContextTest
mockNamespaceRegistry *namespace.MockRegistry
mockMutableState *workflow.MockMutableState
mockClusterMetadata *cluster.MockMetadata

mockExecutionMgr *persistence.MockExecutionManager

Expand Down Expand Up @@ -104,11 +106,14 @@ func (s *ackManagerSuite) SetupTest() {
tests.NewDynamicConfig(),
)

s.mockNamespaceCache = s.mockShard.Resource.NamespaceCache
s.mockNamespaceRegistry = s.mockShard.Resource.NamespaceCache
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(tests.GlobalNamespaceEntry, nil).AnyTimes()

s.mockExecutionMgr = s.mockShard.Resource.ExecutionMgr
s.mockClusterMetadata = s.mockShard.Resource.ClusterMetadata
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalNamespaceEnabled().Return(true).AnyTimes()
s.mockClusterMetadata.EXPECT().ClusterNameForFailoverVersion(true, gomock.Any()).Return(cluster.TestCurrentClusterName, nil).AnyTimes()

s.logger = s.mockShard.GetLogger()
historyCache := workflow.NewCache(s.mockShard)
Expand Down Expand Up @@ -232,7 +237,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowMissing() {
WorkflowID: workflowID,
RunID: runID,
}).Return(nil, serviceerror.NewNotFound(""))
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: namespaceID.String(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
Expand Down Expand Up @@ -284,7 +289,7 @@ func (s *ackManagerSuite) TestSyncActivity_WorkflowCompleted() {
release(nil)
s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil)
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(false).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: namespaceID.String(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
Expand Down Expand Up @@ -338,7 +343,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityCompleted() {
s.mockMutableState.EXPECT().StartTransaction(gomock.Any()).Return(false, nil)
s.mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes()
s.mockMutableState.EXPECT().GetActivityInfo(scheduledEventID).Return(nil, false).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: namespaceID.String(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
Expand Down Expand Up @@ -429,7 +434,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRetry() {
},
}
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: namespaceID.String(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
Expand Down Expand Up @@ -544,7 +549,7 @@ func (s *ackManagerSuite) TestSyncActivity_ActivityRunning() {
},
}
s.mockMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{VersionHistories: versionHistories}).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(namespaceID).Return(namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: namespaceID.String(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
Expand Down Expand Up @@ -606,43 +611,206 @@ func (s *ackManagerSuite) Test_GetMaxTaskInfo() {
s.Equal(int64(6), maxTaskID)
s.Equal(now.Add(time.Hour), maxVisibilityTimestamp)
}
func (s *ackManagerSuite) getHistoryTasksResponse(size int) *persistence.GetHistoryTasksResponse {
result := &persistence.GetHistoryTasksResponse{}
for i := 0; i < size; i++ {
result.Tasks = append(result.Tasks, &tasks.HistoryReplicationTask{
WorkflowKey: definition.WorkflowKey{
NamespaceID: tests.NamespaceID.String(),
WorkflowID: tests.WorkflowID + strconv.Itoa(i),
RunID: uuid.New(),
},
TaskID: int64(i),
FirstEventID: 1,
NextEventID: 1,
Version: 1,
},
)
}

func (s *ackManagerSuite) Test_GetTasks_Success() {
return result
}
func (s *ackManagerSuite) Test_GetTasks_FullPage() {
tasksResponse := s.getHistoryTasksResponse(s.replicationAckManager.pageSize)
tasksResponse.NextPageToken = []byte{22, 3, 83} // There is mor in DB.
minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22)

s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: 10,
BatchSize: s.replicationAckManager.pageSize,
NextPageToken: nil,
}).Return(&persistence.GetHistoryTasksResponse{
Tasks: []tasks.Task{
&tasks.HistoryReplicationTask{
WorkflowKey: definition.WorkflowKey{},
VisibilityTimestamp: time.Time{},
TaskID: 1,
FirstEventID: 0,
NextEventID: 0,
Version: 0,
NewRunID: "",
}).Return(tasksResponse, nil)

eventsCache := events.NewEventsCache(
s.mockShard.GetShardID(),
s.mockShard.GetConfig().EventsCacheInitialSize(),
s.mockShard.GetConfig().EventsCacheMaxSize(),
s.mockShard.GetConfig().EventsCacheTTL(),
s.mockShard.GetExecutionManager(),
false,
s.mockShard.GetLogger(),
s.mockShard.GetMetricsHandler(),
)
ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID)
ei := ms.GetExecutionInfo()
ei.NamespaceId = tests.NamespaceID.String()
ei.VersionHistories = &historyspb.VersionHistories{
Histories: []*historyspb.VersionHistory{
{
Items: []*historyspb.VersionHistoryItem{
{
EventId: 1,
Version: 1,
},
},
},
&tasks.HistoryReplicationTask{
WorkflowKey: definition.WorkflowKey{},
VisibilityTimestamp: time.Time{},
TaskID: 1,
FirstEventID: 0,
NextEventID: 0,
Version: 0,
NewRunID: "",
},
}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{
State: workflow.TestCloneToProto(ms)}, nil).Times(s.replicationAckManager.pageSize)
s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(s.replicationAckManager.pageSize)

replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22)
s.NoError(err)
s.NotNil(replicationMessages)
s.Len(replicationMessages.ReplicationTasks, s.replicationAckManager.pageSize)
}

func (s *ackManagerSuite) Test_GetTasks_PartialPage() {
numTasks := s.replicationAckManager.pageSize / 2
minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22)
s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: s.replicationAckManager.pageSize,
NextPageToken: nil,
}).Return(s.getHistoryTasksResponse(numTasks), nil)

eventsCache := events.NewEventsCache(
s.mockShard.GetShardID(),
s.mockShard.GetConfig().EventsCacheInitialSize(),
s.mockShard.GetConfig().EventsCacheMaxSize(),
s.mockShard.GetConfig().EventsCacheTTL(),
s.mockShard.GetExecutionManager(),
false,
s.mockShard.GetLogger(),
s.mockShard.GetMetricsHandler(),
)
ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID)
ei := ms.GetExecutionInfo()
ei.NamespaceId = tests.NamespaceID.String()
ei.VersionHistories = &historyspb.VersionHistories{
Histories: []*historyspb.VersionHistory{
{
Items: []*historyspb.VersionHistoryItem{
{
EventId: 1,
Version: 1,
},
},
},
},
NextPageToken: nil,
}, nil)
}

msgs, err := s.replicationAckManager.GetTasks(context.Background(), "cluster1", 22)
s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{
State: workflow.TestCloneToProto(ms)}, nil).Times(numTasks)
s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(numTasks)

replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22)
s.NoError(err)
s.Len(msgs, 1)
s.NotNil(replicationMessages)
s.Len(replicationMessages.ReplicationTasks, numTasks)
}

func (s *ackManagerSuite) Test_GetTasks_FilterNamespace() {
notExistOnTestClusterNamespaceID := namespace.ID("not-exist-on-" + cluster.TestCurrentClusterName)
notExistOnTestClusterNamespaceEntry := namespace.NewLocalNamespaceForTest(
&persistencespb.NamespaceInfo{},
&persistencespb.NamespaceConfig{},
"not-a-"+cluster.TestCurrentClusterName,
)
s.mockNamespaceRegistry.EXPECT().GetNamespaceByID(notExistOnTestClusterNamespaceID).Return(notExistOnTestClusterNamespaceEntry, nil).AnyTimes()

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(22)

tasksResponse1 := s.getHistoryTasksResponse(s.replicationAckManager.pageSize)
// 2 of 25 tasks are for namespace that not exist on poll cluster
tasksResponse1.Tasks[1].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String()
tasksResponse1.Tasks[3].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String()
tasksResponse1.NextPageToken = []byte{22, 3, 83} // There is more in DB.
s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: s.replicationAckManager.pageSize,
NextPageToken: nil,
}).Return(tasksResponse1, nil)

tasksResponse2 := s.getHistoryTasksResponse(2)
// 1 of 2 task is for namespace that not exist on poll cluster
tasksResponse2.Tasks[1].(*tasks.HistoryReplicationTask).NamespaceID = notExistOnTestClusterNamespaceID.String()
tasksResponse2.NextPageToken = []byte{22, 8, 78} // There is more in DB.
s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: 2, // 2 tasks are filtered out
NextPageToken: []byte{22, 3, 83}, // previous token
}).Return(tasksResponse2, nil)

tasksResponse3 := s.getHistoryTasksResponse(1)
tasksResponse3.NextPageToken = []byte{21, 4, 16} // There is more in DB, but we've got all we need.
s.mockExecutionMgr.EXPECT().GetHistoryTasks(gomock.Any(), &persistence.GetHistoryTasksRequest{
ShardID: s.mockShard.GetShardID(),
TaskCategory: tasks.CategoryReplication,
InclusiveMinTaskKey: tasks.NewImmediateKey(minTaskID + 1),
ExclusiveMaxTaskKey: tasks.NewImmediateKey(maxTaskID + 1),
BatchSize: 1, // 1 task is filtered out
NextPageToken: []byte{22, 8, 78}, // previous token
}).Return(tasksResponse3, nil)

eventsCache := events.NewEventsCache(
s.mockShard.GetShardID(),
s.mockShard.GetConfig().EventsCacheInitialSize(),
s.mockShard.GetConfig().EventsCacheMaxSize(),
s.mockShard.GetConfig().EventsCacheTTL(),
s.mockShard.GetExecutionManager(),
false,
s.mockShard.GetLogger(),
s.mockShard.GetMetricsHandler(),
)
ms := workflow.TestLocalMutableState(s.mockShard, eventsCache, tests.GlobalNamespaceEntry, log.NewTestLogger(), tests.RunID)
ei := ms.GetExecutionInfo()
ei.NamespaceId = tests.NamespaceID.String()
ei.VersionHistories = &historyspb.VersionHistories{
Histories: []*historyspb.VersionHistory{
{
Items: []*historyspb.VersionHistoryItem{
{
EventId: 1,
Version: 1,
},
},
},
},
}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).Return(&persistence.GetWorkflowExecutionResponse{
State: workflow.TestCloneToProto(ms)}, nil).Times(s.replicationAckManager.pageSize)
s.mockExecutionMgr.EXPECT().ReadRawHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*commonpb.DataBlob{{}}}, nil).Times(s.replicationAckManager.pageSize)

replicationMessages, err := s.replicationAckManager.GetTasks(context.Background(), cluster.TestCurrentClusterName, 22)
s.NoError(err)
s.NotNil(replicationMessages)
s.Len(replicationMessages.ReplicationTasks, s.replicationAckManager.pageSize)
}

0 comments on commit 7089d94

Please sign in to comment.