From 36759aa41a20d7a984457be77f811ca013abf0ac Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Mar 2014 14:26:36 -0400 Subject: [PATCH 01/18] add test case --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ce567b0cde85d..f5ab986abb860 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -375,6 +375,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("accmulator is calculated for only once when the rdd is recomputed") { + + } + test("cached post-shuffle") { val shuffleOneRdd = makeRdd(2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) From 47667c7cb4e4bae552d3477d8472bf3b16d33571 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Mar 2014 16:22:10 -0400 Subject: [PATCH 02/18] refactor accumulator id to an atomic long --- .../src/main/scala/org/apache/spark/Accumulators.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index d5f3e3f6ec496..c43567647d005 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -18,6 +18,7 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} +import java.util.concurrent.atomic.AtomicLong import scala.collection.generic.Growable import scala.collection.mutable.Map @@ -43,7 +44,7 @@ class Accumulable[R, T] ( param: AccumulableParam[R, T]) extends Serializable { - val id = Accumulators.newId + val id = Accumulators.nextAccumID.getAndIncrement @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false @@ -237,12 +238,7 @@ private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() - var lastId: Long = 0 - - def newId: Long = synchronized { - lastId += 1 - lastId - } + var nextAccumID = new AtomicLong(0) def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { if (original) { From ffc7eafb796b6bb7eb99b948bbabc41e2d201fd1 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 23 Mar 2014 19:00:53 -0400 Subject: [PATCH 03/18] reduplicate accumulator operation for resubmitted tasks --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 9 +++++++-- .../src/main/scala/org/apache/spark/scheduler/Task.scala | 3 +++ .../org/apache/spark/scheduler/TaskSetManager.scala | 5 +++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ef3d24d746829..b1dcb252dd7a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -116,6 +116,8 @@ class DAGScheduler( private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf) + private val stageIdToFinishedTasks = new HashMap[Int, HashSet[Long]] + taskScheduler.setDAGScheduler(this) /** @@ -808,14 +810,17 @@ class DAGScheduler( logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) + stageIdToFinishedTasks -= stage.id runningStages -= stage } event.reason match { case Success => logInfo("Completed " + task) - if (event.accumUpdates != null) { - Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted + if (stageIdToFinishedTasks.contains(stage.id) && + stageIdToFinishedTasks(stage.id).contains(task.tid)) { + Accumulators.add(event.accumUpdates) } + stageIdToFinishedTasks.getOrElseUpdate(stage.id, new HashSet[Long]) += task.tid pendingTasks(stage) -= task task match { case rt: ResultTask[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index b85b4a50cd93a..6e81cdeebc393 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -56,6 +56,9 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex def preferredLocations: Seq[TaskLocation] = Nil + + var tid = 0L + // Map output tracker epoch. Will be set by TaskScheduler. var epoch: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 86d2050a03f18..4fce4652eed5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -397,6 +397,7 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() + task.tid = taskId // Figure out whether this should count as a preferred launch logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( taskSet.id, index, taskId, execId, host, taskLocality)) @@ -568,6 +569,10 @@ private[spark] class TaskSetManager( failureReason = "Lost result for TID %s on host %s".format(tid, info.host) logWarning(failureReason) + case ExecutorLostFailure => + failureReason = "Executor %s lost for TID %s".format(info.executorId, tid) + logWarning(failureReason) + case _ => failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) } From 9d6237cdc7f8e95373a7b49231cdc7fd2cc78a51 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Mar 2014 00:29:04 -0400 Subject: [PATCH 04/18] prevent recalculating accumulator when the stage is resubmitted --- .../scala/org/apache/spark/Accumulators.scala | 10 ++++- .../apache/spark/scheduler/DAGScheduler.scala | 39 ++++++++++++------- .../org/apache/spark/scheduler/Stage.scala | 8 ++++ .../org/apache/spark/scheduler/Task.scala | 3 -- .../spark/scheduler/TaskSetManager.scala | 1 - .../spark/scheduler/DAGSchedulerSuite.scala | 39 ++++++++++++++++++- 6 files changed, 80 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index c43567647d005..312d41c774a5b 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -44,7 +44,9 @@ class Accumulable[R, T] ( param: AccumulableParam[R, T]) extends Serializable { - val id = Accumulators.nextAccumID.getAndIncrement + val id = Accumulators.nextAccumID.get() + Accumulators.nextAccumID.getAndIncrement + @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false @@ -273,4 +275,10 @@ private object Accumulators { } } } + + def add(value: (Long, Any)): Unit = synchronized { + if (originals.contains(value._1)) { + originals(value._1).asInstanceOf[Accumulable[Any, Any]] += value._2 + } + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b1dcb252dd7a9..06c72f9891194 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, ListBuffer} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -33,6 +33,8 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import scala.collection.mutable + /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of @@ -116,7 +118,7 @@ class DAGScheduler( private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf) - private val stageIdToFinishedTasks = new HashMap[Int, HashSet[Long]] + val stageIdToAccumulators = new HashMap[Int, ListBuffer[(Long, Any)]] taskScheduler.setDAGScheduler(this) @@ -391,7 +393,6 @@ class DAGScheduler( // data structures based on StageId stageIdToStage -= stageId stageIdToJobIds -= stageId - logDebug("After removal of stage %d, remaining stages = %d" .format(stageId, stageIdToStage.size)) } @@ -407,12 +408,14 @@ class DAGScheduler( independentStages.toSet } - private def jobIdToStageIdsRemove(jobId: Int) { + private def jobIdToStageIdsRemove(jobId: Int): Set[Int] = { if (!jobIdToStageIds.contains(jobId)) { - logDebug("Trying to remove unregistered job " + jobId) + logWarning("Trying to remove unregistered job " + jobId) + return null } else { - removeJobAndIndependentStages(jobId) + val removedStages = removeJobAndIndependentStages(jobId) jobIdToStageIds -= jobId + return removedStages } } @@ -810,17 +813,18 @@ class DAGScheduler( logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) - stageIdToFinishedTasks -= stage.id runningStages -= stage } event.reason match { case Success => logInfo("Completed " + task) - if (stageIdToFinishedTasks.contains(stage.id) && - stageIdToFinishedTasks(stage.id).contains(task.tid)) { - Accumulators.add(event.accumUpdates) + if (!stageIdToAccumulators.contains(stage.id) || + stageIdToAccumulators(stage.id).size < stage.numPartitions) { + stageIdToAccumulators.getOrElseUpdate(stage.id, new ListBuffer[(Long, Any)]) + for ((id, value) <- event.accumUpdates) { + stageIdToAccumulators(stage.id) += id -> value + } } - stageIdToFinishedTasks.getOrElseUpdate(stage.id, new HashSet[Long]) += task.tid pendingTasks(stage) -= task task match { case rt: ResultTask[_, _] => @@ -835,7 +839,13 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - jobIdToStageIdsRemove(job.jobId) + jobIdToStageIdsRemove(job.jobId).foreach(stageId => { + //accumulator operations + for (accumValues <- stageIdToAccumulators(stageId)) { + Accumulators.add(accumValues._1, accumValues._2) + } + stageIdToAccumulators -= stageId + }) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) @@ -907,7 +917,6 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) - runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") @@ -929,6 +938,10 @@ class DAGScheduler( } failedStages += failedStage failedStages += mapStage + if (runningStages.contains(failedStage)) { + stageIdToAccumulators -= failedStage.id + } + runningStages -= failedStage // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 5c1fc30e4a557..fccc1677ff525 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,6 +100,14 @@ private[spark] class Stage( id } + def attempId = { + if (nextAttemptId == 0) { + throw new SparkException("You haven't call newAttempId " + + "before you access the attempId of the stage for the first time") + } + nextAttemptId - 1 + } + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 6e81cdeebc393..b85b4a50cd93a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -56,9 +56,6 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex def preferredLocations: Seq[TaskLocation] = Nil - - var tid = 0L - // Map output tracker epoch. Will be set by TaskScheduler. var epoch: Long = -1 diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4fce4652eed5e..bc53f6354606f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -397,7 +397,6 @@ private[spark] class TaskSetManager( // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() - task.tid = taskId // Figure out whether this should count as a preferred launch logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( taskSet.id, index, taskId, execId, host, taskLocality)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f5ab986abb860..36de49c1eee7f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -174,6 +174,15 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } } + private def completeWithAccumulator(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any]((0, 1)), null, null)) + } + } + } + /** Sends the rdd to the scheduler for scheduling. */ private def submit( rdd: RDD[_], @@ -375,8 +384,33 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } - test("accmulator is calculated for only once when the rdd is recomputed") { - + test("accumulator is not calculated for resubmitted tasks") { + //just for register + new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + submit(finalRdd, Array(0)) + // have the first stage complete normally + completeWithAccumulator(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + // have the second stage complete normally + completeWithAccumulator(taskSets(1), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)))) + // fail the third stage because hostA went down + completeWithAccumulator(taskSets(2), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) + scheduler.resubmitFailedStages() + completeWithAccumulator(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) + completeWithAccumulator(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + completeWithAccumulator(taskSets(5), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(Accumulators.originals(0).value === 5) + assertDataStructuresEmpty } test("cached post-shuffle") { @@ -441,5 +475,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.runningStages.isEmpty) assert(scheduler.shuffleToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) + assert(scheduler.stageIdToAccumulators.isEmpty) } } From 02da695bcdf24db2a0049646e15a56b8eb5f2121 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Mar 2014 00:31:16 -0400 Subject: [PATCH 05/18] adjust test cases order --- .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 36de49c1eee7f..f2f11a8ee4d14 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -384,35 +384,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } - test("accumulator is not calculated for resubmitted tasks") { - //just for register - new Accumulator[Int](0, SparkContext.IntAccumulatorParam) - val shuffleOneRdd = makeRdd(2, Nil) - val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) - val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) - val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) - val finalRdd = makeRdd(1, List(shuffleDepTwo)) - submit(finalRdd, Array(0)) - // have the first stage complete normally - completeWithAccumulator(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 2)), - (Success, makeMapStatus("hostB", 2)))) - // have the second stage complete normally - completeWithAccumulator(taskSets(1), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostC", 1)))) - // fail the third stage because hostA went down - completeWithAccumulator(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) - scheduler.resubmitFailedStages() - completeWithAccumulator(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) - completeWithAccumulator(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) - completeWithAccumulator(taskSets(5), Seq((Success, 42))) - assert(results === Map(0 -> 42)) - assert(Accumulators.originals(0).value === 5) - assertDataStructuresEmpty - } - test("cached post-shuffle") { val shuffleOneRdd = makeRdd(2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) @@ -445,6 +416,35 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("accumulator is not calculated for resubmitted stage") { + //just for register + new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + submit(finalRdd, Array(0)) + // have the first stage complete normally + completeWithAccumulator(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + // have the second stage complete normally + completeWithAccumulator(taskSets(1), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)))) + // fail the third stage because hostA went down + completeWithAccumulator(taskSets(2), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) + scheduler.resubmitFailedStages() + completeWithAccumulator(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) + completeWithAccumulator(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + completeWithAccumulator(taskSets(5), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(Accumulators.originals(0).value === 5) + assertDataStructuresEmpty + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From c7149975717cac238e1f9558ae9f2cef3045a977 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Mon, 24 Mar 2014 10:24:54 -0400 Subject: [PATCH 06/18] fix Accumulator comment --- core/src/main/scala/org/apache/spark/Accumulators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 312d41c774a5b..c93c5e6b63c3d 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -47,7 +47,7 @@ class Accumulable[R, T] ( val id = Accumulators.nextAccumID.get() Accumulators.nextAccumID.getAndIncrement - @transient private var value_ = initialValue // Current value on master + @transient private var value_ = initialValue // Current value on driver val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false From e4cd73b2193a676d85711c98633afc82fced7463 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 25 Mar 2014 14:53:36 -0400 Subject: [PATCH 07/18] remove unused attempId --- .../src/main/scala/org/apache/spark/scheduler/Stage.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index fccc1677ff525..5c1fc30e4a557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,14 +100,6 @@ private[spark] class Stage( id } - def attempId = { - if (nextAttemptId == 0) { - throw new SparkException("You haven't call newAttempId " + - "before you access the attempId of the stage for the first time") - } - nextAttemptId - 1 - } - val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id From 5f22c3d72d6e518af07d8298ad366dd8b10671ed Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 25 Mar 2014 19:39:27 -0400 Subject: [PATCH 08/18] fix concurrency issue problem in Accumulator --- .../scala/org/apache/spark/Accumulators.scala | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index c93c5e6b63c3d..364ad9a6d884d 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -44,10 +44,9 @@ class Accumulable[R, T] ( param: AccumulableParam[R, T]) extends Serializable { - val id = Accumulators.nextAccumID.get() - Accumulators.nextAccumID.getAndIncrement + val id = Accumulators.newId + @transient private var value_ = initialValue // Current value on master - @transient private var value_ = initialValue // Current value on driver val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false @@ -240,7 +239,8 @@ private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() - var nextAccumID = new AtomicLong(0) + private val nextAccumID = new AtomicLong(0) + def newId(): Long = nextAccumID.getAndIncrement() def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { if (original) { @@ -268,14 +268,6 @@ private object Accumulators { } // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value - } - } - } - def add(value: (Long, Any)): Unit = synchronized { if (originals.contains(value._1)) { originals(value._1).asInstanceOf[Accumulable[Any, Any]] += value._2 From 460650e751f1b76249efec711ddc9765be899145 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 25 Mar 2014 19:39:48 -0400 Subject: [PATCH 09/18] consider speculative case in DAGScheduler --- .../apache/spark/scheduler/DAGScheduler.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 06c72f9891194..f6d6298835a62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -33,7 +33,6 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} -import scala.collection.mutable /** @@ -118,7 +117,8 @@ class DAGScheduler( private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf) - val stageIdToAccumulators = new HashMap[Int, ListBuffer[(Long, Any)]] + // stageId -> (splitId -> (acumulatorId, accumulatorValue)) + val stageIdToAccumulators = new HashMap[Int, HashMap[Int, ListBuffer[(Long, Any)]]] taskScheduler.setDAGScheduler(this) @@ -411,7 +411,7 @@ class DAGScheduler( private def jobIdToStageIdsRemove(jobId: Int): Set[Int] = { if (!jobIdToStageIds.contains(jobId)) { logWarning("Trying to remove unregistered job " + jobId) - return null + return Set[Int]() } else { val removedStages = removeJobAndIndependentStages(jobId) jobIdToStageIds -= jobId @@ -818,11 +818,14 @@ class DAGScheduler( event.reason match { case Success => logInfo("Completed " + task) - if (!stageIdToAccumulators.contains(stage.id) || - stageIdToAccumulators(stage.id).size < stage.numPartitions) { - stageIdToAccumulators.getOrElseUpdate(stage.id, new ListBuffer[(Long, Any)]) + if (event.accumUpdates != null && + (!stageIdToAccumulators.contains(stage.id) || + !stageIdToAccumulators(stage.id).contains(task.partitionId))) { + stageIdToAccumulators.getOrElseUpdate(stage.id, + new HashMap[Int, ListBuffer[(Long, Any)]]). + getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)]) for ((id, value) <- event.accumUpdates) { - stageIdToAccumulators(stage.id) += id -> value + stageIdToAccumulators(stage.id)(task.partitionId) += id -> value } } pendingTasks(stage) -= task @@ -839,12 +842,12 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - jobIdToStageIdsRemove(job.jobId).foreach(stageId => { - //accumulator operations - for (accumValues <- stageIdToAccumulators(stageId)) { - Accumulators.add(accumValues._1, accumValues._2) + jobIdToStageIdsRemove(job.jobId).foreach(sid => { + for (partitionIdToAccum <- stageIdToAccumulators(sid); + accum <- partitionIdToAccum._2) { + Accumulators.add(accum) } - stageIdToAccumulators -= stageId + stageIdToAccumulators -= sid }) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } From a92dc5814eecad979dc85d12f987c12b9ae9a04f Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 25 Mar 2014 19:54:24 -0400 Subject: [PATCH 10/18] build cleanup functions --- .../scala/org/apache/spark/Accumulators.scala | 1 - .../apache/spark/scheduler/DAGScheduler.scala | 20 +++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 364ad9a6d884d..c3748491304a1 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -46,7 +46,6 @@ class Accumulable[R, T] ( val id = Accumulators.newId @transient private var value_ = initialValue // Current value on master - val zero = param.zero(initialValue) // Zero value to be passed to workers private var deserialized = false diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6d6298835a62..152325f8b7d5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -34,7 +34,6 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} - /** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a @@ -842,13 +841,8 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - jobIdToStageIdsRemove(job.jobId).foreach(sid => { - for (partitionIdToAccum <- stageIdToAccumulators(sid); - accum <- partitionIdToAccum._2) { - Accumulators.add(accum) - } - stageIdToAccumulators -= sid - }) + val stagesToRemove = jobIdToStageIdsRemove(job.jobId) + cleanup(stagesToRemove.asInstanceOf[Set[Any]]) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) @@ -1120,6 +1114,16 @@ class DAGScheduler( } } + private def cleanup(keys: Set[Any]) { + keys.asInstanceOf[Set[Int]].foreach(sid => { + for (partitionIdToAccum <- stageIdToAccumulators(sid); + accum <- partitionIdToAccum._2) { + Accumulators.add(accum) + } + stageIdToAccumulators -= sid + }) + } + def stop() { if (eventProcessActor != null) { eventProcessActor ! StopDAGScheduler From 9732fc4aad67197e2c1d32414a7936273dbda63d Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 25 Mar 2014 20:02:42 -0400 Subject: [PATCH 11/18] remove redundant checking --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 152325f8b7d5f..dc2e893684f3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -914,6 +914,7 @@ class DAGScheduler( case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) + runningStages -= failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") @@ -935,10 +936,7 @@ class DAGScheduler( } failedStages += failedStage failedStages += mapStage - if (runningStages.contains(failedStage)) { - stageIdToAccumulators -= failedStage.id - } - runningStages -= failedStage + stageIdToAccumulators -= failedStage.id // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) From e8dcecb2d57bf0ed46f72e1e014dc3681dc70e80 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 29 Mar 2014 00:31:46 -0400 Subject: [PATCH 12/18] consider aborted stages --- .../apache/spark/scheduler/DAGScheduler.scala | 116 ++++++++++-------- .../spark/scheduler/DAGSchedulerSuite.scala | 21 +++- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc2e893684f3b..11d897974eb60 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -347,6 +347,45 @@ class DAGScheduler( updateJobIdStageIdMapsList(List(stage)) } + private def removeStage(stageId: Int) { + // data structures based on Stage + for (stage <- stageIdToStage.get(stageId)) { + if (runningStages.contains(stage)) { + logDebug("Removing running stage %d".format(stageId)) + runningStages -= stage + } + stageToInfos -= stage + for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { + shuffleToMapStage.remove(k) + } + if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { + logDebug("Removing pending status for stage %d".format(stageId)) + } + pendingTasks -= stage + if (waitingStages.contains(stage)) { + logDebug("Removing stage %d from waiting set.".format(stageId)) + waitingStages -= stage + } + if (failedStages.contains(stage)) { + logDebug("Removing stage %d from failed set.".format(stageId)) + failedStages -= stage + } + } + // data structures based on StageId + stageIdToStage -= stageId + stageIdToJobIds -= stageId + // accumulate acc values, if the stage is aborted, its accumulators + // will not be calculated, since we have removed it in aborteStage() + for (partitionIdToAccum <- stageIdToAccumulators.get(stageId); + accumulators <- partitionIdToAccum.values; + accum <- accumulators) { + Accumulators.add(accum) + } + stageIdToAccumulators -= stageId + logDebug("After removal of stage %d, remaining stages = %d" + .format(stageId, stageIdToStage.size)) + } + /** * Removes job and any stages that are not needed by any other job. Returns the set of ids for * stages that were removed. The associated tasks for those stages need to be cancelled if we @@ -365,37 +404,6 @@ class DAGScheduler( "Job %d not registered for stage %d even though that stage was registered for the job" .format(jobId, stageId)) } else { - def removeStage(stageId: Int) { - // data structures based on Stage - for (stage <- stageIdToStage.get(stageId)) { - if (runningStages.contains(stage)) { - logDebug("Removing running stage %d".format(stageId)) - runningStages -= stage - } - stageToInfos -= stage - for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { - shuffleToMapStage.remove(k) - } - if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { - logDebug("Removing pending status for stage %d".format(stageId)) - } - pendingTasks -= stage - if (waitingStages.contains(stage)) { - logDebug("Removing stage %d from waiting set.".format(stageId)) - waitingStages -= stage - } - if (failedStages.contains(stage)) { - logDebug("Removing stage %d from failed set.".format(stageId)) - failedStages -= stage - } - } - // data structures based on StageId - stageIdToStage -= stageId - stageIdToJobIds -= stageId - logDebug("After removal of stage %d, remaining stages = %d" - .format(stageId, stageIdToStage.size)) - } - jobSet -= jobId if (jobSet.isEmpty) { // no other job needs this stage independentStages += stageId @@ -407,14 +415,12 @@ class DAGScheduler( independentStages.toSet } - private def jobIdToStageIdsRemove(jobId: Int): Set[Int] = { + private def jobIdToStageIdsRemove(jobId: Int) { if (!jobIdToStageIds.contains(jobId)) { logWarning("Trying to remove unregistered job " + jobId) - return Set[Int]() } else { - val removedStages = removeJobAndIndependentStages(jobId) + removeJobAndIndependentStages(jobId) jobIdToStageIds -= jobId - return removedStages } } @@ -791,6 +797,25 @@ class DAGScheduler( } } + /** + * detect the duplicate accumulator value and save the accumulator values + * @param accumValue the accumulator values received from the task + * @param stage the stage which the task belongs to + * @param task the completed task + */ + private def saveAccumulatorValue(accumValue: Map[Long, Any], stage: Stage, task: Task[_]) { + if (accumValue != null && + (!stageIdToAccumulators.contains(stage.id) || + !stageIdToAccumulators(stage.id).contains(task.partitionId))) { + stageIdToAccumulators.getOrElseUpdate(stage.id, + new HashMap[Int, ListBuffer[(Long, Any)]]). + getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)]) + for ((id, value) <- accumValue) { + stageIdToAccumulators(stage.id)(task.partitionId) += id -> value + } + } + } + /** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. @@ -817,16 +842,7 @@ class DAGScheduler( event.reason match { case Success => logInfo("Completed " + task) - if (event.accumUpdates != null && - (!stageIdToAccumulators.contains(stage.id) || - !stageIdToAccumulators(stage.id).contains(task.partitionId))) { - stageIdToAccumulators.getOrElseUpdate(stage.id, - new HashMap[Int, ListBuffer[(Long, Any)]]). - getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)]) - for ((id, value) <- event.accumUpdates) { - stageIdToAccumulators(stage.id)(task.partitionId) += id -> value - } - } + saveAccumulatorValue(event.accumUpdates, stage, task) pendingTasks(stage) -= task task match { case rt: ResultTask[_, _] => @@ -841,8 +857,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - val stagesToRemove = jobIdToStageIdsRemove(job.jobId) - cleanup(stagesToRemove.asInstanceOf[Set[Any]]) + jobIdToStageIdsRemove(job.jobId) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) @@ -1013,7 +1028,6 @@ class DAGScheduler( */ private def abortStage(failedStage: Stage, reason: String) { if (!stageIdToStage.contains(failedStage.id)) { - // Skip all the actions if the stage has been removed. return } val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq @@ -1022,6 +1036,7 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job aborted: " + reason) job.listener.jobFailed(error) + stageIdToAccumulators -= resultStage.id jobIdToStageIdsRemove(job.jobId) jobIdToActiveJob -= resultStage.jobId activeJobs -= job @@ -1114,8 +1129,9 @@ class DAGScheduler( private def cleanup(keys: Set[Any]) { keys.asInstanceOf[Set[Int]].foreach(sid => { - for (partitionIdToAccum <- stageIdToAccumulators(sid); - accum <- partitionIdToAccum._2) { + for (partitionIdToAccum <- stageIdToAccumulators.get(sid); + accumulators <- partitionIdToAccum.values; + accum <- accumulators) { Accumulators.add(accum) } stageIdToAccumulators -= sid diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f2f11a8ee4d14..8efd588f47411 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -169,7 +169,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) } } } @@ -314,17 +314,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) - val noAccum = Map[Long, Any]() val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -445,6 +444,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("accumulator is cleared for aborted stages") { + //just for register + new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val rdd = makeRdd(2, Nil) + submit(rdd, Array(0)) + failed(taskSets(0), "tastset failed") + assertDataStructuresEmpty + } + + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. From f4fa2f061d9112435fa7527dbfbfa152b11de0ce Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 29 Mar 2014 00:45:55 -0400 Subject: [PATCH 13/18] remove unused cleanup --- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 11d897974eb60..8f619274e1e59 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1127,17 +1127,6 @@ class DAGScheduler( } } - private def cleanup(keys: Set[Any]) { - keys.asInstanceOf[Set[Int]].foreach(sid => { - for (partitionIdToAccum <- stageIdToAccumulators.get(sid); - accumulators <- partitionIdToAccum.values; - accum <- accumulators) { - Accumulators.add(accum) - } - stageIdToAccumulators -= sid - }) - } - def stop() { if (eventProcessActor != null) { eventProcessActor ! StopDAGScheduler From 1d33213f5b27f4b6c1515022c333001fb54c1418 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 29 Mar 2014 11:42:49 -0400 Subject: [PATCH 14/18] add comments in abortStage() --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8f619274e1e59..f6b0f24c30be4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1036,6 +1036,8 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job aborted: " + reason) job.listener.jobFailed(error) + // remove stageIdToAccumulators(id) ensuring that the aborted stage + // accumulator is not calculated in jobIdToStageIdsRemove stageIdToAccumulators -= resultStage.id jobIdToStageIdsRemove(job.jobId) jobIdToActiveJob -= resultStage.jobId From cf9f9085b7866ce85086d95ddee47b224ab3e882 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 29 Mar 2014 13:32:59 -0400 Subject: [PATCH 15/18] fix Accumulator add() bug --- core/src/main/scala/org/apache/spark/Accumulators.scala | 2 +- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index c3748491304a1..c9876da1e2360 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -269,7 +269,7 @@ private object Accumulators { // Add values to the original accumulators with some given IDs def add(value: (Long, Any)): Unit = synchronized { if (originals.contains(value._1)) { - originals(value._1).asInstanceOf[Accumulable[Any, Any]] += value._2 + originals(value._1).asInstanceOf[Accumulable[Any, Any]] ++= value._2 } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f6b0f24c30be4..91acd81eee64d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -375,7 +375,7 @@ class DAGScheduler( stageIdToStage -= stageId stageIdToJobIds -= stageId // accumulate acc values, if the stage is aborted, its accumulators - // will not be calculated, since we have removed it in aborteStage() + // will not be calculated, since we have removed it in abortStage() for (partitionIdToAccum <- stageIdToAccumulators.get(stageId); accumulators <- partitionIdToAccum.values; accum <- accumulators) { From 29a0464e6f0a1b5eb1aba71da1d586586adfc6f1 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 1 Apr 2014 08:59:55 -0400 Subject: [PATCH 16/18] avoid lookup by utilizing getOrElseUpdate return --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 91acd81eee64d..b203f52932eaf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -117,7 +117,8 @@ class DAGScheduler( new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf) // stageId -> (splitId -> (acumulatorId, accumulatorValue)) - val stageIdToAccumulators = new HashMap[Int, HashMap[Int, ListBuffer[(Long, Any)]]] + private[scheduler] val stageIdToAccumulators = new HashMap[Int, + HashMap[Int, ListBuffer[(Long, Any)]]] taskScheduler.setDAGScheduler(this) @@ -807,11 +808,11 @@ class DAGScheduler( if (accumValue != null && (!stageIdToAccumulators.contains(stage.id) || !stageIdToAccumulators(stage.id).contains(task.partitionId))) { - stageIdToAccumulators.getOrElseUpdate(stage.id, + val accum = stageIdToAccumulators.getOrElseUpdate(stage.id, new HashMap[Int, ListBuffer[(Long, Any)]]). getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)]) for ((id, value) <- accumValue) { - stageIdToAccumulators(stage.id)(task.partitionId) += id -> value + accum += id -> value } } } From 5ad739ef3d41750fbb12a2665c2cf1bc035730ef Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 1 Apr 2014 12:54:22 -0400 Subject: [PATCH 17/18] fix Accumulator issue --- core/src/main/scala/org/apache/spark/Accumulators.scala | 9 ++++++--- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index c9876da1e2360..9179cb90a0dc4 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,6 +24,7 @@ import scala.collection.generic.Growable import scala.collection.mutable.Map import org.apache.spark.serializer.JavaSerializer +import scala.collection.immutable /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, @@ -267,9 +268,11 @@ private object Accumulators { } // Add values to the original accumulators with some given IDs - def add(value: (Long, Any)): Unit = synchronized { - if (originals.contains(value._1)) { - originals(value._1).asInstanceOf[Accumulable[Any, Any]] ++= value._2 + def add(values: immutable.Map[Long, Any]): Unit = synchronized { + for ((id, value) <- values) { + if (originals.contains(id)) { + originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b203f52932eaf..03b22f9f3ea83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -378,9 +378,8 @@ class DAGScheduler( // accumulate acc values, if the stage is aborted, its accumulators // will not be calculated, since we have removed it in abortStage() for (partitionIdToAccum <- stageIdToAccumulators.get(stageId); - accumulators <- partitionIdToAccum.values; - accum <- accumulators) { - Accumulators.add(accum) + accumulators <- partitionIdToAccum.values) { + Accumulators.add(accumulators.toMap) } stageIdToAccumulators -= stageId logDebug("After removal of stage %d, remaining stages = %d" From 7883a13cbf8a9f9bd241d6ba883315e52f66a1bc Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 1 Apr 2014 16:12:03 -0400 Subject: [PATCH 18/18] fix test bug --- .../scala/org/apache/spark/Accumulators.scala | 10 ++++----- .../apache/spark/scheduler/DAGScheduler.scala | 5 +++-- .../spark/scheduler/DAGSchedulerSuite.scala | 22 ++++++++++--------- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 9179cb90a0dc4..8781e301ddc17 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,7 +24,7 @@ import scala.collection.generic.Growable import scala.collection.mutable.Map import org.apache.spark.serializer.JavaSerializer -import scala.collection.immutable + /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, @@ -268,11 +268,9 @@ private object Accumulators { } // Add values to the original accumulators with some given IDs - def add(values: immutable.Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value - } + def add(value: (Long, Any)): Unit = synchronized { + if (originals.contains(value._1)) { + originals(value._1).asInstanceOf[Accumulable[Any, Any]] ++= value._2 } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 03b22f9f3ea83..b203f52932eaf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -378,8 +378,9 @@ class DAGScheduler( // accumulate acc values, if the stage is aborted, its accumulators // will not be calculated, since we have removed it in abortStage() for (partitionIdToAccum <- stageIdToAccumulators.get(stageId); - accumulators <- partitionIdToAccum.values) { - Accumulators.add(accumulators.toMap) + accumulators <- partitionIdToAccum.values; + accum <- accumulators) { + Accumulators.add(accum) } stageIdToAccumulators -= stageId logDebug("After removal of stage %d, remaining stages = %d" diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8efd588f47411..841a51e2ef84d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -174,11 +174,13 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont } } - private def completeWithAccumulator(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + private def completeWithAccumulator(accumId: Long, + taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any]((0, 1)), null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, + Map[Long, Any]((accumId, 1)), null, null)) } } } @@ -417,7 +419,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont test("accumulator is not calculated for resubmitted stage") { //just for register - new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam) val shuffleOneRdd = makeRdd(2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) @@ -425,22 +427,22 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val finalRdd = makeRdd(1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) // have the first stage complete normally - completeWithAccumulator(taskSets(0), Seq( + completeWithAccumulator(accum.id, taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) // have the second stage complete normally - completeWithAccumulator(taskSets(1), Seq( + completeWithAccumulator(accum.id, taskSets(1), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostC", 1)))) // fail the third stage because hostA went down - completeWithAccumulator(taskSets(2), Seq( + completeWithAccumulator(accum.id, taskSets(2), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) scheduler.resubmitFailedStages() - completeWithAccumulator(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) - completeWithAccumulator(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) - completeWithAccumulator(taskSets(5), Seq((Success, 42))) + completeWithAccumulator(accum.id, taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) + completeWithAccumulator(accum.id, taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + completeWithAccumulator(accum.id, taskSets(5), Seq((Success, 42))) assert(results === Map(0 -> 42)) - assert(Accumulators.originals(0).value === 5) + assert(Accumulators.originals(accum.id).value === 5) assertDataStructuresEmpty }