Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1035,14 +1035,24 @@ private static int getCumulativeParallelism(VertexParallelism potentialNewParall

@Override
public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {

@Nullable
final Throwable optionalFailure =
archivedExecutionGraph.getFailureInfo() != null
? archivedExecutionGraph.getFailureInfo().getException()
: null;
LOG.info(
"Job {} reached terminal state {}.",
archivedExecutionGraph.getJobID(),
archivedExecutionGraph.getState(),
optionalFailure);

if (jobStatusListener != null) {
jobStatusListener.jobStatusChanges(
jobInformation.getJobID(),
archivedExecutionGraph.getState(),
archivedExecutionGraph.getStatusTimestamp(archivedExecutionGraph.getState()),
archivedExecutionGraph.getFailureInfo() != null
? archivedExecutionGraph.getFailureInfo().getException()
: null);
optionalFailure);
}

jobTerminationFuture.complete(archivedExecutionGraph.getState());
Expand All @@ -1058,7 +1068,7 @@ public Executing.FailureResult howToHandleFailure(Throwable failure) {
restartBackoffTimeStrategy.notifyFailure(failure);
if (restartBackoffTimeStrategy.canRestart()) {
return Executing.FailureResult.canRestart(
Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
failure, Duration.ofMillis(restartBackoffTimeStrategy.getBackoffTime()));
} else {
return Executing.FailureResult.canNotRestart(
new JobException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ public void cancel() {

@Override
public void handleGlobalFailure(Throwable cause) {
// ignore global failures
getLogger()
.debug(
"Ignored global failure because we are already canceling the job {}.",
getJobId(),
cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -85,12 +86,14 @@ private void handleAnyFailure(Throwable cause) {
final FailureResult failureResult = context.howToHandleFailure(cause);

if (failureResult.canRestart()) {
getLogger().info("Restarting job.", failureResult.getFailureCause());
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
getOperatorCoordinatorHandler(),
failureResult.getBackoffTime());
} else {
getLogger().info("Failing job.", failureResult.getFailureCause());
context.goToFailing(
getExecutionGraph(),
getExecutionGraphHandler(),
Expand All @@ -106,7 +109,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
if (successfulUpdate) {
if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) {
Throwable cause = taskExecutionState.getError(userCodeClassLoader);
handleAnyFailure(cause);
handleAnyFailure(
cause == null
? new FlinkException(
"Unknown failure cause. Probably related to FLINK-21376.")
: cause);
}
}

Expand Down Expand Up @@ -281,9 +288,9 @@ CompletableFuture<String> goToStopWithSavepoint(
static final class FailureResult {
@Nullable private final Duration backoffTime;

@Nullable private final Throwable failureCause;
private final Throwable failureCause;

private FailureResult(@Nullable Duration backoffTime, @Nullable Throwable failureCause) {
private FailureResult(Throwable failureCause, @Nullable Duration backoffTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that this is safe? There are some cases where failure causes for tasks can be null (FLINK-21376), but I'm not sure if this applies here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it applies. I will add a safety net.

this.backoffTime = backoffTime;
this.failureCause = failureCause;
}
Expand All @@ -299,20 +306,18 @@ Duration getBackoffTime() {
}

Throwable getFailureCause() {
Preconditions.checkState(
failureCause != null,
"Failure result must not be restartable to return a failure cause.");
return failureCause;
}

/**
* Creates a FailureResult which allows to restart the job.
*
* @param failureCause failureCause for restarting the job
* @param backoffTime backoffTime to wait before restarting the job
* @return FailureResult which allows to restart the job
*/
static FailureResult canRestart(Duration backoffTime) {
return new FailureResult(backoffTime, null);
static FailureResult canRestart(Throwable failureCause, Duration backoffTime) {
return new FailureResult(failureCause, backoffTime);
}

/**
Expand All @@ -322,7 +327,7 @@ static FailureResult canRestart(Duration backoffTime) {
* @return FailureResult which does not allow to restart the job
*/
static FailureResult canNotRestart(Throwable failureCause) {
return new FailureResult(null, failureCause);
return new FailureResult(failureCause, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public void cancel() {

@Override
public void handleGlobalFailure(Throwable cause) {
// nothing to do since we are already failing
getLogger()
.debug(
"Ignored global failure because we are already failing the job {}.",
getJobId(),
cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public ArchivedExecutionGraph getJob() {
}

@Override
public void handleGlobalFailure(Throwable cause) {}
public void handleGlobalFailure(Throwable cause) {
logger.debug(
"Ignore global failure because we already finished the job {}.",
archivedExecutionGraph.getJobID(),
cause);
}

@Override
public Logger getLogger() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ public void cancel() {

@Override
public void handleGlobalFailure(Throwable cause) {
// don't do anything
getLogger()
.debug(
"Ignored global failure because we are already restarting the job {}.",
getJobId(),
cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ ExecutionGraph getExecutionGraph() {
return executionGraph;
}

JobID getJobId() {
return executionGraph.getJobID();
}

protected OperatorCoordinatorHandler getOperatorCoordinatorHandler() {
return operatorCoordinatorHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState
if (successfulUpdate) {
if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) {
Throwable cause = taskExecutionStateTransition.getError(userCodeClassLoader);
handleAnyFailure(cause);
handleAnyFailure(
cause == null
? new FlinkException(
"Unknown failure cause. Probably related to FLINK-21376.")
: cause);
}
}

Expand Down Expand Up @@ -179,12 +183,14 @@ private void handleAnyFailure(Throwable cause) {
final Executing.FailureResult failureResult = context.howToHandleFailure(cause);

if (failureResult.canRestart()) {
getLogger().info("Restarting job.", failureResult.getFailureCause());
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
getOperatorCoordinatorHandler(),
failureResult.getBackoffTime());
} else {
getLogger().info("Failing job.", failureResult.getFailureCause());
context.goToFailing(
getExecutionGraph(),
getExecutionGraphHandler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public void testRecoverableGlobalFailureTransitionsToRestarting() throws Excepti
ctx.setExpectRestarting(
(restartingArguments ->
assertThat(restartingArguments.getBackoffTime(), is(duration))));
ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(duration));
ctx.setHowToHandleFailure((t) -> Executing.FailureResult.canRestart(t, duration));
exec.handleGlobalFailure(new RuntimeException("Recoverable error"));
}
}
Expand Down Expand Up @@ -234,7 +234,8 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws
new ExecutingStateBuilder()
.setExecutionGraph(returnsFailedStateExecutionGraph)
.build(ctx);
ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO));
ctx.setHowToHandleFailure(
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));
ctx.setExpectRestarting(assertNonNull());

exec.updateTaskExecutionState(createFailingStateTransition());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void testRestartOnGlobalFailureIfRestartConfigured() throws Exception {
StopWithSavepoint sws = createStopWithSavepoint(ctx);
ctx.setStopWithSavepoint(sws);
ctx.setHowToHandleFailure(
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));

ctx.setExpectRestarting(assertNonNull());

Expand Down Expand Up @@ -229,7 +229,7 @@ public void testRestartingOnUpdateTaskExecutionStateWithRestart() throws Excepti
createStopWithSavepoint(ctx, new StateTrackingMockExecutionGraph());
ctx.setStopWithSavepoint(sws);
ctx.setHowToHandleFailure(
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));

ctx.setExpectRestarting(assertNonNull());

Expand Down Expand Up @@ -277,7 +277,7 @@ public void testRestartOnTaskFailureAfterSavepointCompletion() throws Exception
ctx.setStopWithSavepoint(sws);

ctx.setHowToHandleFailure(
(ignore) -> Executing.FailureResult.canRestart(Duration.ZERO));
(throwable) -> Executing.FailureResult.canRestart(throwable, Duration.ZERO));

ctx.setExpectRestarting(assertNonNull());

Expand Down