Skip to content

Commit ff0ddff

Browse files
committed
[SPARK-7878] Rename Stage.jobId to firstJobId
The previous name was confusing, because each stage can be associated with many jobs, and jobId is just the ID of the first job that was associated with the Stage. This commit also renames some of the method parameters in DAGScheduler.scala to clarify when the jobId refers to the first job ID associated with the stage (as opposed to the jobId associated with a job that's currently being scheduled). cc markhamstra JoshRosen (hopefully this will help prevent future bugs like SPARK-6880) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #6418 from kayousterhout/SPARK-7878 and squashes the following commits: b71a9b8 [Kay Ousterhout] [SPARK-7878] Rename Stage.jobId to firstJobId
1 parent 4615081 commit ff0ddff

File tree

4 files changed

+33
-37
lines changed

4 files changed

+33
-37
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -208,19 +208,17 @@ class DAGScheduler(
208208

209209
/**
210210
* Get or create a shuffle map stage for the given shuffle dependency's map side.
211-
* The jobId value passed in will be used if the stage doesn't already exist with
212-
* a lower jobId (jobId always increases across jobs.)
213211
*/
214212
private def getShuffleMapStage(
215213
shuffleDep: ShuffleDependency[_, _, _],
216-
jobId: Int): ShuffleMapStage = {
214+
firstJobId: Int): ShuffleMapStage = {
217215
shuffleToMapStage.get(shuffleDep.shuffleId) match {
218216
case Some(stage) => stage
219217
case None =>
220218
// We are going to register ancestor shuffle dependencies
221-
registerShuffleDependencies(shuffleDep, jobId)
219+
registerShuffleDependencies(shuffleDep, firstJobId)
222220
// Then register current shuffleDep
223-
val stage = newOrUsedShuffleStage(shuffleDep, jobId)
221+
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
224222
shuffleToMapStage(shuffleDep.shuffleId) = stage
225223

226224
stage
@@ -230,37 +228,35 @@ class DAGScheduler(
230228
/**
231229
* Helper function to eliminate some code re-use when creating new stages.
232230
*/
233-
private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
234-
val parentStages = getParentStages(rdd, jobId)
231+
private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
232+
val parentStages = getParentStages(rdd, firstJobId)
235233
val id = nextStageId.getAndIncrement()
236234
(parentStages, id)
237235
}
238236

239237
/**
240238
* Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
241-
* newOrUsedShuffleStage. The stage will be associated with the provided jobId.
239+
* newOrUsedShuffleStage. The stage will be associated with the provided firstJobId.
242240
* Production of shuffle map stages should always use newOrUsedShuffleStage, not
243241
* newShuffleMapStage directly.
244242
*/
245243
private def newShuffleMapStage(
246244
rdd: RDD[_],
247245
numTasks: Int,
248246
shuffleDep: ShuffleDependency[_, _, _],
249-
jobId: Int,
247+
firstJobId: Int,
250248
callSite: CallSite): ShuffleMapStage = {
251-
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
249+
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
252250
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
253-
jobId, callSite, shuffleDep)
251+
firstJobId, callSite, shuffleDep)
254252

255253
stageIdToStage(id) = stage
256-
updateJobIdStageIdMaps(jobId, stage)
254+
updateJobIdStageIdMaps(firstJobId, stage)
257255
stage
258256
}
259257

260258
/**
261-
* Create a ResultStage -- either directly for use as a result stage, or as part of the
262-
* (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
263-
* with the provided jobId.
259+
* Create a ResultStage associated with the provided jobId.
264260
*/
265261
private def newResultStage(
266262
rdd: RDD[_],
@@ -277,16 +273,16 @@ class DAGScheduler(
277273

278274
/**
279275
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
280-
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
276+
* provided firstJobId. If a stage for the shuffleId existed previously so that the shuffleId is
281277
* present in the MapOutputTracker, then the number and location of available outputs are
282278
* recovered from the MapOutputTracker
283279
*/
284280
private def newOrUsedShuffleStage(
285281
shuffleDep: ShuffleDependency[_, _, _],
286-
jobId: Int): ShuffleMapStage = {
282+
firstJobId: Int): ShuffleMapStage = {
287283
val rdd = shuffleDep.rdd
288284
val numTasks = rdd.partitions.size
289-
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
285+
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
290286
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
291287
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
292288
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
@@ -304,10 +300,10 @@ class DAGScheduler(
304300
}
305301

306302
/**
307-
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
308-
* provided jobId if they haven't already been created with a lower jobId.
303+
* Get or create the list of parent stages for a given RDD. The new Stages will be created with
304+
* the provided firstJobId.
309305
*/
310-
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
306+
private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
311307
val parents = new HashSet[Stage]
312308
val visited = new HashSet[RDD[_]]
313309
// We are manually maintaining a stack here to prevent StackOverflowError
@@ -321,7 +317,7 @@ class DAGScheduler(
321317
for (dep <- r.dependencies) {
322318
dep match {
323319
case shufDep: ShuffleDependency[_, _, _] =>
324-
parents += getShuffleMapStage(shufDep, jobId)
320+
parents += getShuffleMapStage(shufDep, firstJobId)
325321
case _ =>
326322
waitingForVisit.push(dep.rdd)
327323
}
@@ -336,11 +332,11 @@ class DAGScheduler(
336332
}
337333

338334
/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
339-
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
335+
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
340336
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
341337
while (parentsWithNoMapStage.nonEmpty) {
342338
val currentShufDep = parentsWithNoMapStage.pop()
343-
val stage = newOrUsedShuffleStage(currentShufDep, jobId)
339+
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
344340
shuffleToMapStage(currentShufDep.shuffleId) = stage
345341
}
346342
}
@@ -390,7 +386,7 @@ class DAGScheduler(
390386
for (dep <- rdd.dependencies) {
391387
dep match {
392388
case shufDep: ShuffleDependency[_, _, _] =>
393-
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
389+
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
394390
if (!mapStage.isAvailable) {
395391
missing += mapStage
396392
}
@@ -577,7 +573,7 @@ class DAGScheduler(
577573

578574
private[scheduler] def doCancelAllJobs() {
579575
// Cancel all running jobs.
580-
runningStages.map(_.jobId).foreach(handleJobCancellation(_,
576+
runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
581577
reason = "as part of cancellation of all jobs"))
582578
activeJobs.clear() // These should already be empty by this point,
583579
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
@@ -603,7 +599,7 @@ class DAGScheduler(
603599
clearCacheLocs()
604600
val failedStagesCopy = failedStages.toArray
605601
failedStages.clear()
606-
for (stage <- failedStagesCopy.sortBy(_.jobId)) {
602+
for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
607603
submitStage(stage)
608604
}
609605
}
@@ -623,7 +619,7 @@ class DAGScheduler(
623619
logTrace("failed: " + failedStages)
624620
val waitingStagesCopy = waitingStages.toArray
625621
waitingStages.clear()
626-
for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
622+
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
627623
submitStage(stage)
628624
}
629625
}
@@ -843,7 +839,7 @@ class DAGScheduler(
843839
}
844840
}
845841

846-
val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
842+
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
847843

848844
runningStages += stage
849845
// SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -909,7 +905,7 @@ class DAGScheduler(
909905
stage.pendingTasks ++= tasks
910906
logDebug("New pending tasks: " + stage.pendingTasks)
911907
taskScheduler.submitTasks(
912-
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
908+
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
913909
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
914910
} else {
915911
// Because we posted SparkListenerStageSubmitted earlier, we should mark
@@ -1323,7 +1319,7 @@ class DAGScheduler(
13231319
for (dep <- rdd.dependencies) {
13241320
dep match {
13251321
case shufDep: ShuffleDependency[_, _, _] =>
1326-
val mapStage = getShuffleMapStage(shufDep, stage.jobId)
1322+
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
13271323
if (!mapStage.isAvailable) {
13281324
waitingForVisit.push(mapStage.rdd)
13291325
} // Otherwise there's no need to follow the dependency back

core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ private[spark] class ResultStage(
2828
rdd: RDD[_],
2929
numTasks: Int,
3030
parents: List[Stage],
31-
jobId: Int,
31+
firstJobId: Int,
3232
callSite: CallSite)
33-
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
33+
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
3434

3535
// The active job for this result stage. Will be empty if the job has already finished
3636
// (e.g., because the job was cancelled).

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ private[spark] class ShuffleMapStage(
3030
rdd: RDD[_],
3131
numTasks: Int,
3232
parents: List[Stage],
33-
jobId: Int,
33+
firstJobId: Int,
3434
callSite: CallSite,
3535
val shuffleDep: ShuffleDependency[_, _, _])
36-
extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
36+
extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
3737

3838
override def toString: String = "ShuffleMapStage " + id
3939

core/src/main/scala/org/apache/spark/scheduler/Stage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.util.CallSite
3434
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
3535
* that each output partition is on.
3636
*
37-
* Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
37+
* Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
3838
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
3939
* faster on failure.
4040
*
@@ -51,7 +51,7 @@ private[spark] abstract class Stage(
5151
val rdd: RDD[_],
5252
val numTasks: Int,
5353
val parents: List[Stage],
54-
val jobId: Int,
54+
val firstJobId: Int,
5555
val callSite: CallSite)
5656
extends Logging {
5757

0 commit comments

Comments
 (0)