diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index bad1cc9b0819e..9a1a252fcedf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1035,14 +1035,24 @@ private static int getCumulativeParallelism(VertexParallelism potentialNewParall @Override public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) { + + @Nullable + final Throwable optionalFailure = + archivedExecutionGraph.getFailureInfo() != null + ? archivedExecutionGraph.getFailureInfo().getException() + : null; + LOG.info( + "Job {} reached terminal state {}.", + archivedExecutionGraph.getJobID(), + archivedExecutionGraph.getState(), + optionalFailure); + if (jobStatusListener != null) { jobStatusListener.jobStatusChanges( jobInformation.getJobID(), archivedExecutionGraph.getState(), archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()), - archivedExecutionGraph.getFailureInfo() != null - ? archivedExecutionGraph.getFailureInfo().getException() - : null); + optionalFailure); } jobTerminationFuture.complete(archivedExecutionGraph.getState()); @@ -1058,7 +1068,7 @@ public Executing.FailureResult howToHandleFailure(Throwable failure) { restartBackoffTimeStrategy.notifyFailure(failure); if (restartBackoffTimeStrategy.canRestart()) { return Executing.FailureResult.canRestart( - Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); + failure, Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime())); } else { return Executing.FailureResult.canNotRestart( new JobException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java index 54ddabcc7b717..f5d82a1d39996 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Canceling.java @@ -56,7 +56,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // ignore global failures + getLogger() + .debug( + "Ignored global failure because we are already canceling the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index dcb0366e01063..6b44c51dc9612 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -85,12 +86,14 @@ private void handleAnyFailure(Throwable cause) { final FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), @@ -106,7 +109,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState if (successfulUpdate) { if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { Throwable cause = taskExecutionState.getError(userCodeClassLoader); - handleAnyFailure(cause); + handleAnyFailure( + cause == null + ? new FlinkException( + "Unknown failure cause. Probably related to FLINK-21376.") + : cause); } } @@ -281,9 +288,9 @@ CompletableFuture goToStopWithSavepoint( static final class FailureResult { @Nullable private final Duration backoffTime; - @Nullable private final Throwable failureCause; + private final Throwable failureCause; - private FailureResult(@Nullable Duration backoffTime, @Nullable Throwable failureCause) { + private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) { this.backoffTime = backoffTime; this.failureCause = failureCause; } @@ -299,20 +306,18 @@ Duration getBackoffTime() { } Throwable getFailureCause() { - Preconditions.checkState( - failureCause != null, - "Failure result must not be restartable to return a failure cause."); return failureCause; } /** * Creates a FailureResult which allows to restart the job. * + * @param failureCause failureCause for restarting the job * @param backoffTime backoffTime to wait before restarting the job * @return FailureResult which allows to restart the job */ - static FailureResult canRestart(Duration backoffTime) { - return new FailureResult(backoffTime, null); + static FailureResult canRestart(Throwable failureCause, Duration backoffTime) { + return new FailureResult(failureCause, backoffTime); } /** @@ -322,7 +327,7 @@ static FailureResult canRestart(Duration backoffTime) { * @return FailureResult which does not allow to restart the job */ static FailureResult canNotRestart(Throwable failureCause) { - return new FailureResult(null, failureCause); + return new FailureResult(failureCause, null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java index dc44c3c081dac..c8361617b733c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java @@ -58,7 +58,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // nothing to do since we are already failing + getLogger() + .debug( + "Ignored global failure because we are already failing the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java index 7a7c9f236caa7..f36a98b1dd258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Finished.java @@ -54,7 +54,12 @@ public ArchivedExecutionGraph getJob() { } @Override - public void handleGlobalFailure(Throwable cause) {} + public void handleGlobalFailure(Throwable cause) { + logger.debug( + "Ignore global failure because we already finished the job {}.", + archivedExecutionGraph.getJobID(), + cause); + } @Override public Logger getLogger() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java index ffd7eb6ed68e4..ed5146ac05b1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java @@ -77,7 +77,11 @@ public void cancel() { @Override public void handleGlobalFailure(Throwable cause) { - // don't do anything + getLogger() + .debug( + "Ignored global failure because we are already restarting the job {}.", + getJobId(), + cause); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java index a85caac413550..9962c78d60532 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraph.java @@ -112,6 +112,10 @@ ExecutionGraph getExecutionGraph() { return executionGraph; } + JobID getJobId() { + return executionGraph.getJobID(); + } + protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() { return operatorCoordinatorHandler; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java index dcde71c25ceb5..6c8dc4fe9570b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepoint.java @@ -147,7 +147,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState if (successfulUpdate) { if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) { Throwable cause = taskExecutionStateTransition.getError(userCodeClassLoader); - handleAnyFailure(cause); + handleAnyFailure( + cause == null + ? new FlinkException( + "Unknown failure cause. Probably related to FLINK-21376.") + : cause); } } @@ -179,12 +183,14 @@ private void handleAnyFailure(Throwable cause) { final Executing.FailureResult failureResult = context.howToHandleFailure(cause); if (failureResult.canRestart()) { + getLogger().info("Restarting job.", failureResult.getFailureCause()); context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), getOperatorCoordinatorHandler(), failureResult.getBackoffTime()); } else { + getLogger().info("Failing job.", failureResult.getFailureCause()); context.goToFailing( getExecutionGraph(), getExecutionGraphHandler(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index df5f2332846ec..2852bdab7281e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -140,7 +140,7 @@ public void testRecoverableGlobalFailureTransitionsToRestarting() throws Excepti ctx.setExpectRestarting( (restartingArguments -> assertThat(restartingArguments.getBackoffTime(), is(duration)))); - ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(duration)); + ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(t, duration)); exec.handleGlobalFailure(new RuntimeException("Recoverable error")); } } @@ -234,7 +234,8 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws new ExecutingStateBuilder() .setExecutionGraph(returnsFailedStateExecutionGraph) .build(ctx); - ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO)); + ctx.setHowToHandleFailure( + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); exec.updateTaskExecutionState(createFailingStateTransition()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java index e0383fa33dcad..e9a1157b430dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StopWithSavepointTest.java @@ -174,7 +174,7 @@ public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception { StopWithSavepoint sws = createStopWithSavepoint(ctx); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -229,7 +229,7 @@ public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Excepti createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph()); ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); @@ -277,7 +277,7 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception ctx.setStopWithSavepoint(sws); ctx.setHowToHandleFailure( - (ignore) -> Executing.FailureResult.canRestart(Duration.ZERO)); + (throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO)); ctx.setExpectRestarting(assertNonNull());