From 9a1724de0287b5ca41e30f3d3401fd721a2e1520 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 15 Mar 2016 11:21:09 +0900 Subject: [PATCH 01/16] Add a test to check if the stage graph is properly built. --- .../spark/scheduler/DAGSchedulerSuite.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 55f4190680dd..67377cb12129 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -321,6 +321,47 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } + /* + * <-------------------- + * / \ + * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] + * \ / + * <-------------------- + */ + test("parent stages") { + val rddA = new MyRDD(sc, 1, Nil) + + val shuffleDef1 = new ShuffleDependency(rddA, new HashPartitioner(1)) + val rddB = new MyRDD(sc, 1, List(shuffleDef1), tracker = mapOutputTracker) + + val shuffleDef2 = new ShuffleDependency(rddB, new HashPartitioner(1)) + val rddC = new MyRDD(sc, 1, List(shuffleDef2), tracker = mapOutputTracker) + + val shuffleDef3 = new ShuffleDependency(rddC, new HashPartitioner(1)) + val rddD = new MyRDD(sc, 1, List(shuffleDef3, new OneToOneDependency(rddB)), + tracker = mapOutputTracker) + + val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1)) + val rddE = new MyRDD(sc, 1, List(shuffleDef4, new OneToOneDependency(rddC)), + tracker = mapOutputTracker) + submit(rddE, Array(0)) + + assert(scheduler.shuffleToMapStage.size === 4) + assert(scheduler.activeJobs.size === 1) + + val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId) + val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId) + val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId) + val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId) + val finalStage = scheduler.activeJobs.head.finalStage + + assert(mapStage1.parents.isEmpty) + assert(mapStage2.parents === List(mapStage1)) + assert(mapStage3.parents === List(mapStage2)) + assert(mapStage4.parents === List(mapStage1, mapStage3)) + assert(finalStage.parents === List(mapStage2, mapStage4)) + } + test("zero split job") { var numResults = 0 var failureReason: Option[Exception] = None From f8b7910ecb52a5954de091ed79d5de9c19ba2744 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 15 Mar 2016 11:22:42 +0900 Subject: [PATCH 02/16] Make DAGScheduler.getAncestorShuffleDependencies() return in topological order to ensure building ancestor stages first. --- .../apache/spark/scheduler/DAGScheduler.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8a36af27bdd2..bcdf9127e2c4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -403,24 +403,34 @@ class DAGScheduler( parents.toList } - /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { + /** + * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, + * in topological order to ensure building ancestor stages first. + */ + private def getAncestorShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (!visited(r)) { + val deps = r.dependencies.filter { + case shufDep: ShuffleDependency[_, _, _] => + !shuffleToMapStage.contains(shufDep.shuffleId) + case _ => true + } + if (deps.forall(dep => visited(dep.rdd))) { visited += r - for (dep <- r.dependencies) { + for (dep <- deps) { dep match { case shufDep: ShuffleDependency[_, _, _] => - if (!shuffleToMapStage.contains(shufDep.shuffleId)) { - parents.push(shufDep) - } + parents.push(shufDep) case _ => } + } + } else { + waitingForVisit.push(r) + for (dep <- deps) { waitingForVisit.push(dep.rdd) } } @@ -430,7 +440,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents + parents.toList.reverse } private def getMissingParentStages(stage: Stage): List[Stage] = { From 0ea3fc838f689729794b6ea3aaf0b88a339ec20c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 16 Mar 2016 11:04:45 +0900 Subject: [PATCH 03/16] Refactor getAncestorShuffleDependencies. --- .../org/apache/spark/scheduler/DAGScheduler.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index bcdf9127e2c4..803af76b9053 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.existentials @@ -407,8 +407,8 @@ class DAGScheduler( * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, * in topological order to ensure building ancestor stages first. */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): List[ShuffleDependency[_, _, _]] = { - val parents = new Stack[ShuffleDependency[_, _, _]] + private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { + val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting @@ -424,13 +424,13 @@ class DAGScheduler( for (dep <- deps) { dep match { case shufDep: ShuffleDependency[_, _, _] => - parents.push(shufDep) + parents += shufDep case _ => } } } else { waitingForVisit.push(r) - for (dep <- deps) { + for (dep <- deps if !visited(dep.rdd)) { waitingForVisit.push(dep.rdd) } } @@ -440,7 +440,7 @@ class DAGScheduler( while (waitingForVisit.nonEmpty) { visit(waitingForVisit.pop()) } - parents.toList.reverse + parents } private def getMissingParentStages(stage: Stage): List[Stage] = { From 697b32208262b3c1c10bc2cae43b891c7970811d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 16 Mar 2016 21:55:50 +0900 Subject: [PATCH 04/16] Fix topological sort. --- .../apache/spark/scheduler/DAGScheduler.scala | 36 ++++++++++--------- .../spark/scheduler/DAGSchedulerSuite.scala | 16 ++++++--- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 803af76b9053..d75e6a46879c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -414,24 +414,26 @@ class DAGScheduler( // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - val deps = r.dependencies.filter { - case shufDep: ShuffleDependency[_, _, _] => - !shuffleToMapStage.contains(shufDep.shuffleId) - case _ => true - } - if (deps.forall(dep => visited(dep.rdd))) { - visited += r - for (dep <- deps) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - parents += shufDep - case _ => - } + if (!visited(r)) { + val deps = r.dependencies.filter { + case shufDep: ShuffleDependency[_, _, _] => + !shuffleToMapStage.contains(shufDep.shuffleId) + case _ => true } - } else { - waitingForVisit.push(r) - for (dep <- deps if !visited(dep.rdd)) { - waitingForVisit.push(dep.rdd) + if (deps.forall(dep => visited(dep.rdd))) { + visited += r + for (dep <- deps) { + dep match { + case shufDep: ShuffleDependency[_, _, _] => + parents += shufDep + case _ => + } + } + } else { + waitingForVisit.push(r) + for (dep <- deps if !visited(dep.rdd)) { + waitingForVisit.push(dep.rdd) + } } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 67377cb12129..02fe5b065234 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -324,7 +324,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou /* * <-------------------- * / \ - * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] + * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] * \ / * <-------------------- */ @@ -342,24 +342,30 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou tracker = mapOutputTracker) val shuffleDef4 = new ShuffleDependency(rddD, new HashPartitioner(1)) - val rddE = new MyRDD(sc, 1, List(shuffleDef4, new OneToOneDependency(rddC)), + val rddE = new MyRDD(sc, 1, List(new OneToOneDependency(rddC), shuffleDef4), tracker = mapOutputTracker) - submit(rddE, Array(0)) - assert(scheduler.shuffleToMapStage.size === 4) + val shuffleDef5 = new ShuffleDependency(rddE, new HashPartitioner(1)) + val rddF = new MyRDD(sc, 1, List(shuffleDef5), + tracker = mapOutputTracker) + submit(rddF, Array(0)) + + assert(scheduler.shuffleToMapStage.size === 5) assert(scheduler.activeJobs.size === 1) val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId) val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId) val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId) val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId) + val mapStage5 = scheduler.shuffleToMapStage(shuffleDef5.shuffleId) val finalStage = scheduler.activeJobs.head.finalStage assert(mapStage1.parents.isEmpty) assert(mapStage2.parents === List(mapStage1)) assert(mapStage3.parents === List(mapStage2)) assert(mapStage4.parents === List(mapStage1, mapStage3)) - assert(finalStage.parents === List(mapStage2, mapStage4)) + assert(mapStage5.parents === List(mapStage2, mapStage4)) + assert(finalStage.parents === List(mapStage5)) } test("zero split job") { From 1636531c65912bbfb68e4c669690a9f9107d9cd1 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:01:27 +0900 Subject: [PATCH 05/16] Add assertion to check not to overwrite illegally. --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3b730804fe28..b04da419ac72 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,6 +286,7 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => + assert(!shuffleToMapStage.get(dep.shuffleId).isDefined) shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep From 92e9f4484b09f65829f6e9300042cc2b57979278 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:19:09 +0900 Subject: [PATCH 06/16] Modify to mitigate adds extra push&pop. --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b04da419ac72..e1a5c8eb0a25 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -422,6 +422,7 @@ class DAGScheduler( case _ => true } if (deps.forall(dep => visited(dep.rdd))) { + waitingForVisit.pop() visited += r for (dep <- deps) { dep match { @@ -431,17 +432,18 @@ class DAGScheduler( } } } else { - waitingForVisit.push(r) for (dep <- deps if !visited(dep.rdd)) { waitingForVisit.push(dep.rdd) } } + } else { + waitingForVisit.pop() } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) + visit(waitingForVisit.top) } parents } From 4b412f5e73ca9cf5ab2de1a51f6c30f01286e89a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 16:48:42 +0900 Subject: [PATCH 07/16] Modify comment. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e1a5c8eb0a25..9959a1a69953 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -405,8 +405,9 @@ class DAGScheduler( } /** - * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet, - * in topological order to ensure building ancestor stages first. + * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. + * This is done in topological order to create ancestor stages first to ensure that the result + * stage graph is correctly built. */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] From 8fb9a149a03543a35c2a08c79edc53d49f66b5c2 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 28 Mar 2016 17:11:37 +0900 Subject: [PATCH 08/16] Add a comment to explain what the test is doing. --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index afbd00ff4bf0..037f1766f27f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -322,12 +322,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1)) } - /* + /** + * This test ensures that DAGScheduler build stage graph correctly. + * Here, we submit an RDD[F] having a linage of RDDs as follows: + * * <-------------------- * / \ * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F] * \ / * <-------------------- + * + * then check if all stages have correct parent stages. + * Note: [] means an RDD, () means a shuffle dependency. */ test("parent stages") { val rddA = new MyRDD(sc, 1, Nil) From e2cfeaf3ef5a7291a235bbcbb968d88959e52e93 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 29 Mar 2016 12:22:36 +0900 Subject: [PATCH 09/16] Revert "Add assertion to check not to overwrite illegally." This reverts commit 1636531c65912bbfb68e4c669690a9f9107d9cd1. --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9959a1a69953..88911e120d6b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -286,7 +286,6 @@ class DAGScheduler( case None => // We are going to register ancestor shuffle dependencies getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => - assert(!shuffleToMapStage.get(dep.shuffleId).isDefined) shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) } // Then register current shuffleDep From e3c0de33290aaccdd826d5ca38b87ace73a01fb5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 11 Mar 2016 14:45:24 +0900 Subject: [PATCH 10/16] Eliminate unnecessary `submitWaitingStages()` call. --- .../apache/spark/scheduler/DAGScheduler.scala | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5cdc91316b69..975a853549b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -724,7 +724,6 @@ class DAGScheduler( reason = "as part of cancellation of all jobs")) activeJobs.clear() // These should already be empty by this point, jobIdToActiveJob.clear() // but just in case we lost track of some jobs... - submitWaitingStages() } /** @@ -750,7 +749,6 @@ class DAGScheduler( submitStage(stage) } } - submitWaitingStages() } /** @@ -791,7 +789,6 @@ class DAGScheduler( } val jobIds = activeInGroup.map(_.jobId) jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) - submitWaitingStages() } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { @@ -799,7 +796,6 @@ class DAGScheduler( // In that case, we wouldn't have the stage anymore in stageIdToStage. val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) - submitWaitingStages() } private[scheduler] def handleTaskSetFailed( @@ -807,7 +803,6 @@ class DAGScheduler( reason: String, exception: Option[Throwable]): Unit = { stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } - submitWaitingStages() } private[scheduler] def cleanUpAfterSchedulerStop() { @@ -830,7 +825,6 @@ class DAGScheduler( private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) - submitWaitingStages() } private[scheduler] def handleJobSubmitted(jobId: Int, @@ -869,8 +863,6 @@ class DAGScheduler( listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) submitStage(finalStage) - - submitWaitingStages() } private[scheduler] def handleMapStageSubmitted(jobId: Int, @@ -914,8 +906,6 @@ class DAGScheduler( if (finalStage.isAvailable) { markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) } - - submitWaitingStages() } /** Submits stage, but first recursively submits any missing parents. */ @@ -1247,7 +1237,7 @@ class DAGScheduler( } } - // Note: newly runnable stages will be submitted below when we submit waiting stages + submitWaitingStages() } } @@ -1322,7 +1312,6 @@ class DAGScheduler( // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // will abort the job. } - submitWaitingStages() } /** @@ -1364,7 +1353,6 @@ class DAGScheduler( logDebug("Additional executor lost message for " + execId + "(epoch " + currentEpoch + ")") } - submitWaitingStages() } private[scheduler] def handleExecutorAdded(execId: String, host: String) { @@ -1373,7 +1361,6 @@ class DAGScheduler( logInfo("Host added was in lost list earlier: " + host) failedEpoch -= execId } - submitWaitingStages() } private[scheduler] def handleStageCancellation(stageId: Int) { @@ -1386,7 +1373,6 @@ class DAGScheduler( case None => logInfo("No active jobs to kill for Stage " + stageId) } - submitWaitingStages() } private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { @@ -1396,7 +1382,6 @@ class DAGScheduler( failJobAndIndependentStages( jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason)) } - submitWaitingStages() } /** From 88c4bc1dd1c36b456432de2c895054799ff97a20 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 30 Mar 2016 17:47:20 +0900 Subject: [PATCH 11/16] Add some checks. --- .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 037f1766f27f..c3701dccbcbf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -373,6 +373,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assert(mapStage4.parents === List(mapStage1, mapStage3)) assert(mapStage5.parents === List(mapStage2, mapStage4)) assert(finalStage.parents === List(mapStage5)) + + complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(5), Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty() } test("zero split job") { From a304235c4b086469aa5b5ac8a7d2f0d25addc86f Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 15 Mar 2016 12:19:03 +0900 Subject: [PATCH 12/16] Try to submit only child stages of the completed stage. --- .../apache/spark/scheduler/DAGScheduler.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 455709bb9e09..750749734e95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -768,18 +768,16 @@ class DAGScheduler( /** * Check for waiting stages which are now eligible for resubmission. - * Ordinarily run on every iteration of the event loop. + * Ordinarily run after the parent stage completed successfully. */ - private def submitWaitingStages() { - // TODO: We might want to run this less often, when we are sure that something has become - // runnable that wasn't before. + private def submitWaitingChildStages(parent: Stage) { logTrace("Checking for newly runnable parent stages") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages) - val waitingStagesCopy = waitingStages.toArray - waitingStages.clear() - for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { + val childStages = waitingStages.filter(_.parents.contains(parent)).toArray + waitingStages --= childStages + for (stage <- childStages.sortBy(_.firstJobId)) { submitStage(stage) } } @@ -1083,6 +1081,8 @@ class DAGScheduler( s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) + + submitWaitingChildStages(stage) } } @@ -1252,7 +1252,7 @@ class DAGScheduler( } } - submitWaitingStages() + submitWaitingChildStages(shuffleStage) } } From f1407c0bb302355f7f06aad9ece00541063bde6e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 31 Mar 2016 11:46:50 +0900 Subject: [PATCH 13/16] Modify to call `submitWaitingChildStages` only when the completed stage becomes available. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 750749734e95..5e21c3e75194 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1250,9 +1250,8 @@ class DAGScheduler( markMapStageJobAsFinished(job, stats) } } + submitWaitingChildStages(shuffleStage) } - - submitWaitingChildStages(shuffleStage) } } From b0511f7f413a2e9214adb0928849cb58e750596a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 5 Apr 2016 11:58:53 +0900 Subject: [PATCH 14/16] Modify to cut down on the repeated scanning of data structures. --- .../apache/spark/scheduler/DAGScheduler.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 5e21c3e75194..3311d4d31d2b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -415,29 +415,29 @@ class DAGScheduler( // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (!visited(r)) { - val deps = r.dependencies.filter { - case shufDep: ShuffleDependency[_, _, _] => - !shuffleToMapStage.contains(shufDep.shuffleId) - case _ => true + if (visited(r)) { + waitingForVisit.pop() + } else { + val visitedShuffleDeps = new ArrayBuffer[ShuffleDependency[_, _, _]] + val unvisitedDeps = new ArrayBuffer[Dependency[_]] + + r.dependencies.foreach { + case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) => + if (visited(dep.rdd)) visitedShuffleDeps += dep + else unvisitedDeps += dep + case dep if !visited(dep.rdd) => unvisitedDeps += dep + case _ => } - if (deps.forall(dep => visited(dep.rdd))) { + + if (unvisitedDeps.isEmpty) { waitingForVisit.pop() visited += r - for (dep <- deps) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - parents += shufDep - case _ => - } - } + for (shufDep <- visitedShuffleDeps) { parents += shufDep } } else { - for (dep <- deps if !visited(dep.rdd)) { + for (dep <- unvisitedDeps) { waitingForVisit.push(dep.rdd) } } - } else { - waitingForVisit.pop() } } From eb230bb21fd2783b30aac16cc8ffaa8b5947bb22 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 13 May 2016 09:06:55 +0900 Subject: [PATCH 15/16] Revert DAGScheduler.getAncestorShuffleDependencies. --- .../apache/spark/scheduler/DAGScheduler.scala | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 3311d4d31d2b..b5b49294e4e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Stack} +import scala.collection.mutable.{HashMap, HashSet, Stack} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -403,47 +403,32 @@ class DAGScheduler( parents.toList } - /** - * Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet. - * This is done in topological order to create ancestor stages first to ensure that the result - * stage graph is correctly built. - */ - private def getAncestorShuffleDependencies(rdd: RDD[_]): Seq[ShuffleDependency[_, _, _]] = { - val parents = new ArrayBuffer[ShuffleDependency[_, _, _]] + /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ + private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { + val parents = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(r: RDD[_]) { - if (visited(r)) { - waitingForVisit.pop() - } else { - val visitedShuffleDeps = new ArrayBuffer[ShuffleDependency[_, _, _]] - val unvisitedDeps = new ArrayBuffer[Dependency[_]] - - r.dependencies.foreach { - case dep: ShuffleDependency[_, _, _] if !shuffleToMapStage.contains(dep.shuffleId) => - if (visited(dep.rdd)) visitedShuffleDeps += dep - else unvisitedDeps += dep - case dep if !visited(dep.rdd) => unvisitedDeps += dep - case _ => - } - - if (unvisitedDeps.isEmpty) { - waitingForVisit.pop() - visited += r - for (shufDep <- visitedShuffleDeps) { parents += shufDep } - } else { - for (dep <- unvisitedDeps) { - waitingForVisit.push(dep.rdd) + if (!visited(r)) { + visited += r + for (dep <- r.dependencies) { + dep match { + case shufDep: ShuffleDependency[_, _, _] => + if (!shuffleToMapStage.contains(shufDep.shuffleId)) { + parents.push(shufDep) + } + case _ => } + waitingForVisit.push(dep.rdd) } } } waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.top) + visit(waitingForVisit.pop()) } parents } From 4eb8c05841d14c9ecb306669508d6d0eab7543f5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 17 May 2016 10:13:28 +0900 Subject: [PATCH 16/16] Update comments. --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0a17ade8aa6f..766e9797f902 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -755,10 +755,11 @@ class DAGScheduler( /** * Check for waiting stages which are now eligible for resubmission. - * Ordinarily run after the parent stage completed successfully. + * Submits stages that depend on the given parent stage. Called when the parent stage completes + * successfully. */ private def submitWaitingChildStages(parent: Stage) { - logTrace("Checking for newly runnable parent stages") + logTrace(s"Checking if any dependencies of $parent are now runnable") logTrace("running: " + runningStages) logTrace("waiting: " + waitingStages) logTrace("failed: " + failedStages)