Skip to content

Commit

Permalink
[AMORO-2279] Retry failed tasks when restarting AMS (#2280)
Browse files Browse the repository at this point in the history
  • Loading branch information
XBaith authored and wangtaohz committed Nov 13, 2023
1 parent 6f834ef commit 596574a
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,23 @@ private void initTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
schedulingPolicy.addTable(tableRuntime);
} else if (tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
TableOptimizingProcess process = new TableOptimizingProcess(tableRuntimeMeta);
process.getTaskMap().entrySet().stream()
.filter(
entry ->
entry.getValue().getStatus() == TaskRuntime.Status.SCHEDULED
|| entry.getValue().getStatus() == TaskRuntime.Status.ACKED)
.forEach(entry -> executingTaskMap.put(entry.getKey(), entry.getValue()));
process.getTaskMap().values().stream()
.filter(task -> task.getStatus() == TaskRuntime.Status.PLANNED)
.forEach(taskQueue::offer);
process
.getTaskMap()
.forEach(
(taskId, taskRuntime) -> {
switch (taskRuntime.getStatus()) {
case SCHEDULED:
case ACKED:
executingTaskMap.put(taskId, taskRuntime);
break;
case PLANNED:
taskQueue.offer(taskRuntime);
break;
case FAILED:
retryTask(taskRuntime, false);
break;
}
});
}
} else {
OptimizingProcess process = tableRuntime.getOptimizingProcess();
Expand Down

0 comments on commit 596574a

Please sign in to comment.