From 448eae83c190789e1946f010eecb36ee49d99e4f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Jul 2015 16:03:43 -0700 Subject: [PATCH 1/2] Add regression test --- .../scala/org/apache/spark/FailureSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index b099cd3fb7965..69cb4b44cf7ef 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -141,5 +141,30 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext { FailureSuiteState.clear() } + test("managed memory leak error should not mask other failures (SPARK-9266") { + val conf = new SparkConf().set("spark.unsafe.exceptionOnMemoryLeak", "true") + sc = new SparkContext("local[1,1]", "test", conf) + + // If a task leaks memory but fails due to some other cause, then make sure that the original + // cause is preserved + val thrownDueToTaskFailure = intercept[SparkException] { + sc.parallelize(Seq(0)).mapPartitions { iter => + TaskContext.get().taskMemoryManager().allocate(128) + throw new Exception("intentional task failure") + iter + }.count() + } + assert(thrownDueToTaskFailure.getMessage.contains("intentional task failure")) + + // If the task succeeded but memory was leaked, then the task should fail due to that leak + val thrownDueToMemoryLeak = intercept[SparkException] { + sc.parallelize(Seq(0)).mapPartitions { iter => + TaskContext.get().taskMemoryManager().allocate(128) + iter + }.count() + } + assert(thrownDueToMemoryLeak.getMessage.contains("memory leak")) + } + // TODO: Need to add tests with shuffle fetch failures. } From c1f01672d3a3862b0a4e99c5658f454f554cfd71 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 22 Jul 2015 16:10:47 -0700 Subject: [PATCH 2/2] Fix the error masking problem --- .../main/scala/org/apache/spark/executor/Executor.scala | 7 +++++-- .../scala/org/apache/spark/scheduler/DAGScheduler.scala | 5 ++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 66624ffbe4790..3c05651b66723 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -209,18 +209,21 @@ private[spark] class Executor( // Run the actual task and measure its runtime. taskStart = System.currentTimeMillis() + var threwException = true val (value, accumUpdates) = try { - task.run( + val res = task.run( taskAttemptId = taskId, attemptNumber = attemptNumber, metricsSystem = env.metricsSystem) + threwException = false + res } finally { // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread; // when changing this, make sure to update both copies. val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId" - if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { + if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) { throw new SparkException(errMsg) } else { logError(errMsg) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index b829d06923404..9f8cb0a527de6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -685,9 +685,11 @@ class DAGScheduler( metricsSystem = env.metricsSystem, runningLocally = true) TaskContext.setTaskContext(taskContext) + var threwException = true try { val result = job.func(taskContext, rdd.iterator(split, taskContext)) job.listener.taskSucceeded(0, result) + threwException = false } finally { taskContext.markTaskCompleted() TaskContext.unset() @@ -695,7 +697,8 @@ class DAGScheduler( // make sure to update both copies. val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory() if (freedMemory > 0) { - if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) { + if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) + && !threwException) { throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes") } else { logError(s"Managed memory leak detected; size = $freedMemory bytes")