Skip to content

Commit

Permalink
Merge branch 'develop' into feature/settings-save-button
Browse files Browse the repository at this point in the history
  • Loading branch information
MilosPaunovic authored Oct 24, 2024
2 parents 976be96 + d33d6a7 commit 5ca0ee7
Show file tree
Hide file tree
Showing 95 changed files with 1,224 additions and 358 deletions.
1 change: 0 additions & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ updates:
timezone: "Europe/Paris"
open-pull-requests-limit: 50
labels: ["dependency-upgrade"]
reviewers: ["MilosPaunovic"]
ignore:
# Ignore major versions greater than 8, as it's still known to be flaky
- dependency-name: "eslint"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/generate_translations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ jobs:
exit 0
fi
git commit -m "chore(translations): auto generate values for languages other than english"
git push origin $BRANCH_NAME
git push origin $BRANCH_NAME
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<br />

<p align="center">
<a href="https://twitter.com/kestra_io"><img height="25" src="https://kestra.io/twitter.svg" alt="twitter" /></a> &nbsp;
<a href="https://x.com/kestra_io"><img height="25" src="https://kestra.io/twitter.svg" alt="X(formerly Twitter)" /></a> &nbsp;
<a href="https://www.linkedin.com/company/kestra/"><img height="25" src="https://kestra.io/linkedin.svg" alt="linkedin" /></a> &nbsp;
<a href="https://www.youtube.com/@kestra-io"><img height="25" src="https://kestra.io/youtube.svg" alt="youtube" /></a> &nbsp;
</p>
Expand Down Expand Up @@ -167,7 +167,7 @@ Create custom plugins to extend Kestra's capabilities. Check out our [Plugin Dev
Stay connected and get support:
- **Slack:** Join our [Slack community](https://kestra.io/slack) to ask questions and share ideas.
- **Twitter:** Follow us on [Twitter](https://twitter.com/kestra_io) for the latest updates.
- **X(formerly Twitter):** Follow us on [X(formerly Twitter)](https://x.com/kestra_io) for the latest updates.
- **YouTube:** Subscribe to our [YouTube channel](https://www.youtube.com/@kestra-io) for tutorials and webinars.
- **LinkedIn:** Connect with us on [LinkedIn](https://www.linkedin.com/company/kestra/).
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ plugins {
id "com.github.ben-manes.versions" version "0.51.0"

// front
id 'org.siouan.frontend-jdk17' version '8.1.0' apply false
id 'org.siouan.frontend-jdk21' version '9.0.0' apply false

// release
id "io.github.gradle-nexus.publish-plugin" version "2.0.0"
Expand Down
6 changes: 4 additions & 2 deletions cli/src/main/java/io/kestra/cli/AbstractValidateCommand.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.cli;

import io.kestra.cli.commands.flows.FlowValidateCommand;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.models.validations.ValidateConstraintViolation;
import io.kestra.core.serializers.YamlFlowParser;
Expand Down Expand Up @@ -75,7 +74,8 @@ public Integer call(
YamlFlowParser yamlFlowParser,
ModelValidator modelValidator,
Function<Object, String> identity,
Function<Object, List<String>> warningsFunction
Function<Object, List<String>> warningsFunction,
Function<Object, List<String>> infosFunction
) throws Exception {
super.call();

Expand All @@ -93,6 +93,8 @@ public Integer call(
stdOut("@|green \u2713|@ - " + identity.apply(parse));
List<String> warnings = warningsFunction.apply(parse);
warnings.forEach(warning -> stdOut("@|bold,yellow \u26A0|@ - " + warning));
List<String> infos = infosFunction.apply(parse);
infos.forEach(info -> stdOut("@|bold,blue \u2139|@ - " + info));
} catch (ConstraintViolationException e) {
stdErr("@|red \u2718|@ - " + path);
AbstractValidateCommand.handleException(e, clsName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,12 @@ public Integer call() throws Exception {
Flow flow = (Flow) object;
List<String> warnings = new ArrayList<>();
warnings.addAll(flowService.deprecationPaths(flow).stream().map(deprecation -> deprecation + " is deprecated").toList());
warnings.addAll(flowService.relocations(flow.generateSource()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList());
warnings.addAll(flowService.warnings(flow));
return warnings;
},
(Object object) -> {
Flow flow = (Flow) object;
return flowService.relocations(flow.generateSource()).stream().map(relocation -> relocation.from() + " is replaced by " + relocation.to()).toList();
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public Integer call() throws Exception {
Template template = (Template) object;
return template.getNamespace() + " / " + template.getId();
},
(Object object) -> Collections.emptyList(),
(Object object) -> Collections.emptyList()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void startListening(List<Path> paths) throws IOException, InterruptedExce
WatchEvent.Kind<?> kind = watchEvent.kind();
Path entry = (Path) watchEvent.context();

if (entry.endsWith(".yml") || entry.endsWith(".yaml")) {
if (entry.toString().endsWith(".yml") || entry.toString().endsWith(".yaml")) {

if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ public FlowWithSource createOrUpdateFlow(Flow flow, String content) {

public void deleteFlow(FlowWithSource toDelete) {
flowRepositoryInterface.findByIdWithSource(toDelete.getTenantId(), toDelete.getNamespace(), toDelete.getId()).ifPresent(flowRepositoryInterface::delete);
log.debug("Flow {} has been deleted", toDelete.getId());
log.error("Flow {} has been deleted", toDelete.getId());
}

@Override
public void deleteFlow(String tenantId, String namespace, String id) {
flowRepositoryInterface.findByIdWithSource(tenantId, namespace, id).ifPresent(flowRepositoryInterface::delete);
log.debug("Flow {} has been deleted", id);
log.error("Flow {} has been deleted", id);
}
}
5 changes: 5 additions & 0 deletions cli/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,8 @@ kestra:
uri: https://api.kestra.io/v1/reports/usages
initial-delay: 5m
fixed-delay: 1h

hidden-labels:
prefixes:
- system_
- internal_
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ void warning() {
Integer call = PicocliRunner.call(FlowValidateCommand.class, ctx, args);

assertThat(call, is(0));
assertThat(out.toString(), containsString("tasks[0] is deprecated"));
assertThat(out.toString(), containsString("✓ - system / warning"));
assertThat(out.toString(), containsString("⚠ - tasks[0] is deprecated"));
assertThat(out.toString(), containsString("ℹ - io.kestra.core.tasks.log.Log is replaced by io.kestra.plugin.core.log.Log"));
}
}
}
5 changes: 4 additions & 1 deletion cli/src/test/resources/warning/flow-with-warning.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ namespace: system
tasks:
- id: deprecated
type: io.kestra.plugin.core.debug.Echo
format: Hello World
format: Hello World
- id: alias
type: io.kestra.core.tasks.log.Log
message: I'm an alias
106 changes: 54 additions & 52 deletions core/src/main/java/io/kestra/core/metrics/MetricRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,60 @@
@Singleton
@Slf4j
public class MetricRegistry {
public final static String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public final static String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public final static String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public final static String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public final static String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public final static String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public final static String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public final static String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public final static String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public final static String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public final static String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public final static String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public final static String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public final static String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public final static String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public final static String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public final static String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public final static String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public final static String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public final static String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public final static String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public final static String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public final static String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public final static String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public final static String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public final static String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public final static String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public final static String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public final static String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public final static String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public final static String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public final static String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public final static String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public final static String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public final static String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public final static String STREAMS_STATE_COUNT = "stream.state.count";


public final static String JDBC_QUERY_DURATION = "jdbc.query.duration";

public final static String TAG_TASK_TYPE = "task_type";
public final static String TAG_TRIGGER_TYPE = "trigger_type";
public final static String TAG_FLOW_ID = "flow_id";
public final static String TAG_NAMESPACE_ID = "namespace_id";
public final static String TAG_STATE = "state";
public final static String TAG_ATTEMPT_COUNT = "attempt_count";
public final static String TAG_WORKER_GROUP = "worker_group";
public final static String TAG_TENANT_ID = "tenant_id";
public static final String METRIC_WORKER_JOB_PENDING_COUNT = "worker.job.pending";
public static final String METRIC_WORKER_JOB_RUNNING_COUNT = "worker.job.running";
public static final String METRIC_WORKER_JOB_THREAD_COUNT = "worker.job.thread";
public static final String METRIC_WORKER_RUNNING_COUNT = "worker.running.count";
public static final String METRIC_WORKER_QUEUED_DURATION = "worker.queued.duration";
public static final String METRIC_WORKER_STARTED_COUNT = "worker.started.count";
public static final String METRIC_WORKER_TIMEOUT_COUNT = "worker.timeout.count";
public static final String METRIC_WORKER_ENDED_COUNT = "worker.ended.count";
public static final String METRIC_WORKER_ENDED_DURATION = "worker.ended.duration";
public static final String METRIC_WORKER_TRIGGER_DURATION = "worker.trigger.duration";
public static final String METRIC_WORKER_TRIGGER_RUNNING_COUNT = "worker.trigger.running.count";
public static final String METRIC_WORKER_TRIGGER_STARTED_COUNT = "worker.trigger.started.count";
public static final String METRIC_WORKER_TRIGGER_ENDED_COUNT = "worker.trigger.ended.count";
public static final String METRIC_WORKER_TRIGGER_ERROR_COUNT = "worker.trigger.error.count";
public static final String METRIC_WORKER_TRIGGER_EXECUTION_COUNT = "worker.trigger.execution.count";

public static final String EXECUTOR_TASKRUN_NEXT_COUNT = "executor.taskrun.next.count";
public static final String EXECUTOR_TASKRUN_ENDED_COUNT = "executor.taskrun.ended.count";
public static final String EXECUTOR_TASKRUN_ENDED_DURATION = "executor.taskrun.ended.duration";
public static final String EXECUTOR_WORKERTASKRESULT_COUNT = "executor.workertaskresult.count";
public static final String EXECUTOR_EXECUTION_STARTED_COUNT = "executor.execution.started.count";
public static final String EXECUTOR_EXECUTION_END_COUNT = "executor.execution.end.count";
public static final String EXECUTOR_EXECUTION_DURATION = "executor.execution.duration";

public static final String METRIC_INDEXER_REQUEST_COUNT = "indexer.request.count";
public static final String METRIC_INDEXER_REQUEST_DURATION = "indexer.request.duration";
public static final String METRIC_INDEXER_REQUEST_RETRY_COUNT = "indexer.request.retry.count";
public static final String METRIC_INDEXER_SERVER_DURATION = "indexer.server.duration";
public static final String METRIC_INDEXER_MESSAGE_FAILED_COUNT = "indexer.message.failed.count";
public static final String METRIC_INDEXER_MESSAGE_IN_COUNT = "indexer.message.in.count";
public static final String METRIC_INDEXER_MESSAGE_OUT_COUNT = "indexer.message.out.count";

public static final String SCHEDULER_LOOP_COUNT = "scheduler.loop.count";
public static final String SCHEDULER_TRIGGER_COUNT = "scheduler.trigger.count";
public static final String SCHEDULER_TRIGGER_DELAY_DURATION = "scheduler.trigger.delay.duration";
public static final String SCHEDULER_EVALUATE_COUNT = "scheduler.evaluate.count";
public static final String SCHEDULER_EXECUTION_RUNNING_DURATION = "scheduler.execution.running.duration";
public static final String SCHEDULER_EXECUTION_MISSING_DURATION = "scheduler.execution.missing.duration";

public static final String STREAMS_STATE_COUNT = "stream.state.count";

public static final String JDBC_QUERY_DURATION = "jdbc.query.duration";

public static final String QUEUE_BIG_MESSAGE_COUNT = "queue.big_message.count";

public static final String TAG_TASK_TYPE = "task_type";
public static final String TAG_TRIGGER_TYPE = "trigger_type";
public static final String TAG_FLOW_ID = "flow_id";
public static final String TAG_NAMESPACE_ID = "namespace_id";
public static final String TAG_STATE = "state";
public static final String TAG_ATTEMPT_COUNT = "attempt_count";
public static final String TAG_WORKER_GROUP = "worker_group";
public static final String TAG_TENANT_ID = "tenant_id";
public static final String TAG_CLASS_NAME = "class_name";

@Inject
private MeterRegistry meterRegistry;
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/io/kestra/core/models/Label.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@

import jakarta.validation.constraints.NotNull;

public record Label(@NotNull String key, @NotNull String value) {}
public record Label(@NotNull String key, @NotNull String value) {
public static final String SYSTEM_PREFIX = "system_";

// system labels
public static final String CORRELATION_ID = SYSTEM_PREFIX + "correlationId";
public static final String USERNAME = SYSTEM_PREFIX + "username";
public static final String APP = SYSTEM_PREFIX + "app";
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.stream.Stream;
import java.util.zip.CRC32;

import static io.kestra.core.models.Label.SYSTEM_PREFIX;

@Builder(toBuilder = true)
@Slf4j
@Getter
Expand Down Expand Up @@ -76,7 +78,6 @@ public class Execution implements DeletedInterface, TenantInterface {
@JsonInclude(JsonInclude.Include.NON_EMPTY)
Map<String, Object> outputs;

@With
@JsonSerialize(using = ListOrMapOfLabelSerializer.class)
@JsonDeserialize(using = ListOrMapOfLabelDeserializer.class)
List<Label> labels;
Expand Down Expand Up @@ -180,6 +181,12 @@ public Execution build() {
this.prebuild();
return super.build();
}

@Override
public ExecutionBuilder labels(List<Label> labels) {
checkForSystemLabels(labels);
return super.labels(labels);
}
}

public Execution withState(State.Type state) {
Expand All @@ -205,6 +212,75 @@ public Execution withState(State.Type state) {
);
}

/**
* This method replaces labels with new ones.
* It refuses system labels as they must be passed via the withSystemLabels method.
*/
public Execution withLabels(List<Label> labels) {
checkForSystemLabels(labels);

return new Execution(
this.tenantId,
this.id,
this.namespace,
this.flowId,
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
labels,
this.variables,
this.state,
this.parentId,
this.originalId,
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate,
this.error
);
}

private static void checkForSystemLabels(List<Label> labels) {
if (labels != null) {
Optional<Label> first = labels.stream().filter(label -> label.key() != null && label.key().startsWith(SYSTEM_PREFIX)).findFirst();
if (first.isPresent()) {
throw new IllegalArgumentException("System labels can only be set by Kestra itself, offending label: " + first.get().key() + "=" + first.get().value());
}
}
}

/**
* This method in <b>only to be used</b> to add system labels to an execution.
* It will not replace exisiting labels but add new one (possibly duplicating).
*/
public Execution withSystemLabels(List<Label> labels) {
List<Label> newLabels = this.labels == null ? new ArrayList<>() : this.labels;
if (labels != null) {
newLabels.addAll(labels);
}
return new Execution(
this.tenantId,
this.id,
this.namespace,
this.flowId,
this.flowRevision,
this.taskRunList,
this.inputs,
this.outputs,
newLabels,
this.variables,
this.state,
this.parentId,
this.originalId,
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate,
this.error
);
}

public Execution withTaskRun(TaskRun taskRun) throws InternalException {
ArrayList<TaskRun> newTaskRunList = new ArrayList<>(this.taskRunList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import dev.failsafe.RetryPolicyBuilder;
import io.kestra.core.validations.ConstantRetryValidation;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -14,6 +15,7 @@
@SuperBuilder
@Getter
@NoArgsConstructor
@ConstantRetryValidation
public class Constant extends AbstractRetry {
@NotNull
@JsonInclude
Expand Down
Loading

0 comments on commit 5ca0ee7

Please sign in to comment.