diff --git a/service/history/engine/engineimpl/history_engine2_test.go b/service/history/engine/engineimpl/history_engine2_test.go index 6aab47487f6..d8fd9438c5a 100644 --- a/service/history/engine/engineimpl/history_engine2_test.go +++ b/service/history/engine/engineimpl/history_engine2_test.go @@ -802,6 +802,176 @@ func (s *engine2Suite) TestRecordActivityTaskStartedResurrected() { s.Equal(err, workflow.ErrActivityTaskNotFound) } +func (s *engine2Suite) TestRecordActivityTaskStartedStaleState() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Times(workflow.ConditionalRetryCount) + + _, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + + s.Equal(workflow.ErrMaxAttemptsExceeded, err) +} + +func (s *engine2Suite) TestRecordActivityTaskStartedActivityNotPending() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once() + + _, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 3, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + + s.Equal(workflow.ErrActivityTaskNotFound, err) +} + +func (s *engine2Suite) TestRecordActivityTaskStartedActivityAlreadyStarted() { + domainID := constants.TestDomainID + workflowExecution := types.WorkflowExecution{ + WorkflowID: "wId", + RunID: constants.TestRunID, + } + + identity := "testIdentity" + tl := "testTaskList" + + activityID := "activity1_id" + activityType := "activity_type1" + activityInput := []byte("input1") + + msBuilder := s.createExecutionStartedState(workflowExecution, tl, identity, true) + decisionCompletedEvent := test.AddDecisionTaskCompletedEvent(msBuilder, int64(2), int64(3), nil, identity) + scheduledEvent, _ := test.AddActivityTaskScheduledEvent(msBuilder, decisionCompletedEvent.ID, activityID, + activityType, tl, activityInput, 100, 10, 1, 5) + + ms1 := execution.CreatePersistenceMutableState(msBuilder) + gwmsResponse1 := &p.GetWorkflowExecutionResponse{State: ms1} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(gwmsResponse1, nil).Once() + s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything, mock.Anything).Return(&p.AppendHistoryNodesResponse{}, nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}, + }, nil).Once() + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + // start activity + response, err := s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Nil(err) + s.NotNil(response) + s.Equal(scheduledEvent, response.ScheduledEvent) + + // another request made with the same scheduleID and same requestID + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything, mock.Anything).Return(&p.UpdateWorkflowExecutionResponse{ + MutableStateUpdateSessionStats: &p.MutableStateUpdateSessionStats{}, + }, nil).Once() + + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + response, err = s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "reqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Nil(err) + s.NotNil(response) + s.Equal(scheduledEvent, response.ScheduledEvent) + + // another request made with the same scheduleID and different requestID + s.mockEventsCache.EXPECT().GetEvent( + gomock.Any(), gomock.Any(), domainID, workflowExecution.GetWorkflowID(), workflowExecution.GetRunID(), + decisionCompletedEvent.ID, scheduledEvent.ID, gomock.Any(), + ).Return(scheduledEvent, nil).Times(1) + + response, err = s.historyEngine.RecordActivityTaskStarted(context.Background(), &types.RecordActivityTaskStartedRequest{ + DomainUUID: domainID, + WorkflowExecution: &workflowExecution, + ScheduleID: 5, + TaskID: 100, + RequestID: "otherReqId", + PollRequest: &types.PollForActivityTaskRequest{ + TaskList: &types.TaskList{ + Name: tl, + }, + Identity: identity, + }, + }) + s.Error(err) + s.Nil(response) + s.Equal(&types.EventAlreadyStartedError{Message: "Activity task already started."}, err) +} + func (s *engine2Suite) TestRequestCancelWorkflowExecutionSuccess() { domainID := constants.TestDomainID workflowExecution := types.WorkflowExecution{