Skip to content

Commit

Permalink
fix(core): fix inputs for execution resume (#5494)
Browse files Browse the repository at this point in the history
fix: #5494
  • Loading branch information
fhussonnois committed Oct 16, 2024
1 parent 5acb6d0 commit 548e058
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions core/src/main/java/io/kestra/core/services/ExecutionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,19 +459,16 @@ public Execution resume(Execution execution, Flow flow, State.Type newState) thr
* @param flow the flow of the execution
* @param inputs the onResume inputs
* @return the execution in the new state.
* @throws Exception if the state of the execution cannot be updated
*/
public Mono<List<InputAndValue>> validateForResume(final Execution execution, Flow flow, @Nullable Publisher<CompletedPart> inputs) {
return getFirstPausedTaskOrThrow(execution, flow).handle((task, sink) -> {
if (task instanceof Pause pauseTask) {
flowInputOutput.validateExecutionInputs(
pauseTask.getOnResume(),
execution,
inputs
).subscribe(sink::next, sink::error);
}
sink.next(Collections.emptyList());
});
return getFirstPausedTaskOrThrow(execution, flow)
.flatMap(task -> {
if (task instanceof Pause pauseTask) {
return flowInputOutput.validateExecutionInputs(pauseTask.getOnResume(), execution, inputs);
} else {
return Mono.just(Collections.emptyList());
}
});
}

/**
Expand All @@ -483,26 +480,23 @@ public Mono<List<InputAndValue>> validateForResume(final Execution execution, Fl
* @param flow the flow of the execution
* @param inputs the onResume inputs
* @return the execution in the new state.
* @throws Exception if the state of the execution cannot be updated
*/
public Mono<Execution> resume(final Execution execution, Flow flow, State.Type newState, @Nullable Publisher<CompletedPart> inputs) {
return getFirstPausedTaskOrThrow(execution, flow).handle((task, sink) -> {
Mono<Map<String, Object>> monoOutputs;

if (task instanceof Pause pauseTask) {
monoOutputs = flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), execution, inputs);
} else {
monoOutputs = Mono.just(Collections.emptyMap());
}
Mono<Execution> monoExecution = monoOutputs.handle((outputs, monoSink) -> {
return getFirstPausedTaskOrThrow(execution, flow)
.flatMap(task -> {
if (task instanceof Pause pauseTask) {
return flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), execution, inputs);
} else {
return Mono.just(Collections.<String, Object>emptyMap());
}
})
.handle((resumeInputs, sink) -> {
try {
sink.next(resume(execution, flow, newState, outputs));
sink.next(resume(execution, flow, newState, resumeInputs));
} catch (Exception e) {
sink.error(e);
}
});
monoExecution.subscribe(sink::next, sink::error);
});
}

private static Mono<Task> getFirstPausedTaskOrThrow(Execution execution, Flow flow){
Expand Down

0 comments on commit 548e058

Please sign in to comment.