Skip to content

Commit

Permalink
fix(core): failed expression on a trigger condition crash the scheduler
Browse files Browse the repository at this point in the history
Fixes #4629
  • Loading branch information
loicmathieu committed Aug 21, 2024
1 parent 823a934 commit a10da02
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ default Flow findByExecutionWithoutAcl(Execution execution) {
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision())
Optional.ofNullable(execution.getFlowRevision())
);

if (find.isEmpty()) {
Expand All @@ -50,7 +50,7 @@ default Flow findByExecution(Execution execution) {
execution.getTenantId(),
execution.getNamespace(),
execution.getFlowId(),
Optional.of(execution.getFlowRevision())
Optional.ofNullable(execution.getFlowRevision())
);

if (find.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ private void handle() {
var trigger = f.getTriggerContext().toBuilder().nextExecutionDate(nextExecutionDate).build().checkBackfill();
this.triggerState.save(trigger, scheduleContext);
}
} catch (InternalException ie) {
} catch (Exception ie) {
// validate schedule condition can fail to render variables
// in this case, we send a failed execution so the trigger is not evaluated each second.
logger.error("Unable to evaluate the trigger '{}'", f.getAbstractTrigger().getId(), ie);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import io.kestra.core.Helpers;
import io.kestra.core.events.CrudEvent;
import io.kestra.core.events.CrudEventType;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.FlowWithException;
import io.kestra.core.models.flows.FlowWithSource;
import io.kestra.core.models.flows.Type;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.*;
import io.kestra.core.models.flows.input.StringInput;
import io.kestra.core.models.triggers.Trigger;
import io.kestra.core.queues.QueueFactoryInterface;
Expand Down Expand Up @@ -55,6 +53,9 @@ public abstract class AbstractFlowRepositoryTest {
@Inject
protected FlowRepositoryInterface flowRepository;

@Inject
protected ExecutionRepositoryInterface executionRepository;

@Inject
private LocalFlowRepositoryLoader repositoryLoader;

Expand Down Expand Up @@ -546,6 +547,67 @@ protected void lastRevision() {
}
}

@Test
void findByExecution() {
Flow flow = builder()
.revision(1)
.build();
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.flowRevision(flow.getRevision())
.state(new State())
.build();
execution = executionRepository.save(execution);

try {
Flow full = flowRepository.findByExecution(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));

full = flowRepository.findByExecutionWithoutAcl(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
} finally {
deleteFlow(flow);
executionRepository.delete(execution);
}
}

@Test
void findByExecutionNoRevision() {
Flow flow = builder()
.revision(3)
.build();
flowRepository.create(flow, flow.generateSource(), pluginDefaultService.injectDefaults(flow));
Execution execution = Execution.builder()
.id(IdUtils.create())
.namespace(flow.getNamespace())
.flowId(flow.getId())
.state(new State())
.build();
executionRepository.save(execution);

try {
Flow full = flowRepository.findByExecution(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));

full = flowRepository.findByExecutionWithoutAcl(execution);
assertThat(full, notNullValue());
assertThat(full.getNamespace(), is(flow.getNamespace()));
assertThat(full.getId(), is(flow.getId()));
} finally {
deleteFlow(flow);
executionRepository.delete(execution);
}
}

private void deleteFlow(Flow flow) {
Integer revision = flowRepository.lastRevision(flow.getTenantId(), flow.getNamespace(), flow.getId());
flowRepository.delete(flow.toBuilder().revision(revision).build());
Expand Down

0 comments on commit a10da02

Please sign in to comment.