Skip to content

Commit

Permalink
feat(core): Add displayName to flow level outputs(backend)
Browse files Browse the repository at this point in the history
  • Loading branch information
kratosmy committed Oct 23, 2024
1 parent 48da2a7 commit 663c819
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 10 deletions.
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/models/flows/Output.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public class Output implements Data {
@NotNull
@Valid
Type type;
/**
* The display name of the output.
*/
@NotNull
String displayName;
}
14 changes: 13 additions & 1 deletion core/src/main/java/io/kestra/core/runners/ExecutorService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.kestra.core.exceptions.InternalException;
import io.kestra.core.metrics.MetricRegistry;
Expand Down Expand Up @@ -383,7 +385,17 @@ private Executor onEnd(Executor executor) {
try {
Map<String, Object> outputs = flow.getOutputs()
.stream()
.collect(HashMap::new, (map, entry) -> map.put(entry.getId(), entry.getValue()), Map::putAll);
.collect(HashMap::new, (map, entry) -> {
final ObjectMapper mapper = new ObjectMapper();
final HashMap<String, Object> entryInfo = new HashMap<>();
entryInfo.put("value", entry.getValue());
entryInfo.put("displayName", entry.getDisplayName());
try {
map.put(entry.getId(), mapper.writeValueAsString(entryInfo));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}, Map::putAll);
outputs = runContext.render(outputs);
outputs = flowInputOutput.typedOutputs(flow, executor.getExecution(), outputs);
newExecution = newExecution.withOutputs(outputs);
Expand Down
20 changes: 16 additions & 4 deletions core/src/main/java/io/kestra/core/runners/FlowInputOutput.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.kestra.core.runners;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -31,6 +33,8 @@
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;

import org.jcodings.util.Hash;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +50,7 @@
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.AbstractMap;
import java.util.AbstractMap.SimpleEntry;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -345,24 +350,31 @@ public Map<String, Object> typedOutputs(
if (flow.getOutputs() == null) {
return ImmutableMap.of();
}
final ObjectMapper mapper = new ObjectMapper();
Map<String, Object> results = flow
.getOutputs()
.stream()
.map(output -> {
Object current = in == null ? null : in.get(output.getId());
String current = in == null ? null : in.get(output.getId()).toString();
Object currentValue;
try {
currentValue = mapper.readValue(current, new TypeReference<HashMap<String, Object>>() {}).get("value");
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
try {
return parseData(execution, output, current)
return parseData(execution, output, currentValue)
.map(entry -> {
if (output.getType().equals(Type.SECRET)) {
return new AbstractMap.SimpleEntry<>(
return new SimpleEntry<>(
entry.getKey(),
EncryptedString.from(entry.getValue().toString())
);
}
return entry;
});
} catch (Exception e) {
throw output.toConstraintViolationException(e.getMessage(), current);
throw output.toConstraintViolationException(e.getMessage(), currentValue);
}
})
.filter(Optional::isPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class FlowOutputTest extends AbstractMemoryRunnerTest {
void shouldGetSuccessExecutionForFlowWithOutputs() throws TimeoutException, QueueException {
Execution execution = runnerUtils.runOne(null, NAMESPACE, "flow-with-outputs", null, null);
assertThat(execution.getOutputs(), aMapWithSize(1));
assertThat(execution.getOutputs().get("key"), is("{\"value\":\"flow-with-outputs\"}"));
assertThat(execution.getOutputs().get("key"), is("{\"displayName\":\"Final returned value\",\"value\":\"{\"value\":\"flow-with-outputs\"}\"}"));
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ outputs:
type: ARRAY
value:
- "{{ outputs.return.values.one }}"
- "{{ outputs.return.values.two }}"
- "{{ outputs.return.values.two }}"
displayName: Final returned value
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ tasks:
outputs:
- id: "key"
value: "{{ invalid }}"
type: STRING
type: STRING
displayName: Final returned value
3 changes: 2 additions & 1 deletion core/src/test/resources/flows/valids/flow-with-outputs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ tasks:
outputs:
- id: "key"
value: "{{ outputs.return }}"
type: STRING
type: STRING
displayName: Final returned value
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ tasks:
outputs:
- id: value
value: "{{ outputs.return.value }}"
type: STRING
type: STRING
displayName: Final returned value
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ outputs:
- id: status
type: STRING
value: "{{trigger.state}}"
displayName: Final returned value

tasks:
- id: hello
Expand Down

0 comments on commit 663c819

Please sign in to comment.