Skip to content

Commit 7d0204e

Browse files
committed
allow configuring of MetricsSystem namespace
previously always namespaced metrics with app ID; now metrics- config-file can specify a different Spark conf param to use, e.g. app name.
1 parent 072896d commit 7d0204e

File tree

6 files changed

+98
-3
lines changed

6 files changed

+98
-3
lines changed

core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
3636
private[metrics] val properties = new Properties()
3737
private[metrics] var perInstanceProperties: mutable.HashMap[String, Properties] = null
3838

39+
private[metrics] var metricsNamespaceConfParam: String = null
40+
3941
private def setDefaultProperties(prop: Properties) {
4042
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
4143
prop.setProperty("*.sink.servlet.path", "/metrics/json")
@@ -65,6 +67,8 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
6567
prop.put(k, v)
6668
}
6769
}
70+
71+
metricsNamespaceConfParam = properties.getProperty("namespace-conf-param", "spark.app.id")
6872
}
6973

7074
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {

core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ private[spark] class MetricsSystem private (
124124
* application, executor/driver and metric source.
125125
*/
126126
private[spark] def buildRegistryName(source: Source): String = {
127-
val appId = conf.getOption("spark.app.id")
127+
val appId = conf.getOption(metricsConfig.metricsNamespaceConfParam)
128128
val executorId = conf.getOption("spark.executor.id")
129129
val defaultName = MetricRegistry.name(source.sourceName)
130130

@@ -135,8 +135,12 @@ private[spark] class MetricsSystem private (
135135
// Only Driver and Executor set spark.app.id and spark.executor.id.
136136
// Other instance types, e.g. Master and Worker, are not related to a specific application.
137137
val warningMsg = s"Using default name $defaultName for source because %s is not set."
138-
if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
139-
if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
138+
if (appId.isEmpty) {
139+
logWarning(warningMsg.format(metricsConfig.metricsNamespaceConfParam))
140+
}
141+
if (executorId.isEmpty) {
142+
logWarning(warningMsg.format("spark.executor.id"))
143+
}
140144
defaultName
141145
}
142146
} else { defaultName }
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
namespace-conf-param = spark.app.name
19+
20+
*.sink.console.period = 10
21+
*.sink.console.unit = seconds
22+
*.source.jvm.class = org.apache.spark.metrics.source.JvmSource
23+
master.sink.console.period = 20
24+
master.sink.console.unit = minutes
25+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
namespace-conf-param = spark.app.name
19+
20+
*.sink.console.period = 10
21+
*.sink.console.unit = seconds
22+
test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink
23+
test.sink.console.period = 20
24+
test.sink.console.unit = minutes

core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
8282
setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
8383
val conf = new MetricsConfig(sparkConf)
8484
conf.initialize()
85+
assert(conf.metricsNamespaceConfParam == "spark.app.id")
8586

8687
val masterProp = conf.getInstance("master")
8788
assert(masterProp.size() === 5)
@@ -134,6 +135,17 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
134135
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
135136
}
136137

138+
test("MetricsConfig with alternate namespace set") {
139+
val sparkConf = new SparkConf
140+
sparkConf.set(
141+
"spark.metrics.conf",
142+
getClass.getClassLoader.getResource("test_metrics_config_namespace.properties").getFile()
143+
)
144+
val conf = new MetricsConfig(sparkConf)
145+
conf.initialize()
146+
assert(conf.metricsNamespaceConfParam == "spark.app.name")
147+
}
148+
137149
test("MetricsConfig with subProperties") {
138150
val sparkConf = new SparkConf(loadDefaults = false)
139151
sparkConf.set("spark.metrics.conf", filePath)

core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,32 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
114114
assert(metricName === source.sourceName)
115115
}
116116

117+
test("MetricsSystem with Driver instance and alternate namespace set") {
118+
val source = new Source {
119+
override val sourceName = "dummySource"
120+
override val metricRegistry = new MetricRegistry()
121+
}
122+
123+
val appId = "testId"
124+
conf.set("spark.app.id", appId)
125+
126+
val appName = "testName"
127+
conf.set("spark.app.name", appName)
128+
129+
val executorId = "driver"
130+
conf.set("spark.executor.id", executorId)
131+
132+
val alternateNamespaceFile =
133+
getClass.getClassLoader.getResource("test_metrics_system_namespace.properties").getFile
134+
conf.set("spark.metrics.conf", alternateNamespaceFile)
135+
136+
val instanceName = "driver"
137+
val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
138+
139+
val metricName = driverMetricsSystem.buildRegistryName(source)
140+
assert(metricName === s"$appName.$executorId.${source.sourceName}")
141+
}
142+
117143
test("MetricsSystem with Executor instance") {
118144
val source = new Source {
119145
override val sourceName = "dummySource"

0 commit comments

Comments
 (0)