Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
_taskScheduler.postStartHook()
if (isLocal) {
_env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly)
_env.metricsSystem.registerSource(env.executorSourceLocalModeOnly)
}
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.ExecutorSource
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
Expand Down Expand Up @@ -74,6 +75,9 @@ class SparkEnv (
@volatile private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()

// Used to store executorSource, intended for local mode only
private[spark] var executorSourceLocalModeOnly: ExecutorSource = null

// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata =
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ private[spark] class Executor(
executorMetricsSource.foreach(_.register(env.metricsSystem))
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
} else {
// This enable the registration of the executor source in local mode.
// This enables the registration of the executor source in local mode.
// The actual registration happens in SparkContext,
// it cannot be done here as the appId is not available yet
Executor.executorSourceLocalModeOnly = executorSource
env.executorSourceLocalModeOnly = executorSource
}

// Whether to load classes in user jars before those in Spark jars
Expand Down Expand Up @@ -1019,9 +1019,6 @@ private[spark] object Executor {
// used instead.
val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties]

// Used to store executorSource, for local mode only
var executorSourceLocalModeOnly: ExecutorSource = null

/**
* Whether a `Throwable` thrown from a task is a fatal error. We will use this to decide whether
* to kill the executor.
Expand Down