diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index d5f3e3f6ec496..8781e301ddc17 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -18,12 +18,14 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} +import java.util.concurrent.atomic.AtomicLong import scala.collection.generic.Growable import scala.collection.mutable.Map import org.apache.spark.serializer.JavaSerializer + /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. @@ -237,12 +239,8 @@ private object Accumulators { // TODO: Use soft references? => need to make readObject work properly then val originals = Map[Long, Accumulable[_, _]]() val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]() - var lastId: Long = 0 - - def newId: Long = synchronized { - lastId += 1 - lastId - } + private val nextAccumID = new AtomicLong(0) + def newId(): Long = nextAccumID.getAndIncrement() def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { if (original) { @@ -270,11 +268,9 @@ private object Accumulators { } // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value - } + def add(value: (Long, Any)): Unit = synchronized { + if (originals.contains(value._1)) { + originals(value._1).asInstanceOf[Accumulable[Any, Any]] ++= value._2 } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ef3d24d746829..b203f52932eaf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,7 @@ import java.io.NotSerializableException import java.util.Properties import java.util.concurrent.atomic.AtomicInteger -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, ListBuffer} import scala.concurrent.duration._ import scala.reflect.ClassTag @@ -116,6 +116,10 @@ class DAGScheduler( private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf) + // stageId -> (splitId -> (acumulatorId, accumulatorValue)) + private[scheduler] val stageIdToAccumulators = new HashMap[Int, + HashMap[Int, ListBuffer[(Long, Any)]]] + taskScheduler.setDAGScheduler(this) /** @@ -344,6 +348,45 @@ class DAGScheduler( updateJobIdStageIdMapsList(List(stage)) } + private def removeStage(stageId: Int) { + // data structures based on Stage + for (stage <- stageIdToStage.get(stageId)) { + if (runningStages.contains(stage)) { + logDebug("Removing running stage %d".format(stageId)) + runningStages -= stage + } + stageToInfos -= stage + for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { + shuffleToMapStage.remove(k) + } + if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { + logDebug("Removing pending status for stage %d".format(stageId)) + } + pendingTasks -= stage + if (waitingStages.contains(stage)) { + logDebug("Removing stage %d from waiting set.".format(stageId)) + waitingStages -= stage + } + if (failedStages.contains(stage)) { + logDebug("Removing stage %d from failed set.".format(stageId)) + failedStages -= stage + } + } + // data structures based on StageId + stageIdToStage -= stageId + stageIdToJobIds -= stageId + // accumulate acc values, if the stage is aborted, its accumulators + // will not be calculated, since we have removed it in abortStage() + for (partitionIdToAccum <- stageIdToAccumulators.get(stageId); + accumulators <- partitionIdToAccum.values; + accum <- accumulators) { + Accumulators.add(accum) + } + stageIdToAccumulators -= stageId + logDebug("After removal of stage %d, remaining stages = %d" + .format(stageId, stageIdToStage.size)) + } + /** * Removes job and any stages that are not needed by any other job. Returns the set of ids for * stages that were removed. The associated tasks for those stages need to be cancelled if we @@ -362,38 +405,6 @@ class DAGScheduler( "Job %d not registered for stage %d even though that stage was registered for the job" .format(jobId, stageId)) } else { - def removeStage(stageId: Int) { - // data structures based on Stage - for (stage <- stageIdToStage.get(stageId)) { - if (runningStages.contains(stage)) { - logDebug("Removing running stage %d".format(stageId)) - runningStages -= stage - } - stageToInfos -= stage - for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) { - shuffleToMapStage.remove(k) - } - if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) { - logDebug("Removing pending status for stage %d".format(stageId)) - } - pendingTasks -= stage - if (waitingStages.contains(stage)) { - logDebug("Removing stage %d from waiting set.".format(stageId)) - waitingStages -= stage - } - if (failedStages.contains(stage)) { - logDebug("Removing stage %d from failed set.".format(stageId)) - failedStages -= stage - } - } - // data structures based on StageId - stageIdToStage -= stageId - stageIdToJobIds -= stageId - - logDebug("After removal of stage %d, remaining stages = %d" - .format(stageId, stageIdToStage.size)) - } - jobSet -= jobId if (jobSet.isEmpty) { // no other job needs this stage independentStages += stageId @@ -407,7 +418,7 @@ class DAGScheduler( private def jobIdToStageIdsRemove(jobId: Int) { if (!jobIdToStageIds.contains(jobId)) { - logDebug("Trying to remove unregistered job " + jobId) + logWarning("Trying to remove unregistered job " + jobId) } else { removeJobAndIndependentStages(jobId) jobIdToStageIds -= jobId @@ -787,6 +798,25 @@ class DAGScheduler( } } + /** + * detect the duplicate accumulator value and save the accumulator values + * @param accumValue the accumulator values received from the task + * @param stage the stage which the task belongs to + * @param task the completed task + */ + private def saveAccumulatorValue(accumValue: Map[Long, Any], stage: Stage, task: Task[_]) { + if (accumValue != null && + (!stageIdToAccumulators.contains(stage.id) || + !stageIdToAccumulators(stage.id).contains(task.partitionId))) { + val accum = stageIdToAccumulators.getOrElseUpdate(stage.id, + new HashMap[Int, ListBuffer[(Long, Any)]]). + getOrElseUpdate(task.partitionId, new ListBuffer[(Long, Any)]) + for ((id, value) <- accumValue) { + accum += id -> value + } + } + } + /** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. @@ -813,9 +843,7 @@ class DAGScheduler( event.reason match { case Success => logInfo("Completed " + task) - if (event.accumUpdates != null) { - Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted - } + saveAccumulatorValue(event.accumUpdates, stage, task) pendingTasks(stage) -= task task match { case rt: ResultTask[_, _] => @@ -924,6 +952,7 @@ class DAGScheduler( } failedStages += failedStage failedStages += mapStage + stageIdToAccumulators -= failedStage.id // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch)) @@ -1000,7 +1029,6 @@ class DAGScheduler( */ private def abortStage(failedStage: Stage, reason: String) { if (!stageIdToStage.contains(failedStage.id)) { - // Skip all the actions if the stage has been removed. return } val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq @@ -1009,6 +1037,9 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job aborted: " + reason) job.listener.jobFailed(error) + // remove stageIdToAccumulators(id) ensuring that the aborted stage + // accumulator is not calculated in jobIdToStageIdsRemove + stageIdToAccumulators -= resultStage.id jobIdToStageIdsRemove(job.jobId) jobIdToActiveJob -= resultStage.jobId activeJobs -= job diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 86d2050a03f18..bc53f6354606f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -568,6 +568,10 @@ private[spark] class TaskSetManager( failureReason = "Lost result for TID %s on host %s".format(tid, info.host) logWarning(failureReason) + case ExecutorLostFailure => + failureReason = "Executor %s lost for TID %s".format(info.executorId, tid) + logWarning(failureReason) + case _ => failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index ce567b0cde85d..841a51e2ef84d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -169,7 +169,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(taskSet.tasks.size >= results.size) for ((result, i) <- results.zipWithIndex) { if (i < taskSet.tasks.size) { - runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, Map[Long, Any](), null, null)) + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) + } + } + } + + private def completeWithAccumulator(accumId: Long, + taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]) { + assert(taskSet.tasks.size >= results.size) + for ((result, i) <- results.zipWithIndex) { + if (i < taskSet.tasks.size) { + runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, + Map[Long, Any]((accumId, 1)), null, null)) } } } @@ -305,17 +316,16 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runEvent(ExecutorLost("exec-hostA")) val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) - val noAccum = Map[Long, Any]() val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, null, null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, null, null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, null, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -407,6 +417,45 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assertDataStructuresEmpty } + test("accumulator is not calculated for resubmitted stage") { + //just for register + val accum = new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val shuffleOneRdd = makeRdd(2, Nil) + val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) + val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) + val finalRdd = makeRdd(1, List(shuffleDepTwo)) + submit(finalRdd, Array(0)) + // have the first stage complete normally + completeWithAccumulator(accum.id, taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + // have the second stage complete normally + completeWithAccumulator(accum.id, taskSets(1), Seq( + (Success, makeMapStatus("hostA", 1)), + (Success, makeMapStatus("hostC", 1)))) + // fail the third stage because hostA went down + completeWithAccumulator(accum.id, taskSets(2), Seq( + (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) + scheduler.resubmitFailedStages() + completeWithAccumulator(accum.id, taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) + completeWithAccumulator(accum.id, taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + completeWithAccumulator(accum.id, taskSets(5), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assert(Accumulators.originals(accum.id).value === 5) + assertDataStructuresEmpty + } + + test("accumulator is cleared for aborted stages") { + //just for register + new Accumulator[Int](0, SparkContext.IntAccumulatorParam) + val rdd = makeRdd(2, Nil) + submit(rdd, Array(0)) + failed(taskSets(0), "tastset failed") + assertDataStructuresEmpty + } + + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. @@ -437,5 +486,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont assert(scheduler.runningStages.isEmpty) assert(scheduler.shuffleToMapStage.isEmpty) assert(scheduler.waitingStages.isEmpty) + assert(scheduler.stageIdToAccumulators.isEmpty) } }