diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index dd2d325d8703..bb30313d2f61 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -34,7 +34,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties" private[metrics] val properties = new Properties() - private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null + private[metrics] var perInstanceProperties: mutable.HashMap[String, Properties] = null + + private[metrics] var metricsNamespaceConfParam: String = null private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") @@ -57,14 +59,16 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { case _ => } - propertyCategories = subProperties(properties, INSTANCE_REGEX) - if (propertyCategories.contains(DEFAULT_PREFIX)) { - val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala - for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX); + perInstanceProperties = subProperties(properties, INSTANCE_REGEX) + if (perInstanceProperties.contains(DEFAULT_PREFIX)) { + val defaultProperty = perInstanceProperties(DEFAULT_PREFIX).asScala + for((inst, prop) <- perInstanceProperties if (inst != DEFAULT_PREFIX); (k, v) <- defaultProperty if (prop.get(k) == null)) { prop.put(k, v) } } + + metricsNamespaceConfParam = properties.getProperty("namespace-conf-param", "spark.app.id") } def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { @@ -79,9 +83,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } def getInstance(inst: String): Properties = { - propertyCategories.get(inst) match { + perInstanceProperties.get(inst) match { case Some(s) => s - case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties) + case None => perInstanceProperties.getOrElse(DEFAULT_PREFIX, new Properties) } } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 4517f465ebd3..47664fb8017d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -124,7 +124,7 @@ private[spark] class MetricsSystem private ( * application, executor/driver and metric source. */ private[spark] def buildRegistryName(source: Source): String = { - val appId = conf.getOption("spark.app.id") + val appId = conf.getOption(metricsConfig.metricsNamespaceConfParam) val executorId = conf.getOption("spark.executor.id") val defaultName = MetricRegistry.name(source.sourceName) @@ -135,8 +135,12 @@ private[spark] class MetricsSystem private ( // Only Driver and Executor set spark.app.id and spark.executor.id. // Other instance types, e.g. Master and Worker, are not related to a specific application. val warningMsg = s"Using default name $defaultName for source because %s is not set." - if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } - if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + if (appId.isEmpty) { + logWarning(warningMsg.format(metricsConfig.metricsNamespaceConfParam)) + } + if (executorId.isEmpty) { + logWarning(warningMsg.format("spark.executor.id")) + } defaultName } } else { defaultName } diff --git a/core/src/test/resources/test_metrics_config_namespace.properties b/core/src/test/resources/test_metrics_config_namespace.properties new file mode 100644 index 000000000000..aaf48800ddd9 --- /dev/null +++ b/core/src/test/resources/test_metrics_config_namespace.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +namespace-conf-param = spark.app.name + +*.sink.console.period = 10 +*.sink.console.unit = seconds +*.source.jvm.class = org.apache.spark.metrics.source.JvmSource +master.sink.console.period = 20 +master.sink.console.unit = minutes + diff --git a/core/src/test/resources/test_metrics_system_namespace.properties b/core/src/test/resources/test_metrics_system_namespace.properties new file mode 100644 index 000000000000..5e47cc2f9c35 --- /dev/null +++ b/core/src/test/resources/test_metrics_system_namespace.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +namespace-conf-param = spark.app.name + +*.sink.console.period = 10 +*.sink.console.unit = seconds +test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink +test.sink.console.period = 20 +test.sink.console.unit = minutes diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 41f2ff725a17..ef715b9623ac 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -82,6 +82,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes") val conf = new MetricsConfig(sparkConf) conf.initialize() + assert(conf.metricsNamespaceConfParam == "spark.app.id") val masterProp = conf.getInstance("master") assert(masterProp.size() === 5) @@ -134,13 +135,24 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter { assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") } + test("MetricsConfig with alternate namespace set") { + val sparkConf = new SparkConf + sparkConf.set( + "spark.metrics.conf", + getClass.getClassLoader.getResource("test_metrics_config_namespace.properties").getFile() + ) + val conf = new MetricsConfig(sparkConf) + conf.initialize() + assert(conf.metricsNamespaceConfParam == "spark.app.name") + } + test("MetricsConfig with subProperties") { val sparkConf = new SparkConf(loadDefaults = false) sparkConf.set("spark.metrics.conf", filePath) val conf = new MetricsConfig(sparkConf) conf.initialize() - val propCategories = conf.propertyCategories + val propCategories = conf.perInstanceProperties assert(propCategories.size === 3) val masterProp = conf.getInstance("master") diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index 9c389c76bf3b..480fc4a4ba5d 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -114,6 +114,32 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM assert(metricName === source.sourceName) } + test("MetricsSystem with Driver instance and alternate namespace set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.id", appId) + + val appName = "testName" + conf.set("spark.app.name", appName) + + val executorId = "driver" + conf.set("spark.executor.id", executorId) + + val alternateNamespaceFile = + getClass.getClassLoader.getResource("test_metrics_system_namespace.properties").getFile + conf.set("spark.metrics.conf", alternateNamespaceFile) + + val instanceName = "driver" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appName.$executorId.${source.sourceName}") + } + test("MetricsSystem with Executor instance") { val source = new Source { override val sourceName = "dummySource"