Skip to content

Commit

Permalink
feat(ui); foreachItem progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
Skraye authored and loicmathieu committed Nov 10, 2023
1 parent 85e7fd5 commit f50e861
Show file tree
Hide file tree
Showing 24 changed files with 705 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ public TaskRun onRunningResend() {
}

public boolean isSame(TaskRun taskRun) {
return this.getId().equals(taskRun.getId()) && (
(this.getValue() == null && taskRun.getValue() == null) ||
(this.getValue() != null && this.getValue().equals(taskRun.getValue()))
);
return this.getId().equals(taskRun.getId()) &&
((this.getValue() == null && taskRun.getValue() == null) || (this.getValue() != null && this.getValue().equals(taskRun.getValue()))) &&
((this.getItems() == null && taskRun.getItems() == null) || (this.getItems() != null && this.getItems().equals(taskRun.getItems()))) ;
}

public String toString(boolean pretty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

@Getter
public class SubflowGraphTask extends AbstractGraphTask {
public SubflowGraphTask(ExecutableTask task, TaskRun taskRun, List<String> values, RelationType relationType) {
public SubflowGraphTask(ExecutableTask<?> task, TaskRun taskRun, List<String> values, RelationType relationType) {
super((Task) task, taskRun, values, relationType);
}

public ExecutableTask getExecutableTask() {
return (ExecutableTask) super.getTask();
public ExecutableTask<?> getExecutableTask() {
return (ExecutableTask<?>) super.getTask();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
/**
* Interface for tasks that generates subflow execution(s). Those tasks are handled in the Executor.
*/
public interface ExecutableTask {
public interface ExecutableTask<T extends Output>{
/**
* Creates a list of WorkerTaskExecution for this task definition.
* Each WorkerTaskExecution will generate a subflow execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ public static WorkerTaskResult workerTaskResult(TaskRun taskRun) {
.build();
}

public static <T extends Task & ExecutableTask> WorkerTaskExecution<?> workerTaskExecution(
public static <T extends Task & ExecutableTask<?>> WorkerTaskExecution<?> workerTaskExecution(
RunContext runContext,
FlowExecutorInterface flowExecutorInterface,
Execution currentExecution,
Flow currentFlow,
T currentTask,
TaskRun currentTaskRun,
Map<String, Object> inputs,
List<Label> labels
List<Label> labels,
Integer iteration
) throws IllegalVariableEvaluationException {
String subflowNamespace = runContext.render(currentTask.subflowId().namespace());
String subflowId = runContext.render(currentTask.subflowId().flowId());
Expand Down Expand Up @@ -100,6 +101,7 @@ public static <T extends Task & ExecutableTask> WorkerTaskExecution<?> workerTas
.task(currentTask)
.taskRun(currentTaskRun)
.execution(execution)
.iteration(iteration)
.build();
}

Expand All @@ -117,7 +119,7 @@ public static TaskRun manageIterations(TaskRun taskRun, Execution execution, boo

int currentStateIteration = getIterationCounter(iterations, currentState, maxIterations) + 1;
iterations.put(currentState.toString(), currentStateIteration);
if(previousState.isPresent() && previousState.get() != currentState) {
if (previousState.isPresent() && previousState.get() != currentState) {
int previousStateIterations = getIterationCounter(iterations, previousState.get(), maxIterations) - 1;
iterations.put(previousState.get().toString(), previousStateIterations);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/runners/Executor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class Executor {
private final List<WorkerTaskResult> workerTaskResults = new ArrayList<>();
private final List<ExecutionDelay> executionDelays = new ArrayList<>();
private WorkerTaskResult joined;
private final List<WorkerTaskExecution> workerTaskExecutions = new ArrayList<>();
private final List<WorkerTaskExecution<?>> workerTaskExecutions = new ArrayList<>();
private ExecutionsRunning executionsRunning;
private ExecutionQueued executionQueued;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ private Executor handleExecutableTask(final Executor executor) {
return false;
}

var executableTask = (Task & ExecutableTask) workerTask.getTask();
var executableTask = (Task & ExecutableTask<?>) workerTask.getTask();
try {
// mark taskrun as running to avoid multiple try for failed
TaskRun executableTaskRun = executor.getExecution()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@Data
@Builder
public class WorkerTaskExecution<T extends Task & ExecutableTask> {
public class WorkerTaskExecution<T extends Task & ExecutableTask<?>> {
@NotNull
private TaskRun taskRun;

Expand All @@ -20,4 +20,6 @@ public class WorkerTaskExecution<T extends Task & ExecutableTask> {

@NotNull
private Execution execution;

private Integer iteration;
}
5 changes: 3 additions & 2 deletions core/src/main/java/io/kestra/core/tasks/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
)
}
)
public class Flow extends Task implements ExecutableTask {
public class Flow extends Task implements ExecutableTask<Flow.Output> {
@NotNull
@Schema(
title = "The namespace of the subflow to be executed"
Expand Down Expand Up @@ -137,7 +137,8 @@ public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(RunContext runCon
this,
currentTaskRun,
inputs,
labels
labels,
null
));
}

Expand Down
22 changes: 15 additions & 7 deletions core/src/main/java/io/kestra/core/tasks/flows/ForEachItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.ExecutableUtils;
Expand Down Expand Up @@ -70,7 +69,7 @@
)
}
)
public class ForEachItem extends Task implements ExecutableTask {
public class ForEachItem extends Task implements ExecutableTask<ForEachItem.Output> {
@NotEmpty
@PluginProperty(dynamic = true)
@Schema(title = "The items to be split into batches and processed. Make sure to set it to Kestra's internal storage URI, e.g. output from a previous task in the format `{{ outputs.task_id.uri }}` or an input parameter of FILE type e.g. `{{ inputs.myfile }}`.")
Expand Down Expand Up @@ -171,20 +170,19 @@ public List<WorkerTaskExecution<?>> createWorkerTaskExecutions(
}

int iteration = currentIteration.getAndIncrement();
var outputs = Output.builder().iterations(Map.of("max", splits.size())).build();
return ExecutableUtils.workerTaskExecution(
runContext,
flowExecutorInterface,
currentExecution,
currentFlow,
this,
currentTaskRun
.withValue(String.valueOf(iteration))
.withOutputs(Map.of(
"iterations", Map.of("max", splits.size())
))
.withOutputs(outputs.toMap())
.withItems(split.toString()),
inputs,
labels
labels,
iteration
);
}
))
Expand Down Expand Up @@ -231,4 +229,14 @@ public static class Batch implements StorageSplitInterface {
@Builder.Default
private String separator = "\n";
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The iterations counter.",
description = "This output will be updated in real-time with the subflow executions.\n It will contains one counter by subflow execution state, plus a `max` counter that represent the maximum number of iterations (or the number of batches)."
)
private final Map<String, Integer> iterations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected boolean isFlowTaskChild(Flow parent, Flow child) {
.allTasksWithChilds()
.stream()
.filter(t -> t instanceof ExecutableTask)
.map(t -> (ExecutableTask) t)
.map(t -> (ExecutableTask<?>) t)
.anyMatch(t ->
t.subflowId().namespace().equals(child.getNamespace()) && t.subflowId().flowId().equals(child.getId())
);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/kestra/core/utils/GraphUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private static void fillGraph(
// detect kids
if (currentTask instanceof FlowableTask<?> flowableTask) {
currentGraph = flowableTask.tasksTree(execution, currentTaskRun, parentValues);
} else if (currentTask instanceof ExecutableTask subflowTask) {
} else if (currentTask instanceof ExecutableTask<?> subflowTask) {
currentGraph = new SubflowGraphTask(subflowTask, currentTaskRun, parentValues, relationType);
} else {
currentGraph = new GraphTask(currentTask, currentTaskRun, parentValues, relationType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ ConstraintValidator<FlowValidation, Flow> flowValidation() {
.stream()
.forEach(
task -> {
if (task instanceof ExecutableTask executableTask
if (task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace())) {
violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
import java.util.Optional;

public abstract class AbstractJdbcWorkerTaskExecutionStorage extends AbstractJdbcRepository {
protected io.kestra.jdbc.AbstractJdbcRepository<WorkerTaskExecution> jdbcRepository;
protected io.kestra.jdbc.AbstractJdbcRepository<WorkerTaskExecution<?>> jdbcRepository;

public AbstractJdbcWorkerTaskExecutionStorage(io.kestra.jdbc.AbstractJdbcRepository<WorkerTaskExecution> jdbcRepository) {
public AbstractJdbcWorkerTaskExecutionStorage(io.kestra.jdbc.AbstractJdbcRepository jdbcRepository) {
this.jdbcRepository = jdbcRepository;
}

public Optional<WorkerTaskExecution> get(String executionId) {
public Optional<WorkerTaskExecution<?>> get(String executionId) {
return this.jdbcRepository
.getDslContextWrapper()
.transactionResult(configuration -> {
Expand All @@ -35,20 +35,21 @@ public Optional<WorkerTaskExecution> get(String executionId) {
});
}

public void save(List<WorkerTaskExecution> workerTaskExecutions) {
public void save(List<WorkerTaskExecution<?>> workerTaskExecutions) {
this.jdbcRepository
.getDslContextWrapper()
.transaction(configuration -> {
DSLContext context = DSL.using(configuration);

// TODO batch insert
workerTaskExecutions.forEach(workerTaskExecution -> {
Map<Field<Object>, Object> fields = this.jdbcRepository.persistFields(workerTaskExecution);
this.jdbcRepository.persist(workerTaskExecution, context, fields);
});
});
}

public void delete(WorkerTaskExecution workerTaskExecution) {
public void delete(WorkerTaskExecution<?> workerTaskExecution) {
this.jdbcRepository.delete(workerTaskExecution);
}
}
16 changes: 8 additions & 8 deletions jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,10 @@ private void executionQueue(Either<Execution, DeserializationException> either)
if (!executor.getWorkerTaskExecutions().isEmpty()) {
workerTaskExecutionStorage.save(executor.getWorkerTaskExecutions());

List<WorkerTaskExecution> workerTasksExecutionDedup = executor
List<WorkerTaskExecution<?>> workerTasksExecutionDedup = executor
.getWorkerTaskExecutions()
.stream()
.filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun()))
.filter(workerTaskExecution -> this.deduplicateWorkerTaskExecution(execution, executorState, workerTaskExecution.getTaskRun(), workerTaskExecution.getIteration()))
.toList();

workerTasksExecutionDedup
Expand All @@ -405,7 +405,7 @@ private void executionQueue(Either<Execution, DeserializationException> either)
executionQueue.emit(workerTaskExecution.getExecution());

// send a running worker task result to track running vs created status
if (((ExecutableTask) workerTaskExecution.getTask()).waitForExecution()) {
if (workerTaskExecution.getTask().waitForExecution()) {
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
}
});
Expand All @@ -425,7 +425,7 @@ private void executionQueue(Either<Execution, DeserializationException> either)
workerTaskExecutionStorage.get(execution.getId())
.ifPresent(workerTaskExecution -> {
// If we didn't wait for the flow execution, the worker task execution has already been created by the Executor service.
if (((ExecutableTask) workerTaskExecution.getTask()).waitForExecution()) {
if (workerTaskExecution.getTask().waitForExecution()) {
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING).withState(execution.getState().getCurrent()));
}

Expand Down Expand Up @@ -456,7 +456,7 @@ private void executionQueue(Either<Execution, DeserializationException> either)
private void sendWorkerTaskResultForWorkerTaskExecution(Execution execution, WorkerTaskExecution<?> workerTaskExecution, TaskRun taskRun) {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);

ExecutableTask executableTask = workerTaskExecution.getTask();
ExecutableTask<?> executableTask = workerTaskExecution.getTask();

RunContext runContext = runContextFactory.of(
workerTaskFlow,
Expand Down Expand Up @@ -693,9 +693,9 @@ private boolean deduplicateWorkerTask(Execution execution, ExecutorState executo
}
}

private boolean deduplicateWorkerTaskExecution(Execution execution, ExecutorState executorState, TaskRun taskRun) {
// There can be multiple executions for the same task, so we need to deduplicated with the taskrun.value
String deduplicationKey = taskRun.getId() + "-" + taskRun.getValue();
private boolean deduplicateWorkerTaskExecution(Execution execution, ExecutorState executorState, TaskRun taskRun, Integer iteration) {
// There can be multiple executions for the same task, so we need to deduplicated with the worker task execution iteration
String deduplicationKey = taskRun.getId() + (iteration == null ? "" : "-" + iteration);
State.Type current = executorState.getWorkerTaskExecutionDeduplication().get(deduplicationKey);

if (current == taskRun.getState().getCurrent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class AbstractWorkerTaskExecutionTest {
@Test
void suite() throws Exception {

WorkerTaskExecution workerTaskExecution = WorkerTaskExecution.builder()
WorkerTaskExecution<?> workerTaskExecution = WorkerTaskExecution.builder()
.execution(Execution.builder().id(IdUtils.create()).build())
.task(Flow.builder().type(Flow.class.getName()).id(IdUtils.create()).build())
.taskRun(TaskRun.builder().id(IdUtils.create()).build())
Expand All @@ -37,7 +37,7 @@ void suite() throws Exception {
workerTaskExecutionStorage.save(List.of(workerTaskExecution));


Optional<WorkerTaskExecution> find = workerTaskExecutionStorage.get(workerTaskExecution.getExecution().getId());
Optional<WorkerTaskExecution<?>> find = workerTaskExecutionStorage.get(workerTaskExecution.getExecution().getId());
assertThat(find.isPresent(), is(true));
assertThat(find.get().getExecution().getId(), is(workerTaskExecution.getExecution().getId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private void handleExecution(ExecutionState state) {
executionQueue.emit(workerTaskExecution.getExecution());

// send a running worker task result to track running vs created status
if (((ExecutableTask) workerTaskExecution.getTask()).waitForExecution()) {
if (workerTaskExecution.getTask().waitForExecution()) {
sendWorkerTaskResultForWorkerTaskExecution(execution, workerTaskExecution, workerTaskExecution.getTaskRun().withState(State.Type.RUNNING));
}
});
Expand Down Expand Up @@ -271,7 +271,7 @@ private void sendWorkerTaskResultForWorkerTaskExecution(Execution execution, Wor
try {
Flow workerTaskFlow = this.flowRepository.findByExecution(execution);

ExecutableTask executableTask = workerTaskExecution.getTask();
ExecutableTask<?> executableTask = workerTaskExecution.getTask();

RunContext runContext = runContextFactory.of(
workerTaskFlow,
Expand Down
Loading

0 comments on commit f50e861

Please sign in to comment.