diff --git a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java index 120d8129b7..43abe06476 100644 --- a/core/src/main/java/io/kestra/core/models/executions/TaskRun.java +++ b/core/src/main/java/io/kestra/core/models/executions/TaskRun.java @@ -41,6 +41,7 @@ public class TaskRun { String parentTaskRunId; + @With String value; @With @@ -52,6 +53,9 @@ public class TaskRun { @NotNull State state; + @With + String items; + public void destroyOutputs() { // DANGER ZONE: this method is only used to deals with issues with messages too big that must be stripped down // to avoid crashing the platform. Don't use it for anything else. @@ -71,7 +75,8 @@ public TaskRun withState(State.Type state) { this.value, this.attempts, this.outputs, - this.state.withState(state) + this.state.withState(state), + this.items ); } @@ -88,6 +93,7 @@ public TaskRun forChildExecution(Map remapTaskRunId, String exec .attempts(this.getAttempts()) .outputs(this.getOutputs()) .state(state == null ? this.getState() : state) + .items(this.getItems()) .build(); } @@ -128,7 +134,7 @@ public TaskRunAttempt lastAttempt() { public TaskRun onRunningResend() { TaskRunBuilder taskRunBuilder = this.toBuilder(); - if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.size() == 0) { + if (taskRunBuilder.attempts == null || taskRunBuilder.attempts.isEmpty()) { taskRunBuilder.attempts = new ArrayList<>(); taskRunBuilder.attempts.add(TaskRunAttempt.builder() @@ -172,7 +178,7 @@ public String toString(boolean pretty) { ", parentTaskRunId=" + this.getParentTaskRunId() + ", state=" + this.getState().getCurrent().toString() + ", outputs=" + this.getOutputs() + - ", attemps=" + this.getAttempts() + + ", attempts=" + this.getAttempts() + ")"; } diff --git a/core/src/main/java/io/kestra/core/models/hierarchies/SubflowGraphTask.java b/core/src/main/java/io/kestra/core/models/hierarchies/SubflowGraphTask.java index f29ed12baf..91c9e664ba 100644 --- a/core/src/main/java/io/kestra/core/models/hierarchies/SubflowGraphTask.java +++ b/core/src/main/java/io/kestra/core/models/hierarchies/SubflowGraphTask.java @@ -1,19 +1,19 @@ package io.kestra.core.models.hierarchies; import io.kestra.core.models.executions.TaskRun; -import io.kestra.core.tasks.flows.Flow; +import io.kestra.core.models.tasks.ExecutableTask; +import io.kestra.core.models.tasks.Task; import lombok.Getter; import java.util.List; @Getter public class SubflowGraphTask extends AbstractGraphTask { - public SubflowGraphTask(Flow task, TaskRun taskRun, List values, RelationType relationType) { - super(task, taskRun, values, relationType); + public SubflowGraphTask(ExecutableTask task, TaskRun taskRun, List values, RelationType relationType) { + super((Task) task, taskRun, values, relationType); } - @Override - public Flow getTask() { - return (Flow) super.getTask(); + public ExecutableTask getExecutableTask() { + return (ExecutableTask) super.getTask(); } } diff --git a/core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java b/core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java new file mode 100644 index 0000000000..87c0bdc817 --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java @@ -0,0 +1,57 @@ +package io.kestra.core.models.tasks; + +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.runners.FlowExecutorInterface; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.WorkerTaskExecution; +import io.kestra.core.runners.WorkerTaskResult; + +import java.util.List; +import java.util.Optional; + +/** + * Interface for tasks that generates subflow execution(s). Those tasks are handled in the Executor. + */ +public interface ExecutableTask { + /** + * Creates a list of WorkerTaskExecution for this task definition. + * Each WorkerTaskExecution will generate a subflow execution. + */ + List> createWorkerTaskExecutions(RunContext runContext, + FlowExecutorInterface flowExecutorInterface, + Flow currentFlow, Execution currentExecution, + TaskRun currentTaskRun) throws InternalException; + + /** + * Creates a WorkerTaskResult for a given WorkerTaskExecution + */ + Optional createWorkerTaskResult(RunContext runContext, + WorkerTaskExecution workerTaskExecution, + Flow flow, + Execution execution); + + /** + * Whether to wait for the execution(s) of the subflow before terminating this tasks + */ + boolean waitForExecution(); + + /** + * @return the subflow identifier, used by the flow topology and related dependency code. + */ + SubflowId subflowId(); + + record SubflowId(String namespace, String flowId, Optional revision) { + public String flowUid() { + // as the Flow task can only be used in the same tenant we can hardcode null here + return Flow.uid(null, this.namespace, this.flowId, this.revision); + } + + public String flowUidWithoutRevision() { + // as the Flow task can only be used in the same tenant we can hardcode null here + return Flow.uidWithoutRevision(null, this.namespace, this.flowId); + } + } +} diff --git a/core/src/main/java/io/kestra/core/models/tasks/FlowableTask.java b/core/src/main/java/io/kestra/core/models/tasks/FlowableTask.java index ec68372d30..e004978101 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/FlowableTask.java +++ b/core/src/main/java/io/kestra/core/models/tasks/FlowableTask.java @@ -14,6 +14,9 @@ import java.util.List; import java.util.Optional; +/** + * Interface for tasks that orchestrate other tasks. Those tasks are handled by the Executor. + */ public interface FlowableTask { @Schema( title = "List of tasks to run if any tasks failed on this FlowableTask" @@ -21,14 +24,37 @@ public interface FlowableTask { @PluginProperty List getErrors(); + /** + * Create the topology representation of a flowable task. + *

+ * A flowable task always contains subtask to it returns a cluster that displays the subtasks. + */ GraphCluster tasksTree(Execution execution, TaskRun taskRun, List parentValues) throws IllegalVariableEvaluationException; + /** + * @return all child tasks including errors + */ List allChildTasks(); + /** + * Resolve child tasks of a flowable task. + *

+ * For a normal flowable, it should be the list of its tasks, for an iterative flowable (such as EachSequential, ForEachItem, ...), + * it should be the list of its tasks for all iterations. + */ List childTasks(RunContext runContext, TaskRun parentTaskRun) throws IllegalVariableEvaluationException; + /** + * Resolve next tasks to run for an execution. + *

+ * For a normal flowable, it should be the subsequent task, for a parallel flowable (such as Parallel, ForEachItem, ...), + * it should be a list of the next subsequent tasks of the size of the concurrency of the task. + */ List resolveNexts(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException; + /** + * Resolve the state of a flowable task. + */ default Optional resolveState(RunContext runContext, Execution execution, TaskRun parentTaskRun) throws IllegalVariableEvaluationException { return FlowableUtils.resolveState( execution, diff --git a/core/src/main/java/io/kestra/core/models/tasks/RunnableTask.java b/core/src/main/java/io/kestra/core/models/tasks/RunnableTask.java index e08a37a9c9..eb25d136d0 100644 --- a/core/src/main/java/io/kestra/core/models/tasks/RunnableTask.java +++ b/core/src/main/java/io/kestra/core/models/tasks/RunnableTask.java @@ -2,6 +2,12 @@ import io.kestra.core.runners.RunContext; +/** + * Interface for tasks that are run in the Worker. + */ public interface RunnableTask { + /** + * Thsis method is called inside the Worker to run (execute) the task. + */ T run(RunContext runContext) throws Exception; } diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java new file mode 100644 index 0000000000..2118a1e464 --- /dev/null +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -0,0 +1,105 @@ +package io.kestra.core.runners; + +import com.google.common.collect.ImmutableMap; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.Label; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.ExecutionTrigger; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.TaskRunAttempt; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.FlowWithException; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.ExecutableTask; +import io.kestra.core.models.tasks.Task; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public final class ExecutableUtils { + + private ExecutableUtils() { + // prevent initialization + } + + public static State.Type guessState(Execution execution, boolean transmitFailed) { + if (transmitFailed && + (execution.getState().isFailed() || execution.getState().isPaused() || execution.getState().getCurrent() == State.Type.KILLED || execution.getState().getCurrent() == State.Type.WARNING) + ) { + return execution.getState().getCurrent(); + } else { + return State.Type.SUCCESS; + } + } + + public static WorkerTaskResult workerTaskResult(TaskRun taskRun) { + return WorkerTaskResult.builder() + .taskRun(taskRun.withAttempts( + Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(taskRun.getState().getCurrent())).build()) + )) + .build(); + } + + public static WorkerTaskExecution workerTaskExecution( + RunContext runContext, + FlowExecutorInterface flowExecutorInterface, + Execution currentExecution, + Flow currentFlow, + T currentTask, + TaskRun currentTaskRun, + Map inputs, + List