Skip to content

Commit

Permalink
add Utils.isLocalSparkCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngone51 committed Jan 14, 2025
1 parent 0ae466a commit 55eb730
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ final class ShuffleBlockFetcherIterator(
private[this] val isTestingExternalShuffleInLocalClusterMode = {
Utils.isTesting &&
blockManager.externalShuffleServiceEnabled &&
blockManager.conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
Utils.isLocalSparkCluster(blockManager.conf)
}

initialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ private[spark] object StorageUtils extends Logging {
* set through the Hadoop configuration as the server is launched in the Yarn NM.
*/
def externalShuffleServicePort(conf: SparkConf): Int = {
if (Utils.isTesting && sys.env.contains(ExternalShuffleService.TESTING_ESS_PORT_ENV)) {
if (Utils.isTesting && Utils.isLocalSparkCluster(conf)) {
assert(sys.env.contains(ExternalShuffleService.TESTING_ESS_PORT_ENV),
s"${ExternalShuffleService.TESTING_ESS_PORT_ENV} is not available")
val port = sys.env(ExternalShuffleService.TESTING_ESS_PORT_ENV).toInt
if (conf.contains(config.SHUFFLE_SERVICE_PORT.key)) {
logWarning(log"Setting external shuffle service port in Spark local cluster " +
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ import org.eclipse.jetty.util.MultiException
import org.slf4j.Logger

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys._
Expand Down Expand Up @@ -2527,6 +2527,14 @@ private[spark] object Utils
master == "local" || master.startsWith("local[")
}

def isLocalSparkCluster(conf: ReadOnlySparkConf): Boolean = {
if (LocalSparkCluster.get.isDefined) {
true
} else {
conf.getOption("spark.master").exists(_.startsWith("local-cluster"))
}
}

/**
* Push based shuffle can only be enabled when below conditions are met:
* - the application is submitted to run in YARN mode
Expand Down

0 comments on commit 55eb730

Please sign in to comment.