Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/main' into taskmapper_refactoring
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java
  • Loading branch information
aravindanr committed Mar 28, 2022
2 parents 6e6195b + a0e7314 commit 52b03c8
Show file tree
Hide file tree
Showing 45 changed files with 2,015 additions and 970 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum TaskType {
JOIN,
DO_WHILE,
SUB_WORKFLOW,
START_WORKFLOW,
EVENT,
WAIT,
USER_DEFINED,
Expand All @@ -54,6 +55,7 @@ public enum TaskType {
public static final String TASK_TYPE_EVENT = "EVENT";
public static final String TASK_TYPE_WAIT = "WAIT";
public static final String TASK_TYPE_SUB_WORKFLOW = "SUB_WORKFLOW";
public static final String TASK_TYPE_START_WORKFLOW = "START_WORKFLOW";
public static final String TASK_TYPE_FORK_JOIN = "FORK_JOIN";
public static final String TASK_TYPE_SIMPLE = "SIMPLE";
public static final String TASK_TYPE_HTTP = "HTTP";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ public static boolean isEnvironmentVariable(String test) {
}
}
String value =
Optional.ofNullable(System.getProperty(test))
.orElseGet(() -> Optional.ofNullable(System.getenv(test)).orElse(null));
Optional.ofNullable(System.getProperty(test)).orElseGet(() -> System.getenv(test));
return value != null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.model.TaskModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.START_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_START_WORKFLOW;

@Component
public class StartWorkflowTaskMapper implements TaskMapper {

private static final Logger LOGGER = LoggerFactory.getLogger(StartWorkflowTaskMapper.class);

@Override
public TaskType getTaskType() {
return START_WORKFLOW;
}

@Override
public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext)
throws TerminateWorkflowException {
WorkflowTask workflowTask = taskMapperContext.getWorkflowTask();

TaskModel startWorkflowTask = taskMapperContext.createTaskModel();
startWorkflowTask.setTaskType(TASK_TYPE_START_WORKFLOW);
startWorkflowTask.addInput(taskMapperContext.getTaskInput());
startWorkflowTask.setStatus(TaskModel.Status.SCHEDULED);
startWorkflowTask.setCallbackAfterSeconds(workflowTask.getStartDelay());
LOGGER.debug("{} created", startWorkflowTask);
return List.of(startWorkflowTask);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ SubWorkflowParams getSubWorkflowParams(WorkflowTask workflowTask) {
String reason =
String.format(
"Task %s is defined as sub-workflow and is missing subWorkflowParams. "
+ "Please check the blueprint",
+ "Please check the workflow definition",
workflowTask.getName());
LOGGER.error(reason);
return new TerminateWorkflowException(reason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import java.util.Map;

import javax.validation.Validator;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.databind.ObjectMapper;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_START_WORKFLOW;
import static com.netflix.conductor.model.TaskModel.Status.COMPLETED;
import static com.netflix.conductor.model.TaskModel.Status.FAILED;

@Component(TASK_TYPE_START_WORKFLOW)
public class StartWorkflow extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(StartWorkflow.class);

private static final String WORKFLOW_ID = "workflowId";
private static final String START_WORKFLOW_PARAMETER = "startWorkflow";

private final ObjectMapper objectMapper;
private final Validator validator;

public StartWorkflow(ObjectMapper objectMapper, Validator validator) {
super(TASK_TYPE_START_WORKFLOW);
this.objectMapper = objectMapper;
this.validator = validator;
}

@Override
public void start(
WorkflowModel workflow, TaskModel taskModel, WorkflowExecutor workflowExecutor) {
StartWorkflowRequest request = getRequest(taskModel);
if (request == null) {
return;
}

// set the correlation id of starter workflow, if its empty in the StartWorkflowRequest
request.setCorrelationId(
StringUtils.defaultIfBlank(
request.getCorrelationId(), workflow.getCorrelationId()));

try {
String workflowId = startWorkflow(request, workflowExecutor);
taskModel.addOutput(WORKFLOW_ID, workflowId);
taskModel.setStatus(COMPLETED);
} catch (ApplicationException ae) {
if (ae.isRetryable()) {
LOGGER.info(
"A transient backend error happened when task {} in {} tried to start workflow {}.",
taskModel.getTaskId(),
workflow.toShortString(),
request.getName());
} else {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(ae.getMessage());
LOGGER.error(
"Error starting workflow: {} from workflow: {}",
request.getName(),
workflow.toShortString(),
ae);
}
} catch (Exception e) {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(e.getMessage());
LOGGER.error(
"Error starting workflow: {} from workflow: {}",
request.getName(),
workflow.toShortString(),
e);
}
}

private StartWorkflowRequest getRequest(TaskModel taskModel) {
Map<String, Object> taskInput = taskModel.getInputData();

StartWorkflowRequest startWorkflowRequest = null;

if (taskInput.get(START_WORKFLOW_PARAMETER) == null) {
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(
"Missing '" + START_WORKFLOW_PARAMETER + "' in input data.");
} else {
try {
startWorkflowRequest =
objectMapper.convertValue(
taskInput.get(START_WORKFLOW_PARAMETER),
StartWorkflowRequest.class);

var violations = validator.validate(startWorkflowRequest);
if (!violations.isEmpty()) {
StringBuilder reasonForIncompletion =
new StringBuilder(START_WORKFLOW_PARAMETER)
.append(" validation failed. ");
for (var violation : violations) {
reasonForIncompletion
.append("'")
.append(violation.getPropertyPath().toString())
.append("' -> ")
.append(violation.getMessage())
.append(". ");
}
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(reasonForIncompletion.toString());
startWorkflowRequest = null;
}
} catch (IllegalArgumentException e) {
LOGGER.error("Error reading StartWorkflowRequest for {}", taskModel, e);
taskModel.setStatus(FAILED);
taskModel.setReasonForIncompletion(
"Error reading StartWorkflowRequest. " + e.getMessage());
}
}

return startWorkflowRequest;
}

private String startWorkflow(StartWorkflowRequest request, WorkflowExecutor workflowExecutor) {
if (request.getWorkflowDef() == null) {
return workflowExecutor.startWorkflow(
request.getName(),
request.getVersion(),
request.getCorrelationId(),
request.getPriority(),
request.getInput(),
request.getExternalInputPayloadStoragePath(),
null,
request.getTaskToDomain());
} else {
return workflowExecutor.startWorkflow(
request.getWorkflowDef(),
request.getInput(),
request.getExternalInputPayloadStoragePath(),
request.getCorrelationId(),
request.getPriority(),
null,
request.getTaskToDomain());
}
}

@Override
public boolean isAsync() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ public String startWorkflow(
WorkflowDef workflowDef) {

if (workflowDef == null) {
workflowDef = metadataService.getWorkflowDef(name, version);
if (workflowDef == null) {
throw new ApplicationException(
ApplicationException.Code.NOT_FOUND,
String.format(
"No such workflow found by name: %s, version: %d", name, version));
}

return workflowExecutor.startWorkflow(
name,
version,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2022 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks


import javax.validation.ConstraintViolation
import javax.validation.Validator

import com.netflix.conductor.common.config.ObjectMapperProvider
import com.netflix.conductor.core.exception.ApplicationException
import com.netflix.conductor.core.execution.WorkflowExecutor
import com.netflix.conductor.model.TaskModel
import com.netflix.conductor.model.WorkflowModel

import spock.lang.Specification
import spock.lang.Subject

import static com.netflix.conductor.core.execution.tasks.StartWorkflow.START_WORKFLOW_PARAMETER
import static com.netflix.conductor.model.TaskModel.Status.FAILED
import static com.netflix.conductor.model.TaskModel.Status.SCHEDULED

/**
* Unit test for StartWorkflow. Success and Javax validation cases are covered by the StartWorkflowSpec in test-harness module.
*/
class StartWorkflowSpec extends Specification {

@Subject
StartWorkflow startWorkflow

WorkflowExecutor workflowExecutor
Validator validator
WorkflowModel workflowModel
TaskModel taskModel

def setup() {
workflowExecutor = Mock(WorkflowExecutor.class)
validator = Mock(Validator.class) {
validate(_) >> new HashSet<ConstraintViolation<Object>>()
}

def inputData = [:]
inputData[START_WORKFLOW_PARAMETER] = ['name': 'some_workflow']
taskModel = new TaskModel(status: SCHEDULED, inputData: inputData)
workflowModel = new WorkflowModel()

startWorkflow = new StartWorkflow(new ObjectMapperProvider().getObjectMapper(), validator)
}

def "StartWorkflow task is asynchronous"() {
expect:
startWorkflow.isAsync()
}

def "startWorkflow parameter is missing"() {
given: "a task with no start_workflow in input"
taskModel.inputData = [:]

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
}

def "ObjectMapper throws an IllegalArgumentException"() {
given: "a task with no start_workflow in input"
taskModel.inputData[START_WORKFLOW_PARAMETER] = "I can't be converted to StartWorkflowRequest"

when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
}

def "WorkflowExecutor throws a retryable exception"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == SCHEDULED
1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "") }
}

def "WorkflowExecutor throws a non-retryable ApplicationException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
1 * workflowExecutor.startWorkflow(*_) >> { throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "") }
}

def "WorkflowExecutor throws a RuntimeException"() {
when:
startWorkflow.start(workflowModel, taskModel, workflowExecutor)

then:
taskModel.status == FAILED
taskModel.reasonForIncompletion != null
1 * workflowExecutor.startWorkflow(*_) >> { throw new RuntimeException("I am an unexpected exception") }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void getExceptionWhenNoSubWorkflowParamsPassed() {
expectedException.expectMessage(
String.format(
"Task %s is defined as sub-workflow and is missing subWorkflowParams. "
+ "Please check the blueprint",
+ "Please check the workflow definition",
workflowTask.getName()));

subWorkflowTaskMapper.getSubWorkflowParams(workflowTask);
Expand Down
Loading

0 comments on commit 52b03c8

Please sign in to comment.