diff --git a/cli/src/main/resources/application.yml b/cli/src/main/resources/application.yml index 5a4bdc416f4..d61cb49fbfd 100644 --- a/cli/src/main/resources/application.yml +++ b/cli/src/main/resources/application.yml @@ -204,4 +204,10 @@ kestra: heartbeat: frequency: 10s - heartbeat-missed: 3 \ No newline at end of file + heartbeat-missed: 3 + + tasks: + # Global configuration for task 'Subflow' + subflow: + # Enable/Disable the task parameter's outputs. + allow-parameter-outputs: false \ No newline at end of file diff --git a/core/src/main/java/io/kestra/core/models/executions/Execution.java b/core/src/main/java/io/kestra/core/models/executions/Execution.java index 5155ff095ce..448b25ba198 100644 --- a/core/src/main/java/io/kestra/core/models/executions/Execution.java +++ b/core/src/main/java/io/kestra/core/models/executions/Execution.java @@ -9,32 +9,38 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; +import io.kestra.core.exceptions.InternalException; +import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.Label; import io.kestra.core.models.TenantInterface; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.State; +import io.kestra.core.models.tasks.ResolvedTask; +import io.kestra.core.runners.FlowableUtils; +import io.kestra.core.runners.RunContextLogger; import io.kestra.core.serializers.ListOrMapOfLabelDeserializer; import io.kestra.core.serializers.ListOrMapOfLabelSerializer; +import io.kestra.core.utils.MapUtils; +import io.micronaut.core.annotation.Nullable; import io.swagger.v3.oas.annotations.Hidden; import lombok.Builder; import lombok.Value; import lombok.With; import lombok.extern.slf4j.Slf4j; -import io.kestra.core.exceptions.InternalException; -import io.kestra.core.models.DeletedInterface; -import io.kestra.core.models.flows.Flow; -import io.kestra.core.models.flows.State; -import io.kestra.core.models.tasks.ResolvedTask; -import io.kestra.core.runners.FlowableUtils; -import io.kestra.core.runners.RunContextLogger; -import io.kestra.core.utils.MapUtils; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.zip.CRC32; -import io.micronaut.core.annotation.Nullable; -import javax.validation.constraints.NotNull; -import javax.validation.constraints.Pattern; @Value @Builder(toBuilder = true) @@ -65,6 +71,10 @@ public class Execution implements DeletedInterface, TenantInterface { @JsonInclude(JsonInclude.Include.NON_EMPTY) Map inputs; + @With + @JsonInclude(JsonInclude.Include.NON_EMPTY) + Map outputs; + @With @JsonSerialize(using = ListOrMapOfLabelSerializer.class) @JsonDeserialize(using = ListOrMapOfLabelDeserializer.class) @@ -114,6 +124,7 @@ public Execution withState(State.Type state) { this.flowRevision, this.taskRunList, this.inputs, + this.outputs, this.labels, this.variables, this.state.withState(state), @@ -145,6 +156,7 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException { this.flowRevision, newTaskRunList, this.inputs, + this.outputs, this.labels, this.variables, this.state, @@ -164,6 +176,7 @@ public Execution childExecution(String childExecutionId, List taskRunLi this.flowRevision, taskRunList, this.inputs, + this.outputs, this.labels, this.variables, state, @@ -442,7 +455,7 @@ public State.Type guessFinalState(List currentTasks, TaskRun paren } @JsonIgnore - public boolean hasTaskRunJoinable(TaskRun taskRun) { + public boolean hasTaskRunJoinable(TaskRun taskRun) { if (this.taskRunList == null) { return true; } @@ -532,17 +545,17 @@ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) { }) .filter(Objects::nonNull) .orElseGet(() -> new FailedExecutionWithLog( - this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this, - RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this)) - ) - ); + this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this, + RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this)) + ) + ); } /** * Create a new attempt for failed worker execution * * @param taskRun the task run where we need to add an attempt - * @param e the exception raise + * @param e the exception raise * @return new taskRun with added attempt */ private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun taskRun, Exception e) { @@ -562,9 +575,9 @@ private static FailedTaskRunWithLog newAttemptsTaskRunForFailedExecution(TaskRun /** * Add exception log to last attempts * - * @param taskRun the task run where we need to add an attempt + * @param taskRun the task run where we need to add an attempt * @param lastAttempt the lastAttempt found to add - * @param e the exception raise + * @param e the exception raise * @return new taskRun with updated attempt with logs */ private static FailedTaskRunWithLog lastAttemptsTaskRunForFailedExecution(TaskRun taskRun, TaskRunAttempt lastAttempt, Exception e) { @@ -599,6 +612,7 @@ public static class FailedExecutionWithLog { /** * Transform an exception to {@link ILoggingEvent} + * * @param e the current execption * @return the {@link ILoggingEvent} waited to generate {@link LogEntry} */ diff --git a/core/src/main/java/io/kestra/core/models/flows/Flow.java b/core/src/main/java/io/kestra/core/models/flows/Flow.java index 739db003f6d..8929efa9ca2 100644 --- a/core/src/main/java/io/kestra/core/models/flows/Flow.java +++ b/core/src/main/java/io/kestra/core/models/flows/Flow.java @@ -11,6 +11,7 @@ import io.kestra.core.models.DeletedInterface; import io.kestra.core.models.Label; import io.kestra.core.models.TenantInterface; +import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.listeners.Listener; import io.kestra.core.models.tasks.FlowableTask; @@ -98,7 +99,6 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) { @Valid List triggers; - @Valid List taskDefaults; @@ -113,6 +113,13 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) { @Valid Concurrency concurrency; + @Schema( + title = "Output values available and exposes to other flows.", + description = "Output values make information about the execution of your Flow available and expose for other Kestra flows to use. Output values are similar to return values in programming languages." + ) + @PluginProperty(dynamic = true) + List outputs; + public Logger logger() { return LoggerFactory.getLogger("flow." + this.id); } diff --git a/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java b/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java index 95226d61480..90f98c44fa1 100644 --- a/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java +++ b/core/src/main/java/io/kestra/core/models/flows/FlowWithSource.java @@ -24,6 +24,7 @@ public Flow toFlow() { .description(this.description) .labels(this.labels) .inputs(this.inputs) + .outputs(this.outputs) .variables(this.variables) .tasks(this.tasks) .errors(this.errors) @@ -66,6 +67,7 @@ public static FlowWithSource of(Flow flow, String source) { .description(flow.description) .labels(flow.labels) .inputs(flow.inputs) + .outputs(flow.outputs) .variables(flow.variables) .tasks(flow.tasks) .errors(flow.errors) diff --git a/core/src/main/java/io/kestra/core/models/flows/Output.java b/core/src/main/java/io/kestra/core/models/flows/Output.java new file mode 100644 index 00000000000..e05a50448fb --- /dev/null +++ b/core/src/main/java/io/kestra/core/models/flows/Output.java @@ -0,0 +1,36 @@ +package io.kestra.core.models.flows; + +import io.micronaut.core.annotation.Introspected; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.experimental.SuperBuilder; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; + +/** + * Definition of a flow's output. + */ +@SuperBuilder +@Getter +@NoArgsConstructor +@Introspected +public class Output { + /** + * The output's unique id. + */ + @NotNull + @NotBlank + @Pattern(regexp = "^[a-zA-Z0-9][.a-zA-Z0-9_-]*") + String id; + /** + * Short description of the output. + */ + String description; + /** + * The output value. Can be a dynamic expression. + */ + @NotNull + Object value; +} diff --git a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java index d150334b2f0..28b8369f977 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutableUtils.java @@ -15,15 +15,28 @@ import io.kestra.core.models.tasks.Task; import lombok.extern.slf4j.Slf4j; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.net.URI; import java.util.Collections; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Vector; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static io.kestra.core.utils.Rethrow.throwFunction; @Slf4j public final class ExecutableUtils { + private static final String TASK_VARIABLE_ITERATIONS = "iterations"; + private static final String TASK_VARIABLE_NUMBER_OF_BATCHES = "numberOfBatches"; + private static final String TASK_VARIABLE_URI = "uri"; + private ExecutableUtils() { // prevent initialization } @@ -109,45 +122,87 @@ public static > SubflowExecution subflowEx } @SuppressWarnings("unchecked") - public static TaskRun manageIterations(TaskRun taskRun, Execution execution, boolean transmitFailed, boolean allowFailure) throws InternalException { - Integer numberOfBatches = (Integer) taskRun.getOutputs().get("numberOfBatches"); + public static TaskRun manageIterations(RunContext runContext, TaskRun taskRun, Execution execution, boolean transmitFailed, boolean allowFailure) throws InternalException { + Integer numberOfBatches = (Integer) taskRun.getOutputs().get(TASK_VARIABLE_NUMBER_OF_BATCHES); var previousTaskRun = execution.findTaskRunByTaskRunId(taskRun.getId()); if (previousTaskRun == null) { throw new IllegalStateException("Should never happen"); } State.Type currentState = taskRun.getState().getCurrent(); - Optional previousState = taskRun.getState().getHistories().size() > 1 ? Optional.of(taskRun.getState().getHistories().get(taskRun.getState().getHistories().size() - 2).getState()) : Optional.empty(); + Optional previousState = taskRun.getState().getHistories().size() > 1 ? + Optional.of(taskRun.getState().getHistories().get(taskRun.getState().getHistories().size() - 2).getState()) : + Optional.empty(); // search for the previous iterations, if not found, we init it with an empty map - Map iterations = previousTaskRun.getOutputs() != null ? (Map) previousTaskRun.getOutputs().get("iterations") : new HashMap<>(); + Map iterations = previousTaskRun.getOutputs() != null ? + (Map) previousTaskRun.getOutputs().get(TASK_VARIABLE_ITERATIONS) : + new HashMap<>(); - int currentStateIteration = iterations.getOrDefault(currentState.toString(), 0); + int currentStateIteration = iterations.getOrDefault(currentState.toString(), 0); iterations.put(currentState.toString(), currentStateIteration + 1); if (previousState.isPresent() && previousState.get() != currentState) { - int previousStateIterations = iterations.getOrDefault(previousState.get().toString(), numberOfBatches); + int previousStateIterations = iterations.getOrDefault(previousState.get().toString(), numberOfBatches); iterations.put(previousState.get().toString(), previousStateIterations - 1); } // update the state to success if terminatedIterations == numberOfBatches - int terminatedIterations = iterations.getOrDefault(State.Type.SUCCESS.toString(), 0) + + int terminatedIterations = iterations.getOrDefault(State.Type.SUCCESS.toString(), 0) + iterations.getOrDefault(State.Type.FAILED.toString(), 0) + iterations.getOrDefault(State.Type.KILLED.toString(), 0) + iterations.getOrDefault(State.Type.WARNING.toString(), 0) + iterations.getOrDefault(State.Type.CANCELLED.toString(), 0); - if (terminatedIterations == numberOfBatches) { - var state = transmitFailed ? findTerminalState(iterations, allowFailure) : State.Type.SUCCESS; + + if (terminatedIterations == numberOfBatches) { + State.Type state = transmitFailed ? findTerminalState(iterations, allowFailure) : State.Type.SUCCESS; + final Map outputs = new HashMap<>(); + outputs.put(TASK_VARIABLE_ITERATIONS, iterations); + outputs.put(TASK_VARIABLE_NUMBER_OF_BATCHES, numberOfBatches); + + try { + // Build URIs for each sub-flow outputs. + List outputsURIs = IntStream.rangeClosed(1, terminatedIterations) + .mapToObj(it -> "kestra://" + runContext.getStorageOutputPrefix().toString() + "/" + it + "/outputs.ion") + .map(throwFunction(URI::create)) + .filter(runContext::isFileExist) + .toList(); + + if (!outputsURIs.isEmpty()) { + // Merge outputs from each sub-flow into a single stored in the internal storage. + Enumeration streams = outputsURIs.stream() + .map(throwFunction(runContext::uriToInputStream)) + .collect(Collectors.toCollection(Vector::new)) + .elements(); + try (InputStream is = new SequenceInputStream(streams)) { + outputs.put(TASK_VARIABLE_URI, runContext.putFile(is, "outputs.ion")); + } + } + } catch (Exception e) { + runContext + .logger() + .error("Failed to collect and merge outputs from each sub-flow with error: {}", + e.getLocalizedMessage(), + e + ); + if (transmitFailed) { + state = State.Type.FAILED; + } + } + return previousTaskRun .withIteration(taskRun.getIteration()) - .withOutputs(Map.of("iterations", iterations, "numberOfBatches", numberOfBatches)) + .withOutputs(outputs) .withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build())) .withState(state); } - // else we update the previous taskRun as it's the same taskRun that is still running + // else we update the previous taskRun as it's the same taskRun that is still running return previousTaskRun .withIteration(taskRun.getIteration()) - .withOutputs(Map.of("iterations", iterations, "numberOfBatches", numberOfBatches)); + .withOutputs(Map.of( + TASK_VARIABLE_ITERATIONS, iterations, + TASK_VARIABLE_NUMBER_OF_BATCHES, numberOfBatches + )); } private static State.Type findTerminalState(Map iterations, boolean allowFailure) { diff --git a/core/src/main/java/io/kestra/core/runners/ExecutorService.java b/core/src/main/java/io/kestra/core/runners/ExecutorService.java index 4a66d838f2e..37c64e99ecb 100644 --- a/core/src/main/java/io/kestra/core/runners/ExecutorService.java +++ b/core/src/main/java/io/kestra/core/runners/ExecutorService.java @@ -1,6 +1,7 @@ package io.kestra.core.runners; import com.google.common.collect.ImmutableMap; +import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.exceptions.InternalException; import io.kestra.core.metrics.MetricRegistry; import io.kestra.core.models.executions.Execution; @@ -10,7 +11,12 @@ import io.kestra.core.models.executions.TaskRunAttempt; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; -import io.kestra.core.models.tasks.*; +import io.kestra.core.models.tasks.ExecutableTask; +import io.kestra.core.models.tasks.ExecutionUpdatableTask; +import io.kestra.core.models.tasks.FlowableTask; +import io.kestra.core.models.tasks.Output; +import io.kestra.core.models.tasks.ResolvedTask; +import io.kestra.core.models.tasks.Task; import io.kestra.core.services.ConditionService; import io.kestra.core.tasks.flows.Pause; import io.kestra.core.tasks.flows.WorkingDirectory; @@ -24,6 +30,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -83,8 +90,10 @@ public Executor checkConcurrencyLimit(Executor executor, Flow flow, Execution ex .withExecutionQueued(executionQueued) .withExecution(newExecution, "checkConcurrencyLimit"); } - case CANCEL -> executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit"); - case FAIL -> executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit"); + case CANCEL -> + executor.withExecution(execution.withState(State.Type.CANCELLED), "checkConcurrencyLimit"); + case FAIL -> + executor.withException(new IllegalStateException("Flow is FAILED due to concurrency limit exceeded"), "checkConcurrencyLimit"); }; } @@ -322,10 +331,37 @@ private List saveFlowableOutput( } private Executor onEnd(Executor executor) { + final Flow flow = executor.getFlow(); + + Logger logger = flow.logger(); + Execution newExecution = executor.getExecution() - .withState(executor.getExecution().guessFinalState(executor.getFlow())); + .withState(executor.getExecution().guessFinalState(flow)); - Logger logger = executor.getFlow().logger(); + if (flow.getOutputs() != null) { + try { + Map outputs = flow.getOutputs() + .stream() + .collect(Collectors.toMap( + io.kestra.core.models.flows.Output::getId, + io.kestra.core.models.flows.Output::getValue) + ); + + RunContext runContext = runContextFactory.of(executor.getFlow(), executor.getExecution()); + outputs = runContext.render(outputs); + newExecution = newExecution + .withOutputs(outputs); + } catch (IllegalVariableEvaluationException e) { + logger.error("[namespace: {}] [flow: {}] [execution: {}] Failed to render output values", + executor.getExecution().getNamespace(), + executor.getExecution().getFlowId(), + executor.getExecution().getId(), + e + ); + newExecution = newExecution + .withState(State.Type.FAILED); + } + } logger.info( "[namespace: {}] [flow: {}] [execution: {}] Flow completed with state {} in {}", @@ -470,8 +506,8 @@ private Executor handleCreatedKilling(Executor executor) { .stream() .filter(taskRun -> taskRun.getState().getCurrent().isCreated()) .map(t -> childWorkerTaskTypeToWorkerTask( - Optional.of(State.Type.KILLED), - t + Optional.of(State.Type.KILLED), + t )) .filter(Optional::isPresent) .map(Optional::get) @@ -617,8 +653,7 @@ private Executor handleExecutableTask(final Executor executor) { .withTaskRun(executableTaskRun.withState(State.Type.SUCCESS)), "handleExecutableTaskRunning.noExecution" ); - } - else { + } else { executions.addAll(subflowExecutions); if (!executableTask.waitForExecution()) { // send immediately all workerTaskResult to ends the executable task @@ -633,8 +668,7 @@ private Executor handleExecutableTask(final Executor executor) { } } } - } - catch (Exception e) { + } catch (Exception e) { WorkerTaskResult failed = WorkerTaskResult.builder() .taskRun(workerTask.getTaskRun().withState(State.Type.FAILED) .withAttempts(Collections.singletonList( @@ -683,9 +717,9 @@ private Executor handleExecutionUpdatingTask(final Executor executor) { workerTaskResults.add( WorkerTaskResult.builder() .taskRun(workerTask.getTaskRun().withAttempts( - Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.SUCCESS)).build()) - ) - .withState(State.Type.SUCCESS) + Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(State.Type.SUCCESS)).build()) + ) + .withState(State.Type.SUCCESS) ) .build() ); @@ -757,8 +791,7 @@ public void log(Logger log, Boolean in, WorkerJob value) { workerTask.getClass().getSimpleName(), workerTask.getTaskRun().toStringState() ); - } - else if (value instanceof WorkerTrigger workerTrigger){ + } else if (value instanceof WorkerTrigger workerTrigger) { log.debug( "{} {} : {}", in ? "<< IN " : ">> OUT", diff --git a/core/src/main/java/io/kestra/core/runners/RunContext.java b/core/src/main/java/io/kestra/core/runners/RunContext.java index 623ee9de025..ff06dfac78f 100644 --- a/core/src/main/java/io/kestra/core/runners/RunContext.java +++ b/core/src/main/java/io/kestra/core/runners/RunContext.java @@ -27,11 +27,23 @@ import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; -import java.util.*; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,7 +58,6 @@ public class RunContext { private MetricRegistry meterRegistry; private Path tempBasedPath; private RunContextCache runContextCache; - private URI storageOutputPrefix; private URI storageExecutionPrefix; private Map variables; @@ -62,8 +73,8 @@ public class RunContext { * Only used by {@link io.kestra.core.models.triggers.types.Flow} * * @param applicationContext the current {@link ApplicationContext} - * @param flow the current {@link Flow} - * @param execution the current {@link Execution} + * @param flow the current {@link Flow} + * @param execution the current {@link Execution} */ public RunContext(ApplicationContext applicationContext, Flow flow, Execution execution) { this.initBean(applicationContext); @@ -75,10 +86,10 @@ public RunContext(ApplicationContext applicationContext, Flow flow, Execution ex * Normal usage * * @param applicationContext the current {@link ApplicationContext} - * @param flow the current {@link Flow} - * @param task the current {@link io.kestra.core.models.tasks.Task} - * @param execution the current {@link Execution} - * @param taskRun the current {@link TaskRun} + * @param flow the current {@link Flow} + * @param task the current {@link io.kestra.core.models.tasks.Task} + * @param execution the current {@link Execution} + * @param taskRun the current {@link TaskRun} */ public RunContext(ApplicationContext applicationContext, Flow flow, Task task, Execution execution, TaskRun taskRun) { this.initBean(applicationContext); @@ -102,7 +113,7 @@ public RunContext(ApplicationContext applicationContext, Flow flow, AbstractTrig * Only used by Unit Test * * @param applicationContext the current {@link ApplicationContext} - * @param variables The variable to inject + * @param variables The variable to inject */ @VisibleForTesting public RunContext(ApplicationContext applicationContext, Map variables) { @@ -192,6 +203,8 @@ public String getTriggerExecutionId() { return triggerExecutionId; } + + public Map getVariables() { return variables; } @@ -242,8 +255,7 @@ protected Map variables(Flow flow, Task task, Execution executio "namespace", flow.getNamespace(), "revision", flow.getRevision() )); - } - else { + } else { builder .put("flow", ImmutableMap.of( "id", flow.getId(), @@ -543,9 +555,9 @@ public URI putTempFile(File file, String name) throws IOException { * This method is meant to be used by polling triggers, the name of the destination file is derived from the * executionId and the trigger passed as parameters. * - * @param file the temporary file to upload to storage + * @param file the temporary file to upload to storage * @param executionId overwrite file name - * @param trigger the trigger + * @param trigger the trigger * @return the {@code StorageObject} created * @throws IOException If the temporary file can't be read */ @@ -565,6 +577,41 @@ public URI putTempFile(File file, String executionId, AbstractTrigger trigger) t ); } + /** + * Stores a file with the given name for the given {@link InputStream} into Kestra's storage. + * + * @param inputStream the {@link InputStream} of the file content. + * @param name the target name of the file to be stored in the storage. + * @return the URI of the file/object in the internal storage. + * @throws IOException if an error occurred while storing the file. + */ + public URI putFile(InputStream inputStream, String name) throws IOException { + URI uri = storageOutputPrefix.resolve(storageOutputPrefix + "/" + name); + return this.storageInterface.put(tenantId(), uri, new BufferedInputStream(inputStream)); + } + + /** + * Stores a file with the given name for the given {@link InputStream} into Kestra's storage. + * + * @param inputStream the {@link InputStream} of the file content. + * @param uri the target URI of the file to be stored in the storage. + * @return the URI of the file/object in the internal storage. + * @throws IOException if an error occurred while storing the file. + */ + public URI putFile(InputStream inputStream, URI uri) throws IOException { + return this.storageInterface.put(tenantId(), uri, new BufferedInputStream(inputStream)); + } + + /** + * Checks whether the given URI points to an exiting file/object in the internal storage. + * + * @param uri the URI of the file/object in the internal storage. + * @return {@code true} if the URI points to a file/object that exists in the internal storage. + */ + public boolean isFileExist(URI uri) { + return this.storageInterface.exists(tenantId(), uri); + } + private URI putTempFile(InputStream inputStream, String prefix, String name) throws IOException { URI uri = URI.create(prefix); URI resolve = uri.resolve(uri.getPath() + "/" + name); @@ -606,7 +653,7 @@ public InputStream getTaskStateFile(String state, String name, Boolean isNamespa URI uri = URI.create(this.taskStateFilePathPrefix(state, isNamespace, useTaskRun)); URI resolve = uri.resolve(uri.getPath() + "/" + name); - return this.storageInterface.get(tenantId(), resolve); + return this.storageInterface.get(tenantId(), resolve); } public URI putTaskStateFile(byte[] content, String state, String name) throws IOException { @@ -651,10 +698,9 @@ public boolean deleteTaskStateFile(String state, String name, Boolean isNamespac * If the cache file didn't exist, an empty Optional is returned. * * @param namespace the flow namespace - * @param flowId the flow identifier - * @param taskId the task identifier - * @param value optional, the task run value - * + * @param flowId the flow identifier + * @param taskId the task identifier + * @param value optional, the task run value * @return an Optional with the cache input stream or empty. */ public Optional getTaskCacheFile(String namespace, String flowId, String taskId, String value) throws IOException { @@ -670,12 +716,11 @@ public Optional getTaskCacheFileLastModifiedTime(String namespace, String /** * Put into the internal storage the cache file corresponding to this task. * - * @param file the cache as a ZIP archive + * @param file the cache as a ZIP archive * @param namespace the flow namespace - * @param flowId the flow identifier - * @param taskId the task identifier - * @param value optional, the task run value - * + * @param flowId the flow identifier + * @param taskId the task identifier + * @param value optional, the task run value * @return the URI of the file inside the internal storage. */ public URI putTaskCacheFile(File file, String namespace, String flowId, String taskId, String value) throws IOException { @@ -834,6 +879,7 @@ public Path tempFile(byte[] content, String extension) throws IOException { /** * Get the file extension including the '.' to be used with the various methods that took a suffix. + * * @param fileName the name of the file * @return the file extension including the '.' or null */ diff --git a/core/src/main/java/io/kestra/core/tasks/flows/ForEachItem.java b/core/src/main/java/io/kestra/core/tasks/flows/ForEachItem.java index 092216f2762..6cc4731479c 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/ForEachItem.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/ForEachItem.java @@ -7,6 +7,7 @@ import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.Execution; import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.executions.TaskRunAttempt; import io.kestra.core.models.flows.Flow; import io.kestra.core.models.flows.State; import io.kestra.core.models.tasks.ExecutableTask; @@ -16,6 +17,7 @@ import io.kestra.core.runners.RunContext; import io.kestra.core.runners.SubflowExecution; import io.kestra.core.runners.SubflowExecutionResult; +import io.kestra.core.serializers.FileSerde; import io.kestra.core.services.StorageService; import io.kestra.core.storages.StorageSplitInterface; import io.swagger.v3.oas.annotations.media.Schema; @@ -26,16 +28,20 @@ import lombok.ToString; import lombok.experimental.SuperBuilder; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; -import javax.validation.constraints.NotEmpty; -import javax.validation.constraints.NotNull; +import java.util.stream.Collectors; import static io.kestra.core.utils.Rethrow.throwFunction; @@ -56,30 +62,29 @@ examples = { @Example( title = """ - Execute a subflow for each batch of items. The subflow `orders` is called from the parent flow `orders_parallel` using the `ForEachItem` task in order to start one subflow execution for each batch of items. - ```yaml - id: orders - namespace: prod - - inputs: - - name: order - type: STRING - - tasks: - - id: read_file - type: io.kestra.plugin.scripts.shell.Commands - runner: PROCESS - commands: - - cat "{{ inputs.order }}" - - - id: read_file_content - type: io.kestra.core.tasks.log.Log - message: "{{ read(inputs.order) }}" - ``` - """, + Execute a subflow for each batch of items. The subflow `orders` is called from the parent flow `orders_parallel` using the `ForEachItem` task in order to start one subflow execution for each batch of items. + ```yaml + id: orders + namespace: prod + + inputs: + - name: order + type: STRING + + tasks: + - id: read_file + type: io.kestra.plugin.scripts.shell.Commands + runner: PROCESS + commands: + - cat "{{ inputs.order }}" + + - id: read_file_content + type: io.kestra.core.tasks.log.Log + message: "{{ read(inputs.order) }}" + ``` + """, full = true, - code = - """ + code = """ id: orders_parallel namespace: prod @@ -103,7 +108,8 @@ wait: true # wait for the subflow execution transmitFailed: true # fail the task run if the subflow execution fails inputs: - order: "{{ taskrun.items }}" # special variable that contains the items of the batch""" + order: "{{ taskrun.items }}" # special variable that contains the items of the batch + """ ) } ) @@ -213,13 +219,17 @@ public List> createSubflowExecutions( } if (this.labels != null) { - for (Map.Entry entry: this.labels.entrySet()) { + for (Map.Entry entry : this.labels.entrySet()) { labels.add(new Label(entry.getKey(), runContext.render(entry.getValue()))); } } // these are special outputs to be able to compute iteration map of the parent taskrun - var outputs = Output.builder().numberOfBatches(splits.size()).build(); + var outputs = Output.builder() + .numberOfBatches(splits.size()) + // the passed URI may be used by the subflow to write execution outputs. + .uri(URI.create(runContext.getStorageOutputPrefix().toString() + "/" + iteration + "/outputs.ion")) + .build(); return ExecutableUtils.subflowExecution( runContext, flowExecutorInterface, @@ -248,6 +258,42 @@ public Optional createSubflowExecutionResult( Flow flow, Execution execution ) { + if (flow.getOutputs() != null && waitForExecution()) { + final Map outputs = flow.getOutputs() + .stream() + .collect(Collectors.toMap( + io.kestra.core.models.flows.Output::getId, + io.kestra.core.models.flows.Output::getValue) + ); + final ForEachItem.Output.OutputBuilder builder = Output + .builder() + .iterations((Map) taskRun.getOutputs().get("iterations")) + .numberOfBatches((Integer) taskRun.getOutputs().get("numberOfBatches")); + + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { + FileSerde.write(bos, runContext.render(outputs)); + URI uri = runContext.putFile( + new ByteArrayInputStream(bos.toByteArray()), + URI.create((String) taskRun.getOutputs().get("uri")) + ); + builder.uri(uri); + } catch (Exception e) { + runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e); + var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED; + taskRun = taskRun + .withState(state) + .withAttempts(Collections.singletonList(TaskRunAttempt.builder().state(new State().withState(state)).build())) + .withOutputs(builder.build().toMap()); + + return Optional.of(SubflowExecutionResult.builder() + .executionId(execution.getId()) + .state(State.Type.FAILED) + .parentTaskRun(taskRun) + .build()); + } + taskRun = taskRun.withOutputs(builder.build().toMap()); + } + // ForEachItem is an iterative task, the terminal state will be computed in the executor while counting on the task run execution list return Optional.of(ExecutableUtils.subflowExecutionResult(taskRun, execution)); } @@ -292,5 +338,10 @@ public static class Output implements io.kestra.core.models.tasks.Output { title = "The number of batches." ) private final Integer numberOfBatches; + + @Schema( + title = "The URI of the file gathering outputs from each subflow execution." + ) + private final URI uri; } } diff --git a/core/src/main/java/io/kestra/core/tasks/flows/Subflow.java b/core/src/main/java/io/kestra/core/tasks/flows/Subflow.java index 3b94e3a8761..06712b83504 100644 --- a/core/src/main/java/io/kestra/core/tasks/flows/Subflow.java +++ b/core/src/main/java/io/kestra/core/tasks/flows/Subflow.java @@ -11,14 +11,28 @@ import io.kestra.core.models.flows.State; import io.kestra.core.models.tasks.ExecutableTask; import io.kestra.core.models.tasks.Task; -import io.kestra.core.runners.*; +import io.kestra.core.runners.ExecutableUtils; +import io.kestra.core.runners.FlowExecutorInterface; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.SubflowExecution; +import io.kestra.core.runners.SubflowExecutionResult; import io.swagger.v3.oas.annotations.media.Schema; -import lombok.*; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; import lombok.experimental.SuperBuilder; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; @SuperBuilder @ToString @@ -101,11 +115,15 @@ public class Subflow extends Task implements ExecutableTask { @PluginProperty private final Boolean inheritLabels = false; + /** + * @deprecated Output value should now be defined part of the Flow definition. + */ @Schema( title = "Outputs from the subflow executions.", description = "Allows to specify outputs as key-value pairs to extract any outputs from the subflow execution into output of this task execution." ) @PluginProperty(dynamic = true) + @Deprecated(since = "0.15.0'") private Map outputs; @Override @@ -125,7 +143,7 @@ public List> createSubflowExecutions(RunContext runContext, } if (this.labels != null) { - for (Map.Entry entry: this.labels.entrySet()) { + for (Map.Entry entry : this.labels.entrySet()) { labels.add(new Label(entry.getKey(), runContext.render(entry.getValue()))); } } @@ -154,14 +172,33 @@ public Optional createSubflowExecutionResult( return Optional.empty(); } - Output.OutputBuilder builder = Output.builder() + // Ugly hack to get access to worker's configuration. + // MUST be changed when the RunContext class will be refactored, or remove when + // the `outputs` parameter is removed after being deprecated. + boolean isOutputsAllowed = runContext.getApplicationContext() + .getProperty("kestra.tasks.subflow.allow-parameter-outputs", Boolean.class) + .orElse(false); + + final Output.OutputBuilder builder = Output.builder() .executionId(execution.getId()) .state(execution.getState().getCurrent()); - if (this.getOutputs() != null) { + + final Map subflowOutputs = Optional + .ofNullable(flow.getOutputs()) + .map(outputs -> outputs + .stream() + .collect(Collectors.toMap( + io.kestra.core.models.flows.Output::getId, + io.kestra.core.models.flows.Output::getValue) + ) + ) + .orElseGet(() -> isOutputsAllowed ? this.getOutputs() : null); + + if (subflowOutputs != null) { try { - builder.outputs(runContext.render(this.getOutputs())); + builder.outputs(runContext.render(subflowOutputs)); } catch (Exception e) { - runContext.logger().warn("Failed to extract outputs with the error: '" + e.getMessage() + "'", e); + runContext.logger().warn("Failed to extract outputs with the error: '{}'", e.getLocalizedMessage(), e); var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED; taskRun = taskRun .withState(state) diff --git a/core/src/test/java/io/kestra/core/tasks/flows/FlowOutputTest.java b/core/src/test/java/io/kestra/core/tasks/flows/FlowOutputTest.java new file mode 100644 index 00000000000..938fc026e28 --- /dev/null +++ b/core/src/test/java/io/kestra/core/tasks/flows/FlowOutputTest.java @@ -0,0 +1,33 @@ +package io.kestra.core.tasks.flows; + +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.flows.State; +import io.kestra.core.runners.AbstractMemoryRunnerTest; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +class FlowOutputTest extends AbstractMemoryRunnerTest { + + static final String NAMESPACE = "io.kestra.tests"; + + @Test + void shouldGetSuccessExecutionForFlowFlowWithOutputs() throws TimeoutException { + Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs", null, null); + assertThat(execution.getOutputs(), aMapWithSize(1)); + assertThat(execution.getOutputs().get("key"), is("{\"value\":\"flow-with-outputs\"}")); + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + } + + @Test + void shouldGetFailExecutionForFlowWithInvalidOutputs() throws TimeoutException { + Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs-failed", null, null); + assertThat(execution.getOutputs(), nullValue()); + assertThat(execution.getState().getCurrent(), is(State.Type.FAILED)); + } +} diff --git a/core/src/test/java/io/kestra/core/tasks/flows/ForEachItemCaseTest.java b/core/src/test/java/io/kestra/core/tasks/flows/ForEachItemCaseTest.java index 5a6c7d2e75b..0de3f09ba2d 100644 --- a/core/src/test/java/io/kestra/core/tasks/flows/ForEachItemCaseTest.java +++ b/core/src/test/java/io/kestra/core/tasks/flows/ForEachItemCaseTest.java @@ -12,9 +12,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; @@ -29,11 +32,18 @@ import java.util.stream.IntStream; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.matchesRegex; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; @Slf4j @Singleton public class ForEachItemCaseTest { + static final String TEST_NAMESPACE = "io.kestra.tests"; + @Inject @Named(QueueFactoryInterface.EXECUTION_NAMED) private QueueInterface executionQueue; @@ -58,7 +68,7 @@ public void forEachItem() throws TimeoutException, InterruptedException, URISynt URI file = storageUpload(10); Map inputs = Map.of("file", file.toString()); - Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "for-each-item", null, + Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs), Duration.ofSeconds(30)); @@ -102,7 +112,7 @@ public void forEachItemNoWait() throws TimeoutException, InterruptedException, U URI file = storageUpload(10); Map inputs = Map.of("file", file.toString()); - Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "for-each-item-no-wait", null, + Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item-no-wait", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs), Duration.ofSeconds(30)); @@ -147,7 +157,7 @@ public void forEachItemFailed() throws TimeoutException, InterruptedException, U URI file = storageUpload(10); Map inputs = Map.of("file", file.toString()); - Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "for-each-item-failed", null, + Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item-failed", null, (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs), Duration.ofSeconds(30)); @@ -174,6 +184,57 @@ public void forEachItemFailed() throws TimeoutException, InterruptedException, U assertThat(triggered.get().getTaskRunList(), hasSize(1)); } + public void forEachItemWithSubflowOutputs() throws TimeoutException, InterruptedException, URISyntaxException, IOException { + CountDownLatch countDownLatch = new CountDownLatch(3); + AtomicReference triggered = new AtomicReference<>(); + + executionQueue.receive(either -> { + Execution execution = either.getLeft(); + if (execution.getFlowId().equals("for-each-item-outputs-subflow") && execution.getState().getCurrent().isTerminated()) { + countDownLatch.countDown(); + triggered.set(execution); + } + }); + + URI file = storageUpload(10); + Map inputs = Map.of("file", file.toString()); + Execution execution = runnerUtils.runOne(null, TEST_NAMESPACE, "for-each-item-outputs", null, + (flow, execution1) -> runnerUtils.typedInputs(flow, execution1, inputs), + Duration.ofSeconds(30)); + + // we should have triggered 3 subflows + assertThat(countDownLatch.await(1, TimeUnit.MINUTES), is(true)); + + // assert on the main flow execution + assertThat(execution.getTaskRunList(), hasSize(2)); + assertThat(execution.getTaskRunList().get(0).getAttempts(), hasSize(1)); + assertThat(execution.getTaskRunList().get(0).getAttempts().get(0).getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS)); + Map outputs = execution.getTaskRunList().get(0).getOutputs(); + assertThat(outputs.get("numberOfBatches"), is(3)); + assertThat(outputs.get("iterations"), notNullValue()); + + Map iterations = (Map) outputs.get("iterations"); + assertThat(iterations.get("CREATED"), is(0)); + assertThat(iterations.get("RUNNING"), is(0)); + assertThat(iterations.get("SUCCESS"), is(3)); + + // assert on the last subflow execution + assertThat(triggered.get().getState().getCurrent(), is(State.Type.SUCCESS)); + assertThat(triggered.get().getFlowId(), is("for-each-item-outputs-subflow")); + assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item-outputs/executions/.*/tasks/each/.*\\.txt")); + assertThat(triggered.get().getTaskRunList(), hasSize(1)); + + // asserts for subflow merged outputs + assertThat(outputs.get("uri"), notNullValue()); + InputStream stream = storageInterface.get(null, URI.create((String) outputs.get("uri"))); + + try (var br = new BufferedReader(new InputStreamReader(stream))) { + // one line per sub-flows + assertThat(br.lines().count(), is(3L)); + } + } + private URI storageUpload(int count) throws URISyntaxException, IOException { File tempFile = File.createTempFile("file", ".txt"); diff --git a/core/src/test/java/io/kestra/core/tasks/flows/SubflowTest.java b/core/src/test/java/io/kestra/core/tasks/flows/SubflowTest.java new file mode 100644 index 00000000000..a8236152fd3 --- /dev/null +++ b/core/src/test/java/io/kestra/core/tasks/flows/SubflowTest.java @@ -0,0 +1,94 @@ +package io.kestra.core.tasks.flows; + +import io.kestra.core.exceptions.IllegalVariableEvaluationException; +import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.executions.TaskRun; +import io.kestra.core.models.flows.Flow; +import io.kestra.core.models.flows.Output; +import io.kestra.core.models.flows.State; +import io.kestra.core.runners.RunContext; +import io.kestra.core.runners.SubflowExecutionResult; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class SubflowTest { + + private static final Logger LOG = LoggerFactory.getLogger(SubflowTest.class); + + private static final State DEFAULT_SUCCESS_STATE = State.of(State.Type.SUCCESS, Collections.emptyList()); + public static final String EXECUTION_ID = "executionId"; + + @Mock + private RunContext runContext; + + @BeforeEach + void beforeEach() { + Mockito.when(runContext.logger()).thenReturn(LOG); + } + + @Test + void shouldNotReturnResultForExecutionNotTerminated() { + TaskRun taskRun = TaskRun + .builder() + .state(State.of(State.Type.CREATED, Collections.emptyList())) + .build(); + + Optional result = new Subflow().createSubflowExecutionResult( + runContext, + taskRun, + Flow.builder().build(), + Execution.builder().build() + ); + + assertThat(result, is(Optional.empty())); + } + + @Test + void shouldOnlyReturnOutputsFromFlowOutputs() throws IllegalVariableEvaluationException { + + Output output = Output.builder().id("key").value("value").build(); + Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue())); + Flow flow = Flow.builder() + .outputs(List.of(output)) + .build(); + + Optional result = new Subflow().createSubflowExecutionResult( + runContext, + TaskRun.builder().state(DEFAULT_SUCCESS_STATE).build(), + flow, + Execution.builder().id(EXECUTION_ID).state(DEFAULT_SUCCESS_STATE).build() + ); + + assertTrue(result.isPresent()); + Map outputs = result.get().getParentTaskRun().getOutputs(); + + Map expected = Subflow.Output.builder() + .executionId(EXECUTION_ID) + .state(DEFAULT_SUCCESS_STATE.getCurrent()) + .outputs(Map.of(output.getId(), output.getValue())) + .build() + .toMap(); + assertThat(outputs, is(expected)); + } +} \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/flow-with-outputs-failed.yml b/core/src/test/resources/flows/valids/flow-with-outputs-failed.yml new file mode 100644 index 00000000000..2b64c938228 --- /dev/null +++ b/core/src/test/resources/flows/valids/flow-with-outputs-failed.yml @@ -0,0 +1,11 @@ +id: flow-with-outputs-failed +namespace: io.kestra.tests + +tasks: +- id: return + type: io.kestra.core.tasks.debugs.Return + format: "{{ flow.id }}" + +outputs: +- id: "key" + value: "{{ invalid }}" \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/flow-with-outputs.yml b/core/src/test/resources/flows/valids/flow-with-outputs.yml new file mode 100644 index 00000000000..9eb708d71d6 --- /dev/null +++ b/core/src/test/resources/flows/valids/flow-with-outputs.yml @@ -0,0 +1,11 @@ +id: flow-with-outputs +namespace: io.kestra.tests + +tasks: +- id: return + type: io.kestra.core.tasks.debugs.Return + format: "{{ flow.id }}" + +outputs: +- id: "key" + value: "{{ outputs.return }}" \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/for-each-item-outputs-subflow.yaml b/core/src/test/resources/flows/valids/for-each-item-outputs-subflow.yaml new file mode 100644 index 00000000000..d1115a2f822 --- /dev/null +++ b/core/src/test/resources/flows/valids/for-each-item-outputs-subflow.yaml @@ -0,0 +1,15 @@ +id: for-each-item-outputs-subflow +namespace: io.kestra.tests + +inputs: + - name: items + type: STRING + +tasks: + - id: return + type: io.kestra.core.tasks.debugs.Return + format: "{{ read(inputs.items) }}" + +outputs: + - id: value + value: "{{ outputs.return.value }}" \ No newline at end of file diff --git a/core/src/test/resources/flows/valids/for-each-item-outputs.yaml b/core/src/test/resources/flows/valids/for-each-item-outputs.yaml new file mode 100644 index 00000000000..89c318f0d25 --- /dev/null +++ b/core/src/test/resources/flows/valids/for-each-item-outputs.yaml @@ -0,0 +1,23 @@ +id: for-each-item-outputs +namespace: io.kestra.tests + +inputs: + - name: file + type: FILE + +tasks: + - id: each + type: io.kestra.core.tasks.flows.ForEachItem + items: "{{ inputs.file }}" + batch: + rows: 4 + namespace: io.kestra.tests + flowId: for-each-item-outputs-subflow + wait: true + transmitFailed: true + inputs: + items: "{{ taskrun.items }}" + + - id: return + type: io.kestra.core.tasks.debugs.Return + format: "{{ outputs.each.uri }}" \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java index 7aa66f4bb82..9216d95601f 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcExecutor.java @@ -642,7 +642,14 @@ private void subflowExecutionResultQueue(Either { if (executionState == null) { - return new ExecutionState(execution); + return new ExecutionState(runContextFactory, execution); } else { return executionState.from(execution); } @@ -536,17 +536,19 @@ private void killQueue(Either either) private static class ExecutionState { + + private RunContextFactory runContextFactory; private final Execution execution; private Map taskRuns = new ConcurrentHashMap<>(); private Map workerTaskDeduplication = new ConcurrentHashMap<>(); private Map childDeduplication = new ConcurrentHashMap<>(); - public ExecutionState(Execution execution) { + public ExecutionState(RunContextFactory runContextFactory, Execution execution) { this.execution = execution; } - public ExecutionState(ExecutionState executionState, Execution execution) { - this.execution = execution; + public ExecutionState( ExecutionState executionState, Execution execution) { + this(executionState.runContextFactory, execution); this.taskRuns = executionState.taskRuns; this.workerTaskDeduplication = executionState.workerTaskDeduplication; this.childDeduplication = executionState.childDeduplication; @@ -615,7 +617,19 @@ public ExecutionState from(SubflowExecutionResult subflowExecutionResult, FlowRe Task task = flow.findTaskByTaskId(subflowExecutionResult.getParentTaskRun().getTaskId()); TaskRun taskRun; if (task instanceof ForEachItem forEachItem) { - taskRun = ExecutableUtils.manageIterations(subflowExecutionResult.getParentTaskRun(), this.execution, forEachItem.getTransmitFailed(), forEachItem.isAllowFailure()); + RunContext runContext = runContextFactory.of( + flow, + task, + execution, + subflowExecutionResult.getParentTaskRun() + ); + taskRun = ExecutableUtils.manageIterations( + runContext, + subflowExecutionResult.getParentTaskRun(), + this.execution, + forEachItem.getTransmitFailed(), + forEachItem.isAllowFailure() + ); } else { taskRun = subflowExecutionResult.getParentTaskRun(); } diff --git a/ui/src/components/executions/Overview.vue b/ui/src/components/executions/Overview.vue index 8ea1dd007d9..1ac557ca39f 100644 --- a/ui/src/components/executions/Overview.vue +++ b/ui/src/components/executions/Overview.vue @@ -59,6 +59,11 @@
{{ $t('variables') }}
+ +
+
{{ $t('outputs') }}
+ +