Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: Recreate activity heartbeat timeout after first timer fire #658

Merged
merged 1 commit into from
Apr 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,183 @@ func (s *integrationSuite) TestActivityTimeouts() {
s.True(workflowComplete)
}

func (s *integrationSuite) TestActivityHeartbeatTimeouts() {
id := "integration-activity-heartbeat-timeout-test"
wt := "integration-activity-heartbeat-timeout-test-type"
tl := "integration-activity-heartbeat-timeout-test-tasklist"
identity := "worker1"
activityName := "timeout_activity"

workflowType := &workflow.WorkflowType{}
workflowType.Name = common.StringPtr(wt)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(id),
WorkflowType: workflowType,
TaskList: taskList,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(300),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Identity: common.StringPtr(identity),
}

we, err0 := s.engine.StartWorkflowExecution(createContext(), request)
s.Nil(err0)

s.logger.Infof("StartWorkflowExecution: response: %v \n", *we.RunId)

workflowComplete := false
activitiesScheduled := false
activitiesMap := map[int64]*workflow.HistoryEvent{}
failWorkflow := false
failReason := ""
var activityATimedout bool
var activityALastHeartbeat int
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
if !activitiesScheduled {
activitiesScheduled = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask),
ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{
ActivityId: common.StringPtr("A"),
ActivityType: &workflow.ActivityType{Name: common.StringPtr(activityName)},
TaskList: &workflow.TaskList{Name: &tl},
Input: []byte("Heartbeat"),
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(35),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(20),
StartToCloseTimeoutSeconds: common.Int32Ptr(15),
HeartbeatTimeoutSeconds: common.Int32Ptr(3), // ActivityID A is expected to timeout using Heartbeat
},
}}, nil
} else if previousStartedEventID > 0 {
for _, event := range history.Events[previousStartedEventID:] {
if event.GetEventType() == workflow.EventTypeActivityTaskScheduled {
activitiesMap[event.GetEventId()] = event
}

if event.GetEventType() == workflow.EventTypeActivityTaskTimedOut {
timeoutEvent := event.ActivityTaskTimedOutEventAttributes
scheduledEvent, ok := activitiesMap[timeoutEvent.GetScheduledEventId()]
if !ok {
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr("ScheduledEvent not found."),
},
}}, nil
}

switch timeoutEvent.GetTimeoutType() {
case workflow.TimeoutTypeHeartbeat:
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetActivityId() == "A" {
activityATimedout = true
activityALastHeartbeat, _ = strconv.Atoi(string(timeoutEvent.Details))
} else {
failWorkflow = true
failReason = "ActivityID A is expected to timeout with Heartbeat"
}
}
}
}
}

if failWorkflow {
s.logger.Errorf("Failing workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeFailWorkflowExecution),
FailWorkflowExecutionDecisionAttributes: &workflow.FailWorkflowExecutionDecisionAttributes{
Reason: common.StringPtr(failReason),
},
}}, nil
}

if activityATimedout {
s.logger.Info("Completing Workflow.")
workflowComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Done."),
},
}}, nil
}

return nil, []*workflow.Decision{}, nil
}

atHandler := func(execution *workflow.WorkflowExecution, activityType *workflow.ActivityType,
activityID string, input []byte, taskToken []byte) ([]byte, bool, error) {
s.Equal(id, *execution.WorkflowId)
s.Equal(activityName, *activityType.Name)
timeoutType := string(input)
switch timeoutType {
case "Heartbeat":
s.logger.Info("Starting hearbeat activity.")
go func() {
for i := 0; i < 6; i++ {
s.logger.Infof("Heartbeating for activity: %s, count: %d", activityID, i)
_, err := s.engine.RecordActivityTaskHeartbeat(createContext(), &workflow.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken, Details: []byte(strconv.Itoa(i))})
s.Nil(err)
time.Sleep(1 * time.Second)
}
s.logger.Info("End Heartbeating.")
}()
s.logger.Info("Sleeping hearbeat activity.")
time.Sleep(12 * time.Second)
}

return []byte("Activity Result."), false, nil
}

poller := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
identity: identity,
decisionHandler: dtHandler,
activityHandler: atHandler,
logger: s.logger,
suite: s,
}

_, err := poller.pollAndProcessDecisionTask(false, false)
s.True(err == nil || err == matching.ErrNoTasks)

for i := 0; i < 1; i++ {
go func() {
err = poller.pollAndProcessActivityTask(false)
s.logger.Infof("Activity Processing Completed. Error: %v", err)
}()
}

s.logger.Infof("Waiting for workflow to complete: RunId: %v", *we.RunId)
for i := 0; i < 10; i++ {
s.logger.Infof("Processing decision task: %v", i)
_, err := poller.pollAndProcessDecisionTask(false, false)
s.Nil(err, "Poll for decision task failed.")

if workflowComplete {
break
}
}

s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(id),
RunId: common.StringPtr(we.GetRunId()),
})
s.True(workflowComplete)
s.True(activityATimedout)
s.Equal(5, activityALastHeartbeat)
}

func (s *integrationSuite) TestSequential_UserTimers() {
id := "interation-sequential-user-timers-test"
wt := "interation-sequential-user-timers-test-type"
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerBuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
td := &timerDetails{
TimerSequenceID: TimerSequenceID{VisibilityTimestamp: startToCloseExpiry},
ActivityID: v.ScheduleID,
EventID: v.StartedID,
EventID: v.ScheduleID,
TimeoutType: w.TimeoutTypeStartToClose,
TimeoutSec: v.StartToCloseTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedStartToClose) != 0}
Expand All @@ -303,7 +303,7 @@ func (tb *timerBuilder) loadActivityTimers(msBuilder *mutableStateBuilder) {
td := &timerDetails{
TimerSequenceID: TimerSequenceID{VisibilityTimestamp: heartBeatExpiry},
ActivityID: v.ScheduleID,
EventID: v.StartedID,
EventID: v.ScheduleID,
TimeoutType: w.TimeoutTypeHeartbeat,
TimeoutSec: v.HeartbeatTimeout,
TaskCreated: (v.TimerTaskStatus & TimerTaskStatusCreatedHeartbeat) != 0}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is after first heartbeat timer is created, the bit for heartbeat timer will be set, and after that this TaskCreated will always be true so firstActivityTimerTask() will not return the timer task and it won't be created.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually have some doubt:
here: https://github.com/uber/cadence/pull/658/files#diff-b3d25d1c01ad6e7393fd3958caea1333R287
the start event ID is actually checked
so when the timer detail is created (by loading activity into memory), we are sure that the start event ID is a valid event ID

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this fix works.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the startedID is buffered event id which changed to real start event after it is flushed.

Expand Down