From cc913269b959d3d08549c14e3da3a59203c94e89 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Wed, 13 Jul 2016 15:43:38 +0800 Subject: [PATCH 1/5] [SPARK-16522][MESOS] Spark application throws exception on exit. --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 263e6197a6f4..d300d230b926 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -553,7 +553,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { - removeExecutor(taskId, SlaveLost(reason)) + if (!stopCalled) { + removeExecutor(taskId, SlaveLost(reason)) + } slaves(slaveId).taskIDs.remove(taskId) } } From 2aee78ff2c90a402fd35ea308f62f02ba51de6e8 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Tue, 19 Jul 2016 15:25:21 +0800 Subject: [PATCH 2/5] Add comment. --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index d300d230b926..5177557132db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -553,6 +553,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskId: String, reason: String): Unit = { stateLock.synchronized { + // Do not call removeExecutor() after this scheduler backend was stopped because + // removeExecutor() internally will send a message to the driver endpoint but + // the driver endpoint is not available now, otherwise an exception will be thrown. if (!stopCalled) { removeExecutor(taskId, SlaveLost(reason)) } From f848530cbc4fe5372e912a62d11e9cc8bf646522 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Sat, 6 Aug 2016 14:55:47 +0800 Subject: [PATCH 3/5] Add regression test case. --- ...osCoarseGrainedSchedulerBackendSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index a74fdf79a13c..014e4016f0c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -21,6 +21,7 @@ import java.util.Collections import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ @@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.mesos.Utils._ @@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite private var backend: MesosCoarseGrainedSchedulerBackend = _ private var externalShuffleClient: MesosExternalShuffleClient = _ private var driverEndpoint: RpcEndpointRef = _ + @volatile private var stopCalled = false test("mesos supports killing and limiting executors") { setBackend() @@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } + test("Do not call parent methods like removeExecutor() after backend is stopped") { + setBackend() + + // launches a task on a valid offer + val offers = List((backend.executorMemory(sc), 1)) + offerResources(offers) + verifyTaskLaunched("o1") + + // launches a thread simulating status update + val statusUpdateThread = new Thread { + override def run(): Unit = { + while (!stopCalled) { + Thread.sleep(100) + } + + val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) + backend.statusUpdate(driver, status) + } + }.start + + backend.stop + // Any method of the backend involving sending messages to the driver endpoint should not + // be called after the backend is stopped. + verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { @@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite mesosDriver = newDriver } + override def stopExecutors(): Unit = { + stopCalled = true + } + markRegistered() } backend.start() From 3526f53ad762479b966f69a3c6444cdb1165f1a8 Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Sun, 7 Aug 2016 21:35:33 +0800 Subject: [PATCH 4/5] Fix a small bug after rebase. --- .../cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 014e4016f0c4..b2df10f14e0e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -350,7 +350,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // launches a task on a valid offer val offers = List((backend.executorMemory(sc), 1)) offerResources(offers) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") // launches a thread simulating status update val statusUpdateThread = new Thread { From 534813810247039ded781fc4a6b2bb4baec626fa Mon Sep 17 00:00:00 2001 From: Sun Rui Date: Mon, 8 Aug 2016 14:00:25 +0800 Subject: [PATCH 5/5] Fix comments. --- .../mesos/MesosCoarseGrainedSchedulerBackendSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index b2df10f14e0e..0e6697990154 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -344,7 +344,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } - test("Do not call parent methods like removeExecutor() after backend is stopped") { + test("Do not call removeExecutor() after backend is stopped") { setBackend() // launches a task on a valid offer @@ -364,7 +364,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } }.start - backend.stop + backend.stop() // Any method of the backend involving sending messages to the driver endpoint should not // be called after the backend is stopped. verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])