diff --git a/common/persistence/nosql/nosql_execution_store_util_test.go b/common/persistence/nosql/nosql_execution_store_util_test.go index 8249754429b..01b15c5cfb8 100644 --- a/common/persistence/nosql/nosql_execution_store_util_test.go +++ b/common/persistence/nosql/nosql_execution_store_util_test.go @@ -23,6 +23,7 @@ package nosql import ( + "errors" "testing" "time" @@ -223,7 +224,6 @@ func TestPrepareTasksForWorkflowTxn(t *testing.T) { setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.TimerTask, error) { timerTasks := []persistence.Task{ &persistence.DecisionTimeoutTask{VisibilityTimestamp: time.Now(), TaskID: 1, EventID: 2, TimeoutType: 1, ScheduleAttempt: 1}, - // Add more tasks as needed to fully test functionality } tasks, err := store.prepareTimerTasksForWorkflowTxn("domainID", "workflowID", "runID", timerTasks) assert.NoError(t, err) @@ -236,7 +236,7 @@ func TestPrepareTasksForWorkflowTxn(t *testing.T) { name: "PrepareTimerTasksForWorkflowTxn - Unsupported Timer Task Type", setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.TimerTask, error) { timerTasks := []persistence.Task{ - &dummyTaskType{ // Using the dummy task type + &dummyTaskType{ VisibilityTimestamp: time.Now(), TaskID: 1, }, @@ -244,10 +244,11 @@ func TestPrepareTasksForWorkflowTxn(t *testing.T) { return store.prepareTimerTasksForWorkflowTxn("domainID-unsupported", "workflowID-unsupported", "runID-unsupported", timerTasks) }, validate: func(t *testing.T, tasks []*nosqlplugin.TimerTask, err error) { - assert.Error(t, err) // Expecting an error due to unsupported timer task type - assert.Nil(t, tasks) // No tasks should be returned due to the error + assert.Error(t, err) + assert.Nil(t, tasks) }, - }} + }, + } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -263,6 +264,156 @@ func TestPrepareTasksForWorkflowTxn(t *testing.T) { } } +func TestPrepareReplicationTasksForWorkflowTxn(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockDB := nosqlplugin.NewMockDB(mockCtrl) + store := newTestNosqlExecutionStore(mockDB, log.NewNoop()) + + testCases := []struct { + name string + setupStore func(*nosqlExecutionStore) ([]*nosqlplugin.ReplicationTask, error) + validate func(*testing.T, []*nosqlplugin.ReplicationTask, error) + }{ + { + name: "Successful Replication Tasks Preparation", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.ReplicationTask, error) { + replicationTasks := []persistence.Task{ + &persistence.HistoryReplicationTask{ + Version: 1, + }, + } + return store.prepareReplicationTasksForWorkflowTxn("domainID", "workflowID", "runID", replicationTasks) + }, + validate: func(t *testing.T, tasks []*nosqlplugin.ReplicationTask, err error) { + assert.NoError(t, err) + assert.NotEmpty(t, tasks) + }, + }, + { + name: "Handling Unknown Replication Task Type", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.ReplicationTask, error) { + replicationTasks := []persistence.Task{ + &dummyTaskType{ + VisibilityTimestamp: time.Now(), + TaskID: -1, + }, + } + return store.prepareReplicationTasksForWorkflowTxn("domainID", "workflowID", "runID", replicationTasks) + }, + validate: func(t *testing.T, tasks []*nosqlplugin.ReplicationTask, err error) { + assert.Error(t, err) + assert.Nil(t, tasks) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tasks, err := tc.setupStore(store) + tc.validate(t, tasks, err) + }) + } +} + +func TestPrepareCrossClusterTasksForWorkflowTxn(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockDB := nosqlplugin.NewMockDB(mockCtrl) + store := newTestNosqlExecutionStore(mockDB, log.NewNoop()) + + testCases := []struct { + name string + setupStore func(*nosqlExecutionStore) ([]*nosqlplugin.CrossClusterTask, error) + validate func(*testing.T, []*nosqlplugin.CrossClusterTask, error) + }{ + { + name: "Successful CrossCluster Tasks Preparation", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.CrossClusterTask, error) { + crossClusterTasks := []persistence.Task{ + &persistence.CrossClusterStartChildExecutionTask{ + TargetCluster: "targetCluster", + }, + } + return store.prepareCrossClusterTasksForWorkflowTxn("domainID", "workflowID", "runID", crossClusterTasks) + }, + validate: func(t *testing.T, tasks []*nosqlplugin.CrossClusterTask, err error) { + assert.NoError(t, err) + assert.NotEmpty(t, tasks) + }, + }, + { + name: "Handling Unsupported CrossCluster Task Type", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.CrossClusterTask, error) { + crossClusterTasks := []persistence.Task{ + &dummyTaskType{ // Adjust this to be an unexpected type for cross-cluster tasks + VisibilityTimestamp: time.Now(), + TaskID: -1, + }, + } + return store.prepareCrossClusterTasksForWorkflowTxn("domainID", "workflowID", "runID", crossClusterTasks) + }, + validate: func(t *testing.T, tasks []*nosqlplugin.CrossClusterTask, err error) { + assert.Error(t, err) // Expecting an error due to unsupported task type + assert.Nil(t, tasks) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tasks, err := tc.setupStore(store) + tc.validate(t, tasks, err) + }) + } +} + +func TestPrepareNoSQLTasksForWorkflowTxn(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockDB := nosqlplugin.NewMockDB(mockCtrl) + store := newTestNosqlExecutionStore(mockDB, log.NewNoop()) + + testCases := []struct { + name string + setupStore func(*nosqlExecutionStore) ([]*nosqlplugin.TransferTask, []*nosqlplugin.CrossClusterTask, []*nosqlplugin.ReplicationTask, []*nosqlplugin.TimerTask, error) + validate func(*testing.T, []*nosqlplugin.TransferTask, []*nosqlplugin.CrossClusterTask, []*nosqlplugin.ReplicationTask, []*nosqlplugin.TimerTask, error) + }{ + { + name: "prepareNoSQLTasksForWorkflowTxn - Success", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.TransferTask, []*nosqlplugin.CrossClusterTask, []*nosqlplugin.ReplicationTask, []*nosqlplugin.TimerTask, error) { + return nil, nil, nil, nil, nil + }, + validate: func(t *testing.T, transferTasks []*nosqlplugin.TransferTask, crossClusterTasks []*nosqlplugin.CrossClusterTask, replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, err error) { + assert.NoError(t, err) + }, + }, + { + name: "prepareNoSQLTasksForWorkflowTxn - Task Preparation Failure", + setupStore: func(store *nosqlExecutionStore) ([]*nosqlplugin.TransferTask, []*nosqlplugin.CrossClusterTask, []*nosqlplugin.ReplicationTask, []*nosqlplugin.TimerTask, error) { + return nil, nil, nil, nil, errors.New("task preparation failed") + }, + validate: func(t *testing.T, transferTasks []*nosqlplugin.TransferTask, crossClusterTasks []*nosqlplugin.CrossClusterTask, replicationTasks []*nosqlplugin.ReplicationTask, timerTasks []*nosqlplugin.TimerTask, err error) { + assert.Error(t, err) + assert.Nil(t, transferTasks) + assert.Nil(t, crossClusterTasks) + assert.Nil(t, replicationTasks) + assert.Nil(t, timerTasks) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + transferTasks, crossClusterTasks, replicationTasks, timerTasks, err := tc.setupStore(store) + tc.validate(t, transferTasks, crossClusterTasks, replicationTasks, timerTasks, err) + }) + } +} + type dummyTaskType struct { persistence.Task VisibilityTimestamp time.Time