From 85b0725134edaaa178dd125babd0724272935b20 Mon Sep 17 00:00:00 2001 From: seayi <405078363@qq.com> Date: Wed, 20 Apr 2016 17:00:34 +0800 Subject: [PATCH] Update DAGScheduler.scala when executor lost DagScheduer may submit one stage twice even if the first running taskset for this stage is not finished,because the finished's task from the failed stage should not remove from the pending partition list. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b805bde97e861..b5cf1bf13dc75 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1139,11 +1139,11 @@ class DAGScheduler( case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) - stage.pendingPartitions -= task.partitionId task match { case rt: ResultTask[_, _] => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage + stage.pendingPartitions -= task.partitionId val resultStage = stage.asInstanceOf[ResultStage] resultStage.activeJob match { case Some(job) => @@ -1182,6 +1182,7 @@ class DAGScheduler( if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") } else { + stage.pendingPartitions -= task.partitionId shuffleStage.addOutputLoc(smt.partitionId, status) }