diff --git a/service/history/execution/mutable_state_builder_methods_activity.go b/service/history/execution/mutable_state_builder_methods_activity.go index 674010be50d..cf12eb38839 100644 --- a/service/history/execution/mutable_state_builder_methods_activity.go +++ b/service/history/execution/mutable_state_builder_methods_activity.go @@ -561,8 +561,8 @@ func (e *mutableStateBuilder) AddActivityTaskTimedOutEvent( tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), - tag.WorkflowScheduleID(ai.ScheduleID), - tag.WorkflowStartedID(ai.StartedID), + tag.WorkflowScheduleID(scheduleEventID), + tag.WorkflowStartedID(startedEventID), tag.WorkflowTimeoutType(int64(timeoutType))) return nil, e.createInternalServerError(opTag) } diff --git a/service/history/execution/mutable_state_builder_methods_activity_test.go b/service/history/execution/mutable_state_builder_methods_activity_test.go index 32f534d4488..680d22ba3cf 100644 --- a/service/history/execution/mutable_state_builder_methods_activity_test.go +++ b/service/history/execution/mutable_state_builder_methods_activity_test.go @@ -59,6 +59,7 @@ func testMutableStateBuilder(t *testing.T) *mutableStateBuilder { mockShard.GetConfig().EnableRetryForChecksumFailure = func(domain string) bool { return true } logger := log.NewNoop() + mockShard.Resource.MatchingClient.EXPECT().AddActivityTask(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockShard.Resource.DomainCache.EXPECT().GetDomainID(constants.TestDomainName).Return(constants.TestDomainID, nil).AnyTimes() return newMutableStateBuilder(mockShard, logger, constants.TestLocalDomainEntry) } @@ -119,6 +120,7 @@ func Test__UpdateActivity(t *testing.T) { mb.pendingActivityInfoIDs[1] = ai err := mb.UpdateActivity(ai) assert.NoError(t, err) + assert.Equal(t, ai, mb.updateActivityInfos[1]) }) } @@ -191,7 +193,273 @@ func Test__AddActivityTaskCompletedEvent(t *testing.T) { mb.pendingActivityIDToEventID["1"] = 1 mb.updateActivityInfos[1] = ai mb.hBuilder = NewHistoryBuilder(mb) - _, err := mb.AddActivityTaskCompletedEvent(1, 1, request) + event, err := mb.AddActivityTaskCompletedEvent(1, 1, request) + assert.NoError(t, err) + assert.Equal(t, int64(1), event.ActivityTaskCompletedEventAttributes.ScheduledEventID) + }) +} + +func Test__tryDispatchActivityTask(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{} + ai := &persistence.ActivityInfo{} + result := mb.tryDispatchActivityTask(context.Background(), event, ai) + assert.True(t, result) +} + +func Test__ReplicateActivityTaskCanceledEvent(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{ + EventType: types.EventTypeActivityTaskCanceled.Ptr(), + ActivityTaskCanceledEventAttributes: &types.ActivityTaskCanceledEventAttributes{ + ScheduledEventID: 1, + }, + } + ai := &persistence.ActivityInfo{ + ActivityID: "1", + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + err := mb.ReplicateActivityTaskCanceledEvent(event) + assert.NoError(t, err) + _, ok := mb.pendingActivityInfoIDs[1] + assert.False(t, ok) + _, ok = mb.pendingActivityIDToEventID["1"] + assert.False(t, ok) +} + +func Test__ReplicateActivityTaskCancelRequestedEvent(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{ + EventType: types.EventTypeActivityTaskCanceled.Ptr(), + ActivityTaskCancelRequestedEventAttributes: &types.ActivityTaskCancelRequestedEventAttributes{ + ActivityID: "1", + }, + } + ai := &persistence.ActivityInfo{ + ActivityID: "1", + ScheduleID: 1, + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + err := mb.ReplicateActivityTaskCancelRequestedEvent(event) + assert.NoError(t, err) + assert.Equal(t, ai, mb.updateActivityInfos[1]) +} + +func Test__ReplicateActivityTaskTimedOutEvent(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{ + EventType: types.EventTypeActivityTaskTimedOut.Ptr(), + ActivityTaskTimedOutEventAttributes: &types.ActivityTaskTimedOutEventAttributes{ + ScheduledEventID: 1, + }, + } + ai := &persistence.ActivityInfo{ + ActivityID: "1", + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + err := mb.ReplicateActivityTaskTimedOutEvent(event) + assert.NoError(t, err) + _, ok := mb.pendingActivityInfoIDs[1] + assert.False(t, ok) + _, ok = mb.pendingActivityIDToEventID["1"] + assert.False(t, ok) +} + +func Test__ReplicateActivityTaskFailedEvent(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{ + EventType: types.EventTypeActivityTaskFailed.Ptr(), + ActivityTaskFailedEventAttributes: &types.ActivityTaskFailedEventAttributes{ + ScheduledEventID: 1, + }, + } + ai := &persistence.ActivityInfo{ + ActivityID: "1", + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + err := mb.ReplicateActivityTaskFailedEvent(event) + assert.NoError(t, err) + _, ok := mb.pendingActivityInfoIDs[1] + assert.False(t, ok) + _, ok = mb.pendingActivityIDToEventID["1"] + assert.False(t, ok) +} + +func Test__ReplicateActivityTaskCompletedEvent(t *testing.T) { + mb := testMutableStateBuilder(t) + event := &types.HistoryEvent{ + EventType: types.EventTypeActivityTaskCompleted.Ptr(), + ActivityTaskCompletedEventAttributes: &types.ActivityTaskCompletedEventAttributes{ + ScheduledEventID: 1, + }, + } + ai := &persistence.ActivityInfo{ + ActivityID: "1", + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + err := mb.ReplicateActivityTaskCompletedEvent(event) + assert.NoError(t, err) + _, ok := mb.pendingActivityInfoIDs[1] + assert.False(t, ok) + _, ok = mb.pendingActivityIDToEventID["1"] + assert.False(t, ok) +} + +func Test__AddActivityTaskCanceledEvent(t *testing.T) { + t.Run("error workflow finished", func(t *testing.T) { + mbCompleted := testMutableStateBuilder(t) + mbCompleted.executionInfo.State = persistence.WorkflowStateCompleted + _, err := mbCompleted.AddActivityTaskCanceledEvent(1, 1, 1, []byte{10}, "test") + assert.Error(t, err) + assert.Equal(t, ErrWorkflowFinished, err) + }) + t.Run("error getting activity info", func(t *testing.T) { + mb := testMutableStateBuilder(t) + _, err := mb.AddActivityTaskCanceledEvent(1, 1, 1, []byte{10}, "test") + assert.Error(t, err) + assert.Equal(t, "add-activitytask-canceled-event operation failed", err.Error()) + }) + t.Run("error cancel not requested", func(t *testing.T) { + mb := testMutableStateBuilder(t) + mb.hBuilder = NewHistoryBuilder(mb) + ai := &persistence.ActivityInfo{ + StartedID: 1, + CancelRequested: false, + StartedTime: time.Now(), + } + mb.pendingActivityInfoIDs[1] = ai + _, err := mb.AddActivityTaskCanceledEvent(1, 1, 1, []byte{10}, "test") + assert.Error(t, err) + }) + t.Run("success", func(t *testing.T) { + mb := testMutableStateBuilder(t) + mb.hBuilder = NewHistoryBuilder(mb) + ai := &persistence.ActivityInfo{ + StartedID: 1, + CancelRequested: true, + StartedTime: time.Now(), + } + mb.pendingActivityInfoIDs[1] = ai + event, err := mb.AddActivityTaskCanceledEvent(1, 1, 1, []byte{10}, "test") assert.NoError(t, err) + assert.Equal(t, int64(1), event.ActivityTaskCanceledEventAttributes.ScheduledEventID) + assert.Equal(t, "test", event.ActivityTaskCanceledEventAttributes.Identity) + }) +} + +func Test__AddRequestCancelActivityTaskFailedEvent(t *testing.T) { + t.Run("error workflow finished", func(t *testing.T) { + mbCompleted := testMutableStateBuilder(t) + mbCompleted.executionInfo.State = persistence.WorkflowStateCompleted + _, err := mbCompleted.AddRequestCancelActivityTaskFailedEvent(1, "1", "test") + assert.Error(t, err) + assert.Equal(t, ErrWorkflowFinished, err) + }) + t.Run("success", func(t *testing.T) { + mb := testMutableStateBuilder(t) + mb.hBuilder = NewHistoryBuilder(mb) + event, err := mb.AddRequestCancelActivityTaskFailedEvent(1, "1", "test") + assert.NoError(t, err) + assert.Equal(t, int64(1), event.RequestCancelActivityTaskFailedEventAttributes.DecisionTaskCompletedEventID) + assert.Equal(t, "test", event.RequestCancelActivityTaskFailedEventAttributes.Cause) + }) +} + +func Test__AddActivityTaskCancelRequestedEvent(t *testing.T) { + t.Run("error workflow finished", func(t *testing.T) { + mbCompleted := testMutableStateBuilder(t) + mbCompleted.executionInfo.State = persistence.WorkflowStateCompleted + _, _, err := mbCompleted.AddActivityTaskCancelRequestedEvent(1, "1", "test") + assert.Error(t, err) + assert.Equal(t, ErrWorkflowFinished, err) + }) + t.Run("error getting activity info", func(t *testing.T) { + mb := testMutableStateBuilder(t) + mb.hBuilder = NewHistoryBuilder(mb) + _, _, err := mb.AddActivityTaskCancelRequestedEvent(1, "1", "test") + assert.Error(t, err) + assert.Equal(t, "invalid history builder state for action: add-activitytask-cancel-requested-event", err.Error()) + }) + t.Run("success", func(t *testing.T) { + mb := testMutableStateBuilder(t) + mb.hBuilder = NewHistoryBuilder(mb) + ai := &persistence.ActivityInfo{ + StartedID: 1, + StartedTime: time.Now(), + } + mb.pendingActivityInfoIDs[1] = ai + mb.pendingActivityIDToEventID["1"] = 1 + event, ai, err := mb.AddActivityTaskCancelRequestedEvent(1, "1", "test") + assert.NoError(t, err) + assert.Equal(t, "1", event.ActivityTaskCancelRequestedEventAttributes.ActivityID) + }) +} + +func Test__AddActivityTaskTimedOutEvent(t *testing.T) { + t.Run("error workflow finished", func(t *testing.T) { + mbCompleted := testMutableStateBuilder(t) + mbCompleted.executionInfo.State = persistence.WorkflowStateCompleted + _, err := mbCompleted.AddActivityTaskTimedOutEvent(1, 1, types.TimeoutTypeHeartbeat, []byte{10}) + assert.Error(t, err) + assert.Equal(t, ErrWorkflowFinished, err) + }) + t.Run("error getting activity info", func(t *testing.T) { + mb := testMutableStateBuilder(t) + _, err := mb.AddActivityTaskTimedOutEvent(1, 1, types.TimeoutTypeHeartbeat, []byte{10}) + assert.Error(t, err) + assert.Equal(t, "add-activitytask-timed-event operation failed", err.Error()) + }) + t.Run("success", func(t *testing.T) { + mb := testMutableStateBuilder(t) + ai := &persistence.ActivityInfo{ + ScheduleID: 1, + ActivityID: "1", + ScheduledEvent: &types.HistoryEvent{}, + StartedID: 1, + } + mb.pendingActivityInfoIDs[1] = ai + mb.hBuilder = NewHistoryBuilder(mb) + event, err := mb.AddActivityTaskTimedOutEvent(1, 1, types.TimeoutTypeHeartbeat, []byte{10}) + assert.NoError(t, err) + assert.Equal(t, int64(1), event.ActivityTaskTimedOutEventAttributes.ScheduledEventID) + }) +} + +func Test__AddActivityTaskFailedEvent(t *testing.T) { + t.Run("error workflow finished", func(t *testing.T) { + mbCompleted := testMutableStateBuilder(t) + mbCompleted.executionInfo.State = persistence.WorkflowStateCompleted + _, err := mbCompleted.AddActivityTaskFailedEvent(1, 1, &types.RespondActivityTaskFailedRequest{}) + assert.Error(t, err) + assert.Equal(t, ErrWorkflowFinished, err) + }) + t.Run("error getting activity info", func(t *testing.T) { + mb := testMutableStateBuilder(t) + _, err := mb.AddActivityTaskFailedEvent(1, 1, &types.RespondActivityTaskFailedRequest{}) + assert.Error(t, err) + assert.Equal(t, "add-activitytask-failed-event operation failed", err.Error()) + }) + t.Run("success", func(t *testing.T) { + mb := testMutableStateBuilder(t) + ai := &persistence.ActivityInfo{ + ScheduleID: 1, + ActivityID: "1", + ScheduledEvent: &types.HistoryEvent{}, + StartedID: 1, + } + mb.pendingActivityInfoIDs[1] = ai + mb.hBuilder = NewHistoryBuilder(mb) + event, err := mb.AddActivityTaskFailedEvent(1, 1, &types.RespondActivityTaskFailedRequest{ + Identity: "test", + Details: make([]byte, 10), + }) + assert.NoError(t, err) + assert.Equal(t, int64(1), event.ActivityTaskFailedEventAttributes.ScheduledEventID) + }) }