Skip to content

Commit

Permalink
feat(core): allow flow to define and expose outputs (#2133)
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Jan 19, 2024
1 parent 7584998 commit 2134aca
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 8 deletions.
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.models.DeletedInterface;
import io.kestra.core.models.Label;
import io.kestra.core.models.TenantInterface;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.listeners.Listener;
import io.kestra.core.models.tasks.FlowableTask;
Expand Down Expand Up @@ -98,7 +99,6 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
List<AbstractTrigger> triggers;


@Valid
List<TaskDefault> taskDefaults;

Expand All @@ -113,6 +113,13 @@ public boolean hasIgnoreMarker(final AnnotatedMember m) {
@Valid
Concurrency concurrency;

@Schema(
title = "Output values available and exposes to other flows.",
description = "Output values make information about the execution of your Flow available and expose for other Kestra flows to use. Output values are similar to return values in programming languages."
)
@PluginProperty(dynamic = true)
private List<Output> outputs;

public Logger logger() {
return LoggerFactory.getLogger("flow." + this.id);
}
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Output.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.core.models.flows;

import io.micronaut.core.annotation.Introspected;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;

/**
* Definition of a flow's output.
*/
@SuperBuilder
@Getter
@NoArgsConstructor
@Introspected
public class Output {
/**
* The output's unique id.
*/
@NotNull
@NotBlank
@Pattern(regexp = "^[a-zA-Z0-9][.a-zA-Z0-9_-]*")
String id;
/**
* Short description of the output.
*/
String description;
/**
* The output value. Can be a dynamic expression.
*/
@NotNull
Object value;
}
44 changes: 37 additions & 7 deletions core/src/main/java/io/kestra/core/tasks/flows/Subflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,28 @@
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.ExecutableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.*;
import io.kestra.core.runners.ExecutableUtils;
import io.kestra.core.runners.FlowExecutorInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecution;
import io.kestra.core.runners.SubflowExecutionResult;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -101,11 +115,15 @@ public class Subflow extends Task implements ExecutableTask<Subflow.Output> {
@PluginProperty
private final Boolean inheritLabels = false;

/**
* @deprecated Output value should now be defined part of the Flow definition.
*/
@Schema(
title = "Outputs from the subflow executions.",
description = "Allows to specify outputs as key-value pairs to extract any outputs from the subflow execution into output of this task execution."
)
@PluginProperty(dynamic = true)
@Deprecated(since = "0.15.0'")
private Map<String, Object> outputs;

@Override
Expand All @@ -125,7 +143,7 @@ public List<SubflowExecution<?>> createSubflowExecutions(RunContext runContext,
}

if (this.labels != null) {
for (Map.Entry<String, String> entry: this.labels.entrySet()) {
for (Map.Entry<String, String> entry : this.labels.entrySet()) {
labels.add(new Label(entry.getKey(), runContext.render(entry.getValue())));
}
}
Expand Down Expand Up @@ -154,12 +172,24 @@ public Optional<SubflowExecutionResult> createSubflowExecutionResult(
return Optional.empty();
}

Output.OutputBuilder builder = Output.builder()
final Output.OutputBuilder builder = Output.builder()
.executionId(execution.getId())
.state(execution.getState().getCurrent());
if (this.getOutputs() != null) {

final Map<String, Object> subflowOutputs = Optional
.ofNullable(flow.getOutputs())
.map(outputs -> outputs
.stream()
.collect(Collectors.toMap(
io.kestra.core.models.flows.Output::getId,
io.kestra.core.models.flows.Output::getValue)
)
)
.orElse(this.getOutputs());

if (subflowOutputs != null) {
try {
builder.outputs(runContext.render(this.getOutputs()));
builder.outputs(runContext.render(subflowOutputs));
} catch (Exception e) {
runContext.logger().warn("Failed to extract outputs with the error: '" + e.getMessage() + "'", e);
var state = this.isAllowFailure() ? State.Type.WARNING : State.Type.FAILED;
Expand Down
92 changes: 92 additions & 0 deletions core/src/test/java/io/kestra/core/tasks/flows/SubflowTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package io.kestra.core.tasks.flows;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Output;
import io.kestra.core.models.flows.State;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.SubflowExecutionResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
class SubflowTest {

private static final Logger LOG = LoggerFactory.getLogger(SubflowTest.class);

private static final State DEFAULT_SUCCESS_STATE = State.of(State.Type.SUCCESS, Collections.emptyList());
public static final String EXECUTION_ID = "executionId";

@Mock
private RunContext runContext;

@BeforeEach
void beforeEach() {
Mockito.when(runContext.logger()).thenReturn(LOG);
}

@Test
void shouldNotReturnResultForExecutionNotTerminated() {
// Given
TaskRun taskRun = TaskRun
.builder()
.state(State.of(State.Type.CREATED, Collections.emptyList()))
.build();
// When
Optional<SubflowExecutionResult> result = new Subflow().createSubflowExecutionResult(
runContext,
taskRun,
Flow.builder().build(),
Execution.builder().build()
);
// Then
Assertions.assertEquals(Optional.empty(), result);
}

@Test
void shouldOnlyReturnOutputsFromFlowOutputs() throws IllegalVariableEvaluationException {
// Given
Output output = Output.builder().id("key").value("value").build();
Mockito.when(runContext.render(Mockito.anyMap())).thenReturn(Map.of(output.getId(), output.getValue()));
Flow flow = Flow.builder()
.outputs(List.of(output))
.build();

// When
Optional<SubflowExecutionResult> result = new Subflow().createSubflowExecutionResult(
runContext,
TaskRun.builder().state(DEFAULT_SUCCESS_STATE).build(),
flow,
Execution.builder().id(EXECUTION_ID).state(DEFAULT_SUCCESS_STATE).build()
);

// Then
Assertions.assertTrue(result.isPresent());
Map<String, Object> outputs = result.get().getParentTaskRun().getOutputs();

Map<String, Object> expected = Subflow.Output.builder()
.executionId(EXECUTION_ID)
.state(DEFAULT_SUCCESS_STATE.getCurrent())
.outputs(Map.of(output.getId(), output.getValue()))
.build()
.toMap();
Assertions.assertEquals(expected, outputs);
}
}

0 comments on commit 2134aca

Please sign in to comment.