From 25d28506652625c86b569dffa99e732a191ee626 Mon Sep 17 00:00:00 2001 From: Luca Canali Date: Tue, 5 Mar 2019 10:47:39 -0800 Subject: [PATCH] [SPARK-26928][CORE] Add driver CPU Time to the metrics system ## What changes were proposed in this pull request? This proposes to add instrumentation for the driver's JVM CPU time via the Spark Dropwizard/Codahale metrics system. It follows directly from previous work SPARK-25228 and shares similar motivations: it is intended as an improvement to be used for Spark performance dashboards and monitoring tools/instrumentation. Implementation details: this PR takes the code introduced in SPARK-25228 and moves it to a new separate Source JVMCPUSource, which is then used to register the jvmCpuTime gauge metric for both executor and driver. The registration of the jvmCpuTime metric for the driver is conditional, a new configuration parameter `spark.metrics.cpu.time.driver.enabled` (proposed default: false) is introduced for this purpose. ## How was this patch tested? Manually tested, using local mode and using YARN. Closes #23838 from LucaCanali/addCPUTimeMetricDriver. Authored-by: Luca Canali Signed-off-by: Marcelo Vanzin --- .../scala/org/apache/spark/SparkContext.scala | 2 + .../org/apache/spark/executor/Executor.scala | 2 + .../spark/executor/ExecutorSource.scala | 18 ------- .../apache/spark/metrics/source/JVMCPU.scala | 48 +++++++++++++++++++ docs/monitoring.md | 7 ++- 5 files changed, 58 insertions(+), 19 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index dc0ea24d29b09..01d979270c44c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -48,6 +48,7 @@ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ import org.apache.spark.io.CompressionCodec +import org.apache.spark.metrics.source.JVMCPUSource import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef @@ -568,6 +569,7 @@ class SparkContext(config: SparkConf) extends Logging { _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) + _env.metricsSystem.registerSource(new JVMCPUSource()) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6b23b26945279..740080e1ccbbb 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -38,6 +38,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} +import org.apache.spark.metrics.source.JVMCPUSource import org.apache.spark.rpc.RpcTimeout import org.apache.spark.scheduler._ import org.apache.spark.shuffle.FetchFailedException @@ -117,6 +118,7 @@ private[spark] class Executor( if (!isLocal) { env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) + env.metricsSystem.registerSource(new JVMCPUSource()) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index a8264022a0aff..e8523769c5aa9 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -76,24 +76,6 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } - // Dropwizard metrics gauge measuring the executor's process CPU time. - // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise. - // The CPU time value is returned in nanoseconds. - // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or - // com.ibm.lang.management.OperatingSystemMXBean, if available. - metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { - val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer - val name = new ObjectName("java.lang", "type", "OperatingSystem") - override def getValue: Long = { - try { - // return JVM process CPU time if the ProcessCpuTime method is available - mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] - } catch { - case NonFatal(_) => -1L - } - } - }) - // Expose executor task metrics using the Dropwizard metrics system. // The list is taken from TaskMetrics.scala val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime")) diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala new file mode 100644 index 0000000000000..6ea86b88806af --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/source/JVMCPU.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.source + +import java.lang.management.ManagementFactory + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} +import javax.management.{MBeanServer, ObjectName} +import scala.util.control.NonFatal + +private[spark] class JVMCPUSource extends Source { + + override val metricRegistry = new MetricRegistry() + override val sourceName = "JVMCPU" + + // Dropwizard/Codahale metrics gauge measuring the JVM process CPU time. + // This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise. + // The CPU time value is returned in nanoseconds. + // It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or + // com.ibm.lang.management.OperatingSystemMXBean, if available. + metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] { + val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer + val name = new ObjectName("java.lang", "type", "OperatingSystem") + override def getValue: Long = { + try { + // return JVM process CPU time if the ProcessCpuTime method is available + mBean.getAttribute(name, "ProcessCpuTime").asInstanceOf[Long] + } catch { + case NonFatal(_) => -1L + } + } + }) +} diff --git a/docs/monitoring.md b/docs/monitoring.md index a92dd6f71a09d..72e4f47e197d2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -813,6 +813,9 @@ This is the component with the largest amount of instrumented metrics - states-rowsTotal - states-usedBytes +- namespace=JVMCPU + - jvmCpuTime + ### Component instance = Executor These metrics are exposed by Spark executors. Note, currently they are not available when running in local mode. @@ -834,7 +837,6 @@ when running in local mode. - filesystem.hdfs.read_ops - filesystem.hdfs.write_bytes - filesystem.hdfs.write_ops - - jvmCpuTime - jvmGCTime.count - memoryBytesSpilled.count - recordsRead.count @@ -858,6 +860,9 @@ when running in local mode. - threadpool.currentPool_size - threadpool.maxPool_size +- namespace=JVMCPU + - jvmCpuTime + - namespace=NettyBlockTransfer - shuffle-client.usedDirectMemory - shuffle-client.usedHeapMemory