Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/MRADUL9936/kestra into d…
Browse files Browse the repository at this point in the history
…evelop
  • Loading branch information
MRADUL9936 committed Oct 23, 2024
2 parents b4b1473 + 46962c9 commit 14a4ac8
Show file tree
Hide file tree
Showing 94 changed files with 2,109 additions and 718 deletions.
4 changes: 2 additions & 2 deletions .github/ISSUE_TEMPLATE/bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ body:
- type: markdown
attributes:
value: |
Thanks for reporting an issue! Please provide a [Minima Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example)
Thanks for reporting an issue! Please provide a [Minimal Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example)
and share any additional information that may help reproduce, troubleshoot, and hopefully fix the issue, including screenshots, error traceback, and your Kestra server logs.
NOTE: If your issue is more of a question, please ping us directly on [Slack](https://kestra.io/slack).
- type: textarea
Expand All @@ -25,4 +25,4 @@ body:
validations:
required: false
labels:
- bug
- bug
4 changes: 4 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ jobs:
run: |
regctl image copy ${{ format('kestra/kestra:{0}{1}', steps.vars.outputs.tag, matrix.image.name) }} ${{ format('kestra/kestra:latest{0}', matrix.image.name) }}
- name: Retag latest to latest-full
if: github.event.inputs.retag-latest == 'true'
run: |
regctl image copy kestra/kestra:latest kestra/kestra:latest-full
end:
runs-on: ubuntu-latest
needs:
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/java/io/kestra/core/models/executions/Execution.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ public class Execution implements DeletedInterface, TenantInterface {
@Nullable
Instant scheduleDate;

@With
@Nullable
ExecutionError error;

/**
* Factory method for constructing a new {@link Execution} object for the given {@link Flow}.
*
Expand Down Expand Up @@ -196,7 +200,8 @@ public Execution withState(State.Type state) {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.error
);
}

Expand Down Expand Up @@ -230,7 +235,8 @@ public Execution withTaskRun(TaskRun taskRun) throws InternalException {
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
taskRun.getError() != null ? taskRun.getError() : this.error
);
}

Expand All @@ -252,7 +258,8 @@ public Execution childExecution(String childExecutionId, List<TaskRun> taskRunLi
this.trigger,
this.deleted,
this.metadata,
this.scheduleDate
this.scheduleDate,
this.error
);
}

Expand Down Expand Up @@ -515,10 +522,10 @@ public boolean hasRunning(List<ResolvedTask> resolvedTasks, TaskRun parentTaskRu
}

public State.Type guessFinalState(Flow flow) {
return this.guessFinalState(ResolvedTask.of(flow.getTasks()), null, false);
return this.guessFinalState(ResolvedTask.of(flow.getTasks()), null, false, false);
}

public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun, boolean allowFailure) {
public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun parentTaskRun, boolean allowFailure, boolean allowWarning) {
List<TaskRun> taskRuns = this.findTaskRunByTasks(currentTasks, parentTaskRun);
var state = this
.findLastByState(taskRuns, State.Type.KILLED)
Expand All @@ -538,8 +545,14 @@ public State.Type guessFinalState(List<ResolvedTask> currentTasks, TaskRun paren
.orElse(State.Type.SUCCESS);

if (state == State.Type.FAILED && allowFailure) {
if (allowWarning) {
return State.Type.SUCCESS;
}
return State.Type.WARNING;
}
if (State.Type.WARNING.equals(state) && allowWarning) {
return State.Type.SUCCESS;
}
return state;
}

Expand Down Expand Up @@ -625,15 +638,15 @@ public FailedExecutionWithLog failedExecutionFromExecutor(Exception e) {
.map(t -> {
try {
return new FailedExecutionWithLog(
this.withTaskRun(t.getTaskRun()),
this.withTaskRun(t.getTaskRun()).withError(ExecutionError.from(e)),
t.getLogs()
);
} catch (InternalException ex) {
return null;
}
})
.orElseGet(() -> new FailedExecutionWithLog(
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED) : this,
this.state.getCurrent() != State.Type.FAILED ? this.withState(State.Type.FAILED).withError(ExecutionError.from(e)) : this.withError(ExecutionError.from(e)),
RunContextLogger.logEntries(loggingEventFromException(e), LogEntry.of(this))
)
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package io.kestra.core.models.executions;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Builder
@Setter
@Getter
public class ExecutionError {
private static final int MAX_NB_FRAMES = 10;

private String message;
private String stacktrace;

public static ExecutionError from(Throwable throwable) {
if (throwable == null) {
return ExecutionError.builder().message("Unknown error").build();
}

String firstLine = throwable.getClass().getName() + ": " + throwable.getMessage() + "\n";
StackTraceElement[] stackTraces = throwable.getStackTrace();
String stackTraceStr;
if (stackTraces.length > 10) {
// keep only the top 10 frames
stackTraceStr = stackTraceToString(firstLine, Arrays.copyOf(stackTraces, 10)) + "\n\t[...]";
} else {
stackTraceStr = stackTraceToString(firstLine, stackTraces);
}
return ExecutionError.builder()
.message(throwable.getMessage())
.stacktrace(stackTraceStr)
.build();
}

private static String stackTraceToString(String firstLine, StackTraceElement[] stackTraces) {
return Stream.of(stackTraces)
.map(stackTraceElement -> stackTraceElement.toString())
.collect(Collectors.joining("\n\t", firstLine, ""));
}
}
12 changes: 9 additions & 3 deletions core/src/main/java/io/kestra/core/models/executions/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class TaskRun implements TenantInterface {
@With
Integer iteration;

@With
ExecutionError error;

@Deprecated
public void setItems(String items) {
// no-op for backward compatibility
Expand All @@ -75,7 +78,8 @@ public TaskRun withState(State.Type state) {
this.attempts,
this.outputs,
this.state.withState(state),
this.iteration
this.iteration,
this.error
);
}

Expand All @@ -92,7 +96,8 @@ public TaskRun replaceState(State newState) {
this.attempts,
this.outputs,
newState,
this.iteration
this.iteration,
this.error
);
}

Expand All @@ -113,7 +118,8 @@ public TaskRun fail() {
newAttempts,
this.outputs,
this.state.withState(State.Type.FAILED),
this.iteration
this.iteration,
this.error
);
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Input.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
@JsonSubTypes.Type(value = TimeInput.class, name = "TIME"),
@JsonSubTypes.Type(value = URIInput.class, name = "URI"),
@JsonSubTypes.Type(value = MultiselectInput.class, name = "MULTISELECT"),
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML")
@JsonSubTypes.Type(value = YamlInput.class, name = "YAML"),
@JsonSubTypes.Type(value = EmailInput.class, name = "EMAIL"),
})
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class Input<T> implements Data {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/kestra/core/models/flows/State.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ public enum Type {
CANCELLED,
QUEUED,
RETRYING,
RETRIED;
RETRIED,
SKIPPED;

public boolean isTerminated() {
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED;
return this == Type.FAILED || this == Type.WARNING || this == Type.SUCCESS || this == Type.KILLED || this == Type.CANCELLED || this == Type.RETRIED || this == Type.SKIPPED;
}

public boolean isCreated() {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/kestra/core/models/flows/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public enum Type {
SECRET(SecretInput.class.getName()),
ARRAY(ArrayInput.class.getName()),
MULTISELECT(MultiselectInput.class.getName()),
YAML(YamlInput.class.getName());
YAML(YamlInput.class.getName()),
EMAIL(EmailInput.class.getName());

private final String clsName;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.kestra.core.models.flows.input;

import io.kestra.core.models.flows.Input;
import io.kestra.core.models.validations.ManualConstraintViolation;
import jakarta.validation.ConstraintViolationException;

import java.util.regex.Pattern;

public class EmailInput extends Input<String> {

private static final String EMAIL_PATTERN = "^$|^[a-zA-Z0-9_!#$%&’*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+$";

@Override
public void validate(String input) throws ConstraintViolationException {
if(!Pattern.matches(EMAIL_PATTERN, input)){
throw ManualConstraintViolation.toConstraintViolationException(
"The input must be a valid email",
this,
EmailInput.class,
getId(),
input
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public interface FlowableTask <T extends Output> {
*/
boolean isAllowFailure();

/**
* Whether the task is allowed to be in warning state.
*/
boolean isAllowWarning();

/**
* Resolve the state of a flowable task.
*/
Expand All @@ -68,7 +73,8 @@ default Optional<State.Type> resolveState(RunContext runContext, Execution execu
FlowableUtils.resolveTasks(this.getErrors(), parentTaskRun),
parentTaskRun,
runContext,
isAllowFailure()
isAllowFailure(),
isAllowWarning()
);
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/io/kestra/core/models/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ abstract public class Task implements TaskInterface {
@Builder.Default
private boolean logToFile = false;

@Builder.Default
private String runIf = "true";

@Builder.Default
private boolean allowWarning = false;

public Optional<Task> findById(String id) {
if (this.getId().equals(id)) {
return Optional.of(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.kestra.core.models.validations;

import io.kestra.core.models.flows.Input;
import io.kestra.core.models.tasks.Task;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;

import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class KestraConstraintViolationException extends ConstraintViolationException {

public KestraConstraintViolationException(Set<? extends ConstraintViolation<?>> constraintViolations) {
super(constraintViolations);
}

@Override
public String getMessage() {
StringBuilder message = new StringBuilder();
for (ConstraintViolation<?> violation : getConstraintViolations()) {
String errorMessage = violation.getPropertyPath() + ": " + violation.getMessage();
try {
if (violation.getLeafBean() instanceof Task) {
errorMessage = replaceId("tasks", violation.getPropertyPath().toString(), ((Task) violation.getLeafBean()).getId()) + ": " + violation.getMessage();
}
if (violation.getLeafBean() instanceof Input) {
errorMessage = replaceId("inputs", violation.getPropertyPath().toString(), ((Input) violation.getLeafBean()).getId()) + ": " + violation.getMessage();

}
} catch (Exception e) {
// In case we don't succeed at replacing the id, we just use the default message
}
message.append(errorMessage).append("\n");
}
return message.toString();
}

private String replaceId(String type, String errorMessage, String taskId) {
String regex = type + "\\[\\d+\\]";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(errorMessage);

return matcher.replaceAll(taskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import io.micronaut.validation.validator.Validator;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;

import java.util.Optional;
import java.util.Set;
import jakarta.validation.ConstraintViolation;
import jakarta.validation.ConstraintViolationException;

@Singleton
public class ModelValidator {
Expand All @@ -25,7 +25,7 @@ public <T> Optional<ConstraintViolationException> isValid(T model) {
Set<ConstraintViolation<T>> violations = validator.validate(model);

if (violations.size() > 0) {
return Optional.of(new ConstraintViolationException(violations));
return Optional.of(new KestraConstraintViolationException(violations));
}

return Optional.empty();
Expand Down
Loading

0 comments on commit 14a4ac8

Please sign in to comment.