Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"

private[metrics] val properties = new Properties()
private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null
private[metrics] var perInstanceProperties: mutable.HashMap[String, Properties] = null

private[metrics] var metricsNamespaceConfParam: String = null

private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
Expand All @@ -57,14 +59,16 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
case _ =>
}

propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala
for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX);
perInstanceProperties = subProperties(properties, INSTANCE_REGEX)
if (perInstanceProperties.contains(DEFAULT_PREFIX)) {
val defaultProperty = perInstanceProperties(DEFAULT_PREFIX).asScala
for((inst, prop) <- perInstanceProperties if (inst != DEFAULT_PREFIX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since you're touching this... for (

(k, v) <- defaultProperty if (prop.get(k) == null)) {
prop.put(k, v)
}
}

metricsNamespaceConfParam = properties.getProperty("namespace-conf-param", "spark.app.id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a fan of this config name. How about one of:

  • metrics.namespace
  • metrics.namePrefix
  • metrics.appName

Or something like those?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In graphite-environment you're calling something like this a prefix. How about 'metrics.appPrefix' ?

}

def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
Expand All @@ -79,9 +83,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
}

def getInstance(inst: String): Properties = {
propertyCategories.get(inst) match {
perInstanceProperties.get(inst) match {
case Some(s) => s
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
case None => perInstanceProperties.getOrElse(DEFAULT_PREFIX, new Properties)
}
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private[spark] class MetricsSystem private (
* application, executor/driver and metric source.
*/
private[spark] def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val appId = conf.getOption(metricsConfig.metricsNamespaceConfParam)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably a good idea to change the name of this variable now.

val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)

Expand All @@ -135,8 +135,12 @@ private[spark] class MetricsSystem private (
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
val warningMsg = s"Using default name $defaultName for source because %s is not set."
if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
if (appId.isEmpty) {
logWarning(warningMsg.format(metricsConfig.metricsNamespaceConfParam))
}
if (executorId.isEmpty) {
logWarning(warningMsg.format("spark.executor.id"))
}
defaultName
}
} else { defaultName }
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/resources/test_metrics_config_namespace.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

namespace-conf-param = spark.app.name

*.sink.console.period = 10
*.sink.console.unit = seconds
*.source.jvm.class = org.apache.spark.metrics.source.JvmSource
master.sink.console.period = 20
master.sink.console.unit = minutes

24 changes: 24 additions & 0 deletions core/src/test/resources/test_metrics_system_namespace.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

namespace-conf-param = spark.app.name

*.sink.console.period = 10
*.sink.console.unit = seconds
test.sink.console.class = org.apache.spark.metrics.sink.ConsoleSink
test.sink.console.period = 20
test.sink.console.unit = minutes
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
val conf = new MetricsConfig(sparkConf)
conf.initialize()
assert(conf.metricsNamespaceConfParam == "spark.app.id")

val masterProp = conf.getInstance("master")
assert(masterProp.size() === 5)
Expand Down Expand Up @@ -134,13 +135,24 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}

test("MetricsConfig with alternate namespace set") {
val sparkConf = new SparkConf
sparkConf.set(
"spark.metrics.conf",
getClass.getClassLoader.getResource("test_metrics_config_namespace.properties").getFile()
)
val conf = new MetricsConfig(sparkConf)
conf.initialize()
assert(conf.metricsNamespaceConfParam == "spark.app.name")
}

test("MetricsConfig with subProperties") {
val sparkConf = new SparkConf(loadDefaults = false)
sparkConf.set("spark.metrics.conf", filePath)
val conf = new MetricsConfig(sparkConf)
conf.initialize()

val propCategories = conf.propertyCategories
val propCategories = conf.perInstanceProperties
assert(propCategories.size === 3)

val masterProp = conf.getInstance("master")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,32 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
assert(metricName === source.sourceName)
}

test("MetricsSystem with Driver instance and alternate namespace set") {
val source = new Source {
override val sourceName = "dummySource"
override val metricRegistry = new MetricRegistry()
}

val appId = "testId"
conf.set("spark.app.id", appId)

val appName = "testName"
conf.set("spark.app.name", appName)

val executorId = "driver"
conf.set("spark.executor.id", executorId)

val alternateNamespaceFile =
getClass.getClassLoader.getResource("test_metrics_system_namespace.properties").getFile
conf.set("spark.metrics.conf", alternateNamespaceFile)

val instanceName = "driver"
val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)

val metricName = driverMetricsSystem.buildRegistryName(source)
assert(metricName === s"$appName.$executorId.${source.sourceName}")
}

test("MetricsSystem with Executor instance") {
val source = new Source {
override val sourceName = "dummySource"
Expand Down