@@ -594,7 +594,7 @@ class DAGScheduler(
594594 val activeInGroup = activeJobs.filter(activeJob =>
595595 groupId == activeJob.properties.get(SparkContext .SPARK_JOB_GROUP_ID ))
596596 val jobIds = activeInGroup.map(_.jobId)
597- jobIds.foreach(handleJobCancellation(_, " part of cancel job group" ))
597+ jobIds.foreach(handleJobCancellation(_, " part of cancelled job group %s " .format(groupId) ))
598598 submitWaitingStages()
599599 }
600600
@@ -641,13 +641,14 @@ class DAGScheduler(
641641 }
642642
643643 private [scheduler] def handleJobSubmitted (jobId : Int ,
644- finalRDD : RDD [_],
645- func : (TaskContext , Iterator [_]) => _,
646- partitions : Array [Int ],
647- allowLocal : Boolean ,
648- callSite : String ,
649- listener : JobListener ,
650- properties : Properties = null ) {
644+ finalRDD : RDD [_],
645+ func : (TaskContext , Iterator [_]) => _,
646+ partitions : Array [Int ],
647+ allowLocal : Boolean ,
648+ callSite : String ,
649+ listener : JobListener ,
650+ properties : Properties = null )
651+ {
651652 var finalStage : Stage = null
652653 try {
653654 // New stage creation may throw an exception if, for example, jobs are run on a
@@ -657,12 +658,13 @@ class DAGScheduler(
657658 case e : Exception =>
658659 logWarning(" Creating new stage failed due to exception - job: " + jobId, e)
659660 listener.jobFailed(e)
661+ return
660662 }
661663 if (finalStage != null ) {
662664 val job = new ActiveJob (jobId, finalStage, func, partitions, callSite, listener, properties)
663665 clearCacheLocs()
664- logInfo(" Got job %s (%s) with %d output partitions (allowLocal=%s)" .
665- format( job.jobId, callSite, partitions.length, allowLocal))
666+ logInfo(" Got job %s (%s) with %d output partitions (allowLocal=%s)" .format(
667+ job.jobId, callSite, partitions.length, allowLocal))
666668 logInfo(" Final stage: " + finalStage + " (" + finalStage.name + " )" )
667669 logInfo(" Parents of final stage: " + finalStage.parents)
668670 logInfo(" Missing parents: " + getMissingParentStages(finalStage))
0 commit comments