diff --git a/core/src/main/java/io/kestra/core/services/ExecutionService.java b/core/src/main/java/io/kestra/core/services/ExecutionService.java index 16b2c316bc..0767de2ab3 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionService.java @@ -459,19 +459,16 @@ public Execution resume(Execution execution, Flow flow, State.Type newState) thr * @param flow the flow of the execution * @param inputs the onResume inputs * @return the execution in the new state. - * @throws Exception if the state of the execution cannot be updated */ public Mono> validateForResume(final Execution execution, Flow flow, @Nullable Publisher inputs) { - return getFirstPausedTaskOrThrow(execution, flow).handle((task, sink) -> { - if (task instanceof Pause pauseTask) { - flowInputOutput.validateExecutionInputs( - pauseTask.getOnResume(), - execution, - inputs - ).subscribe(sink::next, sink::error); - } - sink.next(Collections.emptyList()); - }); + return getFirstPausedTaskOrThrow(execution, flow) + .flatMap(task -> { + if (task instanceof Pause pauseTask) { + return flowInputOutput.validateExecutionInputs(pauseTask.getOnResume(), execution, inputs); + } else { + return Mono.just(Collections.emptyList()); + } + }); } /** @@ -483,26 +480,23 @@ public Mono> validateForResume(final Execution execution, Fl * @param flow the flow of the execution * @param inputs the onResume inputs * @return the execution in the new state. - * @throws Exception if the state of the execution cannot be updated */ public Mono resume(final Execution execution, Flow flow, State.Type newState, @Nullable Publisher inputs) { - return getFirstPausedTaskOrThrow(execution, flow).handle((task, sink) -> { - Mono> monoOutputs; - - if (task instanceof Pause pauseTask) { - monoOutputs = flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), execution, inputs); - } else { - monoOutputs = Mono.just(Collections.emptyMap()); - } - Mono monoExecution = monoOutputs.handle((outputs, monoSink) -> { + return getFirstPausedTaskOrThrow(execution, flow) + .flatMap(task -> { + if (task instanceof Pause pauseTask) { + return flowInputOutput.readExecutionInputs(pauseTask.getOnResume(), execution, inputs); + } else { + return Mono.just(Collections.emptyMap()); + } + }) + .handle((resumeInputs, sink) -> { try { - sink.next(resume(execution, flow, newState, outputs)); + sink.next(resume(execution, flow, newState, resumeInputs)); } catch (Exception e) { sink.error(e); } }); - monoExecution.subscribe(sink::next, sink::error); - }); } private static Mono getFirstPausedTaskOrThrow(Execution execution, Flow flow){