From 1cb444d4afc5355625d26fee7bd40ecb5532b8ca Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 9 Sep 2014 23:32:37 -0700 Subject: [PATCH 1/5] [SPARK-3469] Call all TaskCompletionListeners even if some fail. Note that this also changes the fault semantics of TaskCompletionListener. Previously failures in TaskCompletionListeners would result in the task being reported as failed. With this change, tasks won't be reported as failed simply because the execution of TaskCompletionListener fails. --- .../main/scala/org/apache/spark/TaskContext.scala | 11 ++++++++++- .../apache/spark/scheduler/TaskContextSuite.scala | 13 ++++++++++++- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 2b99b8a5af250..a5a27f7bf6382 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -77,6 +77,8 @@ class TaskContext( /** * Add a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. + * Exceptions in callbacks are however not thrown back upstream, i.e. tasks won't marked as + * failed even if completion callbacks fail to execute. * * An example use is for HadoopRDD to register a callback to close the input stream. */ @@ -104,7 +106,14 @@ class TaskContext( private[spark] def markTaskCompleted(): Unit = { completed = true // Process complete callbacks in the reverse order of registration - onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) } + onCompleteCallbacks.reverse.foreach { listener => + try { + listener.onTaskCompletion(this) + } catch { + case e: Throwable => + logError("Error in TaskCompletionListener", e) + } + } } /** Marks the task for interruption, i.e. cancellation. */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index db2ad829a48f9..17adad82affe8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - test("Calls executeOnCompleteCallbacks after failure") { + test("calls TaskCompletionListener after failure") { TaskContextSuite.completed = false sc = new SparkContext("local", "test") val rdd = new RDD[String](sc, List()) { @@ -45,10 +45,21 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte } assert(TaskContextSuite.completed === true) } + + test("all TaskCompletionListeners should be called even if some fail") { + val context = new TaskContext(0, 0, 0) + context.addTaskCompletionListener(_ => throw new Exception("blah")) + context.addTaskCompletionListener(_ => TaskContextSuite.callbackInvoked = true) + context.addTaskCompletionListener(_ => throw new Exception("blah")) + context.markTaskCompleted() + assert(TaskContextSuite.callbackInvoked === true) + } } private object TaskContextSuite { @volatile var completed = false + + var callbackInvoked = false } private case class StubPartition(index: Int) extends Partition From 29b61621e9b8d1e58ecca004c66304d2e944e726 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 10 Sep 2014 19:13:54 -0700 Subject: [PATCH 2/5] oops compilation failed. --- core/src/main/scala/org/apache/spark/TaskContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index a5a27f7bf6382..b06f5652b22fb 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -41,7 +41,7 @@ class TaskContext( val attemptId: Long, val runningLocally: Boolean = false, private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) - extends Serializable { + extends Serializable with Logging { @deprecated("use partitionId", "0.8.1") def splitId = partitionId From aa68ea4f6083c1698b42161fba1ba65e49341412 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 12 Sep 2014 00:55:02 -0700 Subject: [PATCH 3/5] Throw an exception if task completion callback fails. --- .../scala/org/apache/spark/TaskContext.scala | 7 +++- .../TaskCompletionListenerException.scala | 33 +++++++++++++++++++ .../spark/scheduler/TaskContextSuite.scala | 19 +++++++---- 3 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index b06f5652b22fb..7f0256a5cfb9a 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.TaskCompletionListener +import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener} /** @@ -105,15 +105,20 @@ class TaskContext( /** Marks the task as completed and triggers the listeners. */ private[spark] def markTaskCompleted(): Unit = { completed = true + val errorMsgs = new ArrayBuffer[String](2) // Process complete callbacks in the reverse order of registration onCompleteCallbacks.reverse.foreach { listener => try { listener.onTaskCompletion(this) } catch { case e: Throwable => + errorMsgs += e.getMessage logError("Error in TaskCompletionListener", e) } } + if (errorMsgs.nonEmpty) { + throw new TaskCompletionListenerException(errorMsgs) + } } /** Marks the task for interruption, i.e. cancellation. */ diff --git a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala new file mode 100644 index 0000000000000..7fc2023856feb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * Exception thrown when there is an exception in + * executing the callback in TaskCompletionListener. + */ +class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception { + + override def getMessage: String = { + if (errorMessages.size == 1) { + errorMessages.head + } else { + errorMessages.zipWithIndex.map { case (msg, i) => s"Exception $i: $msg" }.mkString("\n") + } + } +} diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 17adad82affe8..faba5508c906c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -17,12 +17,16 @@ package org.apache.spark.scheduler +import org.mockito.Mockito._ +import org.mockito.Matchers.any + import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.util.Utils +import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener} + class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { @@ -48,18 +52,21 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte test("all TaskCompletionListeners should be called even if some fail") { val context = new TaskContext(0, 0, 0) + val listener = mock(classOf[TaskCompletionListener]) context.addTaskCompletionListener(_ => throw new Exception("blah")) - context.addTaskCompletionListener(_ => TaskContextSuite.callbackInvoked = true) + context.addTaskCompletionListener(listener) context.addTaskCompletionListener(_ => throw new Exception("blah")) - context.markTaskCompleted() - assert(TaskContextSuite.callbackInvoked === true) + + intercept[TaskCompletionListenerException] { + context.markTaskCompleted() + } + + verify(listener, times(1)).onTaskCompletion(any()) } } private object TaskContextSuite { @volatile var completed = false - - var callbackInvoked = false } private case class StubPartition(index: Int) extends Partition From ac5baeade915c11423c151ca827168954d1ddb92 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 12 Sep 2014 17:40:40 -0700 Subject: [PATCH 4/5] Removed obsolete comment. --- core/src/main/scala/org/apache/spark/TaskContext.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 7f0256a5cfb9a..51b3e4d5e0936 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -77,8 +77,6 @@ class TaskContext( /** * Add a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. - * Exceptions in callbacks are however not thrown back upstream, i.e. tasks won't marked as - * failed even if completion callbacks fail to execute. * * An example use is for HadoopRDD to register a callback to close the input stream. */ From a3845b28cad4102b0c2ba3a239a6836d4e214d04 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 12 Sep 2014 21:54:14 -0700 Subject: [PATCH 5/5] Mark TaskCompletionListenerException as private[spark]. --- .../org/apache/spark/util/TaskCompletionListenerException.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala index 7fc2023856feb..f64e069cd1724 100644 --- a/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala +++ b/core/src/main/scala/org/apache/spark/util/TaskCompletionListenerException.scala @@ -21,6 +21,7 @@ package org.apache.spark.util * Exception thrown when there is an exception in * executing the callback in TaskCompletionListener. */ +private[spark] class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception { override def getMessage: String = {