From aa9baa88016c3806a9e37b350ba79040bc76a074 Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 5 Dec 2017 10:56:02 -0800 Subject: [PATCH] Fix flaky TestChildWorkflowExecution --- host/integration_test.go | 85 ++++++++++++++++++-------------- service/history/historyEngine.go | 2 +- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/host/integration_test.go b/host/integration_test.go index 4e5bf251315..1b64b9aec2a 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -545,6 +545,7 @@ retry: return nil } + // process decision executionCtx, decisions, err := p.decisionHandler(response.WorkflowExecution, response.WorkflowType, common.Int64Default(response.PreviousStartedEventId), common.Int64Default(response.StartedEventId), response.History) if err != nil { @@ -2682,7 +2683,8 @@ func (s *integrationSuite) TestChildWorkflowExecution() { childID := "integration-child-workflow-child-test" wtParent := "integration-child-workflow-test-parent-type" wtChild := "integration-child-workflow-test-child-type" - tl := "integration-child-workflow-test-tasklist" + tlParent := "integration-child-workflow-test-parent-tasklist" + tlChild := "integration-child-workflow-test-child-tasklist" identity := "worker1" parentWorkflowType := &workflow.WorkflowType{} @@ -2691,15 +2693,17 @@ func (s *integrationSuite) TestChildWorkflowExecution() { childWorkflowType := &workflow.WorkflowType{} childWorkflowType.Name = common.StringPtr(wtChild) - taskList := &workflow.TaskList{} - taskList.Name = common.StringPtr(tl) + taskListParent := &workflow.TaskList{} + taskListParent.Name = common.StringPtr(tlParent) + taskListChild := &workflow.TaskList{} + taskListChild.Name = common.StringPtr(tlChild) request := &workflow.StartWorkflowExecutionRequest{ RequestId: common.StringPtr(uuid.New()), Domain: common.StringPtr(s.domainName), WorkflowId: common.StringPtr(parentID), WorkflowType: parentWorkflowType, - TaskList: taskList, + TaskList: taskListParent, Input: nil, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(100), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), @@ -2716,22 +2720,12 @@ func (s *integrationSuite) TestChildWorkflowExecution() { childExecutionStarted := false var startedEvent *workflow.HistoryEvent var completedEvent *workflow.HistoryEvent - dtHandler := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + + // Parent Decider Logic + dtHandlerParent := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { s.logger.Infof("Processing decision task for WorkflowID: %v", *execution.WorkflowId) - // Child Decider Logic - if *execution.WorkflowId == childID { - childComplete = true - return nil, []*workflow.Decision{{ - DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), - CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ - Result: []byte("Child Done."), - }, - }}, nil - } - - // Parent Decider Logic if *execution.WorkflowId == parentID { if !childExecutionStarted { s.logger.Info("Starting child execution.") @@ -2743,7 +2737,7 @@ func (s *integrationSuite) TestChildWorkflowExecution() { Domain: common.StringPtr(s.domainName), WorkflowId: common.StringPtr(childID), WorkflowType: childWorkflowType, - TaskList: taskList, + TaskList: taskListChild, Input: []byte("child-workflow-input"), ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(200), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(2), @@ -2775,46 +2769,63 @@ func (s *integrationSuite) TestChildWorkflowExecution() { return nil, nil, nil } - poller := &taskPoller{ + // Child Decider Logic + dtHandlerChild := func(execution *workflow.WorkflowExecution, wt *workflow.WorkflowType, + previousStartedEventID, startedEventID int64, history *workflow.History) ([]byte, []*workflow.Decision, error) { + + s.logger.Infof("Processing decision task for Child WorkflowID: %v", *execution.WorkflowId) + childComplete = true + return nil, []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeCompleteWorkflowExecution), + CompleteWorkflowExecutionDecisionAttributes: &workflow.CompleteWorkflowExecutionDecisionAttributes{ + Result: []byte("Child Done."), + }, + }}, nil + + return nil, nil, nil + } + + pollerParent := &taskPoller{ engine: s.engine, domain: s.domainName, - taskList: taskList, + taskList: taskListParent, identity: identity, - decisionHandler: dtHandler, + decisionHandler: dtHandlerParent, + logger: s.logger, + suite: s, + } + + pollerChild := &taskPoller{ + engine: s.engine, + domain: s.domainName, + taskList: taskListChild, + identity: identity, + decisionHandler: dtHandlerChild, logger: s.logger, suite: s, } // Make first decision to start child execution - err := poller.pollAndProcessDecisionTask(false, false) + err := pollerParent.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) s.Nil(err) s.True(childExecutionStarted) - // Process ChildExecution Started event and Process Child Execution and complete it - err = poller.pollAndProcessDecisionTask(false, false) + // Process ChildExecution Started event + err = pollerParent.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) s.Nil(err) - err = poller.pollAndProcessDecisionTask(false, false) + + // Process Child Execution and complete it + err = pollerChild.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) s.Nil(err) - s.NotNil(startedEvent) s.True(childComplete) // Process ChildExecution completed event and complete parent execution - err = poller.pollAndProcessDecisionTask(false, false) + err = pollerParent.pollAndProcessDecisionTask(false, false) s.logger.Infof("pollAndProcessDecisionTask: %v", err) - if err != nil { - s.logger.Info("Parent Workflow Execution History: ") - s.printWorkflowHistory(s.domainName, &workflow.WorkflowExecution{ - WorkflowId: common.StringPtr(parentID), - RunId: common.StringPtr(*we.RunId), - }) - s.logger.Info("Child Workflow Execution History: ") - s.printWorkflowHistory(s.domainName, - startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution) - } s.Nil(err) s.NotNil(completedEvent) completedAttributes := completedEvent.ChildWorkflowExecutionCompletedEventAttributes diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 6cc980e3704..8873e966797 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -192,7 +192,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(startRequest *h.StartWorkflow // DecisionTask is only created when it is not a Child Workflow Execution di := msBuilder.AddDecisionTaskScheduledEvent() if di == nil { - return nil, &workflow.InternalServiceError{Message: "Failed to add decision started event."} + return nil, &workflow.InternalServiceError{Message: "Failed to add decision scheduled event."} } transferTasks = []persistence.Task{&persistence.DecisionTask{