Skip to content

Commit 740922f

Browse files
committed
Merge pull request alteryx#219 from sundeepn/schedulerexception
Scheduler quits when newStage fails The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
2 parents 60e23a5 + be3ea23 commit 740922f

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,17 @@ class DAGScheduler(
413413
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
414414
event match {
415415
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
416-
val finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
416+
var finalStage: Stage = null
417+
try {
418+
// New stage creation at times and if its not protected, the scheduler thread is killed.
419+
// e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
420+
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
421+
} catch {
422+
case e: Exception =>
423+
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
424+
listener.jobFailed(e)
425+
return false
426+
}
417427
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
418428
clearCacheLocs()
419429
logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +

0 commit comments

Comments
 (0)