Skip to content

Commit

Permalink
Add integration test for respondActvityTask...ByID (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Jan 26, 2018
1 parent 78c4b7a commit 9fab27a
Showing 1 changed file with 78 additions and 1 deletion.
79 changes: 78 additions & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,11 @@ func (s *integrationSuite) TestSequentialWorkflow() {
_, err := poller.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
err = poller.pollAndProcessActivityTask(false)
if i%2 == 0 {
err = poller.pollAndProcessActivityTask(false)
} else { // just for testing respondActivityTaskCompleteByID
err = poller.pollAndProcessActivityTaskWithID(false)
}
s.logger.Infof("pollAndProcessActivityTask: %v", err)
s.Nil(err)
}
Expand Down Expand Up @@ -693,6 +697,79 @@ retry:
return matching.ErrNoTasks
}

// Similar to pollAndProcessActivityTask but using RespondActivityTask...ByID
func (p *taskPoller) pollAndProcessActivityTaskWithID(dropTask bool) error {
retry:
for attempt := 0; attempt < 5; attempt++ {
response, err1 := p.engine.PollForActivityTask(createContext(), &workflow.PollForActivityTaskRequest{
Domain: common.StringPtr(p.domain),
TaskList: p.taskList,
Identity: common.StringPtr(p.identity),
})

if err1 == history.ErrDuplicate {
p.logger.Info("Duplicate Activity task: Polling again.")
continue retry
}

if err1 != nil {
return err1
}

if response == nil || len(response.TaskToken) == 0 {
p.logger.Info("Empty Activity task: Polling again.")
return nil
}

if response.GetActivityId() == "" {
p.logger.Info("Empty ActivityID")
return nil
}

if dropTask {
p.logger.Info("Dropping Activity task: ")
return nil
}
p.logger.Debugf("Received Activity task: %v", response)

result, cancel, err2 := p.activityHandler(response.WorkflowExecution, response.ActivityType, *response.ActivityId,
response.Input, response.TaskToken)
if cancel {
p.logger.Info("Executing RespondActivityTaskCanceled")
return p.engine.RespondActivityTaskCanceledByID(createContext(), &workflow.RespondActivityTaskCanceledByIDRequest{
Domain: common.StringPtr(p.domain),
WorkflowID: common.StringPtr(response.WorkflowExecution.GetWorkflowId()),
RunID: common.StringPtr(response.WorkflowExecution.GetRunId()),
ActivityID: common.StringPtr(response.GetActivityId()),
Details: []byte("details"),
Identity: common.StringPtr(p.identity),
})
}

if err2 != nil {
return p.engine.RespondActivityTaskFailedByID(createContext(), &workflow.RespondActivityTaskFailedByIDRequest{
Domain: common.StringPtr(p.domain),
WorkflowID: common.StringPtr(response.WorkflowExecution.GetWorkflowId()),
RunID: common.StringPtr(response.WorkflowExecution.GetRunId()),
ActivityID: common.StringPtr(response.GetActivityId()),
Reason: common.StringPtr(err2.Error()),
Identity: common.StringPtr(p.identity),
})
}

return p.engine.RespondActivityTaskCompletedByID(createContext(), &workflow.RespondActivityTaskCompletedByIDRequest{
Domain: common.StringPtr(p.domain),
WorkflowID: common.StringPtr(response.WorkflowExecution.GetWorkflowId()),
RunID: common.StringPtr(response.WorkflowExecution.GetRunId()),
ActivityID: common.StringPtr(response.GetActivityId()),
Identity: common.StringPtr(p.identity),
Result: result,
})
}

return matching.ErrNoTasks
}

func (s *integrationSuite) TestDecisionAndActivityTimeoutsWorkflow() {
id := "interation-timeouts-workflow-test"
wt := "interation-timeouts-workflow-test-type"
Expand Down

0 comments on commit 9fab27a

Please sign in to comment.