From d764cf3b8ad2d01feedefc40574f16df0d9d3598 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Thu, 25 Jan 2018 11:11:30 -0800 Subject: [PATCH 1/2] Fixes various issues with ActivityTimeout Processing Heartbeart timeout fixes to create next timeout if first heartbeat timer fires and activity is heartbeating. Always include ScheduleToClose timeout when creating next activity timeout. Fix logic for correctly setting TimerCreated flag on TimerDetails. Remove redundant code for all activity APIs from history engine. --- host/integration_test.go | 238 ++++++++++ service/history/historyEngine.go | 621 +++++++++---------------- service/history/timerBuilder.go | 37 +- service/history/timerQueueProcessor.go | 13 +- 4 files changed, 484 insertions(+), 425 deletions(-) diff --git a/host/integration_test.go b/host/integration_test.go index fc04ff46d95..5e192d8a252 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -1028,6 +1028,244 @@ func (s *integrationSuite) TestActivityHeartBeatWorkflow_Timeout() { s.True(workflowComplete) } +func (s *integrationSuite) TestActivityTimeouts() { + id := "integration-activity-timeout-test" + wt := "integration-activity-timeout-test-type" + tl := "integration-activity-timeout-test-tasklist" + identity := "worker1" + activityName := "timeout_activity" + + workflowType := &workflow.WorkflowType{} + workflowType.Name = common.StringPtr(wt) + + taskList := &workflow.TaskList{} + taskList.Name = common.StringPtr(tl) + + request := &workflow.StartWorkflowExecutionRequest{ + RequestId: common.StringPtr(uuid.New()), + Domain: common.StringPtr(s.domainName), + WorkflowId: common.StringPtr(id), + WorkflowType: workflowType, + TaskList: taskList, + Input: nil, + ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300), + TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), + Identity: common.StringPtr(identity), + } + + we, err0 := s.engine.StartWorkflowExecution(createContext(), request) + s.Nil(err0) + + s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId) + + workflowComplete := false + activitiesScheduled := false + activitiesMap := map[int64]*workflow.HistoryEvent{} + failWorkflow := false + failReason := "" + var activityATimedout, activityBTimedout, activityCTimedout, activityDTimedout bool + dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + if !activitiesScheduled { + activitiesScheduled = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("A"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: common.StringPtr("NoWorker")}, + Input: []byte("ScheduleToStart"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using ScheduleToStart + StartToCloseTimeoutSeconds: common.Int32Ptr(30), + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("B"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("ScheduleClose"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(7), // ActivityID B is expected to timeout using ScheduleClose + ScheduleToStartTimeoutSeconds: common.Int32Ptr(5), + StartToCloseTimeoutSeconds: common.Int32Ptr(10), + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("C"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("StartToClose"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(15), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(1), + StartToCloseTimeoutSeconds: common.Int32Ptr(5), // ActivityID C is expected to timeout using StartToClose + HeartbeatTimeoutSeconds: common.Int32Ptr(0), + }, + }, { + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("D"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: []byte("Heartbeat"), + ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), + ScheduleToStartTimeoutSeconds: common.Int32Ptr(20), + StartToCloseTimeoutSeconds: common.Int32Ptr(15), + HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID D is expected to timeout using Heartbeat + }, + }}, nil + } else if previousStartedEventID > 0 { + for _, event := range history.Events[previousStartedEventID:] { + if event.GetEventType() == workflow.EventTypeActivityTaskScheduled { + activitiesMap[event.GetEventId()] = event + } + + if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut { + timeoutEvent := event.ActivityTaskTimedOutEventAttributes + scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()] + if !ok { + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr("ScheduledEvent not found."), + }, + }}, nil + } + + switch timeoutEvent.GetTimeoutType() { + case workflow.TimeoutTypeScheduleToStart: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" { + activityATimedout = true + } else { + failWorkflow = true + failReason = "ActivityID A is expected to timeout with ScheduleToStart" + } + case workflow.TimeoutTypeScheduleToClose: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "B" { + activityBTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID B is expected to timeout with ScheduleToClose" + } + case workflow.TimeoutTypeStartToClose: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "C" { + activityCTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID C is expected to timeout with StartToClose" + } + case workflow.TimeoutTypeHeartbeat: + if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "D" { + activityDTimedout = true + } else { + failWorkflow = true + failReason = "ActivityID D is expected to timeout with Heartbeat" + } + } + } + } + } + + if failWorkflow { + s.logger.Errorf("Failing workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution), + FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{ + Reason: common.StringPtr(failReason), + }, + }}, nil + } + + if activityATimedout && activityBTimedout && activityCTimedout && activityDTimedout { + s.logger.Info("Completing Workflow.") + workflowComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Done."), + }, + }}, nil + } + + return nil, []*workflow.Decision{}, nil + } + + atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType, + activityID string, input []byte, taskToken []byte) ([]byte, bool, error) { + s.Equal(id, *execution.WorkflowId) + s.Equal(activityName, *activityType.Name) + timeoutType := string(input) + switch timeoutType { + case "ScheduleToStart": + s.Fail("Activity A not expected to be started.") + case "ScheduleClose": + s.logger.Infof("Sleeping activityB for 6 seconds.") + time.Sleep(7 * time.Second) + case "StartToClose": + s.logger.Infof("Sleeping activityC for 6 seconds.") + time.Sleep(8 * time.Second) + case "Heartbeat": + s.logger.Info("Starting hearbeat activity.") + go func() { + for i := 0; i < 6; i++ { + s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i) + _, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{ + TaskToken: taskToken, Details: []byte(string(i))}) + s.Nil(err) + time.Sleep(1 * time.Second) + } + s.logger.Info("End Heartbeating.") + }() + s.logger.Info("Sleeping hearbeat activity.") + time.Sleep(10 * time.Second) + } + + return []byte("Activity Result."), false, nil + } + + poller := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskList, + identity: identity, + decisionHandler: dtHandler, + activityHandler: atHandler, + logger: s.logger, + suite: s, + } + + _, err := poller.pollAndProcessDecisionTask(false, false) + s.True(err == nil || err == matching.ErrNoTasks) + + for i := 0; i < 3; i++ { + go func() { + err = poller.pollAndProcessActivityTask(false) + s.logger.Infof("Activity Processing Completed. Error: %v", err) + }() + } + + s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId) + for i := 0; i < 10; i++ { + s.logger.Infof("Processing decision task: %v", i) + _, err := poller.pollAndProcessDecisionTask(false, false) + s.Nil(err, "Poll for decision task failed.") + + if workflowComplete { + break + } + } + + s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{ + WorkflowId: common.StringPtr(id), + RunId: common.StringPtr(we.GetRunId()), + }) + s.True(workflowComplete) +} + func (s *integrationSuite) TestSequential_UserTimers() { id := "interation-sequential-user-timers-test" wt := "interation-sequential-user-timers-test-type" diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index ed2c52af853..f88292905fe 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -82,6 +82,14 @@ var ( ErrConflict = errors.New("Conditional update failed") // ErrMaxAttemptsExceeded is exported temporarily for integration test ErrMaxAttemptsExceeded = errors.New("Maximum attempts exceeded to update history") + // ErrStaleState is the error returned during state update indicating that cached mutable state could be stale + ErrStaleState = errors.New("Cache mutable state could potentially be stale") + // ErrActivityTaskNotFound is the error to indicate activity task could be duplicate and activity already completed + ErrActivityTaskNotFound = &workflow.EntityNotExistsError{Message: "Activity task not found."} + // ErrWorkflowCompleted is the error to indicate workflow execution already completed + ErrWorkflowCompleted = &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + // ErrDeserializingToken is the error to indicate task token is invalid + ErrDeserializingToken = &workflow.BadRequestError{Message: "Error deserializing task token."} // FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy // for start workflow execution API @@ -617,108 +625,87 @@ func (e *historyEngineImpl) RecordActivityTaskStarted( if err != nil { return nil, err } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, *request.WorkflowExecution) - if err0 != nil { - return nil, err0 - } - defer release() - scheduleID := *request.ScheduleId - requestID := common.StringDefault(request.RequestId) - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err0 := context.loadWorkflowExecution() - if err0 != nil { - return nil, err0 - } - tBuilder := e.getTimerBuilder(&context.workflowExecution) - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) + execution := workflow.WorkflowExecution{ + WorkflowId: request.WorkflowExecution.WorkflowId, + RunId: request.WorkflowExecution.RunId, + } - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } + response := &h.RecordActivityTaskStartedResponse{} + err = e.updateWorkflowExecution(domainID, execution, false, false, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted + } - // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If - // task is not outstanding than it is most probably a duplicate and complete the task. - if !msBuilder.isWorkflowExecutionRunning() || !isRunning { - // Looks like ActivityTask already completed as a result of another call. - // It is OK to drop the task at this point. - logging.LogDuplicateTaskEvent(context.logger, persistence.TransferTaskTypeActivityTask, common.Int64Default(request.TaskId), requestID, - scheduleID, emptyEventID, isRunning) + scheduleID := request.GetScheduleId() + requestID := request.GetRequestId() + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - return nil, &workflow.EntityNotExistsError{Message: "Activity task not found."} - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - scheduledEvent, exists := msBuilder.GetActivityScheduledEvent(scheduleID) - if !exists { - return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} - } + // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If + // task is not outstanding than it is most probably a duplicate and complete the task. + if !isRunning { + // Looks like ActivityTask already completed as a result of another call. + // It is OK to drop the task at this point. + logging.LogDuplicateTaskEvent(e.logger, persistence.TransferTaskTypeActivityTask, + common.Int64Default(request.TaskId), requestID, scheduleID, emptyEventID, isRunning) - if ai.StartedID != emptyEventID { - // If activity is started as part of the current request scope then return a positive response - if ai.RequestID == requestID { - response := &h.RecordActivityTaskStartedResponse{} - startedEvent, exists := msBuilder.GetActivityStartedEvent(scheduleID) - if !exists { - return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} - } - response.ScheduledEvent = scheduledEvent - response.StartedEvent = startedEvent - return response, nil + return nil, ErrActivityTaskNotFound } - // Looks like ActivityTask already started as a result of another call. - // It is OK to drop the task at this point. - logging.LogDuplicateTaskEvent(context.logger, persistence.TransferTaskTypeActivityTask, common.Int64Default(request.TaskId), requestID, - scheduleID, ai.StartedID, isRunning) - - return nil, &h.EventAlreadyStartedError{Message: "Activity task already started."} - } + scheduledEvent, exists := msBuilder.GetActivityScheduledEvent(scheduleID) + if !exists { + return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} + } + response.ScheduledEvent = scheduledEvent + + if ai.StartedID != emptyEventID { + // If activity is started as part of the current request scope then return a positive response + if ai.RequestID == requestID { + startedEvent, exists := msBuilder.GetActivityStartedEvent(scheduleID) + if !exists { + return nil, &workflow.InternalServiceError{Message: "Corrupted workflow execution state."} + } + response.StartedEvent = startedEvent + return nil, nil + } - startedEvent := msBuilder.AddActivityTaskStartedEvent(ai, scheduleID, requestID, request.PollRequest) - if startedEvent == nil { - // Unable to add ActivityTaskStarted event to history - return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskStarted event to history."} - } + // Looks like ActivityTask already started as a result of another call. + // It is OK to drop the task at this point. + logging.LogDuplicateTaskEvent(e.logger, persistence.TransferTaskTypeActivityTask, + common.Int64Default(request.TaskId), requestID, scheduleID, ai.StartedID, isRunning) - // Start a timer for the activity task. - timerTasks := []persistence.Task{} - if tt := tBuilder.GetActivityTimerTaskIfNeeded(msBuilder); tt != nil { - timerTasks = append(timerTasks, tt) - } + return nil, &h.EventAlreadyStartedError{Message: "Activity task already started."} + } - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return nil, err2 - } + startedEvent := msBuilder.AddActivityTaskStartedEvent(ai, scheduleID, requestID, request.PollRequest) + if startedEvent == nil { + // Unable to add ActivityTaskStarted event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskStarted event to history."} + } + response.StartedEvent = startedEvent - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operationi again. - if err3 := context.updateWorkflowExecution(nil, timerTasks, transactionID); err3 != nil { - if err3 == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // Start a timer for the activity task. + timerTasks := []persistence.Task{} + if tt := tBuilder.GetActivityTimerTaskIfNeeded(msBuilder); tt != nil { + timerTasks = append(timerTasks, tt) } - return nil, err3 - } - defer e.timerProcessor.NotifyNewTimer(timerTasks) + return timerTasks, nil + }) - response := &h.RecordActivityTaskStartedResponse{} - response.ScheduledEvent = scheduledEvent - response.StartedEvent = startedEvent - return response, nil + if err != nil { + return nil, err } - return nil, ErrMaxAttemptsExceeded + return response, err } // RespondDecisionTaskCompleted completes a decision task @@ -730,7 +717,7 @@ func (e *historyEngineImpl) RespondDecisionTaskCompleted(ctx context.Context, re request := req.CompleteRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1203,7 +1190,7 @@ func (e *historyEngineImpl) RespondDecisionTaskFailed(req *h.RespondDecisionTask request := req.FailedRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1212,21 +1199,21 @@ func (e *historyEngineImpl) RespondDecisionTaskFailed(req *h.RespondDecisionTask } return e.updateWorkflowExecution(domainID, workflowExecution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } scheduleID := token.ScheduleID di, isRunning := msBuilder.GetPendingDecision(scheduleID) if !isRunning || di.Attempt != token.ScheduleAttempt || di.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Decision task not found."} + return nil, &workflow.EntityNotExistsError{Message: "Decision task not found."} } msBuilder.AddDecisionTaskFailedEvent(di.ScheduleID, di.StartedID, request.GetCause(), request.Details, request.GetIdentity()) - return nil + return nil, nil }) } @@ -1239,7 +1226,7 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(req *h.RespondActivityT request := req.CompleteRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1247,86 +1234,40 @@ func (e *historyEngineImpl) RespondActivityTaskCompleted(req *h.RespondActivityT RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - - startedID := ai.StartedID - if msBuilder.AddActivityTaskCompletedEvent(scheduleID, startedID, request) == nil { - // Unable to add ActivityTaskCompleted event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."} - } - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return err2 - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound } - return err - } - e.timerProcessor.NotifyNewTimer(timerTasks) - return nil - } + startedID := ai.StartedID + if msBuilder.AddActivityTaskCompletedEvent(scheduleID, startedID, request) == nil { + // Unable to add ActivityTaskCompleted event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."} + } - return ErrMaxAttemptsExceeded + return nil, nil + }) } // RespondActivityTaskFailed completes an activity task failure. @@ -1338,7 +1279,7 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(req *h.RespondActivityTask request := req.FailedRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1346,87 +1287,40 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(req *h.RespondActivityTask RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - - startedID := ai.StartedID - if msBuilder.AddActivityTaskFailedEvent(scheduleID, startedID, request) == nil { - // Unable to add ActivityTaskFailed event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskFailed event to history."} - } - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err3 := e.shard.GetNextTransferTaskID() - if err3 != nil { - return err3 - } - - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState } - return err - } - e.timerProcessor.NotifyNewTimer(timerTasks) + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound + } - return nil - } + startedID := ai.StartedID + if msBuilder.AddActivityTaskFailedEvent(scheduleID, startedID, request) == nil { + // Unable to add ActivityTaskFailed event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskFailed event to history."} + } - return ErrMaxAttemptsExceeded + return nil, nil + }) } // RespondActivityTaskCanceled completes an activity task failure. @@ -1438,7 +1332,7 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(req *h.RespondActivityTa request := req.CancelRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return &workflow.BadRequestError{Message: "Error deserializing task token."} + return ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1446,90 +1340,41 @@ func (e *historyEngineImpl) RespondActivityTaskCanceled(req *h.RespondActivityTa RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return err1 - } - - scheduleID := token.ScheduleID - if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID - scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) - if err0 != nil { - return err0 + return e.updateWorkflowExecution(domainID, workflowExecution, false, true, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + return nil, ErrWorkflowCompleted } - } - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } - - // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If - // task is not outstanding than it is most probably a duplicate and complete the task. - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Activity task not found."} - } - if msBuilder.AddActivityTaskCanceledEvent(scheduleID, ai.StartedID, ai.CancelRequestID, request.Details, - common.StringDefault(request.Identity)) == nil { - // Unable to add ActivityTaskCanceled event to history - return &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."} - } - - var transferTasks []persistence.Task - var timerTasks []persistence.Task - if !msBuilder.HasPendingDecisionTask() { - di := msBuilder.AddDecisionTaskScheduledEvent() - transferTasks = []persistence.Task{&persistence.DecisionTask{ - DomainID: domainID, - TaskList: di.Tasklist, - ScheduleID: di.ScheduleID, - }} - if msBuilder.isStickyTaskListEnabled() { - tBuilder := e.getTimerBuilder(&context.workflowExecution) - stickyTaskTimeoutTimer := tBuilder.AddScheduleToStartDecisionTimoutTask(di.ScheduleID, di.Attempt, - msBuilder.executionInfo.StickyScheduleToStartTimeout) - timerTasks = []persistence.Task{stickyTaskTimeoutTimer} + scheduleID := token.ScheduleID + if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID + scheduleID, err0 = getScheduleID(token.ActivityID, msBuilder) + if err0 != nil { + return nil, err0 + } } - } + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - // Generate a transaction ID for appending events to history - transactionID, err3 := e.shard.GetNextTransferTaskID() - if err3 != nil { - return err3 - } - - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(transferTasks, timerTasks, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState } - return err - } + if !isRunning || ai.StartedID == emptyEventID { + return nil, ErrActivityTaskNotFound + } - e.timerProcessor.NotifyNewTimer(timerTasks) + if msBuilder.AddActivityTaskCanceledEvent(scheduleID, ai.StartedID, ai.CancelRequestID, request.Details, + common.StringDefault(request.Identity)) == nil { + // Unable to add ActivityTaskCanceled event to history + return nil, &workflow.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."} + } - return nil - } + return nil, nil + }) - return ErrMaxAttemptsExceeded } // RecordActivityTaskHeartbeat records an hearbeat for a task. @@ -1545,7 +1390,7 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( request := req.HeartbeatRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { - return nil, &workflow.BadRequestError{Message: "Error deserializing task token."} + return nil, ErrDeserializingToken } workflowExecution := workflow.WorkflowExecution{ @@ -1553,66 +1398,46 @@ func (e *historyEngineImpl) RecordActivityTaskHeartbeat( RunId: common.StringPtr(token.RunID), } - context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, workflowExecution) - if err0 != nil { - return nil, err0 - } - defer release() - -Update_History_Loop: - for attempt := 0; attempt < conditionalRetryCount; attempt++ { - msBuilder, err1 := context.loadWorkflowExecution() - if err1 != nil { - return nil, err1 - } - - scheduleID := token.ScheduleID - ai, isRunning := msBuilder.GetActivityInfo(scheduleID) + var cancelRequested bool + err = e.updateWorkflowExecution(domainID, workflowExecution, false, false, + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { + if !msBuilder.isWorkflowExecutionRunning() { + e.logger.Errorf("Heartbeat failed ") + return nil, ErrWorkflowCompleted + } - // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in - // some extreme cassandra failure cases. - if !isRunning && scheduleID >= msBuilder.GetNextEventID() { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, metrics.StaleMutableStateCounter) - // Reload workflow execution history - context.clear() - continue Update_History_Loop - } + scheduleID := token.ScheduleID + ai, isRunning := msBuilder.GetActivityInfo(scheduleID) - if !msBuilder.isWorkflowExecutionRunning() || !isRunning || ai.StartedID == emptyEventID { - e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v", - scheduleID, ai, isRunning) - return nil, &workflow.EntityNotExistsError{Message: "Activity task not found."} - } + // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in + // some extreme cassandra failure cases. + if !isRunning && scheduleID >= msBuilder.GetNextEventID() { + e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, metrics.StaleMutableStateCounter) + return nil, ErrStaleState + } - cancelRequested := ai.CancelRequested + if !isRunning || ai.StartedID == emptyEventID { + e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, Exist: %v", scheduleID, ai, + isRunning) + return nil, ErrActivityTaskNotFound + } - e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", - scheduleID, ai, cancelRequested) + cancelRequested = ai.CancelRequested - // Save progress and last HB reported time. - msBuilder.updateActivityProgress(ai, request) + e.logger.Debugf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", + scheduleID, ai, cancelRequested) - // Generate a transaction ID for appending events to history - transactionID, err2 := e.shard.GetNextTransferTaskID() - if err2 != nil { - return nil, err2 - } + // Save progress and last HB reported time. + msBuilder.updateActivityProgress(ai, request) - // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload - // the history and try the operation again. - if err := context.updateWorkflowExecution(nil, nil, transactionID); err != nil { - if err == ErrConflict { - e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, - metrics.ConcurrencyUpdateFailureCounter) - continue Update_History_Loop - } + return nil, nil + }) - return nil, err - } - return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil + if err != nil { + return &workflow.RecordActivityTaskHeartbeatResponse{}, err } - return &workflow.RecordActivityTaskHeartbeatResponse{}, ErrMaxAttemptsExceeded + return &workflow.RecordActivityTaskHeartbeatResponse{CancelRequested: common.BoolPtr(cancelRequested)}, nil } // RequestCancelWorkflowExecution records request cancellation event for workflow execution @@ -1630,9 +1455,9 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( } return e.updateWorkflowExecution(domainID, workflowExecution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } isCancelRequested, cancelRequestID := msBuilder.isCancelRequested() @@ -1641,20 +1466,20 @@ func (e *historyEngineImpl) RequestCancelWorkflowExecution( if cancelRequest.RequestId != nil { requestID := *cancelRequest.RequestId if requestID != "" && cancelRequestID == requestID { - return nil + return nil, nil } } - return &workflow.CancellationAlreadyRequestedError{ + return nil, &workflow.CancellationAlreadyRequestedError{ Message: "Cancellation already requested for this workflow execution.", } } if msBuilder.AddWorkflowExecutionCancelRequestedEvent("", req) == nil { - return &workflow.InternalServiceError{Message: "Unable to cancel workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to cancel workflow execution."} } - return nil + return nil, nil }) } @@ -1670,24 +1495,24 @@ func (e *historyEngineImpl) SignalWorkflowExecution(signalRequest *h.SignalWorkf } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } // deduplicate by request id for signal decision if requestID := request.GetRequestId(); requestID != "" { if msBuilder.isSignalRequested(requestID) { - return nil + return nil, nil } msBuilder.addSignalRequested(requestID) } if msBuilder.AddWorkflowExecutionSignaled(request) == nil { - return &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to signal workflow execution."} } - return nil + return nil, nil }) } @@ -1703,14 +1528,14 @@ func (e *historyEngineImpl) RemoveSignalMutableState(request *h.RemoveSignalMuta } return e.updateWorkflowExecution(domainID, execution, false, false, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } msBuilder.deleteSignalRequested(request.GetRequestId()) - return nil + return nil, nil }) } @@ -1726,16 +1551,16 @@ func (e *historyEngineImpl) TerminateWorkflowExecution(terminateRequest *h.Termi } return e.updateWorkflowExecution(domainID, execution, true, false, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } if msBuilder.AddWorkflowExecutionTerminatedEvent(request) == nil { - return &workflow.InternalServiceError{Message: "Unable to terminate workflow execution."} + return nil, &workflow.InternalServiceError{Message: "Unable to terminate workflow execution."} } - return nil + return nil, nil }) } @@ -1751,14 +1576,14 @@ func (e *historyEngineImpl) ScheduleDecisionTask(scheduleRequest *h.ScheduleDeci } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } // Noop - return nil + return nil, nil }) } @@ -1774,9 +1599,9 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R } return e.updateWorkflowExecution(domainID, execution, false, true, - func(msBuilder *mutableStateBuilder) error { + func(msBuilder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error) { if !msBuilder.isWorkflowExecutionRunning() { - return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} + return nil, ErrWorkflowCompleted } initiatedID := *completionRequest.InitiatedId @@ -1786,7 +1611,7 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R // Check mutable state to make sure child execution is in pending child executions ci, isRunning := msBuilder.GetChildExecutionInfo(initiatedID) if !isRunning || ci.StartedID == emptyEventID { - return &workflow.EntityNotExistsError{Message: "Pending child execution not found."} + return nil, &workflow.EntityNotExistsError{Message: "Pending child execution not found."} } switch *completionEvent.EventType { @@ -1807,13 +1632,13 @@ func (e *historyEngineImpl) RecordChildExecutionCompleted(completionRequest *h.R msBuilder.AddChildWorkflowExecutionTimedOutEvent(initiatedID, completedExecution, attributes) } - return nil + return nil, nil }) } func (e *historyEngineImpl) updateWorkflowExecution(domainID string, execution workflow.WorkflowExecution, createDeletionTask, createDecisionTask bool, - action func(builder *mutableStateBuilder) error) error { + action func(builder *mutableStateBuilder, tBuilder *timerBuilder) ([]persistence.Task, error)) error { context, release, err0 := e.historyCache.getOrCreateWorkflowExecution(domainID, execution) if err0 != nil { @@ -1829,13 +1654,23 @@ Update_History_Loop: } tBuilder := e.getTimerBuilder(&context.workflowExecution) + var timerTasks []persistence.Task + var err error // conduct caller action - if err := action(msBuilder); err != nil { + if timerTasks, err = action(msBuilder, tBuilder); err != nil { + if err == ErrStaleState { + // Handler detected that cached workflow mutable could potentially be stale + // Reload workflow execution history + context.clear() + continue Update_History_Loop + } + + // Returned error back to the caller return err } var transferTasks []persistence.Task - var timerTasks []persistence.Task + if createDeletionTask { tranT, timerT, err := e.getDeleteWorkflowTasks(domainID, tBuilder) if err != nil { diff --git a/service/history/timerBuilder.go b/service/history/timerBuilder.go index 8459f1ac6af..64da27dd8ea 100644 --- a/service/history/timerBuilder.go +++ b/service/history/timerBuilder.go @@ -152,11 +152,6 @@ func (tb *timerBuilder) AddScheduleToStartDecisionTimoutTask(scheduleID, schedul return timeOutTask } -func (tb *timerBuilder) AddScheduleToStartActivityTimeout( - ai *persistence.ActivityInfo) *persistence.ActivityTimeoutTask { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeScheduleToStart, ai.ScheduleToStartTimeout, nil) -} - func (tb *timerBuilder) AddScheduleToCloseActivityTimeout( ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, error) { return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeScheduleToClose, ai.ScheduleToCloseTimeout, nil), nil @@ -167,17 +162,6 @@ func (tb *timerBuilder) AddStartToCloseActivityTimeout(ai *persistence.ActivityI return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeStartToClose, ai.StartToCloseTimeout, nil), nil } -func (tb *timerBuilder) AddHeartBeatActivityTimeout(ai *persistence.ActivityInfo) (*persistence.ActivityTimeoutTask, - error) { - // We want to create the timer starting from the last heart beat time stamp but - // avoid creating timers before the current timer frame. - targetTime := common.AddSecondsToBaseTime(ai.LastHeartBeatUpdatedTime.UnixNano(), int64(ai.HeartbeatTimeout)) - if targetTime > tb.timeSource.Now().UnixNano() { - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeHeartbeat, ai.HeartbeatTimeout, &ai.LastHeartBeatUpdatedTime), nil - } - return tb.AddActivityTimeoutTask(ai.ScheduleID, w.TimeoutTypeHeartbeat, ai.HeartbeatTimeout, nil), nil -} - // AddActivityTimeoutTask - Adds an activity timeout task. func (tb *timerBuilder) AddActivityTimeoutTask(scheduleID int64, timeoutType w.TimeoutType, fireTimeout int32, baseTime *time.Time) *persistence.ActivityTimeoutTask { @@ -257,7 +241,7 @@ func (tb *timerBuilder) GetActivityTimerTaskIfNeeded(msBuilder *mutableStateBuil td := tb.activityTimers[0] ai := tb.pendingActivityTimers[td.ActivityID] at := timerTask.(*persistence.ActivityTimeoutTask) - ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(w.TimeoutType(at.TimeoutType)) + ai.TimerTaskStatus = ai.TimerTaskStatus | getActivityTimerStatus(w.TimeoutType(at.TimeoutType)) msBuilder.UpdateActivity(ai) tb.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", @@ -287,6 +271,16 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { tb.activityTimers = make(timers, 0, len(msBuilder.pendingActivityInfoIDs)) for _, v := range msBuilder.pendingActivityInfoIDs { if v.ScheduleID != emptyEventID { + scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second) + td := &timerDetails{ + SequenceID: SequenceID{VisibilityTimestamp: scheduleToCloseExpiry}, + ActivityID: v.ScheduleID, + EventID: v.ScheduleID, + TimeoutSec: v.ScheduleToCloseTimeout, + TimeoutType: w.TimeoutTypeScheduleToClose, + TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0} + tb.activityTimers = append(tb.activityTimers, td) + if v.StartedID != emptyEventID { startToCloseExpiry := v.StartedTime.Add(time.Duration(v.StartToCloseTimeout) * time.Second) td := &timerDetails{ @@ -322,15 +316,6 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) { TimeoutType: w.TimeoutTypeScheduleToStart, TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToStart) != 0} tb.activityTimers = append(tb.activityTimers, td) - scheduleToCloseExpiry := v.ScheduledTime.Add(time.Duration(v.ScheduleToCloseTimeout) * time.Second) - td = &timerDetails{ - SequenceID: SequenceID{VisibilityTimestamp: scheduleToCloseExpiry}, - ActivityID: v.ScheduleID, - EventID: v.ScheduleID, - TimeoutSec: v.ScheduleToCloseTimeout, - TimeoutType: w.TimeoutTypeScheduleToClose, - TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedScheduleToClose) != 0} - tb.activityTimers = append(tb.activityTimers, td) } } } diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 015903525fb..a905161ab90 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -608,8 +608,8 @@ Update_History_Loop: } var timerTasks []persistence.Task - scheduleNewDecision := false updateHistory := false + createNewTimer := false ExpireActivityTimers: for _, td := range tBuilder.GetActivityTimers(msBuilder) { @@ -650,7 +650,7 @@ Update_History_Loop: t.metricsClient.IncCounter(metrics.TimerTaskActivityTimeoutScope, metrics.HeartbeatTimeoutCounter) t.logger.Debugf("Activity Heartbeat expired: %+v", *ai) - if msBuilder.AddActivityTaskTimedOutEvent(ai.ScheduleID, ai.StartedID, timeoutType, nil) == nil { + if msBuilder.AddActivityTaskTimedOutEvent(ai.ScheduleID, ai.StartedID, timeoutType, ai.Details) == nil { return errFailedToAddTimeoutEvent } updateHistory = true @@ -675,13 +675,14 @@ Update_History_Loop: // if current one is HB task and we need to create next HB task for the same. // NOTE: When record activity HB comes in we only update last heartbeat timestamp, this is the place // where we create next timer task based on that new updated timestamp. - if !td.TaskCreated || (isHeartBeatTask && td.ActivityID == scheduleID) { + if !td.TaskCreated || (isHeartBeatTask && td.EventID == scheduleID) { nextTask := tBuilder.createNewTask(td) timerTasks = []persistence.Task{nextTask} at := nextTask.(*persistence.ActivityTimeoutTask) - ai.TimerTaskStatus = ai.TimerTaskStatus & getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType)) + ai.TimerTaskStatus = ai.TimerTaskStatus | getActivityTimerStatus(workflow.TimeoutType(at.TimeoutType)) msBuilder.UpdateActivity(ai) + createNewTimer = true t.logger.Debugf("%s: Adding Activity Timeout: with timeout: %v sec, ExpiryTime: %s, TimeoutType: %v, EventID: %v", time.Now(), td.TimeoutSec, at.VisibilityTimestamp, td.TimeoutType.String(), at.EventID) @@ -692,10 +693,10 @@ Update_History_Loop: } } - if updateHistory { + if updateHistory || createNewTimer { // We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload // the history and try the operation again. - scheduleNewDecision = !msBuilder.HasPendingDecisionTask() + scheduleNewDecision := updateHistory && !msBuilder.HasPendingDecisionTask() err := t.updateWorkflowExecution(context, msBuilder, scheduleNewDecision, false, timerTasks, nil) if err != nil { if err == ErrConflict { From 1b5b27b85c1b9553746aa45fe8578b4987f5b361 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Thu, 25 Jan 2018 12:14:15 -0800 Subject: [PATCH 2/2] fix gofmt --- host/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/host/integration_test.go b/host/integration_test.go index 5e192d8a252..80a2100ed44 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -1076,7 +1076,7 @@ func (s *integrationSuite) TestActivityTimeouts() { TaskList: &workflow.TaskList{Name: common.StringPtr("NoWorker")}, Input: []byte("ScheduleToStart"), ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), - ScheduleToStartTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using ScheduleToStart + ScheduleToStartTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using ScheduleToStart StartToCloseTimeoutSeconds: common.Int32Ptr(30), HeartbeatTimeoutSeconds: common.Int32Ptr(0), }, @@ -1114,7 +1114,7 @@ func (s *integrationSuite) TestActivityTimeouts() { ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35), ScheduleToStartTimeoutSeconds: common.Int32Ptr(20), StartToCloseTimeoutSeconds: common.Int32Ptr(15), - HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID D is expected to timeout using Heartbeat + HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID D is expected to timeout using Heartbeat }, }}, nil } else if previousStartedEventID > 0 {