diff --git a/service/history/decision/task_handler_test.go b/service/history/decision/task_handler_test.go index 9574836d911..38471130106 100644 --- a/service/history/decision/task_handler_test.go +++ b/service/history/decision/task_handler_test.go @@ -25,6 +25,7 @@ package decision import ( "context" "errors" + "fmt" "testing" "time" @@ -35,15 +36,20 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/mocks" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/testdata" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/execution" + "github.com/uber/cadence/service/history/workflow" ) const ( @@ -1131,6 +1137,361 @@ func TestHandleDecisionFailWorkflow(t *testing.T) { } } +func TestHandleDecisionCancelWorkflow(t *testing.T) { + tests := []struct { + name string + expectMockCalls func(taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes) + attributes *types.CancelWorkflowExecutionDecisionAttributes + asserts func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes, err error) + }{ + { + name: "handler has unhandled events", + attributes: &types.CancelWorkflowExecutionDecisionAttributes{}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes) { + taskHandler.hasUnhandledEventsBeforeDecisions = true + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes, err error) { + assert.Equal(t, types.DecisionTaskFailedCauseUnhandledDecision, *taskHandler.failDecisionCause) + assert.Equal(t, "cannot process cancellation, new pending decisions were scheduled while this decision was processing", *taskHandler.failMessage) + }, + }, + { + name: "attributes validation failure", + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes, err error) { + assert.Equal(t, types.DecisionTaskFailedCauseBadCancelWorkflowExecutionAttributes, *taskHandler.failDecisionCause) + assert.Nil(t, err) + assert.True(t, taskHandler.stopProcessing) + }, + }, + { + name: "workflow not running", + attributes: &types.CancelWorkflowExecutionDecisionAttributes{Details: []byte("some-details")}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes) { + taskHandler.metricsClient = new(mocks.Client) + taskHandler.logger = new(log.MockLogger) + taskHandler.metricsClient.(*mocks.Client).On("IncCounter", metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCancelWorkflowCounter) + taskHandler.metricsClient.(*mocks.Client).On("IncCounter", metrics.HistoryRespondDecisionTaskCompletedScope, metrics.MultipleCompletionDecisionsCounter) + taskHandler.logger.(*log.MockLogger).On("Warn", "Multiple completion decisions", []tag.Tag{tag.WorkflowDecisionType(int64(types.DecisionTypeCancelWorkflowExecution)), tag.ErrorTypeMultipleCompletionDecisions}) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().IsWorkflowExecutionRunning().Return(false) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes, err error) { + assert.Nil(t, err) + }, + }, + { + name: "success", + attributes: &types.CancelWorkflowExecutionDecisionAttributes{}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().IsWorkflowExecutionRunning().Return(true) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddWorkflowExecutionCanceledEvent(taskHandler.decisionTaskCompletedID, attr) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.CancelWorkflowExecutionDecisionAttributes, err error) { + assert.Nil(t, err) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + taskHandler := newTaskHandlerForTest(t) + if test.expectMockCalls != nil { + test.expectMockCalls(taskHandler, test.attributes) + } + decision := &types.Decision{ + DecisionType: func(i int32) *types.DecisionType { + decisionType := new(types.DecisionType) + *decisionType = types.DecisionType(i) + return decisionType + }(6), //types.DecisionTypeCancelWorkflowExecution + CancelWorkflowExecutionDecisionAttributes: test.attributes, + } + err := taskHandler.handleDecision(context.Background(), decision) + test.asserts(t, taskHandler, test.attributes, err) + }) + } +} + +func TestHandleDecisionRecordMarker(t *testing.T) { + tests := []struct { + name string + expectMockCalls func(taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes) + attributes *types.RecordMarkerDecisionAttributes + asserts func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes, err error) + }{ + { + name: "blob size limit check failure", + attributes: &types.RecordMarkerDecisionAttributes{MarkerName: "some-marker", Details: []byte("some-details")}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes) { + taskHandler.sizeLimitChecker.blobSizeLimitError = 5 + taskHandler.sizeLimitChecker.blobSizeLimitWarn = 3 + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddFailWorkflowEvent(testTaskCompletedID, &types.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr(common.FailureReasonDecisionBlobSizeExceedsLimit), + Details: []byte("RecordMarkerDecisionAttributes.Details exceeds size limit."), + }) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes, err error) { + assert.True(t, taskHandler.stopProcessing) + assert.Nil(t, err) + }, + }, + { + name: "attributes validation failure", + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes, err error) { + assert.Equal(t, types.DecisionTaskFailedCauseBadRecordMarkerAttributes, *taskHandler.failDecisionCause) + assert.Nil(t, err) + assert.True(t, taskHandler.stopProcessing) + }, + }, + { + name: "success", + attributes: &types.RecordMarkerDecisionAttributes{MarkerName: "some-marker", Details: []byte("some-details")}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddRecordMarkerEvent(taskHandler.decisionTaskCompletedID, attr) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.RecordMarkerDecisionAttributes, err error) { + assert.Nil(t, err) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + taskHandler := newTaskHandlerForTest(t) + if test.expectMockCalls != nil { + test.expectMockCalls(taskHandler, test.attributes) + } + decision := &types.Decision{ + DecisionType: func(i int32) *types.DecisionType { + decisionType := new(types.DecisionType) + *decisionType = types.DecisionType(i) + return decisionType + }(8), //types.DecisionTypeRecordMarker + RecordMarkerDecisionAttributes: test.attributes, + } + err := taskHandler.handleDecision(context.Background(), decision) + test.asserts(t, taskHandler, test.attributes, err) + }) + } +} + +func TestHandleDecisionScheduleActivity(t *testing.T) { + domainEntry := cache.NewLocalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: testdata.DomainID, Name: testdata.DomainName}, + &persistence.DomainConfig{ + Retention: 1, + BadBinaries: types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{"test-binary-checksum": {Reason: "some reason"}}}, + }, + cluster.TestCurrentClusterName) + executionInfo := &persistence.WorkflowExecutionInfo{ + DomainID: testdata.DomainID, + WorkflowID: testdata.WorkflowID, + WorkflowTimeout: 100, + } + validAttr := &types.ScheduleActivityTaskDecisionAttributes{ + Domain: testdata.DomainName, + TaskList: &types.TaskList{Name: testdata.TaskListName}, + ActivityID: "some-activity-id", + ActivityType: &types.ActivityType{Name: testdata.ActivityTypeName}, + ScheduleToCloseTimeoutSeconds: func(i int32) *int32 { return &i }(100), + ScheduleToStartTimeoutSeconds: func(i int32) *int32 { return &i }(20), + StartToCloseTimeoutSeconds: func(i int32) *int32 { return &i }(80), + Input: []byte("some-input"), + } + + tests := []struct { + name string + expectMockCalls func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) + attributes *types.ScheduleActivityTaskDecisionAttributes + asserts func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) + }{ + { + name: "internal service error", + attributes: &types.ScheduleActivityTaskDecisionAttributes{Domain: testdata.DomainName}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(&persistence.WorkflowExecutionInfo{}) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(nil, errors.New("some radom error")) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.NotNil(t, err) + assert.Nil(t, res) + assert.Equal(t, &types.InternalServiceError{Message: fmt.Sprintf("Unable to schedule activity across domain %v.", attr.GetDomain())}, err) + }, + }, + { + name: "attributes validation failure", + attributes: &types.ScheduleActivityTaskDecisionAttributes{Domain: testdata.DomainName}, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.Nil(t, res) + assert.True(t, taskHandler.stopProcessing) + }, + }, + { + name: "blob size limit check failure", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.sizeLimitChecker.blobSizeLimitWarn = 3 + taskHandler.sizeLimitChecker.blobSizeLimitError = 5 + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddFailWorkflowEvent(testTaskCompletedID, + &types.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr(common.FailureReasonDecisionBlobSizeExceedsLimit), + Details: []byte("ScheduleActivityTaskDecisionAttributes.Input exceeds size limit."), + }) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.Nil(t, res) + assert.True(t, taskHandler.stopProcessing) + }, + }, + { + name: "success - activity started", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(&types.HistoryEvent{}, &persistence.ActivityInfo{}, &types.ActivityLocalDispatchInfo{}, true, true, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskStartedEvent(&persistence.ActivityInfo{}, int64(0), gomock.Any(), taskHandler.identity) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.Nil(t, res) + }, + }, + { + name: "success - token serialization failure", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(&types.HistoryEvent{}, &persistence.ActivityInfo{}, &types.ActivityLocalDispatchInfo{}, true, false, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskStartedEvent(&persistence.ActivityInfo{}, int64(0), gomock.Any(), taskHandler.identity) + taskHandler.tokenSerializer.(*common.MockTaskTokenSerializer).EXPECT().Serialize(&common.TaskToken{ + DomainID: testdata.DomainID, + WorkflowID: testdata.WorkflowID, + ActivityType: testdata.ActivityTypeName, + }).Return(nil, errors.New("some error")) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Equal(t, workflow.ErrSerializingToken, err) + assert.Nil(t, res) + }, + }, + { + name: "success - decisionResult non nil", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(&types.HistoryEvent{}, &persistence.ActivityInfo{}, &types.ActivityLocalDispatchInfo{}, true, false, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskStartedEvent(&persistence.ActivityInfo{}, int64(0), gomock.Any(), taskHandler.identity) + taskHandler.tokenSerializer.(*common.MockTaskTokenSerializer).EXPECT().Serialize(&common.TaskToken{ + DomainID: testdata.DomainID, + WorkflowID: testdata.WorkflowID, + ActivityType: testdata.ActivityTypeName, + }).Return([]byte("some-serialized-data"), nil) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.NotNil(t, res) + assert.Equal(t, []byte("some-serialized-data"), res.activityDispatchInfo.TaskToken) + }, + }, + { + name: "failure to add activity task started", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(&types.HistoryEvent{}, &persistence.ActivityInfo{}, &types.ActivityLocalDispatchInfo{}, true, true, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskStartedEvent(&persistence.ActivityInfo{}, int64(0), gomock.Any(), taskHandler.identity).Return(nil, errors.New("some error")) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.NotNil(t, err) + assert.Equal(t, "some error", err.Error()) + assert.Nil(t, res) + }, + }, + { + name: "activity scheduled - nil activityDispatchInfo", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(&types.HistoryEvent{}, &persistence.ActivityInfo{}, nil, true, false, nil) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.Nil(t, res) + }, + }, + { + name: "bad request error", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(nil, nil, nil, false, false, &types.BadRequestError{ + Message: "some bad request error", + }) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.Nil(t, err) + assert.Nil(t, res) + }, + }, + { + name: "default error case", + attributes: validAttr, + expectMockCalls: func(taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes) { + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().GetExecutionInfo().Return(executionInfo).Times(2) + taskHandler.domainCache.(*cache.MockDomainCache).EXPECT().GetDomain(attr.GetDomain()).Return(domainEntry, nil) + taskHandler.mutableState.(*execution.MockMutableState).EXPECT().AddActivityTaskScheduledEvent(context.Background(), taskHandler.decisionTaskCompletedID, attr, taskHandler.activityCountToDispatch > 0). + Return(nil, nil, nil, false, false, errors.New("some default error")) + }, + asserts: func(t *testing.T, taskHandler *taskHandlerImpl, attr *types.ScheduleActivityTaskDecisionAttributes, res *decisionResult, err error) { + assert.NotNil(t, err) + assert.Equal(t, "some default error", err.Error()) + assert.Nil(t, res) + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + taskHandler := newTaskHandlerForTest(t) + if test.expectMockCalls != nil { + test.expectMockCalls(taskHandler, test.attributes) + } + decision := &types.Decision{ + DecisionType: func(i int32) *types.DecisionType { + decisionType := new(types.DecisionType) + *decisionType = types.DecisionType(i) + return decisionType + }(0), //types.DecisionTypeScheduleActivityTask + ScheduleActivityTaskDecisionAttributes: test.attributes, + } + err := taskHandler.handleDecision(context.Background(), decision) + assert.NotNil(t, err) + assert.Equal(t, &types.BadRequestError{Message: fmt.Sprintf("Unknown decision type: %v", decision.GetDecisionType())}, err) + + decisionRes, err := taskHandler.handleDecisionWithResult(context.Background(), decision) + test.asserts(t, taskHandler, test.attributes, decisionRes, err) + }) + } +} + func newTaskHandlerForTest(t *testing.T) *taskHandlerImpl { ctrl := gomock.NewController(t) testLogger := testlogger.New(t)