From 5078bb2037201cbcd51223d2acf971a7a7fcc0cb Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Fri, 10 Jun 2016 16:12:02 -0700 Subject: [PATCH 1/4] [SPARK-15927] Eliminate redundant DAGScheduler code. --- .../apache/spark/scheduler/DAGScheduler.scala | 68 ++++++++----------- 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a2eadbcbd660..e815e5ac472c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -378,31 +378,9 @@ class DAGScheduler( * the provided firstJobId. */ private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = { - val parents = new HashSet[Stage] - val visited = new HashSet[RDD[_]] - // We are manually maintaining a stack here to prevent StackOverflowError - // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] - def visit(r: RDD[_]) { - if (!visited(r)) { - visited += r - // Kind of ugly: need to register RDDs with the cache here since - // we can't do it in its constructor because # of partitions is unknown - for (dep <- r.dependencies) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - parents += getShuffleMapStage(shufDep, firstJobId) - case _ => - waitingForVisit.push(dep.rdd) - } - } - } - } - waitingForVisit.push(rdd) - while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) - } - parents.toList + getShuffleDependencies(rdd).map { shuffleDep => + getShuffleMapStage(shuffleDep, firstJobId) + }.toList } /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ @@ -412,25 +390,39 @@ class DAGScheduler( // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] - def visit(r: RDD[_]) { - if (!visited(r)) { - visited += r - for (dep <- r.dependencies) { - dep match { - case shufDep: ShuffleDependency[_, _, _] => - if (!shuffleToMapStage.contains(shufDep.shuffleId)) { - parents.push(shufDep) - } - case _ => - } - waitingForVisit.push(dep.rdd) + waitingForVisit.push(rdd) + while (waitingForVisit.nonEmpty) { + val toVisit = waitingForVisit.pop() + if (!visited(toVisit)) { + visited += toVisit + getShuffleDependencies(toVisit).foreach { shuffleDep => + if (!shuffleToMapStage.contains(shuffleDep.shuffleId)) { + parents.push(shuffleDep) + waitingForVisit.push(shuffleDep.rdd) + } // Otherwise, the dependency and it's ancestors have already been registered. } } } + parents + } + /** Returns shuffle dependencies that are immediate parents of the given RDD (no ancestors). */ + private def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { + val parents = new HashSet[ShuffleDependency[_, _, _]] + val visited = new HashSet[RDD[_]] + val waitingForVisit = new Stack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) + val toVisit = waitingForVisit.pop() + if (!visited(toVisit)) { + visited += toVisit + toVisit.dependencies.foreach { + case shuffleDep: ShuffleDependency[_, _, _] => + parents += shuffleDep + case dependency: Any => + waitingForVisit.push(dependency.rdd) + } + } } parents } From 42a8d16ed0b7e8175a58d1d6fa21685cc36c85c2 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 13 Jun 2016 12:43:32 -0700 Subject: [PATCH 2/4] Improved method comment --- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e815e5ac472c..1e1f1a6fc5da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -406,7 +406,16 @@ class DAGScheduler( parents } - /** Returns shuffle dependencies that are immediate parents of the given RDD (no ancestors). */ + /** + * Returns shuffle dependencies that are immediate parents of the given RDD. + * + * This function will not return more distant ancestors. For example, if C has a shuffle + * dependency on B which has a shuffle dependency on A: + * + * A <-- B <-- C + * + * calling this function with rdd C will only return the B <-- C dependency. + */ private def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] From 3e471665505ba0b259fcd7b43333a69d2c4ae1f5 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Mon, 13 Jun 2016 14:00:35 -0700 Subject: [PATCH 3/4] Removed unnecessary ': Any; --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1e1f1a6fc5da..fc27a58d73b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -428,7 +428,7 @@ class DAGScheduler( toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep - case dependency: Any => + case dependency => waitingForVisit.push(dependency.rdd) } } From edb29859a611939aba4c81ff13607ee2a74b2d75 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 14 Jun 2016 12:39:22 -0700 Subject: [PATCH 4/4] Addressed Imran's comments --- .../apache/spark/scheduler/DAGScheduler.scala | 11 ++++--- .../spark/scheduler/DAGSchedulerSuite.scala | 31 +++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index fc27a58d73b7..9458aef6434a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -385,7 +385,7 @@ class DAGScheduler( /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { - val parents = new Stack[ShuffleDependency[_, _, _]] + val ancestors = new Stack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting @@ -397,13 +397,13 @@ class DAGScheduler( visited += toVisit getShuffleDependencies(toVisit).foreach { shuffleDep => if (!shuffleToMapStage.contains(shuffleDep.shuffleId)) { - parents.push(shuffleDep) + ancestors.push(shuffleDep) waitingForVisit.push(shuffleDep.rdd) } // Otherwise, the dependency and it's ancestors have already been registered. } } } - parents + ancestors } /** @@ -415,8 +415,11 @@ class DAGScheduler( * A <-- B <-- C * * calling this function with rdd C will only return the B <-- C dependency. + * + * This function is scheduler-visible for the purpose of unit testing. */ - private def getShuffleDependencies(rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { + private[scheduler] def getShuffleDependencies( + rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] val waitingForVisit = new Stack[RDD[_]] diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index f28f429e0c54..62d822cb037d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2025,6 +2025,37 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou assertDataStructuresEmpty() } + /** + * Checks the DAGScheduler's internal logic for traversing a RDD DAG by making sure that + * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular + * RDD. The test creates the following RDD graph (where n denotes a narrow dependency and s + * denotes a shuffle dependency): + * + * A <------------s---------, + * \ + * B <--s-- C <--s-- D <--n---`-- E + * + * Here, the direct shuffle dependency of C is just the shuffle dependency on B. The direct + * shuffle dependencies of E are the shuffle dependency on A and the shuffle dependency on C. + */ + test("getShuffleDependencies correctly returns only direct shuffle parents") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1)) + val rddB = new MyRDD(sc, 2, Nil) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1)) + val rddC = new MyRDD(sc, 1, List(shuffleDepB)) + val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1)) + val rddD = new MyRDD(sc, 1, List(shuffleDepC)) + val narrowDepD = new OneToOneDependency(rddD) + val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = mapOutputTracker) + + assert(scheduler.getShuffleDependencies(rddA) === Set()) + assert(scheduler.getShuffleDependencies(rddB) === Set()) + assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB)) + assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC)) + assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.