Skip to content

Commit

Permalink
feat(core): ForEachItem task (#2359)
Browse files Browse the repository at this point in the history
close #2131
  • Loading branch information
loicmathieu authored Nov 3, 2023
1 parent 382a5df commit a570cc6
Show file tree
Hide file tree
Showing 25 changed files with 851 additions and 207 deletions.
12 changes: 9 additions & 3 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class TaskRun {

String parentTaskRunId;

@With
String value;

@With
Expand All @@ -52,6 +53,9 @@ public class TaskRun {
@NotNull
State state;

@With
String items;

public void destroyOutputs() {
// DANGER ZONE: this method is only used to deals with issues with messages too big that must be stripped down
// to avoid crashing the platform. Don't use it for anything else.
Expand All @@ -71,7 +75,8 @@ public TaskRun withState(State.Type state) {
this.value,
this.attempts,
this.outputs,
this.state.withState(state)
this.state.withState(state),
this.items
);
}

Expand All @@ -88,6 +93,7 @@ public TaskRun forChildExecution(Map<String, String> remapTaskRunId, String exec
.attempts(this.getAttempts())
.outputs(this.getOutputs())
.state(state == null ? this.getState() : state)
.items(this.getItems())
.build();
}

Expand Down Expand Up @@ -128,7 +134,7 @@ public TaskRunAttempt lastAttempt() {
public TaskRun onRunningResend() {
TaskRunBuilder taskRunBuilder = this.toBuilder();

if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.size() == 0) {
if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.isEmpty()) {
taskRunBuilder.attempts = new ArrayList<>();

taskRunBuilder.attempts.add(TaskRunAttempt.builder()
Expand Down Expand Up @@ -172,7 +178,7 @@ public String toString(boolean pretty) {
", parentTaskRunId=" + this.getParentTaskRunId() +
", state=" + this.getState().getCurrent().toString() +
", outputs=" + this.getOutputs() +
", attemps=" + this.getAttempts() +
", attempts=" + this.getAttempts() +
")";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package io.kestra.core.models.hierarchies;

import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.tasks.flows.Flow;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import lombok.Getter;

import java.util.List;

@Getter
public class SubflowGraphTask extends AbstractGraphTask {
public SubflowGraphTask(Flow task, TaskRun taskRun, List<String> values, RelationType relationType) {
super(task, taskRun, values, relationType);
public SubflowGraphTask(ExecutableTask task, TaskRun taskRun, List<String> values, RelationType relationType) {
super((Task) task, taskRun, values, relationType);
}

@Override
public Flow getTask() {
return (Flow) super.getTask();
public ExecutableTask getExecutableTask() {
return (ExecutableTask) super.getTask();
}
}
57 changes: 57 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.kestra.core.models.tasks;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.WorkerTaskExecution;
import io.kestra.core.runners.WorkerTaskResult;

import java.util.List;
import java.util.Optional;

/**
* Interface for tasks that generates subflow execution(s). Those tasks are handled in the Executor.
*/
public interface ExecutableTask {
/**
* Creates a list of WorkerTaskExecution for this task definition.
* Each WorkerTaskExecution will generate a subflow execution.
*/
List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow, Execution currentExecution,
TaskRun currentTaskRun) throws InternalException;

/**
* Creates a WorkerTaskResult for a given WorkerTaskExecution
*/
Optional<WorkerTaskResult> createWorkerTaskResult(RunContext runContext,
WorkerTaskExecution<?> workerTaskExecution,
Flow flow,
Execution execution);

/**
* Whether to wait for the execution(s) of the subflow before terminating this tasks
*/
boolean waitForExecution();

/**
* @return the subflow identifier, used by the flow topology and related dependency code.
*/
SubflowId subflowId();

record SubflowId(String namespace, String flowId, Optional<Integer> revision) {
public String flowUid() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return Flow.uid(null, this.namespace, this.flowId, this.revision);
}

public String flowUidWithoutRevision() {
// as the Flow task can only be used in the same tenant we can hardcode null here
return Flow.uidWithoutRevision(null, this.namespace, this.flowId);
}
}
}
26 changes: 26 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/FlowableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,47 @@
import java.util.List;
import java.util.Optional;

/**
* Interface for tasks that orchestrate other tasks. Those tasks are handled by the Executor.
*/
public interface FlowableTask <T extends Output> {
@Schema(
title = "List of tasks to run if any tasks failed on this FlowableTask"
)
@PluginProperty
List<Task> getErrors();

/**
* Create the topology representation of a flowable task.
* <p>
* A flowable task always contains subtask to it returns a cluster that displays the subtasks.
*/
GraphCluster tasksTree(Execution execution, TaskRun taskRun, List<String> parentValues) throws IllegalVariableEvaluationException;

/**
* @return all child tasks including errors
*/
List<Task> allChildTasks();

/**
* Resolve child tasks of a flowable task.
* <p>
* For a normal flowable, it should be the list of its tasks, for an iterative flowable (such as EachSequential, ForEachItem, ...),
* it should be the list of its tasks for all iterations.
*/
List<ResolvedTask> childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException;

/**
* Resolve next tasks to run for an execution.
* <p>
* For a normal flowable, it should be <b>the</b> subsequent task, for a parallel flowable (such as Parallel, ForEachItem, ...),
* it should be a list of the next subsequent tasks of the size of the concurrency of the task.
*/
List<NextTaskRun> resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException;

/**
* Resolve the state of a flowable task.
*/
default Optional<State.Type> resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException {
return FlowableUtils.resolveState(
execution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import io.kestra.core.runners.RunContext;

/**
* Interface for tasks that are run in the Worker.
*/
public interface RunnableTask <T extends Output> {
/**
* Thsis method is called inside the Worker to run (execute) the task.
*/
T run(RunContext runContext) throws Exception;
}
105 changes: 105 additions & 0 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.kestra.core.runners;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.ExecutionTrigger;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.executions.TaskRunAttempt;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public final class ExecutableUtils {

private ExecutableUtils() {
// prevent initialization
}

public static State.Type guessState(Execution execution, boolean transmitFailed) {
if (transmitFailed &&
(execution.getState().isFailed() || execution.getState().isPaused() || execution.getState().getCurrent() == State.Type.KILLED || execution.getState().getCurrent() == State.Type.WARNING)
) {
return execution.getState().getCurrent();
} else {
return State.Type.SUCCESS;
}
}

public static WorkerTaskResult workerTaskResult(TaskRun taskRun) {
return WorkerTaskResult.builder()
.taskRun(taskRun.withAttempts(
Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build())
))
.build();
}

public static <T extends Task & ExecutableTask> WorkerTaskExecution<?> workerTaskExecution(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Execution currentExecution,
Flow currentFlow,
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,
List<Label> labels
) throws IllegalVariableEvaluationException {
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Optional<Integer> subflowRevision = currentTask.subflowId().revision();

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
currentExecution.getTenantId(),
subflowNamespace,
subflowId,
subflowRevision,
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + subflowNamespace + "'.'" + subflowId + "' with revision + '" + subflowRevision.orElse(0) + "'"));

if (flow.isDisabled()) {
throw new IllegalStateException("Cannot execute a flow which is disabled");
}

if (flow instanceof FlowWithException fwe) {
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}

Map<String, Object> variables = ImmutableMap.of(
"executionId", currentExecution.getId(),
"namespace", currentFlow.getNamespace(),
"flowId", currentFlow.getId(),
"flowRevision", currentFlow.getRevision()
);

RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);
Execution execution = runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
labels)
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.build()
);

return WorkerTaskExecution.builder()
.task(currentTask)
.taskRun(currentTaskRun)
.execution(execution)
.build();
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Executor withWorkerTaskDelays(List<ExecutionDelay> executionDelays, Strin
return this;
}

public Executor withWorkerTaskExecutions(List<WorkerTaskExecution> newExecutions, String from) {
public Executor withWorkerTaskExecutions(List<WorkerTaskExecution<?>> newExecutions, String from) {
this.workerTaskExecutions.addAll(newExecutions);
this.from.add(from);

Expand Down
Loading

0 comments on commit a570cc6

Please sign in to comment.