Skip to content

Commit

Permalink
Fix flaky TestChildWorkflowExecution (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Dec 7, 2017
1 parent 7ad6d5b commit 5a5e6ff
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 38 deletions.
85 changes: 48 additions & 37 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,7 @@ retry:
return nil
}

// process decision
executionCtx, decisions, err := p.decisionHandler(response.WorkflowExecution, response.WorkflowType,
common.Int64Default(response.PreviousStartedEventId), common.Int64Default(response.StartedEventId), response.History)
if err != nil {
Expand Down Expand Up @@ -2682,7 +2683,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
childID := "integration-child-workflow-child-test"
wtParent := "integration-child-workflow-test-parent-type"
wtChild := "integration-child-workflow-test-child-type"
tl := "integration-child-workflow-test-tasklist"
tlParent := "integration-child-workflow-test-parent-tasklist"
tlChild := "integration-child-workflow-test-child-tasklist"
identity := "worker1"

parentWorkflowType := &workflow.WorkflowType{}
Expand All @@ -2691,15 +2693,17 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
childWorkflowType := &workflow.WorkflowType{}
childWorkflowType.Name = common.StringPtr(wtChild)

taskList := &workflow.TaskList{}
taskList.Name = common.StringPtr(tl)
taskListParent := &workflow.TaskList{}
taskListParent.Name = common.StringPtr(tlParent)
taskListChild := &workflow.TaskList{}
taskListChild.Name = common.StringPtr(tlChild)

request := &workflow.StartWorkflowExecutionRequest{
RequestId: common.StringPtr(uuid.New()),
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(parentID),
WorkflowType: parentWorkflowType,
TaskList: taskList,
TaskList: taskListParent,
Input: nil,
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1),
Expand All @@ -2716,22 +2720,12 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
childExecutionStarted := false
var startedEvent *workflow.HistoryEvent
var completedEvent *workflow.HistoryEvent
dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,

// Parent Decider Logic
dtHandlerParent := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {
s.logger.Infof("Processing decision task for WorkflowID: %v", *execution.WorkflowId)

// Child Decider Logic
if *execution.WorkflowId == childID {
childComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Child Done."),
},
}}, nil
}

// Parent Decider Logic
if *execution.WorkflowId == parentID {
if !childExecutionStarted {
s.logger.Info("Starting child execution.")
Expand All @@ -2743,7 +2737,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
Domain: common.StringPtr(s.domainName),
WorkflowId: common.StringPtr(childID),
WorkflowType: childWorkflowType,
TaskList: taskList,
TaskList: taskListChild,
Input: []byte("child-workflow-input"),
ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200),
TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2),
Expand Down Expand Up @@ -2775,46 +2769,63 @@ func (s *integrationSuite) TestChildWorkflowExecution() {
return nil, nil, nil
}

poller := &taskPoller{
// Child Decider Logic
dtHandlerChild := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType,
previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) {

s.logger.Infof("Processing decision task for Child WorkflowID: %v", *execution.WorkflowId)
childComplete = true
return nil, []*workflow.Decision{{
DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution),
CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{
Result: []byte("Child Done."),
},
}}, nil

return nil, nil, nil
}

pollerParent := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskList,
taskList: taskListParent,
identity: identity,
decisionHandler: dtHandler,
decisionHandler: dtHandlerParent,
logger: s.logger,
suite: s,
}

pollerChild := &taskPoller{
engine: s.engine,
domain: s.domainName,
taskList: taskListChild,
identity: identity,
decisionHandler: dtHandlerChild,
logger: s.logger,
suite: s,
}

// Make first decision to start child execution
err := poller.pollAndProcessDecisionTask(false, false)
err := pollerParent.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
s.True(childExecutionStarted)

// Process ChildExecution Started event and Process Child Execution and complete it
err = poller.pollAndProcessDecisionTask(false, false)
// Process ChildExecution Started event
err = pollerParent.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)
err = poller.pollAndProcessDecisionTask(false, false)

// Process Child Execution and complete it
err = pollerChild.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
s.Nil(err)

s.NotNil(startedEvent)
s.True(childComplete)

// Process ChildExecution completed event and complete parent execution
err = poller.pollAndProcessDecisionTask(false, false)
err = pollerParent.pollAndProcessDecisionTask(false, false)
s.logger.Infof("pollAndProcessDecisionTask: %v", err)
if err != nil {
s.logger.Info("Parent Workflow Execution History: ")
s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{
WorkflowId: common.StringPtr(parentID),
RunId: common.StringPtr(*we.RunId),
})
s.logger.Info("Child Workflow Execution History: ")
s.printWorkflowHistory(s.domainName,
startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution)
}
s.Nil(err)
s.NotNil(completedEvent)
completedAttributes := completedEvent.ChildWorkflowExecutionCompletedEventAttributes
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow
// DecisionTask is only created when it is not a Child Workflow Execution
di := msBuilder.AddDecisionTaskScheduledEvent()
if di == nil {
return nil, &workflow.InternalServiceError{Message: "Failed to add decision started event."}
return nil, &workflow.InternalServiceError{Message: "Failed to add decision scheduled event."}
}

transferTasks = []persistence.Task{&persistence.DecisionTask{
Expand Down

0 comments on commit 5a5e6ff

Please sign in to comment.