Skip to content

Commit

Permalink
[SPARK-26928][CORE] Add driver CPU Time to the metrics system
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This proposes to add instrumentation for the driver's JVM CPU time via the Spark Dropwizard/Codahale metrics system. It follows directly from previous work SPARK-25228 and shares similar motivations: it is intended as an improvement to be used for Spark performance dashboards and monitoring tools/instrumentation.

Implementation details: this PR takes the code introduced in SPARK-25228 and moves it to a new separate Source JVMCPUSource, which is then used to register the jvmCpuTime gauge metric for both executor and driver.
The registration of the jvmCpuTime metric for the driver is conditional, a new configuration parameter `spark.metrics.cpu.time.driver.enabled` (proposed default: false) is introduced for this purpose.

## How was this patch tested?

Manually tested, using local mode and using YARN.

Closes #23838 from LucaCanali/addCPUTimeMetricDriver.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
LucaCanali authored and Marcelo Vanzin committed Mar 5, 2019
1 parent 6207360 commit 25d2850
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 19 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.rpc.RpcEndpointRef
Expand Down Expand Up @@ -568,6 +569,7 @@ class SparkContext(config: SparkConf) extends Logging {
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_env.metricsSystem.registerSource(new JVMCPUSource())
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException
Expand Down Expand Up @@ -117,6 +118,7 @@ private[spark] class Executor(
if (!isLocal) {
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(new JVMCPUSource())
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}

Expand Down
18 changes: 0 additions & 18 deletions core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,6 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}

// Dropwizard metrics gauge measuring the executor's process CPU time.
// This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
// The CPU time value is returned in nanoseconds.
// It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
// com.ibm.lang.management.OperatingSystemMXBean, if available.
metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
val name = new ObjectName("java.lang", "type", "OperatingSystem")
override def getValue: Long = {
try {
// return JVM process CPU time if the ProcessCpuTime method is available
mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
} catch {
case NonFatal(_) => -1L
}
}
})

// Expose executor task metrics using the Dropwizard metrics system.
// The list is taken from TaskMetrics.scala
val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.source

import java.lang.management.ManagementFactory

import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
import javax.management.{MBeanServer, ObjectName}
import scala.util.control.NonFatal

private[spark] class JVMCPUSource extends Source {

override val metricRegistry = new MetricRegistry()
override val sourceName = "JVMCPU"

// Dropwizard/Codahale metrics gauge measuring the JVM process CPU time.
// This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
// The CPU time value is returned in nanoseconds.
// It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
// com.ibm.lang.management.OperatingSystemMXBean, if available.
metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
val name = new ObjectName("java.lang", "type", "OperatingSystem")
override def getValue: Long = {
try {
// return JVM process CPU time if the ProcessCpuTime method is available
mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long]
} catch {
case NonFatal(_) => -1L
}
}
})
}
7 changes: 6 additions & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,9 @@ This is the component with the largest amount of instrumented metrics
- states-rowsTotal
- states-usedBytes

- namespace=JVMCPU
- jvmCpuTime

### Component instance = Executor
These metrics are exposed by Spark executors. Note, currently they are not available
when running in local mode.
Expand All @@ -834,7 +837,6 @@ when running in local mode.
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- jvmCpuTime
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
Expand All @@ -858,6 +860,9 @@ when running in local mode.
- threadpool.currentPool_size
- threadpool.maxPool_size

- namespace=JVMCPU
- jvmCpuTime

- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
Expand Down

0 comments on commit 25d2850

Please sign in to comment.