Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Engine] Fix Checkpoint failed reason can't be found. #3769

Merged
merged 3 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static class RetryMaterial {
/**
* An arbitrary absolute maximum practical retry time.
*/
public static final long MAX_RETRY_TIME_MS = TimeUnit.MINUTES.toMillis(1);
public static final long MAX_RETRY_TIME_MS = TimeUnit.SECONDS.toMillis(20);

/**
* The maximum retry time.
Expand All @@ -93,6 +93,8 @@ public static class RetryMaterial {
// this is the exception condition, can add result condition in the future.
private final RetryCondition<Exception> retryCondition;

private final boolean sleepTimeIncrease;

/**
* The interval between each retry
*/
Expand All @@ -104,10 +106,16 @@ public RetryMaterial(int retryTimes, boolean shouldThrowException, RetryConditio

public RetryMaterial(int retryTimes, boolean shouldThrowException,
RetryCondition<Exception> retryCondition, long sleepTimeMillis) {
this(retryTimes, shouldThrowException, retryCondition, sleepTimeMillis, false);
}

public RetryMaterial(int retryTimes, boolean shouldThrowException,
RetryCondition<Exception> retryCondition, long sleepTimeMillis, boolean sleepTimeIncrease) {
this.retryTimes = retryTimes;
this.shouldThrowException = shouldThrowException;
this.retryCondition = retryCondition;
this.sleepTimeMillis = sleepTimeMillis;
this.sleepTimeIncrease = sleepTimeIncrease;
}

public int getRetryTimes() {
Expand All @@ -130,6 +138,9 @@ public long computeRetryWaitTimeMillis(int retryAttempts) {
if (sleepTimeMillis < 0) {
return 0;
}
if (!sleepTimeIncrease) {
return sleepTimeMillis;
}
if (retryAttempts > MAX_RETRY_TIME) {
// This would overflow the exponential algorithm ...
return MAX_RETRY_TIME_MS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public final class TaskGroupExecutionTracker {
exception(e);
// Don't interrupt the threads. We require that they do not block for too long,
// interrupting them might make the termination faster, but can also cause troubles.
blockingFutures.forEach(f -> f.cancel(false));
blockingFutures.forEach(f -> f.cancel(true));
}));
}

Expand All @@ -515,20 +515,24 @@ void exception(Throwable t) {
}

void taskDone() {
logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
logger.info("taskDone: " + taskGroupLocation);
Throwable ex = executionException.get();
if (completionLatch.decrementAndGet() == 0) {
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
finishedExecutionContexts.put(taskGroupLocation, executionContexts.remove(taskGroupLocation));
cancellationFutures.remove(taskGroupLocation);
Throwable ex = executionException.get();
if (ex == null) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED, null));
return;
} else if (isCancel.get()) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null));
} else {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
return;
}
}
if (ex != null) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
blockingFutures.forEach(f -> f.cancel(true));
}
}

boolean executionCompletedExceptionally() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,21 @@ protected void reportedTask(TaskReportStatusOperation operation) {
default:
break;
}
}).exceptionally(error -> {
handleCoordinatorError("task running failed", error, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
return null;
});
}

private void handleCoordinatorError(String message, Throwable e, CheckpointFailureReason reason) {
LOG.error(message, e);
handleCoordinatorError(reason);
}

private void handleCoordinatorError(CheckpointFailureReason reason) {
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(reason));
}

private void restoreTaskState(TaskLocation taskLocation) {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
Expand All @@ -205,7 +217,7 @@ private void restoreTaskState(TaskLocation taskLocation) {
}
});
}
checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states));
checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states)).join();
}

private void allTaskReady() {
Expand Down Expand Up @@ -253,7 +265,7 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
return;
}
final long currentTimestamp = Instant.now().toEpochMilli();
if (checkpointType.equals(CHECKPOINT_TYPE)) {
if (notFinalCheckpoint(checkpointType)) {
if (currentTimestamp - latestTriggerTimestamp.get() < coordinatorConfig.getCheckpointInterval() ||
pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) {
return;
Expand All @@ -264,10 +276,17 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
CompletableFuture<PendingCheckpoint> pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType);
startTriggerPendingCheckpoint(pendingCheckpoint);
pendingCounter.incrementAndGet();
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
// if checkpoint type are final type, we don't need to trigger next checkpoint
if (notFinalCheckpoint(checkpointType)) {
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
}
}
}

private boolean notFinalCheckpoint(CheckpointType checkpointType) {
return checkpointType.equals(CHECKPOINT_TYPE);
}

@SuppressWarnings("checkstyle:MagicNumber")
private void waitingPendingCheckpointDone() {
while (pendingCounter.get() != 0) {
Expand Down Expand Up @@ -307,14 +326,12 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
PassiveCompletableFuture<CompletedCheckpoint> completableFuture = pendingCheckpoint.getCompletableFuture();
completableFuture.whenComplete((completedCheckpoint, error) -> {
if (error != null) {
LOG.error("trigger checkpoint failed", error);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
handleCoordinatorError("trigger checkpoint failed", error, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
} else {
try {
completePendingCheckpoint(completedCheckpoint);
} catch (Throwable e) {
LOG.error("complete checkpoint failed", e);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
handleCoordinatorError("complete checkpoint failed", e, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
}
}
});
Expand Down Expand Up @@ -342,7 +359,7 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
handleCoordinatorError(CheckpointFailureReason.CHECKPOINT_EXPIRED);
}
}
}, coordinatorConfig.getCheckpointTimeout(),
Expand Down Expand Up @@ -479,12 +496,14 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
LOG.debug("pending checkpoint({}/{}@{}) completed! cost: {}, trigger: {}, completed: {}",
completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
completedCheckpoint.getCompletedTimestamp() - completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCompletedTimestamp());
pendingCounter.decrementAndGet();
final long checkpointId = completedCheckpoint.getCheckpointId();
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
tryTriggerPendingCheckpoint();
if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
tryTriggerPendingCheckpoint();
}
}
completedCheckpoints.addLast(completedCheckpoint);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -149,7 +150,7 @@ public void triggerBarrier(Barrier barrier) throws Exception {
public void restoreState(List<ActionSubtaskState> actionStateList) throws Exception {
Optional<Serializable> state = actionStateList.stream()
.map(ActionSubtaskState::getState)
.flatMap(Collection::stream)
.flatMap(Collection::stream).filter(Objects::nonNull)
.map(bytes -> sneaky(() -> enumeratorStateSerializer.deserialize(bytes)))
.findFirst();
if (state.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {

public static final int QUEUE_SIZE = 1000;
public static final int QUEUE_SIZE = 100000;

public TaskGroupWithIntermediateQueue(TaskGroupLocation taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
super(taskGroupLocation, taskGroupName, tasks);
Expand Down