Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] [Engine] Fix Checkpoint failed reason can't be found. #3769

Merged
merged 3 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static class RetryMaterial {
/**
* An arbitrary absolute maximum practical retry time.
*/
public static final long MAX_RETRY_TIME_MS = TimeUnit.MINUTES.toMillis(1);
public static final long MAX_RETRY_TIME_MS = TimeUnit.SECONDS.toMillis(20);

/**
* The maximum retry time.
Expand All @@ -93,6 +93,8 @@ public static class RetryMaterial {
// this is the exception condition, can add result condition in the future.
private final RetryCondition<Exception> retryCondition;

private final boolean sleepTimeIncrease;

/**
* The interval between each retry
*/
Expand All @@ -104,10 +106,16 @@ public RetryMaterial(int retryTimes, boolean shouldThrowException, RetryConditio

public RetryMaterial(int retryTimes, boolean shouldThrowException,
RetryCondition<Exception> retryCondition, long sleepTimeMillis) {
this(retryTimes, shouldThrowException, retryCondition, sleepTimeMillis, false);
}

public RetryMaterial(int retryTimes, boolean shouldThrowException,
RetryCondition<Exception> retryCondition, long sleepTimeMillis, boolean sleepTimeIncrease) {
this.retryTimes = retryTimes;
this.shouldThrowException = shouldThrowException;
this.retryCondition = retryCondition;
this.sleepTimeMillis = sleepTimeMillis;
this.sleepTimeIncrease = sleepTimeIncrease;
}

public int getRetryTimes() {
Expand All @@ -130,6 +138,9 @@ public long computeRetryWaitTimeMillis(int retryAttempts) {
if (sleepTimeMillis < 0) {
return 0;
}
if (!sleepTimeIncrease) {
return sleepTimeMillis;
}
if (retryAttempts > MAX_RETRY_TIME) {
// This would overflow the exponential algorithm ...
return MAX_RETRY_TIME_MS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -135,7 +136,7 @@ private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionT
return taskTracker;
} catch (Exception e) {
taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(t);
}
}
return null;
Expand All @@ -154,7 +155,7 @@ private void submitBlockingTask(TaskGroupExecutionTracker taskGroupExecutionTrac
.map(executorService::submit)
.collect(toList());

// Do not return from this method until all workers have started. Otherwise
// Do not return from this method until all workers have started. Otherwise,
// on cancellation there is a race where the executor might not have started
// the worker yet. This would result in taskletDone() never being called for
// a worker.
Expand Down Expand Up @@ -292,7 +293,11 @@ private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, TaskE
public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
if (cancellationFutures.containsKey(taskGroupLocation)) {
cancellationFutures.get(taskGroupLocation).cancel(false);
try {
cancellationFutures.get(taskGroupLocation).cancel(false);
} catch (CancellationException ignore) {
// ignore
}
} else {
logger.warning(String.format("need cancel taskId : %s is not exist", taskGroupLocation));
}
Expand Down Expand Up @@ -352,11 +357,16 @@ public void run() {
result = t.call();
} while (!result.isDone() && isRunning &&
!taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (InterruptedException e) {
logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t));
if (taskGroupExecutionTracker.executionException.get() == null && !taskGroupExecutionTracker.isCancel.get()) {
taskGroupExecutionTracker.exception(e);
}
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(t);
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
Expand Down Expand Up @@ -402,7 +412,7 @@ public void run() {
taskqueue.takeFirst();
TaskGroupExecutionTracker taskGroupExecutionTracker = taskTracker.taskGroupExecutionTracker;
if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
// If it's exclusive need to end the work
break;
Expand All @@ -426,7 +436,7 @@ public void run() {
} catch (Throwable e) {
//task Failure and complete
taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
//If it's exclusive need to end the work
logger.warning("Exception in " + taskTracker.task, e);
if (null != exclusiveTaskTracker.get()) {
Expand All @@ -440,7 +450,7 @@ public void run() {
if (null != call) {
if (call.isDone()) {
//If it's exclusive, you need to end the work
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
break;
}
Expand Down Expand Up @@ -504,31 +514,42 @@ public final class TaskGroupExecutionTracker {
e = new IllegalStateException("cancellationFuture should be completed exceptionally");
}
exception(e);
// Don't interrupt the threads. We require that they do not block for too long,
// interrupting them might make the termination faster, but can also cause troubles.
blockingFutures.forEach(f -> f.cancel(false));
cancelAllTask();
}));
}

void exception(Throwable t) {
executionException.compareAndSet(null, t);
}

void taskDone() {
logger.info("taskDone: " + taskGroup.getTaskGroupLocation());
private void cancelAllTask() {
try {
blockingFutures.forEach(f -> f.cancel(true));
} catch (CancellationException ignore) {
// ignore
}
}

void taskDone(Task task) {
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
logger.info(String.format("taskDone, taskId = %d, taskGroup = %s", task.getTaskID(), taskGroupLocation));
Throwable ex = executionException.get();
if (completionLatch.decrementAndGet() == 0) {
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
finishedExecutionContexts.put(taskGroupLocation, executionContexts.remove(taskGroupLocation));
cancellationFutures.remove(taskGroupLocation);
Throwable ex = executionException.get();
if (ex == null) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FINISHED, null));
return;
} else if (isCancel.get()) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.CANCELED, null));
return;
} else {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
}
}
if (!isCancel.get() && ex != null) {
cancelAllTask();
}
}

boolean executionCompletedExceptionally() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,22 @@ protected void reportedTask(TaskReportStatusOperation operation) {
default:
break;
}
}).exceptionally(error -> {
handleCoordinatorError("task running failed", error, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
return null;
});
}

private void handleCoordinatorError(String message, Throwable e, CheckpointFailureReason reason) {
LOG.error(message, e);
handleCoordinatorError(reason);
}

private void handleCoordinatorError(CheckpointFailureReason reason) {
cleanPendingCheckpoint(reason);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(reason));
}

private void restoreTaskState(TaskLocation taskLocation) {
List<ActionSubtaskState> states = new ArrayList<>();
if (latestCompletedCheckpoint != null) {
Expand All @@ -205,7 +218,7 @@ private void restoreTaskState(TaskLocation taskLocation) {
}
});
}
checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states));
checkpointManager.sendOperationToMemberNode(new NotifyTaskRestoreOperation(taskLocation, states)).join();
}

private void allTaskReady() {
Expand Down Expand Up @@ -253,7 +266,7 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
return;
}
final long currentTimestamp = Instant.now().toEpochMilli();
if (checkpointType.equals(CHECKPOINT_TYPE)) {
if (notFinalCheckpoint(checkpointType)) {
if (currentTimestamp - latestTriggerTimestamp.get() < coordinatorConfig.getCheckpointInterval() ||
pendingCounter.get() >= coordinatorConfig.getMaxConcurrentCheckpoints() || !isAllTaskReady) {
return;
Expand All @@ -264,10 +277,17 @@ protected void tryTriggerPendingCheckpoint(CheckpointType checkpointType) {
CompletableFuture<PendingCheckpoint> pendingCheckpoint = createPendingCheckpoint(currentTimestamp, checkpointType);
startTriggerPendingCheckpoint(pendingCheckpoint);
pendingCounter.incrementAndGet();
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
// if checkpoint type are final type, we don't need to trigger next checkpoint
if (notFinalCheckpoint(checkpointType)) {
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
}
}
}

private boolean notFinalCheckpoint(CheckpointType checkpointType) {
return checkpointType.equals(CHECKPOINT_TYPE);
}

@SuppressWarnings("checkstyle:MagicNumber")
private void waitingPendingCheckpointDone() {
while (pendingCounter.get() != 0) {
Expand Down Expand Up @@ -307,14 +327,12 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
PassiveCompletableFuture<CompletedCheckpoint> completableFuture = pendingCheckpoint.getCompletableFuture();
completableFuture.whenComplete((completedCheckpoint, error) -> {
if (error != null) {
LOG.error("trigger checkpoint failed", error);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
handleCoordinatorError("trigger checkpoint failed", error, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
} else {
try {
completePendingCheckpoint(completedCheckpoint);
} catch (Throwable e) {
LOG.error("complete checkpoint failed", e);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR));
handleCoordinatorError("complete checkpoint failed", e, CheckpointFailureReason.CHECKPOINT_INSIDE_ERROR);
}
}
});
Expand All @@ -341,8 +359,7 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
// If any task is not acked within the checkpoint timeout
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
handleCoordinatorError(CheckpointFailureReason.CHECKPOINT_EXPIRED);
}
}
}, coordinatorConfig.getCheckpointTimeout(),
Expand Down Expand Up @@ -392,12 +409,8 @@ CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(long triggerTimest

private Set<Long> getNotYetAcknowledgedTasks() {
// TODO: some tasks have completed and don't need to be ack
Set<Long> set = new CopyOnWriteArraySet<>();
Set<Long> threadUnsafe = plan.getPipelineSubtasks()
.stream().map(TaskLocation::getTaskID)
.collect(Collectors.toSet());
set.addAll(threadUnsafe);
return set;
return plan.getPipelineSubtasks()
.stream().map(TaskLocation::getTaskID).collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}

private Map<Long, ActionState> getActionStates() {
Expand Down Expand Up @@ -479,12 +492,14 @@ public void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
LOG.debug("pending checkpoint({}/{}@{}) completed! cost: {}, trigger: {}, completed: {}",
completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), completedCheckpoint.getJobId(),
completedCheckpoint.getCompletedTimestamp() - completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCheckpointTimestamp(), completedCheckpoint.getCompletedTimestamp());
pendingCounter.decrementAndGet();
final long checkpointId = completedCheckpoint.getCheckpointId();
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
tryTriggerPendingCheckpoint();
if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
tryTriggerPendingCheckpoint();
}
}
completedCheckpoints.addLast(completedCheckpoint);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
*/
public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(),
log.debug("reported task({}) status {}", reportStatusOperation.getLocation().getTaskID(),
reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import lombok.NonNull;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,14 +69,10 @@ public class PhysicalPlan {
*/
private CompletableFuture<JobStatus> jobEndFuture;

private final ExecutorService executorService;

private final String jobFullName;

private final long jobId;

private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;

private JobMaster jobMaster;

/**
Expand All @@ -97,7 +91,6 @@ public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
long initializationTimestamp,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap) {
this.executorService = executorService;
this.jobImmutableInformation = jobImmutableInformation;
this.jobId = jobImmutableInformation.getJobId();
Long[] stateTimestamps = new Long[JobStatus.values().length];
Expand All @@ -123,7 +116,6 @@ public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId());

pipelineSchedulerFutureMap = new ConcurrentHashMap<>(pipelineList.size());
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
Expand Down Expand Up @@ -220,12 +212,8 @@ public void cancelJob() {
}

private void cancelJobPipelines() {
List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
pipeline.cancelPipeline();
});
return future;
}).filter(x -> x != null).collect(Collectors.toList());
List<CompletableFuture<Void>> collect = pipelineList.stream()
.map(pipeline -> CompletableFuture.runAsync(pipeline::cancelPipeline)).collect(Collectors.toList());

try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
Expand Down Expand Up @@ -309,10 +297,6 @@ public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus tar
}
}

public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
return new PassiveCompletableFuture<>(jobEndFuture);
}

public JobImmutableInformation getJobImmutableInformation() {
return jobImmutableInformation;
}
Expand Down
Loading