Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Only Flow #5655

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public record Label(@NotNull String key, @NotNull String value) {
public static final String CORRELATION_ID = SYSTEM_PREFIX + "correlationId";
public static final String USERNAME = SYSTEM_PREFIX + "username";
public static final String APP = SYSTEM_PREFIX + "app";
public static final String READ_ONLY = SYSTEM_PREFIX + "readOnly";
}
62 changes: 5 additions & 57 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.kestra.core.runners.RunContextLogger;
import io.kestra.core.serializers.ListOrMapOfLabelDeserializer;
import io.kestra.core.serializers.ListOrMapOfLabelSerializer;
import io.kestra.core.services.LabelService;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.MapUtils;
import io.micronaut.core.annotation.Nullable;
Expand All @@ -39,8 +40,6 @@
import java.util.stream.Stream;
import java.util.zip.CRC32;

import static io.kestra.core.models.Label.SYSTEM_PREFIX;

@Builder(toBuilder = true)
@Slf4j
@Getter
Expand Down Expand Up @@ -143,10 +142,7 @@ public static Execution newExecution(final Flow flow,
.scheduleDate(scheduleDate.map(ChronoZonedDateTime::toInstant).orElse(null))
.build();

List<Label> executionLabels = new ArrayList<>();
if (flow.getLabels() != null) {
executionLabels.addAll(flow.getLabels());
}
List<Label> executionLabels = new ArrayList<>(LabelService.labelsExcludingSystem(flow));

if (labels != null) {
executionLabels.addAll(labels);
Expand All @@ -162,6 +158,8 @@ public static Execution newExecution(final Flow flow,
return execution;
}



public static class ExecutionBuilder {
void prebuild() {
this.originalId = this.id;
Expand All @@ -181,12 +179,6 @@ public Execution build() {
this.prebuild();
return super.build();
}

@Override
public ExecutionBuilder labels(List<Label> labels) {
checkForSystemLabels(labels);
return super.labels(labels);
}
}

public Execution withState(State.Type state) {
Expand All @@ -212,53 +204,9 @@ public Execution withState(State.Type state) {
);
}

/**
* This method replaces labels with new ones.
* It refuses system labels as they must be passed via the withSystemLabels method.
*/
public Execution withLabels(List<Label> labels) {
checkForSystemLabels(labels);

return new Execution(
this.tenantId,
this.id,
this.namespace,
this.flowId,
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
labels,
this.variables,
this.state,
this.parentId,
this.originalId,
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate,
this.error
);
}

private static void checkForSystemLabels(List<Label> labels) {
if (labels != null) {
Optional<Label> first = labels.stream().filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().key() + "=" + first.get().value());
}
}
}

/**
* This method in <b>only to be used</b> to add system labels to an execution.
* It will not replace exisiting labels but add new one (possibly duplicating).
*/
public Execution withSystemLabels(List<Label> labels) {
List<Label> newLabels = this.labels == null ? new ArrayList<>() : this.labels;
if (labels != null) {
newLabels.addAll(labels);
}
return new Execution(
this.tenantId,
this.id,
Expand All @@ -268,7 +216,7 @@ public Execution withSystemLabels(List<Label> labels) {
this.taskRunList,
this.inputs,
this.outputs,
newLabels,
labels,
this.variables,
this.state,
this.parentId,
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
);

// propagate system labels and compute correlation ID if not already existing
List<Label> systemLabels = Streams.of(currentExecution.getLabels())
List<Label> newLabels = Streams.of(currentExecution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
.collect(Collectors.toList());
if (systemLabels.stream().noneMatch(label -> label.key().equals(Label.CORRELATION_ID))) {
systemLabels.add(new Label(Label.CORRELATION_ID, currentExecution.getId()));
if (newLabels.stream().noneMatch(label -> label.key().equals(Label.CORRELATION_ID))) {
newLabels.add(new Label(Label.CORRELATION_ID, currentExecution.getId()));
}
if (labels != null) {
newLabels.addAll(labels);
}

FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Expand All @@ -107,16 +110,15 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
.newExecution(
flow,
(f, e) -> flowInputOutput.readExecutionInputs(f, e, inputs),
labels,
newLabels,
Optional.empty())
.withTrigger(ExecutionTrigger.builder()
.id(currentTask.getId())
.type(currentTask.getType())
.variables(variables)
.build()
)
.withScheduleDate(scheduleOnDate)
.withSystemLabels(systemLabels);
.withScheduleDate(scheduleOnDate);
return SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/io/kestra/core/runners/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.kestra.core.server.ServerConfig;
import io.kestra.core.server.Service;
import io.kestra.core.server.ServiceStateChangeEvent;
import io.kestra.core.services.LabelService;
import io.kestra.core.services.LogService;
import io.kestra.core.services.WorkerGroupService;
import io.kestra.core.utils.Await;
Expand Down Expand Up @@ -361,11 +362,11 @@ private void publishTriggerExecution(WorkerTrigger workerTrigger, Optional<Execu
);
}

var flowLabels = workerTrigger.getConditionContext().getFlow().getLabels();
if (flowLabels != null) {
var flow = workerTrigger.getConditionContext().getFlow();
if (flow.getLabels() != null) {
evaluate = evaluate.map(execution -> {
List<Label> executionLabels = execution.getLabels() != null ? execution.getLabels() : new ArrayList<>();
executionLabels.addAll(flowLabels);
executionLabels.addAll(LabelService.labelsExcludingSystem(flow));
return execution.withLabels(executionLabels);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ private void handle() {
.namespace(f.getTriggerContext().getNamespace())
.flowId(f.getTriggerContext().getFlowId())
.flowRevision(f.getFlow().getRevision())
.labels(f.getFlow().getLabels())
.labels(LabelService.labelsExcludingSystem(f.getFlow()))
.state(new State().withState(State.Type.FAILED))
.error(ExecutionError.from(ie))
.build();
Expand Down
56 changes: 56 additions & 0 deletions core/src/main/java/io/kestra/core/services/LabelService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.kestra.core.services;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.Label;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.triggers.AbstractTrigger;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.ListUtils;

import java.util.ArrayList;
import java.util.List;

public final class LabelService {
private LabelService() {}

/**
* Return flow labels excluding system labels.
*/
public static List<Label> labelsExcludingSystem(Flow flow) {
return ListUtils.emptyOnNull(flow.getLabels()).stream().filter(label -> !label.key().startsWith(Label.SYSTEM_PREFIX)).toList();
}

/**
* Return flow labels excluding system labels concatenated with trigger labels.
*
* Trigger labels will be rendered via the run context but not flow labels.
* In case rendering is not possible, the label will be omitted.
*/
public static List<Label> fromTrigger(RunContext runContext, Flow flow, AbstractTrigger trigger) {
final List<Label> labels = new ArrayList<>();

if (flow.getLabels() != null) {
labels.addAll(LabelService.labelsExcludingSystem(flow)); // no need for rendering
}

if (trigger.getLabels() != null) {
for (Label label : trigger.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}

return labels;
}

private static String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import java.util.Set;

import static io.kestra.core.models.Label.READ_ONLY;
import static io.kestra.core.models.Label.SYSTEM_PREFIX;

@Singleton
Expand Down Expand Up @@ -79,7 +80,7 @@ public boolean isValid(

// system labels
ListUtils.emptyOnNull(value.getLabels()).stream()
.filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX))
.filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX) && !label.key().equals(READ_ONLY))
.forEach(label -> violations.add("System labels can only be set by Kestra itself, offending label: " + label.key() + "=" + label.value()));

if (!violations.isEmpty()) {
Expand Down
14 changes: 11 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/execution/Labels.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static io.kestra.core.models.Label.SYSTEM_PREFIX;
import static io.kestra.core.utils.Rethrow.throwBiConsumer;
import static io.kestra.core.utils.Rethrow.throwFunction;

Expand All @@ -35,7 +36,8 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Allow to add or overwrite labels for the current execution at runtime."
title = "Allow to add or overwrite labels for the current execution at runtime.",
description = "Trying to pass a system label (a label starting with `system_`) will fail the task."
)
@Plugin(
examples = {
Expand Down Expand Up @@ -131,11 +133,17 @@ public Execution update(Execution execution, RunContext runContext) throws Excep
);
}));

// check for system labels: none can be passed at runtime
Optional<Map.Entry<String, String>> first = newLabels.entrySet().stream().filter(entry -> entry.getKey().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().getKey() + "=" + first.get().getValue());
}

return execution.withLabels(newLabels.entrySet().stream()
.map(throwFunction(entry -> new Label(
.map(entry -> new Label(
entry.getKey(),
entry.getValue()
)))
))
.toList());
} else {
throw new IllegalVariableEvaluationException("Unknown value type: " + labels.getClass());
Expand Down
35 changes: 4 additions & 31 deletions core/src/main/java/io/kestra/plugin/core/trigger/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.kestra.core.models.Label;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.services.LabelService;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -60,7 +61,7 @@
outputs:
- id: last_ingested_date
type: STRING
value: "{{ outputs.final_date.value }}"
value: "{{ outputs.final_date.value }}"
```
Below is the `transform` flow triggered in response to the `extract` flow's successful completion.""",
code = """
Expand All @@ -74,7 +75,7 @@

variables:
result: |
Ingestion done in {{ trigger.executionId }}.
Ingestion done in {{ trigger.executionId }}.
Now transforming data up to {{ inputs.last_ingested_date }}

tasks:
Expand Down Expand Up @@ -144,7 +145,7 @@ public Optional<Execution> evaluate(RunContext runContext, io.kestra.core.models
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.labels(generateLabels(runContext, flow))
.labels(LabelService.fromTrigger(runContext, flow, this))
.state(new State())
.trigger(ExecutionTrigger.of(
this,
Expand Down Expand Up @@ -181,34 +182,6 @@ public Optional<Execution> evaluate(RunContext runContext, io.kestra.core.models
}
}

private List<Label> generateLabels(RunContext runContext, io.kestra.core.models.flows.Flow flow) {
final List<Label> labels = new ArrayList<>();

if (flow.getLabels() != null) {
labels.addAll(flow.getLabels()); // no need for rendering
}

if (this.getLabels() != null) {
for (Label label : this.getLabels()) {
final var value = renderLabelValue(runContext, label);
if (value != null) {
labels.add(new Label(label.key(), value));
}
}
}

return labels;
}

private String renderLabelValue(RunContext runContext, Label label) {
try {
return runContext.render(label.value());
} catch (IllegalVariableEvaluationException e) {
runContext.logger().warn("Failed to render label '{}', it will be omitted", label.key(), e);
return null;
}
}

@Builder
@ToString
@EqualsAndHashCode
Expand Down
Loading