diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 85a24acb97c0..7774ce6890a8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} -import org.apache.spark.executor.{ExecutorMetrics, ExecutorMetricsSource} +import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -623,6 +623,9 @@ class SparkContext(config: SparkConf) extends Logging { // Post init _taskScheduler.postStartHook() + if (isLocal) { + _env.metricsSystem.registerSource(Executor.executorSourceLocalModeOnly) + } _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _env.metricsSystem.registerSource(new JVMCPUSource()) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 54b50e6d2fa4..4035daf5449f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -135,6 +135,11 @@ private[spark] class Executor( env.metricsSystem.registerSource(new JVMCPUSource()) executorMetricsSource.foreach(_.register(env.metricsSystem)) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) + } else { + // This enable the registration of the executor source in local mode. + // The actual registration happens in SparkContext, + // it cannot be done here as the appId is not available yet + Executor.executorSourceLocalModeOnly = executorSource } // Whether to load classes in user jars before those in Spark jars @@ -979,4 +984,7 @@ private[spark] object Executor { // task is fully deserialized. When possible, the TaskContext.getLocalProperty call should be // used instead. val taskDeserializationProps: ThreadLocal[Properties] = new ThreadLocal[Properties] + + // Used to store executorSource, for local mode only + var executorSourceLocalModeOnly: ExecutorSource = null } diff --git a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala index 8f5ab7419d4f..7da1403ecd4b 100644 --- a/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/source/SourceConfigSuite.scala @@ -80,4 +80,16 @@ class SourceConfigSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-31711: Test executor source registration in local mode") { + val conf = new SparkConf() + val sc = new SparkContext("local", "test", conf) + try { + val metricsSystem = sc.env.metricsSystem + + // Executor source should be registered + assert (metricsSystem.getSourcesByName("executor").nonEmpty) + } finally { + sc.stop() + } + } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 97948f6fac4d..9e9eda0e109c 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1153,6 +1153,11 @@ This is the component with the largest amount of instrumented metrics - namespace=JVMCPU - jvmCpuTime +- namespace=executor + - **note:** These metrics are available in the driver in local mode only. + - A full list of available metrics in this + namespace can be found in the corresponding entry for the Executor component instance. + - namespace=ExecutorMetrics - **note:** these metrics are conditional to a configuration parameter: `spark.metrics.executorMetricsSource.enabled` (default is true) @@ -1165,8 +1170,7 @@ This is the component with the largest amount of instrumented metrics custom plugins into Spark. ### Component instance = Executor -These metrics are exposed by Spark executors. Note, currently they are not available -when running in local mode. +These metrics are exposed by Spark executors. - namespace=executor (metrics are of type counter or gauge) - bytesRead.count