Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Optional subworkflow task #2974

Merged
merged 6 commits into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,12 @@ private void updateAndPushParents(WorkflowModel workflow, String operation) {
// update parent's sub workflow task
TaskModel subWorkflowTask =
executionDAOFacade.getTaskModel(workflow.getParentWorkflowTaskId());
if (subWorkflowTask.getWorkflowTask().isOptional()) {
// break out
LOGGER.info(
"Sub workflow task {} is optional, skip updating parents", subWorkflowTask);
break;
}
subWorkflowTask.setSubworkflowChanged(true);
subWorkflowTask.setStatus(IN_PROGRESS);
executionDAOFacade.updateTask(subWorkflowTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public boolean execute(
boolean allDone = true;
boolean hasFailures = false;
StringBuilder failureReason = new StringBuilder();
StringBuilder optionalTaskFailures = new StringBuilder();
List<String> joinOn = (List<String>) task.getInputData().get("joinOn");
if (task.isLoopOverTask()) {
// If join is part of loop over task, wait for specific iteration to get complete
Expand Down Expand Up @@ -66,11 +67,26 @@ public boolean execute(
if (hasFailures) {
break;
}

// check for optional task failures
if (forkedTask.getWorkflowTask().isOptional()
&& taskStatus == TaskModel.Status.COMPLETED_WITH_ERRORS) {
optionalTaskFailures
.append(
String.format(
"%s/%s",
forkedTask.getTaskDefName(), forkedTask.getTaskId()))
.append(" ");
}
}
if (allDone || hasFailures) {
if (allDone || hasFailures || optionalTaskFailures.length() > 0) {
if (hasFailures) {
task.setReasonForIncompletion(failureReason.toString());
task.setStatus(TaskModel.Status.FAILED);
} else if (optionalTaskFailures.length() > 0) {
task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS);
optionalTaskFailures.append("completed with errors");
apanicker-nflx marked this conversation as resolved.
Show resolved Hide resolved
task.setReasonForIncompletion(optionalTaskFailures.toString());
} else {
task.setStatus(TaskModel.Status.COMPLETED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,9 @@ public void testOptionalWithDynamicFork() {

assertEquals(TaskModel.Status.IN_PROGRESS, outcome.tasksToBeScheduled.get(0).getStatus());
new Join().execute(workflow, outcome.tasksToBeScheduled.get(0), null);
assertEquals(TaskModel.Status.COMPLETED, outcome.tasksToBeScheduled.get(0).getStatus());
assertEquals(
TaskModel.Status.COMPLETED_WITH_ERRORS,
outcome.tasksToBeScheduled.get(0).getStatus());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,7 @@ public void testRerunSubWorkflow() {
task.setOutputData(new HashMap<>());
task.setSubWorkflowId(subWorkflowId);
task.setTaskType(TaskType.SUB_WORKFLOW.name());
task.setWorkflowTask(new WorkflowTask());

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(parentWorkflowId);
Expand Down Expand Up @@ -1565,6 +1566,7 @@ public void testRerunSubWorkflowWithTaskId() {
task.setOutputData(new HashMap<>());
task.setSubWorkflowId(subWorkflowId);
task.setTaskType(TaskType.SUB_WORKFLOW.name());
task.setWorkflowTask(new WorkflowTask());

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(parentWorkflowId);
Expand Down Expand Up @@ -2097,6 +2099,260 @@ public void testTerminateWorkflowWithFailureWorkflow() {
argumentCaptor.getAllValues().get(0).getInput().get("failureTaskId"));
}

@Test
public void testRerunOptionalSubWorkflow() {
IDGenerator idGenerator = new IDGenerator();
// setup
String parentWorkflowId = idGenerator.generate();
String subWorkflowId = idGenerator.generate();

// sub workflow setup
TaskModel task1 = new TaskModel();
task1.setTaskType(TaskType.SIMPLE.name());
task1.setTaskDefName("task1");
task1.setReferenceTaskName("task1_ref");
task1.setWorkflowInstanceId(subWorkflowId);
task1.setScheduledTime(System.currentTimeMillis());
task1.setTaskId(idGenerator.generate());
task1.setStatus(TaskModel.Status.COMPLETED);
task1.setWorkflowTask(new WorkflowTask());
task1.setOutputData(new HashMap<>());

TaskModel task2 = new TaskModel();
task2.setTaskType(TaskType.SIMPLE.name());
task2.setTaskDefName("task2");
task2.setReferenceTaskName("task2_ref");
task2.setWorkflowInstanceId(subWorkflowId);
task2.setScheduledTime(System.currentTimeMillis());
task2.setTaskId(idGenerator.generate());
task2.setStatus(TaskModel.Status.FAILED);
task2.setWorkflowTask(new WorkflowTask());
task2.setOutputData(new HashMap<>());

WorkflowModel subWorkflow = new WorkflowModel();
subWorkflow.setParentWorkflowId(parentWorkflowId);
subWorkflow.setWorkflowId(subWorkflowId);
WorkflowDef subworkflowDef = new WorkflowDef();
subworkflowDef.setName("subworkflow");
subworkflowDef.setVersion(1);
subWorkflow.setWorkflowDefinition(subworkflowDef);
subWorkflow.setOwnerApp("junit_testRerunWorkflowId");
subWorkflow.setStatus(WorkflowModel.Status.FAILED);
subWorkflow.getTasks().addAll(Arrays.asList(task1, task2));

// parent workflow setup
TaskModel task = new TaskModel();
task.setWorkflowInstanceId(parentWorkflowId);
task.setScheduledTime(System.currentTimeMillis());
task.setTaskId(idGenerator.generate());
task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS);
task.setOutputData(new HashMap<>());
task.setSubWorkflowId(subWorkflowId);
task.setTaskType(TaskType.SUB_WORKFLOW.name());
WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setOptional(true);
task.setWorkflowTask(workflowTask);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(parentWorkflowId);
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("parentworkflow");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRerunWorkflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);
workflow.getTasks().addAll(Arrays.asList(task));
// end of setup

// when:
when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true))
.thenReturn(workflow);
when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true))
.thenReturn(subWorkflow);
when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId()))
.thenReturn(task);
when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false))
.thenReturn(workflow);

RerunWorkflowRequest rerunWorkflowRequest = new RerunWorkflowRequest();
rerunWorkflowRequest.setReRunFromWorkflowId(subWorkflow.getWorkflowId());
workflowExecutor.rerun(rerunWorkflowRequest);

// then: parent workflow remains the same
assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus());
assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus());
assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus());
assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus());
}

@Test
public void testRestartOptionalSubWorkflow() {
IDGenerator idGenerator = new IDGenerator();
// setup
String parentWorkflowId = idGenerator.generate();
String subWorkflowId = idGenerator.generate();

// sub workflow setup
TaskModel task1 = new TaskModel();
task1.setTaskType(TaskType.SIMPLE.name());
task1.setTaskDefName("task1");
task1.setReferenceTaskName("task1_ref");
task1.setWorkflowInstanceId(subWorkflowId);
task1.setScheduledTime(System.currentTimeMillis());
task1.setTaskId(idGenerator.generate());
task1.setStatus(TaskModel.Status.COMPLETED);
task1.setWorkflowTask(new WorkflowTask());
task1.setOutputData(new HashMap<>());

TaskModel task2 = new TaskModel();
task2.setTaskType(TaskType.SIMPLE.name());
task2.setTaskDefName("task2");
task2.setReferenceTaskName("task2_ref");
task2.setWorkflowInstanceId(subWorkflowId);
task2.setScheduledTime(System.currentTimeMillis());
task2.setTaskId(idGenerator.generate());
task2.setStatus(TaskModel.Status.FAILED);
task2.setWorkflowTask(new WorkflowTask());
task2.setOutputData(new HashMap<>());

WorkflowModel subWorkflow = new WorkflowModel();
subWorkflow.setParentWorkflowId(parentWorkflowId);
subWorkflow.setWorkflowId(subWorkflowId);
WorkflowDef subworkflowDef = new WorkflowDef();
subworkflowDef.setName("subworkflow");
subworkflowDef.setVersion(1);
subWorkflow.setWorkflowDefinition(subworkflowDef);
subWorkflow.setOwnerApp("junit_testRerunWorkflowId");
subWorkflow.setStatus(WorkflowModel.Status.FAILED);
subWorkflow.getTasks().addAll(Arrays.asList(task1, task2));

// parent workflow setup
TaskModel task = new TaskModel();
task.setWorkflowInstanceId(parentWorkflowId);
task.setScheduledTime(System.currentTimeMillis());
task.setTaskId(idGenerator.generate());
task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS);
task.setOutputData(new HashMap<>());
task.setSubWorkflowId(subWorkflowId);
task.setTaskType(TaskType.SUB_WORKFLOW.name());
WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setOptional(true);
task.setWorkflowTask(workflowTask);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(parentWorkflowId);
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("parentworkflow");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRerunWorkflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);
workflow.getTasks().addAll(Arrays.asList(task));
// end of setup

// when:
when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true))
.thenReturn(workflow);
when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true))
.thenReturn(subWorkflow);
when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId()))
.thenReturn(task);
when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false))
.thenReturn(workflow);

workflowExecutor.restart(subWorkflowId, false);

// then: parent workflow remains the same
assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus());
assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus());
assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus());
assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus());
}

@Test
public void testRetryOptionalSubWorkflow() {
IDGenerator idGenerator = new IDGenerator();
// setup
String parentWorkflowId = idGenerator.generate();
String subWorkflowId = idGenerator.generate();

// sub workflow setup
TaskModel task1 = new TaskModel();
task1.setTaskType(TaskType.SIMPLE.name());
task1.setTaskDefName("task1");
task1.setReferenceTaskName("task1_ref");
task1.setWorkflowInstanceId(subWorkflowId);
task1.setScheduledTime(System.currentTimeMillis());
task1.setTaskId(idGenerator.generate());
task1.setStatus(TaskModel.Status.COMPLETED);
task1.setWorkflowTask(new WorkflowTask());
task1.setOutputData(new HashMap<>());

TaskModel task2 = new TaskModel();
task2.setTaskType(TaskType.SIMPLE.name());
task2.setTaskDefName("task2");
task2.setReferenceTaskName("task2_ref");
task2.setWorkflowInstanceId(subWorkflowId);
task2.setScheduledTime(System.currentTimeMillis());
task2.setTaskId(idGenerator.generate());
task2.setStatus(TaskModel.Status.FAILED);
task2.setWorkflowTask(new WorkflowTask());
task2.setOutputData(new HashMap<>());

WorkflowModel subWorkflow = new WorkflowModel();
subWorkflow.setParentWorkflowId(parentWorkflowId);
subWorkflow.setWorkflowId(subWorkflowId);
WorkflowDef subworkflowDef = new WorkflowDef();
subworkflowDef.setName("subworkflow");
subworkflowDef.setVersion(1);
subWorkflow.setWorkflowDefinition(subworkflowDef);
subWorkflow.setOwnerApp("junit_testRerunWorkflowId");
subWorkflow.setStatus(WorkflowModel.Status.FAILED);
subWorkflow.getTasks().addAll(Arrays.asList(task1, task2));

// parent workflow setup
TaskModel task = new TaskModel();
task.setWorkflowInstanceId(parentWorkflowId);
task.setScheduledTime(System.currentTimeMillis());
task.setTaskId(idGenerator.generate());
task.setStatus(TaskModel.Status.COMPLETED_WITH_ERRORS);
task.setOutputData(new HashMap<>());
task.setSubWorkflowId(subWorkflowId);
task.setTaskType(TaskType.SUB_WORKFLOW.name());
WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setOptional(true);
task.setWorkflowTask(workflowTask);

WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId(parentWorkflowId);
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("parentworkflow");
workflowDef.setVersion(1);
workflow.setWorkflowDefinition(workflowDef);
workflow.setOwnerApp("junit_testRerunWorkflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);
workflow.getTasks().addAll(Arrays.asList(task));
// end of setup

// when:
when(executionDAOFacade.getWorkflowModel(workflow.getWorkflowId(), true))
.thenReturn(workflow);
when(executionDAOFacade.getWorkflowModel(task.getSubWorkflowId(), true))
.thenReturn(subWorkflow);
when(executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId()))
.thenReturn(task);
when(executionDAOFacade.getWorkflowModel(subWorkflow.getParentWorkflowId(), false))
.thenReturn(workflow);

workflowExecutor.retry(subWorkflowId, true);

// then: parent workflow remains the same
assertEquals(WorkflowModel.Status.FAILED, subWorkflow.getPreviousStatus());
assertEquals(WorkflowModel.Status.RUNNING, subWorkflow.getStatus());
assertEquals(TaskModel.Status.COMPLETED_WITH_ERRORS, task.getStatus());
assertEquals(WorkflowModel.Status.COMPLETED, workflow.getStatus());
}

private WorkflowModel generateSampleWorkflow() {
// setup
WorkflowModel workflow = new WorkflowModel();
Expand Down
4 changes: 4 additions & 0 deletions docs/docs/reference-docs/sub-workflow-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ Looking at the subworkflow (the WEBP version):

The ```subWorkflowParam``` tells conductor which workflow to call. The task is marked as completed upon the completion of the spawned workflow.
If the sub-workflow is terminated or fails the task is marked as failure and retried if configured.

### Optional Sub Workflow Task
If the Sub Workflow task is defined as optional in the parent workflow task definition, the parent workflow task will not be retried if sub-workflow is terminated or failed.
In addition, even if the sub-workflow is retried/rerun/restarted after reaching to a terminal status, the parent workflow task status will remain as it is.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired

import com.netflix.conductor.common.metadata.tasks.Task
import com.netflix.conductor.common.metadata.tasks.TaskDef
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest
import com.netflix.conductor.common.run.Workflow
import com.netflix.conductor.core.execution.tasks.SubWorkflow
import com.netflix.conductor.test.base.AbstractSpecification
Expand Down Expand Up @@ -821,7 +822,7 @@ class ForkJoinSpec extends AbstractSpecification {
}
sweep(workflowInstanceId)

and: "verify that the workflow is in a COMPLETED state"
and: "verify that the workflow is in a COMPLETED state and the join task is also marked as COMPLETED_WITH_ERRORS"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 4
Expand All @@ -832,7 +833,34 @@ class ForkJoinSpec extends AbstractSpecification {
tasks[2].taskType == 'SUB_WORKFLOW'
tasks[2].status == Task.Status.COMPLETED_WITH_ERRORS
tasks[3].taskType == 'JOIN'
tasks[3].status == Task.Status.COMPLETED
tasks[3].status == Task.Status.COMPLETED_WITH_ERRORS
}

when: "do a rerun on the sub workflow"
def reRunSubWorkflowRequest = new RerunWorkflowRequest()
reRunSubWorkflowRequest.reRunFromWorkflowId = subWorkflowInstanceId1
workflowExecutor.rerun(reRunSubWorkflowRequest)

then: "verify that the sub workflows are in a RUNNING state"
with(workflowExecutionService.getExecutionStatus(subWorkflowInstanceId1, true)) {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
tasks[0].taskType == 'simple_task_in_sub_wf'
}

and: "parent workflow remains the same"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 4
tasks[0].taskType == 'FORK'
tasks[0].status == Task.Status.COMPLETED
tasks[1].taskType == 'SUB_WORKFLOW'
tasks[1].status == Task.Status.COMPLETED_WITH_ERRORS
tasks[2].taskType == 'SUB_WORKFLOW'
tasks[2].status == Task.Status.COMPLETED_WITH_ERRORS
tasks[3].taskType == 'JOIN'
tasks[3].status == Task.Status.COMPLETED_WITH_ERRORS
}
}

Expand Down