Skip to content

Commit aadd699

Browse files
author
Reza Safi
committed
Exposing Procfsmetrics to metric system
1 parent d811369 commit aadd699

File tree

4 files changed

+98
-10
lines changed

4 files changed

+98
-10
lines changed

core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,21 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L
6262
SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
6363
val shouldLogStageExecutorProcessTreeMetrics =
6464
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
65-
procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
65+
val shouldAddProcessTreeMetricsToMetricsSet =
66+
SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS)
67+
val pickEitherUIOrMetricsSet = shouldLogStageExecutorProcessTreeMetrics ^
68+
shouldAddProcessTreeMetricsToMetricsSet
69+
val areBothUIMetricsEnabled = shouldLogStageExecutorProcessTreeMetrics &&
70+
shouldAddProcessTreeMetricsToMetricsSet
71+
if (areBothUIMetricsEnabled) {
72+
logWarning("You have enabled " +
73+
"both spark.eventLog.logStageExecutorProcessTreeMetrics.enabled" +
74+
" and spark.metrics.logStageExecutorProcessTreeMetrics.enabled. This isn't " +
75+
"allowed. As a result Procfs metrics won't be reported to UI or Metricsset")
76+
}
77+
(procDirExists.get && shouldLogStageExecutorMetrics && pickEitherUIOrMetricsSet) ||
78+
(procDirExists.get && !shouldLogStageExecutorMetrics &&
79+
pickEitherUIOrMetricsSet && shouldAddProcessTreeMetricsToMetricsSet)
6680
}
6781
}
6882

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import com.codahale.metrics.{Gauge, MetricRegistry}
21+
22+
import org.apache.spark.internal.config
23+
import org.apache.spark.metrics.source.Source
24+
import org.apache.spark.SparkEnv
25+
26+
private[executor] class ProcfsMetricsSource extends Source {
27+
override val sourceName = "procfs"
28+
override val metricRegistry = new MetricRegistry()
29+
var numMetrics: Int = 0
30+
var metrics: Map[String, Long] = Map.empty
31+
val shouldAddProcessTreeMetricsToMetricsSet =
32+
SparkEnv.get.conf.get(config.METRICS_PROCESS_TREE_METRICS)
33+
34+
private def getProcfsMetrics: Map[String, Long] = {
35+
if (numMetrics == 0) {
36+
metrics = Map.empty
37+
val p = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics()
38+
metrics = Map("ProcessTreeJVMVMemory" -> p.jvmVmemTotal,
39+
"ProcessTreeJVMRSSMemory" -> p.jvmRSSTotal,
40+
"ProcessTreePythonVMemory" -> p.pythonVmemTotal,
41+
"ProcessTreePythonRSSMemory" -> p.pythonRSSTotal,
42+
"ProcessTreeOtherVMemory" -> p.otherVmemTotal,
43+
"ProcessTreeOtherRSSMemory" -> p.otherRSSTotal)
44+
}
45+
numMetrics = numMetrics + 1
46+
if (numMetrics == 6) {
47+
numMetrics = 0}
48+
metrics
49+
}
50+
private def registerProcfsMetrics[Long]( name: String) = {
51+
metricRegistry.register(MetricRegistry.name("processTree", name), new Gauge[Long] {
52+
override def getValue: Long = getProcfsMetrics(name).asInstanceOf[Long]
53+
})
54+
}
55+
56+
if (shouldAddProcessTreeMetricsToMetricsSet) {
57+
registerProcfsMetrics("ProcessTreeJVMVMemory")
58+
registerProcfsMetrics("ProcessTreeJVMRSSMemory")
59+
registerProcfsMetrics("ProcessTreePythonVMemory")
60+
registerProcfsMetrics("ProcessTreePythonRSSMemory")
61+
registerProcfsMetrics("ProcessTreeOtherVMemory")
62+
registerProcfsMetrics("ProcessTreeOtherRSSMemory")
63+
}
64+
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ package object config {
9898
.booleanConf
9999
.createWithDefault(false)
100100

101+
private[spark] val METRICS_PROCESS_TREE_METRICS =
102+
ConfigBuilder("spark.metrics.logStageExecutorProcessTreeMetrics.enabled")
103+
.booleanConf
104+
.createWithDefault(false)
105+
101106
private[spark] val EVENT_LOG_OVERWRITE =
102107
ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
103108

core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import javax.management.ObjectName
2222
import scala.collection.mutable
2323

2424
import org.apache.spark.executor.ProcfsMetricsGetter
25+
import org.apache.spark.internal.config
2526
import org.apache.spark.memory.MemoryManager
27+
import org.apache.spark.SparkEnv
28+
2629

2730
/**
2831
* Executor metric types for executor-level metrics stored in ExecutorMetrics.
@@ -85,16 +88,19 @@ case object ProcessTreeMetrics extends ExecutorMetricType {
8588
"ProcessTreePythonRSSMemory",
8689
"ProcessTreeOtherVMemory",
8790
"ProcessTreeOtherRSSMemory")
88-
91+
val shouldLogStageExecutorProcessTreeMetrics =
92+
SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
8993
override private[spark] def getMetricValues(memoryManager: MemoryManager): Array[Long] = {
90-
val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics()
9194
val processTreeMetrics = new Array[Long](names.length)
92-
processTreeMetrics(0) = allMetrics.jvmVmemTotal
93-
processTreeMetrics(1) = allMetrics.jvmRSSTotal
94-
processTreeMetrics(2) = allMetrics.pythonVmemTotal
95-
processTreeMetrics(3) = allMetrics.pythonRSSTotal
96-
processTreeMetrics(4) = allMetrics.otherVmemTotal
97-
processTreeMetrics(5) = allMetrics.otherRSSTotal
95+
if (shouldLogStageExecutorProcessTreeMetrics) {
96+
val allMetrics = ProcfsMetricsGetter.pTreeInfo.computeAllMetrics()
97+
processTreeMetrics(0) = allMetrics.jvmVmemTotal
98+
processTreeMetrics(1) = allMetrics.jvmRSSTotal
99+
processTreeMetrics(2) = allMetrics.pythonVmemTotal
100+
processTreeMetrics(3) = allMetrics.pythonRSSTotal
101+
processTreeMetrics(4) = allMetrics.otherVmemTotal
102+
processTreeMetrics(5) = allMetrics.otherRSSTotal
103+
}
98104
processTreeMetrics
99105
}
100106
}
@@ -140,7 +146,6 @@ private[spark] object ExecutorMetricType {
140146
ProcessTreeMetrics
141147
)
142148

143-
144149
val (metricToOffset, numMetrics) = {
145150
var numberOfMetrics = 0
146151
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]

0 commit comments

Comments
 (0)