From 457572018f154399e4f36b8cec8089010faeedc9 Mon Sep 17 00:00:00 2001 From: pankaj arora Date: Mon, 13 Apr 2015 21:57:58 +0530 Subject: [PATCH 1/2] [CORE] SPARK-6880: Fixed null check when all the dependent stages are cancelled due to previous stage failure --- .../org/apache/spark/scheduler/DAGScheduler.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 508fe7b3303c..04ef9e706323 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -818,13 +818,12 @@ class DAGScheduler( } } - val properties = if (jobIdToActiveJob.contains(jobId)) { - jobIdToActiveJob(stage.jobId).properties - } else { - // this stage will be assigned to "default" pool - null + val activeJob = jobIdToActiveJob.get(stage.jobId).getOrElse(null) + val properties = if (activeJob != null) { + activeJob.properties + } else { + null } - runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event From 55ba5e350fc922696342aa38b99ad0bdfad64f92 Mon Sep 17 00:00:00 2001 From: pankaj arora Date: Tue, 14 Apr 2015 19:35:17 +0530 Subject: [PATCH 2/2] [CORE] SPARK-6880: Fixed null check when all the dependent stages are cancelled due to previous stage failure --- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 04ef9e706323..0e4f4a1793f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -818,12 +818,8 @@ class DAGScheduler( } } - val activeJob = jobIdToActiveJob.get(stage.jobId).getOrElse(null) - val properties = if (activeJob != null) { - activeJob.properties - } else { - null - } + val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).getOrElse(null) + runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event