From a97563e210b72c8800b9eb4adb2697e8e3ac7b9a Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Mon, 18 Jul 2016 22:38:31 -0700 Subject: [PATCH 1/6] [SPARK-5847][CORE] Allow for configuring MetricsSystem's use of app ID to namespace all metrics Adding a new property to SparkConf called spark.metrics.namespace that allows users to set a custom namespace for executor and driver metrics in the metrics systems. By default, the root namespace used for driver or executor metrics is the value of `spark.app.id`. However, often times, users want to be able to track the metrics across apps for driver and executor metrics, which is hard to do with application ID (i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, users can set the `spark.metrics.namespace` property to another spark configuration key like `spark.app.name` which is then used to populate the root namespace of the metrics system (with the app name in our example). `spark.metrics.namespace` property can be set to any arbitrary spark property key, whose value would be used to set the root namespace of the metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the `spark.metrics.namespace` property have any such affect on such metrics. --- .../apache/spark/metrics/MetricsConfig.scala | 59 ++++++++++++-- .../apache/spark/metrics/MetricsSystem.scala | 38 +++++---- .../spark/metrics/MetricsConfigSuite.scala | 18 ++++- .../spark/metrics/MetricsSystemSuite.scala | 81 +++++++++++++++++++ docs/monitoring.md | 12 +++ 5 files changed, 182 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 979782ea40fd..327d4c4897e9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -33,9 +33,16 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_PREFIX = "*" private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" + // This is intetionally made to not fall the prefix (spark.metrics.conf) because it's not + // intended to be a _real_ metrics property, for example, its first part before the dot + // doesn't represent the instance (master, worker, etc.). Instead, it's used to configure the + // metrics systems. Where accessed, this should be namespace property should be directly accessed + // from SparkConf using its full name, represented here. + private val NAMESPACE_CONFIG_PROPERTY = "spark.metrics.namespace" private[metrics] val properties = new Properties() - private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null + private[metrics] var metricsNamespace: String = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -44,6 +51,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } + /** + * Load properties from various places, based on precedence + * If the same property is set again latter on in the method, it overwrites the previous value + */ def initialize() { // Add default properties in case there's no properties file setDefaultProperties(properties) @@ -58,16 +69,48 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { case _ => } - propertyCategories = subProperties(properties, INSTANCE_REGEX) - if (propertyCategories.contains(DEFAULT_PREFIX)) { - val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala - for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX); - (k, v) <- defaultProperty if (prop.get(k) == null)) { + // Now, let's populate a list of sub-properties per instance, instance being the prefix that + // appears before the first dot in the property name. + // Add to the sub-properties per instance, the default properties (those with prefix "*"), as + // as they don't have a more specific sub-property already defined. + // + // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path, + // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be + // ("driver"->Map("path"->"driver_path", "class"->"default_class") + // Note how class got added to based on the default property, but path remained the same + // since "driver.path" already existed and took precedence over "*.path" + // + perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX) + if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) { + val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala + for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX); + (k, v) <- defaultSubProperties if (prop.get(k) == null)) { prop.put(k, v) } } + metricsNamespace = conf.getOption(NAMESPACE_CONFIG_PROPERTY).getOrElse("spark.app.id") } + /** + * Take a simple set of properties and a regex that the property names have to conform to. + * And, return a map of the first order prefix (before the first dot) to the subproperties under + * that prefix. + * + * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1", + * "*.sink.servlet.path"->"path1"), the returned map would be + * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1")) + * Note in the subProperties (value of the returned Map), only the suffixes are used as property + * keys. + * If, in the passed properties, there is only one property with a given prefix, it is still + * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1" + * the returned Map would contain one key-value pair + * Map("*" -> Properties("sink.servlet.class" -> "class1")) + * Any passed in properties, not complying with the regex are ignored. + * + * @param prop the flat list of properties to "unflatten" based on prefixes + * @param regex the regex that the prefix has to comply with + * @return an unflatted map, mapping prefix with sub-properties under that prefix + */ def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { val subProperties = new mutable.HashMap[String, Properties] prop.asScala.foreach { kv => @@ -80,9 +123,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } def getInstance(inst: String): Properties = { - propertyCategories.get(inst) match { + perInstanceSubProperties.get(inst) match { case Some(s) => s - case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties) + case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 9b16c116ae5a..a071b311db96 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -32,24 +32,24 @@ import org.apache.spark.metrics.source.{Source, StaticSources} import org.apache.spark.util.Utils /** - * Spark Metrics System, created by specific "instance", combined by source, - * sink, periodically poll source metrics data to sink destinations. + * Spark Metrics System, created by a specific "instance", combined by source, + * sink, periodically polls source metrics data to sink destinations. * - * "instance" specify "who" (the role) use metrics system. In spark there are several roles - * like master, worker, executor, client driver, these roles will create metrics system - * for monitoring. So instance represents these roles. Currently in Spark, several instances + * "instance" specifies "who" (the role) uses the metrics system. In Spark, there are several roles + * like master, worker, executor, client driver. These roles will create metrics system + * for monitoring. So, "instance" represents these roles. Currently in Spark, several instances * have already implemented: master, worker, executor, driver, applications. * - * "source" specify "where" (source) to collect metrics data. In metrics system, there exists + * "source" specifies "where" (source) to collect metrics data from. In metrics system, there exists * two kinds of source: * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect * Spark component's internal state, these sources are related to instance and will be - * added after specific metrics system is created. + * added after a specific metrics system is created. * 2. Common source, like JvmSource, which will collect low level state, is configured by * configuration and loaded through reflection. * - * "sink" specify "where" (destination) to output metrics data to. Several sinks can be - * coexisted and flush metrics to all these sinks. + * "sink" specifies "where" (destination) to output metrics data to. Several sinks can + * coexist and metrics can be flushed to all these sinks. * * Metrics configuration format is like below: * [instance].[sink|source].[name].[options] = xxxx @@ -62,9 +62,9 @@ import org.apache.spark.util.Utils * [sink|source] means this property belongs to source or sink. This field can only be * source or sink. * - * [name] specify the name of sink or source, it is custom defined. + * [name] specify the name of sink or source, if it is custom defined. * - * [options] is the specific property of this source or sink. + * [options] represent the specific property of this source or sink. */ private[spark] class MetricsSystem private ( val instance: String, @@ -125,19 +125,23 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val appId = conf.getOption("spark.app.id") + val metricsNamespace = conf.getOption(metricsConfig.metricsNamespace) val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) if (instance == "driver" || instance == "executor") { - if (appId.isDefined && executorId.isDefined) { - MetricRegistry.name(appId.get, executorId.get, source.sourceName) + if (metricsNamespace.isDefined && executorId.isDefined) { + MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName) } else { // Only Driver and Executor set spark.app.id and spark.executor.id. // Other instance types, e.g. Master and Worker, are not related to a specific application. - val warningMsg = s"Using default name $defaultName for source because %s is not set." - if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } - if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + val warningMsgFormat = s"Using default name $defaultName for source because %s is not set." + if (metricsNamespace.isEmpty) { + logWarning(warningMsgFormat.format(metricsConfig.metricsNamespace)) + } + if (executorId.isEmpty) { + logWarning(warningMsgFormat.format("spark.executor.id")) + } defaultName } } else { defaultName } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index b24f5d732f29..85fb3d2cc8f4 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -37,6 +37,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(conf.properties.size() === 4) assert(conf.properties.getProperty("test-for-dummy") === null) + assert(conf.metricsNamespace === "spark.app.id") val property = conf.getInstance("random") assert(property.size() === 2) @@ -70,6 +71,8 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") + + assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with properties set from a Spark configuration") { @@ -101,6 +104,8 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") + + assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with properties set from a file and a Spark configuration") { @@ -131,6 +136,8 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") + + assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with subProperties") { @@ -139,7 +146,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { val conf = new MetricsConfig(sparkConf) conf.initialize() - val propCategories = conf.propertyCategories + val propCategories = conf.perInstanceSubProperties assert(propCategories.size === 3) val masterProp = conf.getInstance("master") @@ -159,6 +166,15 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(servletProps.size() === 2) } + test("MetricsConfig with alternate namespace set") { + val sparkConf = new SparkConf(loadDefaults = false) + sparkConf.set("spark.metrics.conf", filePath) + sparkConf.set("spark.metrics.namespace", "spark.app.name") + val conf = new MetricsConfig(sparkConf) + conf.initialize() + assert(conf.metricsNamespace == "spark.app.name") + } + private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = { conf.set(s"spark.metrics.conf.$name", value) } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 2400832f6eea..865a4f5c6dbf 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -183,4 +183,85 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName != s"$appId.$executorId.${source.sourceName}") assert(metricName === source.sourceName) } + + test("MetricsSystem with Executor instance, with custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "1" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set("spark.executor.id", executorId) + conf.set("spark.metrics.namespace", "spark.app.name") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appName.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance and custom namespace which is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val executorId = "1" + conf.set("spark.executor.id", executorId) + conf.set("spark.metrics.namespace", "spark.app.name") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with Executor instance and custom namespace and spark.executor.id is not " + + "set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.name", appId) + conf.set("spark.metrics.namespace", "spark.app.name") + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with instance which is neither Driver nor Executor and custom namespace") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val appName = "testName" + val executorId = "dummyExecutorId" + conf.set("spark.app.id", appId) + conf.set("spark.app.name", appName) + conf.set("spark.metrics.namespace", "spark.app.name") + conf.set("spark.executor.id", executorId) + + val instanceName = "testInstance" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name. + assert(metricName != s"$appId.$executorId.${source.sourceName}") + assert(metricName === source.sourceName) + } + } diff --git a/docs/monitoring.md b/docs/monitoring.md index c8694762ffd7..75f284e93bb5 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -346,6 +346,18 @@ This allows users to report Spark metrics to a variety of sinks including HTTP, files. The metrics system is configured via a configuration file that Spark expects to be present at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the `spark.metrics.conf` [configuration property](configuration.html#spark-properties). +Also, a custom namespace can be specified to report metrics using `spark.metrics.namespace` +configuration property. By default, the root namespace used for driver or executor metrics is +the value of `spark.app.id`. However, often times, users want to be able to track the metrics +across apps for driver and executor metrics, which is hard to do with application ID +(i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, +users can set the `spark.metrics.namespace` property to another spark configuration key like +`spark.app.name` which is then used to populate the root namespace of the metrics system +(with the app name in our example). `spark.metrics.namespace` property can be set to any +arbitrary spark property key, whose value would be used to set the root namespace of the +metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor +does the `spark.metrics.namespace` property have any such affect on such metrics. + Spark's metrics are decoupled into different _instances_ corresponding to Spark components. Within each instance, you can configure a set of sinks to which metrics are reported. The following instances are currently supported: From 95356ca83e1b6838aa2fccaff8c1c2526fddbf56 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Tue, 19 Jul 2016 12:44:55 -0700 Subject: [PATCH 2/6] Minor fixes based on self-reviewing --- .../org/apache/spark/metrics/MetricsConfig.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 327d4c4897e9..046561c63c5f 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -33,10 +33,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_PREFIX = "*" private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" - // This is intetionally made to not fall the prefix (spark.metrics.conf) because it's not + // This is intetionally made to not fall in the prefix (spark.metrics.conf) because it's not // intended to be a _real_ metrics property, for example, its first part before the dot // doesn't represent the instance (master, worker, etc.). Instead, it's used to configure the - // metrics systems. Where accessed, this should be namespace property should be directly accessed + // metrics systems. Where accessed, this namespace property should be directly accessed // from SparkConf using its full name, represented here. private val NAMESPACE_CONFIG_PROPERTY = "spark.metrics.namespace" @@ -71,8 +71,8 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { // Now, let's populate a list of sub-properties per instance, instance being the prefix that // appears before the first dot in the property name. - // Add to the sub-properties per instance, the default properties (those with prefix "*"), as - // as they don't have a more specific sub-property already defined. + // Add to the sub-properties per instance, the default properties (those with prefix "*"), if + // they don't have that exact same sub-property already defined. // // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path, // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be @@ -92,9 +92,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } /** - * Take a simple set of properties and a regex that the property names have to conform to. - * And, return a map of the first order prefix (before the first dot) to the subproperties under - * that prefix. + * Take a simple set of properties and a regex that the instance names (part before the first dot) + * have to conform to. And, return a map of the first order prefix (before the first dot) to the + * sub-properties under that prefix. * * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1", * "*.sink.servlet.path"->"path1"), the returned map would be From 6b02fe189482146a9f4b44b39de8c12745249125 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Fri, 22 Jul 2016 18:59:51 -0700 Subject: [PATCH 3/6] Basing the work to use expansion capabilities brought in by SPARK-16272 --- .../apache/spark/internal/config/package.scala | 5 +++++ .../apache/spark/metrics/MetricsConfig.scala | 8 -------- .../apache/spark/metrics/MetricsSystem.scala | 12 ++++++++---- .../spark/metrics/MetricsConfigSuite.scala | 16 ---------------- .../spark/metrics/MetricsSystemSuite.scala | 15 ++++++++++----- docs/monitoring.md | 18 +++++++++--------- 6 files changed, 32 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ebb21e9efd38..cb75716d1027 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -108,4 +108,9 @@ package object config { ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") .intConf .createWithDefault(10000) + + // This property sets the root namespace for metrics reporting + private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") + .stringConf + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 046561c63c5f..a4056508c181 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -33,16 +33,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_PREFIX = "*" private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" - // This is intetionally made to not fall in the prefix (spark.metrics.conf) because it's not - // intended to be a _real_ metrics property, for example, its first part before the dot - // doesn't represent the instance (master, worker, etc.). Instead, it's used to configure the - // metrics systems. Where accessed, this namespace property should be directly accessed - // from SparkConf using its full name, represented here. - private val NAMESPACE_CONFIG_PROPERTY = "spark.metrics.namespace" private[metrics] val properties = new Properties() private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null - private[metrics] var metricsNamespace: String = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -88,7 +81,6 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { prop.put(k, v) } } - metricsNamespace = conf.getOption(NAMESPACE_CONFIG_PROPERTY).getOrElse("spark.app.id") } /** diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index a071b311db96..cd781ff017b5 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -26,6 +26,7 @@ import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.metrics.sink.{MetricsServlet, Sink} import org.apache.spark.metrics.source.{Source, StaticSources} @@ -125,7 +126,9 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val metricsNamespace = conf.getOption(metricsConfig.metricsNamespace) + val metricsNamespace = conf.get(METRICS_NAMESPACE).map(Some(_)) + .getOrElse(conf.getOption("spark.app.id")) + val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) @@ -135,12 +138,13 @@ private[spark] class MetricsSystem private ( } else { // Only Driver and Executor set spark.app.id and spark.executor.id. // Other instance types, e.g. Master and Worker, are not related to a specific application. - val warningMsgFormat = s"Using default name $defaultName for source because %s is not set." if (metricsNamespace.isEmpty) { - logWarning(warningMsgFormat.format(metricsConfig.metricsNamespace)) + logWarning(s"Using default name $defaultName for source because neither " + + s"${METRICS_NAMESPACE.key} nor spark.app.id is set.") } if (executorId.isEmpty) { - logWarning(warningMsgFormat.format("spark.executor.id")) + logWarning(s"Using default name $defaultName for source because spark.executor.id is " + + s"not set.") } defaultName } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 85fb3d2cc8f4..a85011b42bbc 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -37,7 +37,6 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(conf.properties.size() === 4) assert(conf.properties.getProperty("test-for-dummy") === null) - assert(conf.metricsNamespace === "spark.app.id") val property = conf.getInstance("random") assert(property.size() === 2) @@ -71,8 +70,6 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") - - assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with properties set from a Spark configuration") { @@ -104,8 +101,6 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") - - assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with properties set from a file and a Spark configuration") { @@ -136,8 +131,6 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") - - assert(conf.metricsNamespace === "spark.app.id") } test("MetricsConfig with subProperties") { @@ -166,15 +159,6 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(servletProps.size() === 2) } - test("MetricsConfig with alternate namespace set") { - val sparkConf = new SparkConf(loadDefaults = false) - sparkConf.set("spark.metrics.conf", filePath) - sparkConf.set("spark.metrics.namespace", "spark.app.name") - val conf = new MetricsConfig(sparkConf) - conf.initialize() - assert(conf.metricsNamespace == "spark.app.name") - } - private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = { conf.set(s"spark.metrics.conf.$name", value) } diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 865a4f5c6dbf..be516edfcd1d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.master.MasterSource +import org.apache.spark.internal.config._ import org.apache.spark.metrics.source.{Source, StaticSources} class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester{ @@ -196,7 +197,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM conf.set("spark.app.id", appId) conf.set("spark.app.name", appName) conf.set("spark.executor.id", executorId) - conf.set("spark.metrics.namespace", "spark.app.name") + conf.set(METRICS_NAMESPACE, "${spark.app.name}") val instanceName = "executor" val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) @@ -212,14 +213,18 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM } val executorId = "1" + val namespaceToResolve = "${spark.doesnotexist}" conf.set("spark.executor.id", executorId) - conf.set("spark.metrics.namespace", "spark.app.name") + conf.set(METRICS_NAMESPACE, namespaceToResolve) val instanceName = "executor" val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) val metricName = driverMetricsSystem.buildRegistryName(source) - assert(metricName === source.sourceName) + // If the user set the spark.metrics.namespace property to an expansion of another property + // (say ${spark.doesnotexist}, the unresolved name (i.e. litterally ${spark.doesnot}) + // is used as the root logger name. + assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}") } test("MetricsSystem with Executor instance and custom namespace and spark.executor.id is not " + @@ -231,7 +236,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val appId = "testId" conf.set("spark.app.name", appId) - conf.set("spark.metrics.namespace", "spark.app.name") + conf.set(METRICS_NAMESPACE, "${spark.app.name}") val instanceName = "executor" val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) @@ -251,7 +256,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val executorId = "dummyExecutorId" conf.set("spark.app.id", appId) conf.set("spark.app.name", appName) - conf.set("spark.metrics.namespace", "spark.app.name") + conf.set("spark.metrics.namespace", "${spark.app.name}") conf.set("spark.executor.id", executorId) val instanceName = "testInstance" diff --git a/docs/monitoring.md b/docs/monitoring.md index 75f284e93bb5..6fdf87b4be57 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -346,17 +346,17 @@ This allows users to report Spark metrics to a variety of sinks including HTTP, files. The metrics system is configured via a configuration file that Spark expects to be present at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the `spark.metrics.conf` [configuration property](configuration.html#spark-properties). -Also, a custom namespace can be specified to report metrics using `spark.metrics.namespace` -configuration property. By default, the root namespace used for driver or executor metrics is +By default, the root namespace used for driver or executor metrics is the value of `spark.app.id`. However, often times, users want to be able to track the metrics -across apps for driver and executor metrics, which is hard to do with application ID +across apps for driver and executors, which is hard to do with application ID (i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, -users can set the `spark.metrics.namespace` property to another spark configuration key like -`spark.app.name` which is then used to populate the root namespace of the metrics system -(with the app name in our example). `spark.metrics.namespace` property can be set to any -arbitrary spark property key, whose value would be used to set the root namespace of the -metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor -does the `spark.metrics.namespace` property have any such affect on such metrics. +a custom namespace can be specified for metrics reporting using `spark.metrics.namespace` +configuration property. +If, say, users wanted to set the metrics namespace to the name of the application, they +can set the `spark.metrics.namespace` property to a value like `${spark.app.name}`. This value is +then expanded appropriately by Spark and is used as the root namespace of the metrics system. +Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the +`spark.metrics.namespace` property have any such affect on such metrics. Spark's metrics are decoupled into different _instances_ corresponding to Spark components. Within each instance, you can configure a From de1d431755a4d3606cc1f354a59dc4da916f2570 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Tue, 26 Jul 2016 16:21:27 -0700 Subject: [PATCH 4/6] Incorporating review feedback --- .../main/scala/org/apache/spark/metrics/MetricsSystem.scala | 3 +-- .../scala/org/apache/spark/metrics/MetricsSystemSuite.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index cd781ff017b5..1d494500cdb5 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -126,8 +126,7 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val metricsNamespace = conf.get(METRICS_NAMESPACE).map(Some(_)) - .getOrElse(conf.getOption("spark.app.id")) + val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id")) val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index be516edfcd1d..ee36d87aaf44 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -222,7 +222,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val metricName = driverMetricsSystem.buildRegistryName(source) // If the user set the spark.metrics.namespace property to an expansion of another property - // (say ${spark.doesnotexist}, the unresolved name (i.e. litterally ${spark.doesnot}) + // (say ${spark.doesnotexist}, the unresolved name (i.e. literally ${spark.doesnotexist}) // is used as the root logger name. assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}") } From b9c9a7aa2b831247ae04d655f537223a02bc8440 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Tue, 26 Jul 2016 16:36:13 -0700 Subject: [PATCH 5/6] Shorterning test names --- .../org/apache/spark/metrics/MetricsSystemSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index ee36d87aaf44..878e18c8affc 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -206,7 +206,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === s"$appName.$executorId.${source.sourceName}") } - test("MetricsSystem with Executor instance and custom namespace which is not set") { + test("MetricsSystem with Executor instance, custom namespace which is not set") { val source = new Source { override val sourceName = "dummySource" override val metricRegistry = new MetricRegistry() @@ -227,8 +227,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}") } - test("MetricsSystem with Executor instance and custom namespace and spark.executor.id is not " + - "set") { + test("MetricsSystem with Executor instance, custom namespace, spark.executor.id not set") { val source = new Source { override val sourceName = "dummySource" override val metricRegistry = new MetricRegistry() @@ -245,7 +244,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === source.sourceName) } - test("MetricsSystem with instance which is neither Driver nor Executor and custom namespace") { + test("MetricsSystem with non-driver, non-executor instance with custom namespace") { val source = new Source { override val sourceName = "dummySource" override val metricRegistry = new MetricRegistry() From 8923c58d324b8083ffb423d165f4707ec4395db2 Mon Sep 17 00:00:00 2001 From: Mark Grover Date: Tue, 26 Jul 2016 16:54:34 -0700 Subject: [PATCH 6/6] Making the last test use set(ConfigEntry) as well, just like the other newly added tests --- .../scala/org/apache/spark/metrics/MetricsSystemSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 878e18c8affc..61db6af830cc 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -255,7 +255,7 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM val executorId = "dummyExecutorId" conf.set("spark.app.id", appId) conf.set("spark.app.name", appName) - conf.set("spark.metrics.namespace", "${spark.app.name}") + conf.set(METRICS_NAMESPACE, "${spark.app.name}") conf.set("spark.executor.id", executorId) val instanceName = "testInstance"