diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index b5c52d4c6a..9969eab6ce 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -653,6 +653,12 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) { // update parent's sub workflow task TaskModel subWorkflowTask = executionDAOFacade.getTaskModel(workflow.getParentWorkflowTaskId()); + if (subWorkflowTask.getWorkflowTask().isOptional()) { + // break out + LOGGER.info( + "Sub workflow task {} is optional, skip updating parents", subWorkflowTask); + break; + } subWorkflowTask.setSubworkflowChanged(true); subWorkflowTask.setStatus(IN_PROGRESS); executionDAOFacade.updateTask(subWorkflowTask); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java index 0ae000c589..eab4961c72 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Join.java @@ -39,6 +39,7 @@ public boolean execute( boolean allDone = true; boolean hasFailures = false; StringBuilder failureReason = new StringBuilder(); + StringBuilder optionalTaskFailures = new StringBuilder(); List joinOn = (List) task.getInputData().get("joinOn"); if (task.isLoopOverTask()) { // If join is part of loop over task, wait for specific iteration to get complete @@ -66,11 +67,26 @@ public boolean execute( if (hasFailures) { break; } + + // check for optional task failures + if (forkedTask.getWorkflowTask().isOptional() + && taskStatus == TaskModel.Status.COMPLETED_WITH_ERRORS) { + optionalTaskFailures + .append( + String.format( + "%s/%s", + forkedTask.getTaskDefName(), forkedTask.getTaskId())) + .append(" "); + } } - if (allDone || hasFailures) { + if (allDone || hasFailures || optionalTaskFailures.length() > 0) { if (hasFailures) { task.setReasonForIncompletion(failureReason.toString()); task.setStatus(TaskModel.Status.FAILED); + } else if (optionalTaskFailures.length() > 0) { + task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); + optionalTaskFailures.append("completed with errors"); + task.setReasonForIncompletion(optionalTaskFailures.toString()); } else { task.setStatus(TaskModel.Status.COMPLETED); } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index 9616c845c1..8512fef3a5 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -515,7 +515,9 @@ public void testOptionalWithDynamicFork() { assertEquals(TaskModel.Status.IN_PROGRESS, outcome.tasksToBeScheduled.get(0).getStatus()); new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null); - assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus()); + assertEquals( + TaskModel.Status.COMPLETED_WITH_ERRORS, + outcome.tasksToBeScheduled.get(0).getStatus()); } @Test diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 0707811a99..b8633be4bb 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -1356,6 +1356,7 @@ public void testRerunSubWorkflow() { task.setOutputData(new HashMap<>()); task.setSubWorkflowId(subWorkflowId); task.setTaskType(TaskType.SUB_WORKFLOW.name()); + task.setWorkflowTask(new WorkflowTask()); WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId(parentWorkflowId); @@ -1565,6 +1566,7 @@ public void testRerunSubWorkflowWithTaskId() { task.setOutputData(new HashMap<>()); task.setSubWorkflowId(subWorkflowId); task.setTaskType(TaskType.SUB_WORKFLOW.name()); + task.setWorkflowTask(new WorkflowTask()); WorkflowModel workflow = new WorkflowModel(); workflow.setWorkflowId(parentWorkflowId); @@ -2097,6 +2099,260 @@ public void testTerminateWorkflowWithFailureWorkflow() { argumentCaptor.getAllValues().get(0).getInput().get("failureTaskId")); } + @Test + public void testRerunOptionalSubWorkflow() { + IDGenerator idGenerator = new IDGenerator(); + // setup + String parentWorkflowId = idGenerator.generate(); + String subWorkflowId = idGenerator.generate(); + + // sub workflow setup + TaskModel task1 = new TaskModel(); + task1.setTaskType(TaskType.SIMPLE.name()); + task1.setTaskDefName("task1"); + task1.setReferenceTaskName("task1_ref"); + task1.setWorkflowInstanceId(subWorkflowId); + task1.setScheduledTime(System.currentTimeMillis()); + task1.setTaskId(idGenerator.generate()); + task1.setStatus(TaskModel.Status.COMPLETED); + task1.setWorkflowTask(new WorkflowTask()); + task1.setOutputData(new HashMap<>()); + + TaskModel task2 = new TaskModel(); + task2.setTaskType(TaskType.SIMPLE.name()); + task2.setTaskDefName("task2"); + task2.setReferenceTaskName("task2_ref"); + task2.setWorkflowInstanceId(subWorkflowId); + task2.setScheduledTime(System.currentTimeMillis()); + task2.setTaskId(idGenerator.generate()); + task2.setStatus(TaskModel.Status.FAILED); + task2.setWorkflowTask(new WorkflowTask()); + task2.setOutputData(new HashMap<>()); + + WorkflowModel subWorkflow = new WorkflowModel(); + subWorkflow.setParentWorkflowId(parentWorkflowId); + subWorkflow.setWorkflowId(subWorkflowId); + WorkflowDef subworkflowDef = new WorkflowDef(); + subworkflowDef.setName("subworkflow"); + subworkflowDef.setVersion(1); + subWorkflow.setWorkflowDefinition(subworkflowDef); + subWorkflow.setOwnerApp("junit_testRerunWorkflowId"); + subWorkflow.setStatus(WorkflowModel.Status.FAILED); + subWorkflow.getTasks().addAll(Arrays.asList(task1, task2)); + + // parent workflow setup + TaskModel task = new TaskModel(); + task.setWorkflowInstanceId(parentWorkflowId); + task.setScheduledTime(System.currentTimeMillis()); + task.setTaskId(idGenerator.generate()); + task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); + task.setOutputData(new HashMap<>()); + task.setSubWorkflowId(subWorkflowId); + task.setTaskType(TaskType.SUB_WORKFLOW.name()); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setOptional(true); + task.setWorkflowTask(workflowTask); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId(parentWorkflowId); + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("parentworkflow"); + workflowDef.setVersion(1); + workflow.setWorkflowDefinition(workflowDef); + workflow.setOwnerApp("junit_testRerunWorkflowId"); + workflow.setStatus(WorkflowModel.Status.COMPLETED); + workflow.getTasks().addAll(Arrays.asList(task)); + // end of setup + + // when: + when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true)) + .thenReturn(workflow); + when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true)) + .thenReturn(subWorkflow); + when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId())) + .thenReturn(task); + when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false)) + .thenReturn(workflow); + + RerunWorkflowRequest rerunWorkflowRequest = new RerunWorkflowRequest(); + rerunWorkflowRequest.setReRunFromWorkflowId(subWorkflow.getWorkflowId()); + workflowExecutor.rerun(rerunWorkflowRequest); + + // then: parent workflow remains the same + assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus()); + assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus()); + assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus()); + assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus()); + } + + @Test + public void testRestartOptionalSubWorkflow() { + IDGenerator idGenerator = new IDGenerator(); + // setup + String parentWorkflowId = idGenerator.generate(); + String subWorkflowId = idGenerator.generate(); + + // sub workflow setup + TaskModel task1 = new TaskModel(); + task1.setTaskType(TaskType.SIMPLE.name()); + task1.setTaskDefName("task1"); + task1.setReferenceTaskName("task1_ref"); + task1.setWorkflowInstanceId(subWorkflowId); + task1.setScheduledTime(System.currentTimeMillis()); + task1.setTaskId(idGenerator.generate()); + task1.setStatus(TaskModel.Status.COMPLETED); + task1.setWorkflowTask(new WorkflowTask()); + task1.setOutputData(new HashMap<>()); + + TaskModel task2 = new TaskModel(); + task2.setTaskType(TaskType.SIMPLE.name()); + task2.setTaskDefName("task2"); + task2.setReferenceTaskName("task2_ref"); + task2.setWorkflowInstanceId(subWorkflowId); + task2.setScheduledTime(System.currentTimeMillis()); + task2.setTaskId(idGenerator.generate()); + task2.setStatus(TaskModel.Status.FAILED); + task2.setWorkflowTask(new WorkflowTask()); + task2.setOutputData(new HashMap<>()); + + WorkflowModel subWorkflow = new WorkflowModel(); + subWorkflow.setParentWorkflowId(parentWorkflowId); + subWorkflow.setWorkflowId(subWorkflowId); + WorkflowDef subworkflowDef = new WorkflowDef(); + subworkflowDef.setName("subworkflow"); + subworkflowDef.setVersion(1); + subWorkflow.setWorkflowDefinition(subworkflowDef); + subWorkflow.setOwnerApp("junit_testRerunWorkflowId"); + subWorkflow.setStatus(WorkflowModel.Status.FAILED); + subWorkflow.getTasks().addAll(Arrays.asList(task1, task2)); + + // parent workflow setup + TaskModel task = new TaskModel(); + task.setWorkflowInstanceId(parentWorkflowId); + task.setScheduledTime(System.currentTimeMillis()); + task.setTaskId(idGenerator.generate()); + task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); + task.setOutputData(new HashMap<>()); + task.setSubWorkflowId(subWorkflowId); + task.setTaskType(TaskType.SUB_WORKFLOW.name()); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setOptional(true); + task.setWorkflowTask(workflowTask); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId(parentWorkflowId); + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("parentworkflow"); + workflowDef.setVersion(1); + workflow.setWorkflowDefinition(workflowDef); + workflow.setOwnerApp("junit_testRerunWorkflowId"); + workflow.setStatus(WorkflowModel.Status.COMPLETED); + workflow.getTasks().addAll(Arrays.asList(task)); + // end of setup + + // when: + when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true)) + .thenReturn(workflow); + when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true)) + .thenReturn(subWorkflow); + when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId())) + .thenReturn(task); + when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false)) + .thenReturn(workflow); + + workflowExecutor.restart(subWorkflowId, false); + + // then: parent workflow remains the same + assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus()); + assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus()); + assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus()); + assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus()); + } + + @Test + public void testRetryOptionalSubWorkflow() { + IDGenerator idGenerator = new IDGenerator(); + // setup + String parentWorkflowId = idGenerator.generate(); + String subWorkflowId = idGenerator.generate(); + + // sub workflow setup + TaskModel task1 = new TaskModel(); + task1.setTaskType(TaskType.SIMPLE.name()); + task1.setTaskDefName("task1"); + task1.setReferenceTaskName("task1_ref"); + task1.setWorkflowInstanceId(subWorkflowId); + task1.setScheduledTime(System.currentTimeMillis()); + task1.setTaskId(idGenerator.generate()); + task1.setStatus(TaskModel.Status.COMPLETED); + task1.setWorkflowTask(new WorkflowTask()); + task1.setOutputData(new HashMap<>()); + + TaskModel task2 = new TaskModel(); + task2.setTaskType(TaskType.SIMPLE.name()); + task2.setTaskDefName("task2"); + task2.setReferenceTaskName("task2_ref"); + task2.setWorkflowInstanceId(subWorkflowId); + task2.setScheduledTime(System.currentTimeMillis()); + task2.setTaskId(idGenerator.generate()); + task2.setStatus(TaskModel.Status.FAILED); + task2.setWorkflowTask(new WorkflowTask()); + task2.setOutputData(new HashMap<>()); + + WorkflowModel subWorkflow = new WorkflowModel(); + subWorkflow.setParentWorkflowId(parentWorkflowId); + subWorkflow.setWorkflowId(subWorkflowId); + WorkflowDef subworkflowDef = new WorkflowDef(); + subworkflowDef.setName("subworkflow"); + subworkflowDef.setVersion(1); + subWorkflow.setWorkflowDefinition(subworkflowDef); + subWorkflow.setOwnerApp("junit_testRerunWorkflowId"); + subWorkflow.setStatus(WorkflowModel.Status.FAILED); + subWorkflow.getTasks().addAll(Arrays.asList(task1, task2)); + + // parent workflow setup + TaskModel task = new TaskModel(); + task.setWorkflowInstanceId(parentWorkflowId); + task.setScheduledTime(System.currentTimeMillis()); + task.setTaskId(idGenerator.generate()); + task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS); + task.setOutputData(new HashMap<>()); + task.setSubWorkflowId(subWorkflowId); + task.setTaskType(TaskType.SUB_WORKFLOW.name()); + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setOptional(true); + task.setWorkflowTask(workflowTask); + + WorkflowModel workflow = new WorkflowModel(); + workflow.setWorkflowId(parentWorkflowId); + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("parentworkflow"); + workflowDef.setVersion(1); + workflow.setWorkflowDefinition(workflowDef); + workflow.setOwnerApp("junit_testRerunWorkflowId"); + workflow.setStatus(WorkflowModel.Status.COMPLETED); + workflow.getTasks().addAll(Arrays.asList(task)); + // end of setup + + // when: + when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true)) + .thenReturn(workflow); + when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true)) + .thenReturn(subWorkflow); + when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId())) + .thenReturn(task); + when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false)) + .thenReturn(workflow); + + workflowExecutor.retry(subWorkflowId, true); + + // then: parent workflow remains the same + assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus()); + assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus()); + assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus()); + assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus()); + } + private WorkflowModel generateSampleWorkflow() { // setup WorkflowModel workflow = new WorkflowModel(); diff --git a/docs/docs/reference-docs/sub-workflow-task.md b/docs/docs/reference-docs/sub-workflow-task.md index fc7a987b30..359354995f 100644 --- a/docs/docs/reference-docs/sub-workflow-task.md +++ b/docs/docs/reference-docs/sub-workflow-task.md @@ -173,3 +173,7 @@ Looking at the subworkflow (the WEBP version): The ```subWorkflowParam``` tells conductor which workflow to call. The task is marked as completed upon the completion of the spawned workflow. If the sub-workflow is terminated or fails the task is marked as failure and retried if configured. + +### Optional Sub Workflow Task +If the Sub Workflow task is defined as optional in the parent workflow task definition, the parent workflow task will not be retried if sub-workflow is terminated or failed. +In addition, even if the sub-workflow is retried/rerun/restarted after reaching to a terminal status, the parent workflow task status will remain as it is. diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy index 4cdf011fb7..2351badb2c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/ForkJoinSpec.groovy @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.metadata.tasks.TaskDef +import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.SubWorkflow import com.netflix.conductor.test.base.AbstractSpecification @@ -821,7 +822,7 @@ class ForkJoinSpec extends AbstractSpecification { } sweep(workflowInstanceId) - and: "verify that the workflow is in a COMPLETED state" + and: "verify that the workflow is in a COMPLETED state and the join task is also marked as COMPLETED_WITH_ERRORS" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 4 @@ -832,7 +833,34 @@ class ForkJoinSpec extends AbstractSpecification { tasks[2].taskType == 'SUB_WORKFLOW' tasks[2].status == Task.Status.COMPLETED_WITH_ERRORS tasks[3].taskType == 'JOIN' - tasks[3].status == Task.Status.COMPLETED + tasks[3].status == Task.Status.COMPLETED_WITH_ERRORS + } + + when: "do a rerun on the sub workflow" + def reRunSubWorkflowRequest = new RerunWorkflowRequest() + reRunSubWorkflowRequest.reRunFromWorkflowId = subWorkflowInstanceId1 + workflowExecutor.rerun(reRunSubWorkflowRequest) + + then: "verify that the sub workflows are in a RUNNING state" + with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId1, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].status == Task.Status.SCHEDULED + tasks[0].taskType == 'simple_task_in_sub_wf' + } + + and: "parent workflow remains the same" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 4 + tasks[0].taskType == 'FORK' + tasks[0].status == Task.Status.COMPLETED + tasks[1].taskType == 'SUB_WORKFLOW' + tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[2].taskType == 'SUB_WORKFLOW' + tasks[2].status == Task.Status.COMPLETED_WITH_ERRORS + tasks[3].taskType == 'JOIN' + tasks[3].status == Task.Status.COMPLETED_WITH_ERRORS } }