Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2833,6 +2833,7 @@ object SparkContext extends Logging {
sc.conf.setIfMissing(SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, false)

val scheduler = new TaskSchedulerImpl(sc)
sc.conf.set("syncUnregisterApplicationInLocalCluster", true.toString)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,12 @@ class LocalSparkCluster(
def stop(): Unit = {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
workerRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
Seq(workerRpcEnvs, masterRpcEnvs).foreach { rpcEnvArr =>
rpcEnvArr.foreach { rpcEnv => Utils.tryLog {
rpcEnv.shutdown()
rpcEnv.awaitTermination()
}}
rpcEnvArr.clear()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,15 @@ private[spark] class StandaloneAppClient(
* Send a message to the current master. If we have not yet registered successfully with any
* master, the message will be dropped.
*/
private def sendToMaster(message: Any): Unit = {
private def sendToMaster(message: Any, sync: Boolean): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)
case Some(masterRef) =>
if (sync) {
masterRef.askSync[Boolean](message)
()
} else {
masterRef.send(message)
}
case None => logWarning(s"Drop $message because has not yet connected to master")
}
}
Expand Down Expand Up @@ -198,7 +204,8 @@ private[spark] class StandaloneAppClient(
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case StopAppClient =>
markDead("Application has been stopped.")
sendToMaster(UnregisterApplication(appId.get))
sendToMaster(UnregisterApplication(appId.get),
conf.getBoolean("syncUnregisterApplicationInLocalCluster", false))
context.reply(true)
stop()

Expand Down
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ private[deploy] class Master(
}
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
if (completedApps.find(_.id == appId).map(_.executors.contains(execId)).isEmpty) {
logWarning(s"Got status update for unknown executor $appId/$execId")
}
}

case DriverStateChanged(driverId, state, exception) =>
Expand Down Expand Up @@ -433,15 +435,18 @@ private[deploy] class Master(
}

case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
unregisterApplication(applicationId)

case CheckForWorkerTimeOut =>
timeOutDeadWorkers()

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case UnregisterApplication(applicationId) =>
unregisterApplication(applicationId)
context.reply(true)

case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
Expand Down Expand Up @@ -527,6 +532,11 @@ private[deploy] class Master(
context.reply(handleKillExecutors(appId, formattedExecutorIds))
}

private def unregisterApplication(applicationId: String) = {
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
}

override def onDisconnected(address: RpcAddress): Unit = {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

override def reviveOffers(): Unit = {
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
driverEndpoint.send(ReviveOffers)
}

Expand Down