From 2c3540fd9b7a59a4a2b749dfaa19a6eaa3bb3776 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 20 Apr 2015 16:59:09 +0800 Subject: [PATCH 1/4] Add a metric source for ExecutorAllocationManager --- .../spark/ExecutorAllocationManager.scala | 23 +++++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 3 +++ 2 files changed, 26 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 228d9149df2a2..8967fd8d45d11 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable +import com.codahale.metrics.{Gauge, MetricRegistry} + import org.apache.spark.scheduler._ +import org.apache.spark.metrics.source.Source import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** @@ -144,6 +147,26 @@ private[spark] class ExecutorAllocationManager( private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") + // Metric source for ExecutorAllocationManager to expose the its internal executor allocation + // status to MetricsSystem. + private[spark] val executorAllocationManagerSource = new Source { + val sourceName = "ExecutorAllocationManager" + val metricRegistry = new MetricRegistry() + + private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { + metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { + override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } + }) + } + + registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) + registerGauge("numberExecutorsPending", numExecutorsPending, 0) + registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) + registerGauge("numberAllExecutors", executorIds.size, 0) + registerGauge("numberTargetExecutors", targetNumExecutors(), 0) + registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) + } + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4ef90546a2452..3cc153b3e0e0e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _taskScheduler.postStartHook() _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler)) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) + _executorAllocationManager.foreach { e => + _env.metricsSystem.registerSource(e.executorAllocationManagerSource) + } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM From d237ba587b01d6fe297ba6702369e17cc58e2f78 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 29 Apr 2015 10:08:20 +0800 Subject: [PATCH 2/4] Address the comments --- .../spark/ExecutorAllocationManager.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8967fd8d45d11..1c07a2cb06500 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -147,25 +147,8 @@ private[spark] class ExecutorAllocationManager( private val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") - // Metric source for ExecutorAllocationManager to expose the its internal executor allocation - // status to MetricsSystem. - private[spark] val executorAllocationManagerSource = new Source { - val sourceName = "ExecutorAllocationManager" - val metricRegistry = new MetricRegistry() - - private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { - metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { - override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } - }) - } - - registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) - registerGauge("numberExecutorsPending", numExecutorsPending, 0) - registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) - registerGauge("numberAllExecutors", executorIds.size, 0) - registerGauge("numberTargetExecutors", targetNumExecutors(), 0) - registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) - } + // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. + private[spark] val executorAllocationManagerSource = new ExecutorAllocationManagerSource /** * Verify that the settings specified through the config are valid. @@ -602,6 +585,26 @@ private[spark] class ExecutorAllocationManager( } } + // Metric source for ExecutorAllocationManager to expose the its internal executor allocation + // status to MetricsSystem. + // Note: these metrics may not be stable across Spark version. + private[spark] class ExecutorAllocationManagerSource extends Source { + val sourceName = "ExecutorAllocationManager" + val metricRegistry = new MetricRegistry() + + private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = { + metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] { + override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) } + }) + } + + registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) + registerGauge("numberExecutorsPending", numExecutorsPending, 0) + registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) + registerGauge("numberAllExecutors", executorIds.size, 0) + registerGauge("numberTargetExecutors", targetNumExecutors(), 0) + registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) + } } private object ExecutorAllocationManager { From c501a2ce2e64bfd2ef21006f1c4bbc991cc8ea3f Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 30 Apr 2015 09:22:05 +0800 Subject: [PATCH 3/4] Address the comments --- .../org/apache/spark/ExecutorAllocationManager.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 1c07a2cb06500..b6749367f311c 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -148,7 +148,7 @@ private[spark] class ExecutorAllocationManager( ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem. - private[spark] val executorAllocationManagerSource = new ExecutorAllocationManagerSource + val executorAllocationManagerSource = new ExecutorAllocationManagerSource /** * Verify that the settings specified through the config are valid. @@ -585,9 +585,11 @@ private[spark] class ExecutorAllocationManager( } } - // Metric source for ExecutorAllocationManager to expose the its internal executor allocation - // status to MetricsSystem. - // Note: these metrics may not be stable across Spark version. + /** + * Metric source for ExecutorAllocationManager to expose its internal executor allocation + * status to MetricsSystem. + * Note: These metrics may not be stable across Spark version. + */ private[spark] class ExecutorAllocationManagerSource extends Source { val sourceName = "ExecutorAllocationManager" val metricRegistry = new MetricRegistry() From 104d155200cd759be2619e1a1e46cf43c76cfe45 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Sat, 2 May 2015 13:26:55 +0800 Subject: [PATCH 4/4] rebase and address the comments --- .../scala/org/apache/spark/ExecutorAllocationManager.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b6749367f311c..66bda68088502 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -588,7 +588,9 @@ private[spark] class ExecutorAllocationManager( /** * Metric source for ExecutorAllocationManager to expose its internal executor allocation * status to MetricsSystem. - * Note: These metrics may not be stable across Spark version. + * Note: These metrics heavily rely on the internal implementation of + * ExecutorAllocationManager, metrics or value of metrics will be changed when internal + * implementation is changed, so these metrics are not stable across Spark version. */ private[spark] class ExecutorAllocationManagerSource extends Source { val sourceName = "ExecutorAllocationManager" @@ -601,10 +603,9 @@ private[spark] class ExecutorAllocationManager( } registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) - registerGauge("numberExecutorsPending", numExecutorsPending, 0) registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0) registerGauge("numberAllExecutors", executorIds.size, 0) - registerGauge("numberTargetExecutors", targetNumExecutors(), 0) + registerGauge("numberTargetExecutors", numExecutorsTarget, 0) registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0) } }