Skip to content

Commit

Permalink
feat(core,webserver): correlation ID
Browse files Browse the repository at this point in the history
Automatically set the 'system_correlationId' label to the parent execution id for all child, recursively.
This label is the only one that can be set once when starting an execution.
  • Loading branch information
loicmathieu committed Oct 22, 2024
1 parent 81c1c3e commit f169c68
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 40 deletions.
5 changes: 5 additions & 0 deletions core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,9 @@

public record Label(@NotNull String key, @NotNull String value) {
public static final String SYSTEM_PREFIX = "system_";

// system labels
public static final String CORRELATION_ID = SYSTEM_PREFIX + "correlationId";
public static final String USERNAME = SYSTEM_PREFIX + "username";
public static final String APP = SYSTEM_PREFIX + "app";
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ public Execution withState(State.Type state) {
);
}

/**
* This method replaces labels with new ones.
* It refuses system labels as they must be passed via the withSystemLabels method.
*/
public Execution withLabels(List<Label> labels) {
checkForSystemLabels(labels);

Expand Down Expand Up @@ -246,7 +250,15 @@ private static void checkForSystemLabels(List<Label> labels) {
}
}

/**
* This method in <b>only to be used</b> to add system labels to an execution.
* It will not replace exisiting labels but add new one (possibly duplicating).
*/
public Execution withSystemLabels(List<Label> labels) {
List<Label> newLabels = this.labels == null ? new ArrayList<>() : this.labels;
if (labels != null) {
newLabels.addAll(labels);
}
return new Execution(
this.tenantId,
this.id,
Expand All @@ -256,7 +268,7 @@ public Execution withSystemLabels(List<Label> labels) {
this.taskRunList,
this.inputs,
this.outputs,
labels,
newLabels,
this.variables,
this.state,
this.parentId,
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/io/kestra/core/runners/ExecutableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import io.kestra.core.models.tasks.Task;
import io.kestra.core.storages.Storage;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.stream.Streams;

import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
public final class ExecutableUtils {
Expand Down Expand Up @@ -91,6 +93,14 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
"flowRevision", currentFlow.getRevision()
);

// propagate system labels and compute correlation ID if not already existing
List<Label> systemLabels = Streams.of(currentExecution.getLabels())
.filter(label -> label.key().startsWith(Label.SYSTEM_PREFIX))
.collect(Collectors.toList());
if (systemLabels.stream().noneMatch(label -> label.key().equals(Label.CORRELATION_ID))) {
systemLabels.add(new Label(Label.CORRELATION_ID, currentExecution.getId()));
}

FlowInputOutput flowInputOutput = ((DefaultRunContext)runContext).getApplicationContext().getBean(FlowInputOutput.class);
Instant scheduleOnDate = scheduleDate != null ? scheduleDate.as(runContext, ZonedDateTime.class).toInstant() : null;
Execution execution = Execution
Expand All @@ -105,7 +115,8 @@ public static <T extends Task & ExecutableTask<?>> SubflowExecution<?> subflowEx
.variables(variables)
.build()
)
.withScheduleDate(scheduleOnDate);
.withScheduleDate(scheduleOnDate)
.withSystemLabels(systemLabels);
return SubflowExecution.builder()
.parentTask(currentTask)
.parentTaskRun(currentTaskRun.withState(State.Type.RUNNING))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.core.validations.validator;

import io.kestra.core.models.Label;
import io.kestra.core.models.flows.Data;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.Input;
Expand Down Expand Up @@ -60,38 +59,28 @@ public boolean isValid(
}

value.allTasksWithChilds()
.forEach(
task -> {
if (task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace())) {
violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]");
}
}
);
.stream().filter(task -> task instanceof ExecutableTask<?> executableTask
&& value.getId().equals(executableTask.subflowId().flowId())
&& value.getNamespace().equals(executableTask.subflowId().namespace()))
.forEach(task -> violations.add("Recursive call to flow [" + value.getNamespace() + "." + value.getId() + "]"));

// input unique name
if (value.getInputs() != null) {
List<String> duplicates = getDuplicates(value.getInputs().stream().map(Data::getId).toList());
if (!duplicates.isEmpty()) {
violations.add("Duplicate input with name [" + String.join(", ", duplicates) + "]");
}
checkFlowInputsDependencyGraph(value, violations);
duplicateIds = getDuplicates(ListUtils.emptyOnNull(value.getInputs()).stream().map(Data::getId).toList());
if (!duplicateIds.isEmpty()) {
violations.add("Duplicate input with name [" + String.join(", ", duplicateIds) + "]");
}
checkFlowInputsDependencyGraph(value, violations);

// output unique name
if (value.getOutputs() != null) {
List<String> duplicates = getDuplicates(value.getOutputs().stream().map(Data::getId).toList());
if (!duplicates.isEmpty()) {
violations.add("Duplicate output with name [" + String.join(", ", duplicates) + "]");
}
duplicateIds = getDuplicates(ListUtils.emptyOnNull(value.getOutputs()).stream().map(Data::getId).toList());
if (!duplicateIds.isEmpty()) {
violations.add("Duplicate output with name [" + String.join(", ", duplicateIds) + "]");
}

// system labels
if (value.getLabels() != null) {
value.getLabels().stream()
.filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX))
.forEach(label -> violations.add("System labels can only be set by Kestra itself, offending label: " + label.key() + "=" + label.value()));
}
ListUtils.emptyOnNull(value.getLabels()).stream()
.filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX))
.forEach(label -> violations.add("System labels can only be set by Kestra itself, offending label: " + label.key() + "=" + label.value()));

if (!violations.isEmpty()) {
context.disableDefaultConstraintViolation();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,12 @@ public Execution update(Execution execution, RunContext runContext) throws Excep
);
}));

return execution.toBuilder()
.labels(newLabels.entrySet().stream()
return execution.withLabels(newLabels.entrySet().stream()
.map(throwFunction(entry -> new Label(
entry.getKey(),
entry.getValue()
)))
.toList())
.build();
.toList());
} else {
throw new IllegalVariableEvaluationException("Unknown value type: " + labels.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.core.flow;

import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.AbstractMemoryRunnerTest;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;

class CorrelationIdTest extends AbstractMemoryRunnerTest {
@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

@Test
void shouldHaveCorrelationId() throws QueueException, TimeoutException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicReference<Execution> child = new AtomicReference<>();
AtomicReference<Execution> grandChild = new AtomicReference<>();

Flux<Execution> receive = TestsUtils.receive(executionQueue, either -> {
Execution execution = either.getLeft();
if (execution.getFlowId().equals("subflow-child") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
child.set(execution);
}
if (execution.getFlowId().equals("subflow-grand-child") && execution.getState().getCurrent().isTerminated()) {
countDownLatch.countDown();
grandChild.set(execution);
}
});

Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "subflow-parent");
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

assertTrue(countDownLatch.await(1, TimeUnit.MINUTES));
receive.blockLast();

assertThat(child.get(), notNullValue());
assertThat(child.get().getState().getCurrent(), is(State.Type.SUCCESS));
Optional<Label> correlationId = child.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent(), is(true));
assertThat(correlationId.get().value(), is(execution.getId()));

assertThat(grandChild.get(), notNullValue());
assertThat(grandChild.get().getState().getCurrent(), is(State.Type.SUCCESS));
correlationId = grandChild.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent(), is(true));
assertThat(correlationId.get().value(), is(execution.getId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,16 @@ void run(String input, State.Type fromState, State.Type triggerState, int count,

if (testInherited) {
assertThat(triggered.get().getLabels(), hasItems(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("mainFlowExecutionLabel", "execFoo"),
new Label("mainFlowLabel", "flowFoo"),
new Label("launchTaskLabel", "launchFoo"),
new Label("switchFlowLabel", "switchFoo")
));
} else {
assertThat(triggered.get().getLabels().size(), is(2));
assertThat(triggered.get().getLabels().size(), is(3));
assertThat(triggered.get().getLabels(), hasItems(
new Label(Label.CORRELATION_ID, execution.getId()),
new Label("launchTaskLabel", "launchFoo"),
new Label("switchFlowLabel", "switchFoo")
));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.core.flow;

import io.kestra.core.models.Label;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
Expand Down Expand Up @@ -28,6 +29,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -101,6 +103,9 @@ public void forEachItem() throws TimeoutException, InterruptedException, URISynt
assertThat(triggered.get().getFlowId(), is("for-each-item-subflow"));
assertThat((String) triggered.get().getInputs().get("items"), matchesRegex("kestra:///io/kestra/tests/for-each-item/executions/.*/tasks/each-split/.*\\.txt"));
assertThat(triggered.get().getTaskRunList(), hasSize(1));
Optional<Label> correlationId = triggered.get().getLabels().stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).findAny();
assertThat(correlationId.isPresent(), is(true));
assertThat(correlationId.get().value(), is(execution.getId()));
}

public void forEachItemEmptyItems() throws TimeoutException, URISyntaxException, IOException, QueueException {
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/resources/flows/valids/subflow-child.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
id: subflow-child
namespace: io.kestra.tests

tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-grand-child
7 changes: 7 additions & 0 deletions core/src/test/resources/flows/valids/subflow-grand-child.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
id: subflow-grand-child
namespace: io.kestra.tests

tasks:
- id: firstLevel
type: io.kestra.plugin.core.log.Log
message: "My grandparent is {{labels.system_correlationId}}"
8 changes: 8 additions & 0 deletions core/src/test/resources/flows/valids/subflow-parent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
id: subflow-parent
namespace: io.kestra.tests

tasks:
- id: subflow
type: io.kestra.plugin.core.flow.Subflow
namespace: io.kestra.tests
flowId: subflow-child
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand Down Expand Up @@ -582,7 +583,8 @@ public Publisher<ApiValidateExecutionInputsResponse> validateInputsOnCreate(
@Parameter(description = "The flow revision or latest if null") @QueryValue Optional<Integer> revision
) {
Flow flow = flowService.getFlowIfExecutableOrThrow(tenantService.resolveTenant(), namespace, id, revision);
Execution execution = Execution.newExecution(flow, parseLabels(labels));
Pair<List<Label>, List<Label>> parsedLabels = parseLabels(labels);
Execution execution = Execution.newExecution(flow, parsedLabels.getLeft()).withSystemLabels(parsedLabels.getRight());
return flowInputOutput
.validateExecutionInputs(flow.getInputs(), execution, inputs)
.map(values -> ApiValidateExecutionInputsResponse.of(id, namespace, values));
Expand All @@ -603,7 +605,8 @@ public Publisher<ExecutionResponse> create(
@Parameter(description = "Schedule the flow on a specific date") @QueryValue Optional<ZonedDateTime> scheduleDate
) throws IOException {
Flow flow = flowService.getFlowIfExecutableOrThrow(tenantService.resolveTenant(), namespace, id, revision);
Execution current = Execution.newExecution(flow, null, parseLabels(labels), scheduleDate);
Pair<List<Label>, List<Label>> parsedLabels = parseLabels(labels);
Execution current = Execution.newExecution(flow, null, parsedLabels.getLeft(), scheduleDate).withSystemLabels(parsedLabels.getRight());
Mono<CompletableFuture<ExecutionResponse>> handle = flowInputOutput.readExecutionInputs(flow, current, inputs)
.handle((executionInputs, sink) -> {
Execution executionWithInputs = current.withInputs(executionInputs);
Expand Down Expand Up @@ -682,10 +685,14 @@ public static ExecutionResponse fromExecution(Execution execution, URI url) {
}
}

private List<Label> parseLabels(List<String> labels) {
return labels == null ? null : RequestUtils.toMap(labels).entrySet().stream()
protected Pair<List<Label>, List<Label>> parseLabels(List<String> labels) {
// We allow passing the correlation id from the API but only this one, other system labels will go throught and fail at execution creation.
List<Label> parsedLabels = labels == null ? Collections.emptyList() : RequestUtils.toMap(labels).entrySet().stream()
.map(entry -> new Label(entry.getKey(), entry.getValue()))
.toList();
List<Label> standardLabels = parsedLabels.stream().filter(label -> !label.key().equals(Label.CORRELATION_ID)).toList();
List<Label> systemLabels = parsedLabels.stream().filter(label -> label.key().equals(Label.CORRELATION_ID)).toList();
return Pair.of(standardLabels, systemLabels);
}

protected <T> HttpResponse<T> validateFile(String executionId, URI path, String redirect) {
Expand Down Expand Up @@ -1705,9 +1712,7 @@ private Execution setLabels(Execution execution, List<Label> labels) {
}

Execution newExecution = execution
.toBuilder()
.labels(newLabels.entrySet().stream().map(entry -> new Label(entry.getKey(), entry.getValue())).filter(label -> !label.key().isEmpty() || !label.value().isEmpty()).toList())
.build();
.withLabels(newLabels.entrySet().stream().map(entry -> new Label(entry.getKey(), entry.getValue())).filter(label -> !label.key().isEmpty() || !label.value().isEmpty()).toList());
eventPublisher.publishEvent(new CrudEvent<>(newExecution, execution, CrudEventType.UPDATE));

return executionRepository.save(newExecution);
Expand Down

0 comments on commit f169c68

Please sign in to comment.