diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 17ceb5f1887c..eab337ec6fa0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -639,7 +639,7 @@ class SparkContext(config: SparkConf) extends Logging { // Post init _taskScheduler.postStartHook() if (isLocal) { - _env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly) + _env.metricsSystem.registerSource(env.executorSourceLocalModeOnly) } _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9fc60ac3990f..7dfd7e407bee 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.executor.ExecutorSource import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager} @@ -74,6 +75,9 @@ class SparkEnv ( @volatile private[spark] var isStopped = false private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + // Used to store executorSource, intended for local mode only + private[spark] var executorSourceLocalModeOnly: ExecutorSource = null + // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index e7f1b8f3cf17..98c4653ec5e2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -140,10 +140,10 @@ private[spark] class Executor( executorMetricsSource.foreach(_.register(env.metricsSystem)) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } else { - // This enable the registration of the executor source in local mode. + // This enables the registration of the executor source in local mode. // The actual registration happens in SparkContext, // it cannot be done here as the appId is not available yet - Executor.executorSourceLocalModeOnly = executorSource + env.executorSourceLocalModeOnly = executorSource } // Whether to load classes in user jars before those in Spark jars @@ -1019,9 +1019,6 @@ private[spark] object Executor { // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] - // Used to store executorSource, for local mode only - var executorSourceLocalModeOnly: ExecutorSource = null - /** * Whether a `Throwable` thrown from a task is a fatal error. We will use this to decide whether * to kill the executor.