Skip to content

Commit

Permalink
fix(core): refacto
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Oct 25, 2023
1 parent 97f534a commit 1342549
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.utils.IdUtils;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -45,9 +44,6 @@ public class TaskRun {
@With
String value;

@With
URI items;

@With
List<TaskRunAttempt> attempts;

Expand All @@ -74,7 +70,6 @@ public TaskRun withState(State.Type state) {
this.taskId,
this.parentTaskRunId,
this.value,
this.items,
this.attempts,
this.outputs,
this.state.withState(state)
Expand All @@ -91,7 +86,6 @@ public TaskRun forChildExecution(Map<String, String> remapTaskRunId, String exec
.taskId(this.getTaskId())
.parentTaskRunId(this.getParentTaskRunId() != null ? remapTaskRunId.get(this.getParentTaskRunId()) : null)
.value(this.getValue())
.items(this.getItems())
.attempts(this.getAttempts())
.outputs(this.getOutputs())
.state(state == null ? this.getState() : state)
Expand Down
30 changes: 21 additions & 9 deletions core/src/main/java/io/kestra/core/models/tasks/ExecutableTask.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.core.models.tasks;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
Expand All @@ -11,17 +10,30 @@
import io.kestra.core.runners.WorkerTaskResult;

import java.util.List;
import java.util.Optional;

public interface ExecutableTask<T extends Output> {
List<TaskRun> createTaskRun(RunContext runContext, Execution currentExecution, TaskRun executionTaskRun);

Execution createExecution(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Execution currentExecution) throws InternalException;
/**
* Interface for tasks that generates subflow execution(s). Those tasks are handled in the Executor.
*/
public interface ExecutableTask {
/**
* Creates a list of WorkerTaskExecution for this task definition.
* Each WorkerTaskExecution will generate a subflow execution.
*/
List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Flow currentFlow, Execution currentExecution,
TaskRun currentTaskRun) throws InternalException;

/**
* Creates a WorkerTaskResult for a given WorkerTaskExecution
*/
WorkerTaskResult createWorkerTaskResult(RunContext runContext,
WorkerTaskExecution<?> workerTaskExecution,
Flow flow,
Execution execution);
WorkerTaskExecution<?> workerTaskExecution,
Flow flow,
Execution execution);

/**
* Whether to wait for the execution(s) of the subflow before terminating this tasks
*/
boolean waitForExecution();
}
45 changes: 14 additions & 31 deletions core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Executor process(Executor executor) {
executor = this.handleChildWorkerTaskResult(executor);

// search for flow task
executor = this.handleFlowTask(executor);
executor = this.handleExecutableTask(executor);
} catch (Exception e) {
return executor.withException(e, "process");
}
Expand Down Expand Up @@ -561,17 +561,17 @@ private Executor handleWorkerTask(Executor executor) throws InternalException {
return executor.withWorkerTasks(workerTasks, "handleWorkerTask");
}

private Executor handleFlowTask(final Executor executor) {
private Executor handleExecutableTask(final Executor executor) {
List<WorkerTaskExecution<?>> executions = new ArrayList<>();
List<WorkerTaskResult> workerTaskResults = new ArrayList<>();

boolean haveFlows = executor.getWorkerTasks()
.removeIf(workerTask -> {
if (!(workerTask.getTask() instanceof ExecutableTask<?>)) {
if (!(workerTask.getTask() instanceof ExecutableTask)) {
return false;
}

var executableTask = (Task & ExecutableTask<?>) workerTask.getTask();
var executableTask = (Task & ExecutableTask) workerTask.getTask();
try {
// mark taskrun as running to avoid multiple try for failed
TaskRun executableTaskRun = executor.getExecution()
Expand All @@ -580,42 +580,25 @@ private Executor handleFlowTask(final Executor executor) {
executor
.getExecution()
.withTaskRun(executableTaskRun.withState(State.Type.RUNNING)),
"handleFlowTaskRunning"
"handleExecutableTaskRunning"
);

// create the task run
RunContext executableRunContext = runContextFactory.of(
RunContext runContext = runContextFactory.of(
executor.getFlow(),
executableTask,
executor.getExecution(),
executableTaskRun
);
List<TaskRun> taskRuns = executableTask.createTaskRun(executableRunContext, executor.getExecution(), executableTaskRun);

for (TaskRun taskRun : taskRuns) {
// create the execution
RunContext runContext = runContextFactory.of(
executor.getFlow(),
executableTask,
executor.getExecution(),
taskRun
);
Execution execution = executableTask.createExecution(runContext, flowExecutorInterface(), executor.getExecution());

WorkerTaskExecution<?> workerTaskExecution = WorkerTaskExecution.builder()
.task(executableTask)
.taskRun(taskRun)
.execution(execution)
.build();

executions.add(workerTaskExecution);
List<WorkerTaskExecution<?>> workerTaskExecutions = executableTask.createWorkerTaskExecutions(runContext, flowExecutorInterface(), executor.getFlow(), executor.getExecution(), executableTaskRun);
executions.addAll(workerTaskExecutions);

if (!executableTask.waitForExecution()) {
if (!executableTask.waitForExecution()) {
for (WorkerTaskExecution<?> workerTaskExecution : workerTaskExecutions) {
WorkerTaskResult workerTaskResult = executableTask.createWorkerTaskResult(
runContext,
workerTaskExecution,
executor.getFlow(),
execution
workerTaskExecution.getExecution()
);
workerTaskResults.add(workerTaskResult);
}
Expand All @@ -630,7 +613,7 @@ private Executor handleFlowTask(final Executor executor) {
)
.build()
);
executor.withException(e, "handleFlowTask");
executor.withException(e, "handleExecutableTask");
}
return true;
});
Expand All @@ -639,10 +622,10 @@ private Executor handleFlowTask(final Executor executor) {
return executor;
}

Executor resultExecutor = executor.withWorkerTaskExecutions(executions, "handleFlowTask");
Executor resultExecutor = executor.withWorkerTaskExecutions(executions, "handleExecutableTask");

if (!workerTaskResults.isEmpty()) {
resultExecutor = executor.withWorkerTaskResults(workerTaskResults, "handleFlowTaskWorkerTaskResults");
resultExecutor = executor.withWorkerTaskResults(workerTaskResults, "handleExecutableTaskWorkerTaskResults");
}

return resultExecutor;
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/java/io/kestra/core/runners/RunContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,6 @@ private Map<String, Object> variables(TaskRun taskRun) {
builder.put("value", taskRun.getValue());
}

if(taskRun.getItems() != null) {
builder.put("items", taskRun.getItems());
}

return builder.build();
}

Expand Down Expand Up @@ -611,7 +607,7 @@ public InputStream getTaskStateFile(String state, String name, Boolean isNamespa
URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun));
URI resolve = uri.resolve(uri.getPath() + "/" + name);

return this.storageInterface.get(getTenantId(), resolve);
return this.storageInterface.get(getTenantId(), resolve);
}

public URI putTaskStateFile(byte[] content, String state, String name) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@Data
@Builder
public class WorkerTaskExecution<T extends Task & ExecutableTask<?>> {
public class WorkerTaskExecution<T extends Task & ExecutableTask> {
@NotNull
private TaskRun taskRun;

Expand Down
38 changes: 21 additions & 17 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
)
}
)
public class Flow extends Task implements ExecutableTask<Flow.Output> {
public class Flow extends Task implements ExecutableTask {
@NotNull
@Schema(
title = "The namespace of the flow to trigger"
Expand Down Expand Up @@ -123,7 +123,11 @@ public String flowUidWithoutRevision() {

@SuppressWarnings("unchecked")
@Override
public Execution createExecution(RunContext runContext, FlowExecutorInterface flowExecutorInterface, Execution currentExecution) throws InternalException {
public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
io.kestra.core.models.flows.Flow currentFlow,
Execution currentExecution,
TaskRun currentTaskRun) throws InternalException {
RunnerUtils runnerUtils = runContext.getApplicationContext().getBean(RunnerUtils.class);

Map<String, Object> inputs = new HashMap<>();
Expand All @@ -141,20 +145,18 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
}
}

Map<String, String> flowVars = (Map<String, String>) runContext.getVariables().get("flow");

String namespace = runContext.render(this.namespace);
String flowId = runContext.render(this.flowId);
Optional<Integer> revision = this.revision != null ? Optional.of(this.revision) : Optional.empty();

io.kestra.core.models.flows.Flow flow = flowExecutorInterface.findByIdFromFlowTask(
flowVars.get("tenantId"),
currentExecution.getTenantId(),
namespace,
flowId,
revision,
flowVars.get("tenantId"),
flowVars.get("namespace"),
flowVars.get("id")
currentExecution.getTenantId(),
currentFlow.getNamespace(),
currentFlow.getId()
)
.orElseThrow(() -> new IllegalStateException("Unable to find flow '" + namespace + "'.'" + flowId + "' with revision + '" + revision + "'"));

Expand All @@ -166,7 +168,7 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
}

return runnerUtils
Execution execution = runnerUtils
.newExecution(
flow,
(f, e) -> runnerUtils.typedInputs(f, e, inputs),
Expand All @@ -175,18 +177,20 @@ public Execution createExecution(RunContext runContext, FlowExecutorInterface fl
.id(this.getId())
.type(this.getType())
.variables(ImmutableMap.of(
"executionId", ((Map<String, Object>) runContext.getVariables().get("execution")).get("id"),
"namespace", flowVars.get("namespace"),
"flowId", flowVars.get("id"),
"flowRevision", flowVars.get("revision")
"executionId", currentExecution.getId(),
"namespace", flow.getNamespace(),
"flowId", flow.getId(),
"flowRevision", flow.getRevision()
))
.build()
);
}
WorkerTaskExecution<?> workerTaskExecution = WorkerTaskExecution.builder()
.task(this)
.taskRun(currentTaskRun)
.execution(execution)
.build();

@Override
public List<TaskRun> createTaskRun(RunContext runContext, Execution currentExecution, TaskRun executionTaskRun) {
return List.of(executionTaskRun);
return List.of(workerTaskExecution);
}

@Override
Expand Down
Loading

0 comments on commit 1342549

Please sign in to comment.