Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 10 additions & 25 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ class DAGScheduler(
reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
submitWaitingStages()
}

/**
Expand All @@ -752,23 +751,21 @@ class DAGScheduler(
submitStage(stage)
}
}
submitWaitingStages()
}

/**
* Check for waiting stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
* Submits stages that depend on the given parent stage. Called when the parent stage completes
* successfully.
*/
private def submitWaitingStages() {
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
logTrace("Checking for newly runnable parent stages")
private def submitWaitingChildStages(parent: Stage) {
logTrace(s"Checking if any dependencies of $parent are now runnable")
logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray
waitingStages.clear()
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages --= childStages
for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems submitWaitingChildStages is called to submit child stages when the given parent stage is available. From this observation, do we have to re-check missing parents inside submitStage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and the re-check is done in the submitStage().
If there are some missing parent stages, the child will go to waitingStages again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahah, I see.

}
}
Expand All @@ -793,23 +790,20 @@ class DAGScheduler(
}
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
}

private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
}

private[scheduler] def handleTaskSetFailed(
taskSet: TaskSet,
reason: String,
exception: Option[Throwable]): Unit = {
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
submitWaitingStages()
}

private[scheduler] def cleanUpAfterSchedulerStop() {
Expand All @@ -832,7 +826,6 @@ class DAGScheduler(

private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
submitWaitingStages()
}

private[scheduler] def handleJobSubmitted(jobId: Int,
Expand Down Expand Up @@ -871,8 +864,6 @@ class DAGScheduler(
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)

submitWaitingStages()
}

private[scheduler] def handleMapStageSubmitted(jobId: Int,
Expand Down Expand Up @@ -916,8 +907,6 @@ class DAGScheduler(
if (finalStage.isAvailable) {
markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
}

submitWaitingStages()
}

/** Submits stage, but first recursively submits any missing parents. */
Expand Down Expand Up @@ -1073,6 +1062,8 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)

submitWaitingChildStages(stage)
}
}

Expand Down Expand Up @@ -1238,9 +1229,8 @@ class DAGScheduler(
markMapStageJobAsFinished(job, stats)
}
}
submitWaitingChildStages(shuffleStage)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call this submitWaitingChildStages(shuffleStage) when the given shuffleStage is not available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need.
The shuffleStage is re-submitted if it is not available, so the submitWaitingChildStages(shuffleStage) for the stage will be called after it is processed again and becomes available.

}

// Note: newly runnable stages will be submitted below when we submit waiting stages
}
}

Expand Down Expand Up @@ -1315,7 +1305,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job.
}
submitWaitingStages()
}

/**
Expand Down Expand Up @@ -1357,7 +1346,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to submit some newly-waiting stages here (e.g., if shuffle output was lost for a map stage, so now that map stage needs to be re-run)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a non-issue, because we handle lost shuffle output separately, when we get a FetchFailure from a task that tries to fetch the output.

submitWaitingStages()
}

private[scheduler] def handleExecutorAdded(execId: String, host: String) {
Expand All @@ -1366,7 +1354,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
submitWaitingStages()
}

private[scheduler] def handleStageCancellation(stageId: Int) {
Expand All @@ -1379,7 +1366,6 @@ class DAGScheduler(
case None =>
logInfo("No active jobs to kill for Stage " + stageId)
}
submitWaitingStages()
}

private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
Expand All @@ -1389,7 +1375,6 @@ class DAGScheduler(
failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
}
submitWaitingStages()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapStageB.parents === List(mapStageA))
assert(mapStageC.parents === List(mapStageA, mapStageB))
assert(finalStage.parents === List(mapStageC))

complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(3), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
}

test("zero split job") {
Expand Down