diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index fc849d7f4372..8d183026515b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -43,6 +43,7 @@ class LocalSparkCluster( private val localHostname = Utils.localHostName() private val masterRpcEnvs = ArrayBuffer[RpcEnv]() private val workerRpcEnvs = ArrayBuffer[RpcEnv]() + private val shutdownWaitTimeout = conf.get(config.LOCAL_CLUSTER_SHUTDOWN_WAIT_TIMEOUT) // exposed for testing var masterWebUIPort = -1 @@ -75,6 +76,7 @@ class LocalSparkCluster( def stop(): Unit = { logInfo("Shutting down local Spark cluster.") // Stop the workers before the master so they don't get upset that it disconnected + Thread.sleep(shutdownWaitTimeout) workerRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.shutdown()) workerRpcEnvs.foreach(_.awaitTermination()) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ee437c696b47..0d27b0438e74 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1873,4 +1873,12 @@ package object config { .version("3.1.0") .booleanConf .createWithDefault(false) + + private[spark] val LOCAL_CLUSTER_SHUTDOWN_WAIT_TIMEOUT = + ConfigBuilder("spark.localCluster.shutdown.wait.timeoutMs") + .internal() + .doc("Timeout in milliseconds for waiting for local cluster to shutdown.") + .version("3.1.0") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(1000) }