Skip to content

Commit

Permalink
[Improve] [Engine] Fix Checkpoint failed reason can't be found.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X committed Dec 21, 2022
1 parent 5112eb0 commit d90e4a4
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -135,7 +136,7 @@ private void submitThreadShareTask(TaskGroupExecutionTracker taskGroupExecutionT
return taskTracker;
} catch (Exception e) {
taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(t);
}
}
return null;
Expand Down Expand Up @@ -292,7 +293,11 @@ private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, TaskE
public void cancelTaskGroup(TaskGroupLocation taskGroupLocation) {
logger.info(String.format("Task (%s) need cancel.", taskGroupLocation));
if (cancellationFutures.containsKey(taskGroupLocation)) {
cancellationFutures.get(taskGroupLocation).cancel(false);
try {
cancellationFutures.get(taskGroupLocation).cancel(false);
} catch (CancellationException ignore) {
// ignore
}
} else {
logger.warning(String.format("need cancel taskId : %s is not exist", taskGroupLocation));
}
Expand Down Expand Up @@ -352,11 +357,16 @@ public void run() {
result = t.call();
} while (!result.isDone() && isRunning &&
!taskGroupExecutionTracker.executionCompletedExceptionally());
} catch (InterruptedException e) {
logger.warning(String.format("Interrupted task %d - %s", t.getTaskID(), t));
if (taskGroupExecutionTracker.executionException.get() == null && !taskGroupExecutionTracker.isCancel.get()) {
taskGroupExecutionTracker.exception(e);
}
} catch (Throwable e) {
logger.warning("Exception in " + t, e);
taskGroupExecutionTracker.exception(e);
} finally {
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(t);
}
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
Expand Down Expand Up @@ -402,7 +412,7 @@ public void run() {
taskqueue.takeFirst();
TaskGroupExecutionTracker taskGroupExecutionTracker = taskTracker.taskGroupExecutionTracker;
if (taskGroupExecutionTracker.executionCompletedExceptionally()) {
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
// If it's exclusive need to end the work
break;
Expand All @@ -426,7 +436,7 @@ public void run() {
} catch (Throwable e) {
//task Failure and complete
taskGroupExecutionTracker.exception(e);
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
//If it's exclusive need to end the work
logger.warning("Exception in " + taskTracker.task, e);
if (null != exclusiveTaskTracker.get()) {
Expand All @@ -440,7 +450,7 @@ public void run() {
if (null != call) {
if (call.isDone()) {
//If it's exclusive, you need to end the work
taskGroupExecutionTracker.taskDone();
taskGroupExecutionTracker.taskDone(taskTracker.task);
if (null != exclusiveTaskTracker.get()) {
break;
}
Expand Down Expand Up @@ -504,19 +514,25 @@ public final class TaskGroupExecutionTracker {
e = new IllegalStateException("cancellationFuture should be completed exceptionally");
}
exception(e);
// Don't interrupt the threads. We require that they do not block for too long,
// interrupting them might make the termination faster, but can also cause troubles.
blockingFutures.forEach(f -> f.cancel(true));
cancelAllTask();
}));
}

void exception(Throwable t) {
executionException.compareAndSet(null, t);
}

void taskDone() {
private void cancelAllTask() {
try {
blockingFutures.forEach(f -> f.cancel(true));
} catch (CancellationException ignore) {
// ignore
}
}

void taskDone(Task task) {
TaskGroupLocation taskGroupLocation = taskGroup.getTaskGroupLocation();
logger.info("taskDone: " + taskGroupLocation);
logger.info(String.format("taskDone, taskId = %d, taskGroup = %s", task.getTaskID(), taskGroupLocation));
Throwable ex = executionException.get();
if (completionLatch.decrementAndGet() == 0) {
finishedExecutionContexts.put(taskGroupLocation, executionContexts.remove(taskGroupLocation));
Expand All @@ -529,9 +545,9 @@ void taskDone() {
return;
}
}
if (ex != null) {
if (!isCancel.get() && ex != null) {
future.complete(new TaskExecutionState(taskGroupLocation, ExecutionState.FAILED, ex));
blockingFutures.forEach(f -> f.cancel(true));
cancelAllTask();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ private void handleCoordinatorError(String message, Throwable e, CheckpointFailu
}

private void handleCoordinatorError(CheckpointFailureReason reason) {
cleanPendingCheckpoint(reason);
checkpointManager.handleCheckpointError(pipelineId, new CheckpointException(reason));
}

Expand Down Expand Up @@ -358,7 +359,6 @@ private void startTriggerPendingCheckpoint(CompletableFuture<PendingCheckpoint>
// If any task is not acked within the checkpoint timeout
if (pendingCheckpoints.get(pendingCheckpoint.getCheckpointId()) != null && !pendingCheckpoint.isFullyAcknowledged()) {
if (tolerableFailureCheckpoints-- <= 0) {
cleanPendingCheckpoint(CheckpointFailureReason.CHECKPOINT_EXPIRED);
handleCoordinatorError(CheckpointFailureReason.CHECKPOINT_EXPIRED);
}
}
Expand Down Expand Up @@ -409,12 +409,8 @@ CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(long triggerTimest

private Set<Long> getNotYetAcknowledgedTasks() {
// TODO: some tasks have completed and don't need to be ack
Set<Long> set = new CopyOnWriteArraySet<>();
Set<Long> threadUnsafe = plan.getPipelineSubtasks()
.stream().map(TaskLocation::getTaskID)
.collect(Collectors.toSet());
set.addAll(threadUnsafe);
return set;
return plan.getPipelineSubtasks()
.stream().map(TaskLocation::getTaskID).collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}

private Map<Long, ActionState> getActionStates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private CheckpointCoordinator getCheckpointCoordinator(int pipelineId) {
*/
public void reportedTask(TaskReportStatusOperation reportStatusOperation) {
// task address may change during restore.
log.debug("reported task({}) status{}", reportStatusOperation.getLocation().getTaskID(),
log.debug("reported task({}) status {}", reportStatusOperation.getLocation().getTaskID(),
reportStatusOperation.getStatus());
getCheckpointCoordinator(reportStatusOperation.getLocation()).reportedTask(reportStatusOperation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import lombok.NonNull;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,14 +69,10 @@ public class PhysicalPlan {
*/
private CompletableFuture<JobStatus> jobEndFuture;

private final ExecutorService executorService;

private final String jobFullName;

private final long jobId;

private final Map<Integer, CompletableFuture> pipelineSchedulerFutureMap;

private JobMaster jobMaster;

/**
Expand All @@ -97,7 +91,6 @@ public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
long initializationTimestamp,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap) {
this.executorService = executorService;
this.jobImmutableInformation = jobImmutableInformation;
this.jobId = jobImmutableInformation.getJobId();
Long[] stateTimestamps = new Long[JobStatus.values().length];
Expand All @@ -123,7 +116,6 @@ public PhysicalPlan(@NonNull List<SubPlan> pipelineList,
this.jobFullName = String.format("Job %s (%s)", jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId());

pipelineSchedulerFutureMap = new ConcurrentHashMap<>(pipelineList.size());
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
}
Expand Down Expand Up @@ -220,12 +212,8 @@ public void cancelJob() {
}

private void cancelJobPipelines() {
List<CompletableFuture<Void>> collect = pipelineList.stream().map(pipeline -> {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
pipeline.cancelPipeline();
});
return future;
}).filter(x -> x != null).collect(Collectors.toList());
List<CompletableFuture<Void>> collect = pipelineList.stream()
.map(pipeline -> CompletableFuture.runAsync(pipeline::cancelPipeline)).collect(Collectors.toList());

try {
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(
Expand Down Expand Up @@ -309,10 +297,6 @@ public boolean updateJobState(@NonNull JobStatus current, @NonNull JobStatus tar
}
}

public PassiveCompletableFuture<JobStatus> getJobEndCompletableFuture() {
return new PassiveCompletableFuture<>(jobEndFuture);
}

public JobImmutableInformation getJobImmutableInformation() {
return jobImmutableInformation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import lombok.NonNull;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,15 +46,11 @@ public class SubPlan {

private final int pipelineId;

private final int totalPipelineNum;
private final AtomicInteger finishedTaskNum = new AtomicInteger(0);

private final JobImmutableInformation jobImmutableInformation;
private final AtomicInteger canceledTaskNum = new AtomicInteger(0);

private AtomicInteger finishedTaskNum = new AtomicInteger(0);

private AtomicInteger canceledTaskNum = new AtomicInteger(0);

private AtomicInteger failedTaskNum = new AtomicInteger(0);
private final AtomicInteger failedTaskNum = new AtomicInteger(0);

private final String pipelineFullName;

Expand Down Expand Up @@ -95,7 +92,6 @@ public SubPlan(int pipelineId,
this.pipelineId = pipelineId;
this.pipelineLocation = new PipelineLocation(jobImmutableInformation.getJobId(), pipelineId);
this.pipelineFuture = new CompletableFuture<>();
this.totalPipelineNum = totalPipelineNum;
this.physicalVertexList = physicalVertexList;
this.coordinatorVertexList = coordinatorVertexList;
pipelineRestoreNum = 0;
Expand All @@ -115,7 +111,6 @@ public SubPlan(int pipelineId,
runningJobStateIMap.put(pipelineLocation, PipelineStatus.CREATED);
}

this.jobImmutableInformation = jobImmutableInformation;
this.pipelineFullName = String.format(
"Job %s (%s), Pipeline: [(%d/%d)]",
jobImmutableInformation.getJobConfig().getName(),
Expand All @@ -129,11 +124,11 @@ public SubPlan(int pipelineId,
}

public PassiveCompletableFuture<PipelineStatus> initStateFuture() {
physicalVertexList.stream().forEach(physicalVertex -> {
physicalVertexList.forEach(physicalVertex -> {
addPhysicalVertexCallBack(physicalVertex.initStateFuture());
});

coordinatorVertexList.stream().forEach(coordinator -> {
coordinatorVertexList.forEach(coordinator -> {
addPhysicalVertexCallBack(coordinator.initStateFuture());
});

Expand Down Expand Up @@ -250,19 +245,19 @@ public void cancelPipeline() {
}
// If an active Master Node done and another Master Node active, we can not know whether canceled pipeline
// complete. So we need cancel running pipeline again.
if (!PipelineStatus.CANCELING.equals((PipelineStatus) runningJobStateIMap.get(pipelineLocation))) {
if (!PipelineStatus.CANCELING.equals(runningJobStateIMap.get(pipelineLocation))) {
updatePipelineState(getPipelineState(), PipelineStatus.CANCELING);
}
cancelPipelineTasks();
}

private void cancelPipelineTasks() {
List<CompletableFuture<Void>> coordinatorCancelList =
coordinatorVertexList.stream().map(coordinator -> cancelTask(coordinator)).filter(x -> x != null)
coordinatorVertexList.stream().map(this::cancelTask).filter(Objects::nonNull)
.collect(Collectors.toList());

List<CompletableFuture<Void>> taskCancelList =
physicalVertexList.stream().map(task -> cancelTask(task)).filter(x -> x != null)
physicalVertexList.stream().map(this::cancelTask).filter(Objects::nonNull)
.collect(Collectors.toList());

try {
Expand All @@ -279,11 +274,10 @@ private void cancelPipelineTasks() {
private CompletableFuture<Void> cancelTask(@NonNull PhysicalVertex task) {
if (!task.getExecutionState().isEndState() &&
!ExecutionState.CANCELING.equals(task.getExecutionState())) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return CompletableFuture.supplyAsync(() -> {
task.cancel();
return null;
}, executorService);
return future;
}
return null;
}
Expand All @@ -297,13 +291,9 @@ private void reset() {
canceledTaskNum.set(0);
failedTaskNum.set(0);

coordinatorVertexList.forEach(coordinate -> {
coordinate.reset();
});
coordinatorVertexList.forEach(PhysicalVertex::reset);

physicalVertexList.forEach(task -> {
task.reset();
});
physicalVertexList.forEach(PhysicalVertex::reset);
}

private void updateStateTimestamps(@NonNull PipelineStatus targetState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.checkpoint.interval = 5000
checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

Expand Down

0 comments on commit d90e4a4

Please sign in to comment.