diff --git a/host/integration_test.go b/host/integration_test.go index eea54d89172..a7881d94007 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -1360,6 +1360,183 @@ func (s *integrationSuite) TestActivityTimeouts() { s.True(workflowComplete) } +func (s *integrationSuite) TestActivityHeartbeatTimeouts() { + id := "integration-activity-heartbeat-timeout-test" + wt := "integration-activity-heartbeat-timeout-test-type" + tl := "integration-activity-heartbeat-timeout-test-tasklist" + identity := "worker1" + activityName := "timeout_activity" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + workflowComplete := false + activitiesScheduled := false + activitiesMap := map[int64]*workflow.HistoryEvent{} + failWorkflow := false + failReason := "" + var activityATimedout bool + var activityALastHeartbeat int + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + if !activitiesScheduled { + activitiesScheduled = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("A"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("Heartbeat"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(20), + StartToCloseTimeoutSeconds: common.Int32Ptr(15), + HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using Heartbeat + }, + }}, nil + } else if previousStartedEventID > 0 { + for _, event := range history.Events[previousStartedEventID:] { + if event.GetEventType() == workflow.EventTypeActivityTaskScheduled { + activitiesMap[event.GetEventId()] = event + } + + if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut { + timeoutEvent := event.ActivityTaskTimedOutEventAttributes + scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()] + if !ok { + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr("ScheduledEvent not found."), + }, + }}, nil + } + + switch timeoutEvent.GetTimeoutType() { + case workflow.TimeoutTypeHeartbeat: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" { + activityATimedout = true + activityALastHeartbeat, _ = strconv.Atoi(string(timeoutEvent.Details)) + } else { + failWorkflow = true + failReason = "ActivityID A is expected to timeout with Heartbeat" + } + } + } + } + } + + if failWorkflow { + s.logger.Errorf("Failing workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr(failReason), + }, + }}, nil + } + + if activityATimedout { + s.logger.Info("Completing Workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + return nil, []*workflow.Decision{}, nil + } + + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + s.Equal(id, *execution.WorkflowId) + s.Equal(activityName, *activityType.Name) + timeoutType := string(input) + switch timeoutType { + case "Heartbeat": + s.logger.Info("Starting hearbeat activity.") + go func() { + for i := 0; i < 6; i++ { + s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i) + _, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{ + TaskToken: taskToken, Details: []byte(strconv.Itoa(i))}) + s.Nil(err) + time.Sleep(1 * time.Second) + } + s.logger.Info("End Heartbeating.") + }() + s.logger.Info("Sleeping hearbeat activity.") + time.Sleep(12 * time.Second) + } + + return []byte("Activity Result."), false, nil + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: atHandler, + logger: s.logger, + suite: s, + } + + _, err := poller.pollAndProcessDecisionTask(false, false) + s.True(err == nil || err == matching.ErrNoTasks) + + for i := 0; i < 1; i++ { + go func() { + err = poller.pollAndProcessActivityTask(false) + s.logger.Infof("Activity Processing Completed. Error: %v", err) + }() + } + + s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId) + for i := 0; i < 10; i++ { + s.logger.Infof("Processing decision task: %v", i) + _, err := poller.pollAndProcessDecisionTask(false, false) + s.Nil(err, "Poll for decision task failed.") + + if workflowComplete { + break + } + } + + s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(we.GetRunId()), + }) + s.True(workflowComplete) + s.True(activityATimedout) + s.Equal(5, activityALastHeartbeat) +} + func (s *integrationSuite) TestSequential_UserTimers() { id := "interation-sequential-user-timers-test" wt := "interation-sequential-user-timers-test-type" diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index e9912a9b750..5afdb5a4a30 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -289,7 +289,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { td := &timerDetails{ TimerSequenceID: TimerSequenceID{VisibilityTimestamp: startToCloseExpiry}, ActivityID: v.ScheduleID, - EventID: v.StartedID, + EventID: v.ScheduleID, TimeoutType: w.TimeoutTypeStartToClose, TimeoutSec: v.StartToCloseTimeout, TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0} @@ -303,7 +303,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { td := &timerDetails{ TimerSequenceID: TimerSequenceID{VisibilityTimestamp: heartBeatExpiry}, ActivityID: v.ScheduleID, - EventID: v.StartedID, + EventID: v.ScheduleID, TimeoutType: w.TimeoutTypeHeartbeat, TimeoutSec: v.HeartbeatTimeout, TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0}