Skip to content

Commit

Permalink
feat(core): allow flow to define and expose outputs (#2133)
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jan 23, 2024
1 parent ab62ba9 commit 5591a55
Show file tree
Hide file tree
Showing 20 changed files with 670 additions and 118 deletions.
54 changes: 34 additions & 20 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,38 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
import io.swagger.v3.oas.annotations.Hidden;
import lombok.Builder;
import lombok.Value;
import lombok.With;
import lombok.extern.slf4j.Slf4j;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ResolvedTask;
import io.kestra.core.runners.FlowableUtils;
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.utils.MapUtils;

import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.CRC32;
import io.micronaut.core.annotation.Nullable;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;

@Value
@Builder(toBuilder = true)
Expand Down Expand Up @@ -65,6 +71,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String, Object> inputs;

@With
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String, Object> outputs;

@With
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
Expand Down Expand Up @@ -114,6 +124,7 @@ public Execution withState(State.Type state) {
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
this.labels,
this.variables,
this.state.withState(state),
Expand Down Expand Up @@ -145,6 +156,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.flowRevision,
newTaskRunList,
this.inputs,
this.outputs,
this.labels,
this.variables,
this.state,
Expand All @@ -164,6 +176,7 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.flowRevision,
taskRunList,
this.inputs,
this.outputs,
this.labels,
this.variables,
state,
Expand Down Expand Up @@ -442,7 +455,7 @@ public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun paren
}

@JsonIgnore
public boolean hasTaskRunJoinable(TaskRun taskRun) {
public boolean hasTaskRunJoinable(TaskRun taskRun) {
if (this.taskRunList == null) {
return true;
}
Expand Down Expand Up @@ -532,17 +545,17 @@ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
})
.filter(Objects::nonNull)
.orElseGet(() -> new FailedExecutionWithLog(
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this,
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this))
)
);
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this,
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this))
)
);
}

/**
* Create a new attempt for failed worker execution
*
* @param taskRun the task run where we need to add an attempt
* @param e the exception raise
* @param e the exception raise
* @return new taskRun with added attempt
*/
private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun, Exception e) {
Expand All @@ -562,9 +575,9 @@ private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun
/**
* Add exception log to last attempts
*
* @param taskRun the task run where we need to add an attempt
* @param taskRun the task run where we need to add an attempt
* @param lastAttempt the lastAttempt found to add
* @param e the exception raise
* @param e the exception raise
* @return new taskRun with updated attempt with logs
*/
private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) {
Expand Down Expand Up @@ -599,6 +612,7 @@ public static class FailedExecutionWithLog {

/**
* Transform an exception to {@link ILoggingEvent}
*
* @param e the current execption
* @return the {@link ILoggingEvent} waited to generate {@link LogEntry}
*/
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
Expand Down Expand Up @@ -98,7 +99,6 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<AbstractTrigger> triggers;


@Valid
List<TaskDefault> taskDefaults;

Expand All @@ -113,6 +113,13 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
Concurrency concurrency;

@Schema(
title = "Output values available and exposes to other flows.",
description = "Output values make information about the execution of your Flow available and expose for other Kestra flows to use. Output values are similar to return values in programming languages."
)
@PluginProperty(dynamic = true)
List<Output> outputs;

public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public Flow toFlow() {
.description(this.description)
.labels(this.labels)
.inputs(this.inputs)
.outputs(this.outputs)
.variables(this.variables)
.tasks(this.tasks)
.errors(this.errors)
Expand Down Expand Up @@ -66,6 +67,7 @@ public static FlowWithSource of(Flow flow, String source) {
.description(flow.description)
.labels(flow.labels)
.inputs(flow.inputs)
.outputs(flow.outputs)
.variables(flow.variables)
.tasks(flow.tasks)
.errors(flow.errors)
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Output.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.core.models.flows;

import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;

/**
* Definition of a flow's output.
*/
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
public class Output {
/**
* The output's unique id.
*/
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
/**
* Short description of the output.
*/
String description;
/**
* The output value. Can be a dynamic expression.
*/
@NotNull
Object value;
}
79 changes: 67 additions & 12 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,28 @@
import io.kestra.core.models.tasks.Task;
import lombok.extern.slf4j.Slf4j;

import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Vector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.kestra.core.utils.Rethrow.throwFunction;

@Slf4j
public final class ExecutableUtils {

private static final String TASK_VARIABLE_ITERATIONS = "iterations";
private static final String TASK_VARIABLE_NUMBER_OF_BATCHES = "numberOfBatches";
private static final String TASK_VARIABLE_URI = "uri";

private ExecutableUtils() {
// prevent initialization
}
Expand Down Expand Up @@ -109,45 +122,87 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
}

@SuppressWarnings("unchecked")
public static TaskRun manageIterations(TaskRun taskRun, Execution execution, boolean transmitFailed, boolean allowFailure) throws InternalException {
Integer numberOfBatches = (Integer) taskRun.getOutputs().get("numberOfBatches");
public static TaskRun manageIterations(RunContext runContext, TaskRun taskRun, Execution execution, boolean transmitFailed, boolean allowFailure) throws InternalException {
Integer numberOfBatches = (Integer) taskRun.getOutputs().get(TASK_VARIABLE_NUMBER_OF_BATCHES);
var previousTaskRun = execution.findTaskRunByTaskRunId(taskRun.getId());
if (previousTaskRun == null) {
throw new IllegalStateException("Should never happen");
}

State.Type currentState = taskRun.getState().getCurrent();
Optional<State.Type> previousState = taskRun.getState().getHistories().size() > 1 ? Optional.of(taskRun.getState().getHistories().get(taskRun.getState().getHistories().size() - 2).getState()) : Optional.empty();
Optional<State.Type> previousState = taskRun.getState().getHistories().size() > 1 ?
Optional.of(taskRun.getState().getHistories().get(taskRun.getState().getHistories().size() - 2).getState()) :
Optional.empty();

// search for the previous iterations, if not found, we init it with an empty map
Map<String, Integer> iterations = previousTaskRun.getOutputs() != null ? (Map<String, Integer>) previousTaskRun.getOutputs().get("iterations") : new HashMap<>();
Map<String, Integer> iterations = previousTaskRun.getOutputs() != null ?
(Map<String, Integer>) previousTaskRun.getOutputs().get(TASK_VARIABLE_ITERATIONS) :
new HashMap<>();

int currentStateIteration = iterations.getOrDefault(currentState.toString(), 0);
int currentStateIteration = iterations.getOrDefault(currentState.toString(), 0);
iterations.put(currentState.toString(), currentStateIteration + 1);
if (previousState.isPresent() && previousState.get() != currentState) {
int previousStateIterations = iterations.getOrDefault(previousState.get().toString(), numberOfBatches);
int previousStateIterations = iterations.getOrDefault(previousState.get().toString(), numberOfBatches);
iterations.put(previousState.get().toString(), previousStateIterations - 1);
}

// update the state to success if terminatedIterations == numberOfBatches
int terminatedIterations = iterations.getOrDefault(State.Type.SUCCESS.toString(), 0) +
int terminatedIterations = iterations.getOrDefault(State.Type.SUCCESS.toString(), 0) +
iterations.getOrDefault(State.Type.FAILED.toString(), 0) +
iterations.getOrDefault(State.Type.KILLED.toString(), 0) +
iterations.getOrDefault(State.Type.WARNING.toString(), 0) +
iterations.getOrDefault(State.Type.CANCELLED.toString(), 0);
if (terminatedIterations == numberOfBatches) {
var state = transmitFailed ? findTerminalState(iterations, allowFailure) : State.Type.SUCCESS;

if (terminatedIterations == numberOfBatches) {
State.Type state = transmitFailed ? findTerminalState(iterations, allowFailure) : State.Type.SUCCESS;
final Map<String, Object> outputs = new HashMap<>();
outputs.put(TASK_VARIABLE_ITERATIONS, iterations);
outputs.put(TASK_VARIABLE_NUMBER_OF_BATCHES, numberOfBatches);

try {
// Build URIs for each sub-flow outputs.
List<URI> outputsURIs = IntStream.rangeClosed(1, terminatedIterations)
.mapToObj(it -> "kestra://" + runContext.getStorageOutputPrefix().toString() + "/" + it + "/outputs.ion")
.map(throwFunction(URI::create))
.filter(runContext::isFileExist)
.toList();

if (!outputsURIs.isEmpty()) {
// Merge outputs from each sub-flow into a single stored in the internal storage.
Enumeration<InputStream> streams = outputsURIs.stream()
.map(throwFunction(runContext::uriToInputStream))
.collect(Collectors.toCollection(Vector::new))
.elements();
try (InputStream is = new SequenceInputStream(streams)) {
outputs.put(TASK_VARIABLE_URI, runContext.putFile(is, "outputs.ion"));
}
}
} catch (Exception e) {
runContext
.logger()
.error("Failed to collect and merge outputs from each sub-flow with error: {}",
e.getLocalizedMessage(),
e
);
if (transmitFailed) {
state = State.Type.FAILED;
}
}

return previousTaskRun
.withIteration(taskRun.getIteration())
.withOutputs(Map.of("iterations", iterations, "numberOfBatches", numberOfBatches))
.withOutputs(outputs)
.withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build()))
.withState(state);
}

// else we update the previous taskRun as it's the same taskRun that is still running
// else we update the previous taskRun as it's the same taskRun that is still running
return previousTaskRun
.withIteration(taskRun.getIteration())
.withOutputs(Map.of("iterations", iterations, "numberOfBatches", numberOfBatches));
.withOutputs(Map.of(
TASK_VARIABLE_ITERATIONS, iterations,
TASK_VARIABLE_NUMBER_OF_BATCHES, numberOfBatches
));
}

private static State.Type findTerminalState(Map<String, Integer> iterations, boolean allowFailure) {
Expand Down
Loading

0 comments on commit 5591a55

Please sign in to comment.