diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 38d7319b1f0ef..047f165457ba6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2833,6 +2833,7 @@ object SparkContext extends Logging { sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false) val scheduler = new TaskSchedulerImpl(sc) + sc.conf.set("syncUnregisterApplicationInLocalCluster", true.toString) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index fc849d7f4372f..5641ce5452376 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -75,11 +75,12 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected - workerRpcEnvs.foreach(_.shutdown()) - masterRpcEnvs.foreach(_.shutdown()) - workerRpcEnvs.foreach(_.awaitTermination()) - masterRpcEnvs.foreach(_.awaitTermination()) - masterRpcEnvs.clear() - workerRpcEnvs.clear() + Seq(workerRpcEnvs, masterRpcEnvs).foreach { rpcEnvArr => + rpcEnvArr.foreach { rpcEnv => Utils.tryLog { + rpcEnv.shutdown() + rpcEnv.awaitTermination() + }} + rpcEnvArr.clear() + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index eedf5e969e291..85447e16816ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -141,9 +141,15 @@ private[spark] class StandaloneAppClient( * Send a message to the current master. If we have not yet registered successfully with any * master, the message will be dropped. */ - private def sendToMaster(message: Any): Unit = { + private def sendToMaster(message: Any, sync: Boolean): Unit = { master match { - case Some(masterRef) => masterRef.send(message) + case Some(masterRef) => + if (sync) { + masterRef.askSync[Boolean](message) + () + } else { + masterRef.send(message) + } case None => logWarning(s"Drop $message because has not yet connected to master") } } @@ -198,7 +204,8 @@ private[spark] class StandaloneAppClient( override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case StopAppClient => markDead("Application has been stopped.") - sendToMaster(UnregisterApplication(appId.get)) + sendToMaster(UnregisterApplication(appId.get), + conf.getBoolean("syncUnregisterApplicationInLocalCluster", false)) context.reply(true) stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 71df5dfa423a9..f2ded2b969d2e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -337,7 +337,9 @@ private[deploy] class Master( } schedule() case None => - logWarning(s"Got status update for unknown executor $appId/$execId") + if (completedApps.find(_.id == appId).map(_.executors.contains(execId)).isEmpty) { + logWarning(s"Got status update for unknown executor $appId/$execId") + } } case DriverStateChanged(driverId, state, exception) => @@ -433,8 +435,7 @@ private[deploy] class Master( } case UnregisterApplication(applicationId) => - logInfo(s"Received unregister request from application $applicationId") - idToApp.get(applicationId).foreach(finishApplication) + unregisterApplication(applicationId) case CheckForWorkerTimeOut => timeOutDeadWorkers() @@ -442,6 +443,10 @@ private[deploy] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case UnregisterApplication(applicationId) => + unregisterApplication(applicationId) + context.reply(true) + case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + @@ -527,6 +532,11 @@ private[deploy] class Master( context.reply(handleKillExecutors(appId, formattedExecutorIds)) } + private def unregisterApplication(applicationId: String) = { + logInfo(s"Received unregister request from application $applicationId") + idToApp.get(applicationId).foreach(finishApplication) + } + override def onDisconnected(address: RpcAddress): Unit = { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 67638a5f9593c..119b8429d676f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -557,7 +557,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } - override def reviveOffers(): Unit = { + override def reviveOffers(): Unit = Utils.tryLogNonFatalError { driverEndpoint.send(ReviveOffers) }