Skip to content

Commit 0d24fe0

Browse files
ueshinkayousterhout
authored andcommitted
[SPARK-13902][SCHEDULER] Make DAGScheduler not to create duplicate stage.
## What changes were proposed in this pull request? `DAGScheduler`sometimes generate incorrect stage graph. Suppose you have the following DAG: ``` [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D] \ / <------------- ``` Note: [] means an RDD, () means a shuffle dependency. Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the `DAGScheduler`, but to make the example easier to understand, let's call the shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from `B` shuffle dependency ID `s_B`. The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when `getAncestorShuffleDependencies` gets called on `C` (previous of the final RDD), `getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets added twice: once when the method "visit"s RDD `C`, and once when the method "visit"s RDD `B`). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by `getAncestorShuffleDependencies`, resulting in duplicate map stages that compute the map output from RDD `A`. As a result, `DAGScheduler` generates the following stages and their parents for each shuffle: | | stage | parents | |----|----|----| | s_A | ShuffleMapStage 2 | List() | | s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) | | s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) | | - | ResultStage 4 | List(ShuffleMapStage 3) | The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the old stage `ShuffleMapStage 0`. This patch is fixing it. ## How was this patch tested? I added the sample RDD graph to show the illegal stage graph to `DAGSchedulerSuite`. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #12655 from ueshin/issues/SPARK-13902.
1 parent 31ea3c7 commit 0d24fe0

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,9 @@ class DAGScheduler(
286286
case None =>
287287
// We are going to register ancestor shuffle dependencies
288288
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
289-
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
289+
if (!shuffleToMapStage.contains(dep.shuffleId)) {
290+
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
291+
}
290292
}
291293
// Then register current shuffleDep
292294
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,53 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
325325
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
326326
}
327327

328+
/**
329+
* This test ensures that DAGScheduler build stage graph correctly.
330+
*
331+
* Suppose you have the following DAG:
332+
*
333+
* [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
334+
* \ /
335+
* <-------------
336+
*
337+
* Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both
338+
* B and A. The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example
339+
* easier to understand, let's call the shuffled data from A shuffle dependency ID s_A and the
340+
* shuffled data from B shuffle dependency ID s_B.
341+
*
342+
* Note: [] means an RDD, () means a shuffle dependency.
343+
*/
344+
test("[SPARK-13902] Ensure no duplicate stages are created") {
345+
val rddA = new MyRDD(sc, 1, Nil)
346+
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
347+
val s_A = shuffleDepA.shuffleId
348+
349+
val rddB = new MyRDD(sc, 1, List(shuffleDepA), tracker = mapOutputTracker)
350+
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
351+
val s_B = shuffleDepB.shuffleId
352+
353+
val rddC = new MyRDD(sc, 1, List(shuffleDepA, shuffleDepB), tracker = mapOutputTracker)
354+
val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
355+
val s_C = shuffleDepC.shuffleId
356+
357+
val rddD = new MyRDD(sc, 1, List(shuffleDepC), tracker = mapOutputTracker)
358+
359+
submit(rddD, Array(0))
360+
361+
assert(scheduler.shuffleToMapStage.size === 3)
362+
assert(scheduler.activeJobs.size === 1)
363+
364+
val mapStageA = scheduler.shuffleToMapStage(s_A)
365+
val mapStageB = scheduler.shuffleToMapStage(s_B)
366+
val mapStageC = scheduler.shuffleToMapStage(s_C)
367+
val finalStage = scheduler.activeJobs.head.finalStage
368+
369+
assert(mapStageA.parents.isEmpty)
370+
assert(mapStageB.parents === List(mapStageA))
371+
assert(mapStageC.parents === List(mapStageA, mapStageB))
372+
assert(finalStage.parents === List(mapStageC))
373+
}
374+
328375
test("zero split job") {
329376
var numResults = 0
330377
var failureReason: Option[Exception] = None

0 commit comments

Comments
 (0)