From 55d45de12574e5430c58000a9d84cca0e171e81f Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Wed, 12 Sep 2018 10:47:15 -0700 Subject: [PATCH] Support for procfs metrics. 1. Use ExecutorPlugin interface from https://github.com/apache/spark/pull/22192 for SPARK-24918. There is no support for taskStart, onTaskFailure and onTaskCompletion, so MemoryMonitorExecutorExtension cannot support the polling for thread dumps. 2. Add support for procfs memory metrics. --- .../java/org/apache/spark/ExecutorPlugin.java | 57 +++++ .../com/cloudera/spark/MemoryGetter.scala | 32 +++ .../com/cloudera/spark/MemoryMonitor.scala | 13 +- .../spark/executor/ExecutorPlugin.scala | 28 --- .../spark/executor/ProcfsBasedMetrics.scala | 211 ++++++++++++++++++ 5 files changed, 309 insertions(+), 32 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/ExecutorPlugin.java delete mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorPlugin.scala create mode 100644 core/src/main/scala/org/apache/spark/executor/ProcfsBasedMetrics.scala diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java new file mode 100644 index 0000000..ec0b57f --- /dev/null +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. We will + * hopefully be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all Spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public interface ExecutorPlugin { + + /** + * Initialize the executor plugin. + * + *

Each executor will, during its initialization, invoke this method on each + * plugin provided in the spark.executor.plugins configuration.

+ * + *

Plugins should create threads in their implementation of this method for + * any polling, blocking, or intensive computation.

+ */ + default void init() {} + + /** + * Clean up and terminate this plugin. + * + *

This function is called during the executor shutdown phase. The executor + * will wait for the plugin to terminate before continuing its own shutdown.

+ */ + default void shutdown() {} +} diff --git a/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala b/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala index 62e78af..b21ce00 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryGetter.scala @@ -3,6 +3,8 @@ package com.cloudera.spark import java.lang.management.{BufferPoolMXBean, MemoryMXBean, MemoryPoolMXBean} +import org.apache.spark.executor.ProcfsBasedMetrics + trait MemoryGetter { def namesAndReporting: Seq[(String, PeakReporting)] def values(dest: Array[Long], offset: Int): Unit @@ -47,3 +49,33 @@ class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter { dest(offset + 1) = bean.getMemoryUsed() } } + +class ProcfsBasedMetricsGetter extends MemoryGetter { + // TODO: PAGESIZE should be obtained from the system. + // This should be done in ProcfsBasedMetrics. In which case, the RSS numbers + // will be converted to bytes there, and no conversion will be needed here. + final val PAGESIZE = 4096L + + val pTreeInfo = new ProcfsBasedMetrics + + val namesAndReporting = Seq( + ("jvmrssmem", IncrementBytes), + ("jvmvmem", IncrementBytes), + ("pythonrssmem", IncrementBytes), + ("pythonvmem", IncrementBytes), + ("otherrssmem", IncrementBytes), + ("othervmem", IncrementBytes) + ) + + def values(dest: Array[Long], offset: Int): Unit = { + val memInfo = pTreeInfo.getMemoryUsage() + if (memInfo != null) { + dest(offset) = memInfo.javaRSSTotal * PAGESIZE + dest(offset + 1) = memInfo.javaVmemTotal + dest(offset + 2) = memInfo.pythonRSSTotal * PAGESIZE + dest(offset + 3) = memInfo.pythonVmemTotal + dest(offset + 4) = memInfo.otherRSSTotal * PAGESIZE + dest(offset + 5) = memInfo.otherVmemTotal + } + } +} diff --git a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala index 9a85c11..e0e117c 100644 --- a/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala +++ b/core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala @@ -13,7 +13,8 @@ import scala.collection.JavaConverters._ import com.quantifind.sumac.FieldArgs import org.apache.spark.{TaskContext, SparkContext} -import org.apache.spark.executor.ExecutorPlugin +import org.apache.spark.ExecutorPlugin +import org.apache.spark.executor.ProcfsBasedMetrics import org.apache.spark.memory.SparkMemoryManagerHandle class MemoryMonitor(val args: MemoryMonitorArgs) { @@ -37,7 +38,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) { offHeapPoolBeans.map(new PoolGetter(_)) ++ bufferPoolsBeans.map(new BufferPoolGetter(_)) ++ nettyMemoryHandle.toSeq ++ - sparkMemManagerHandle.toSeq + sparkMemManagerHandle.toSeq ++ + Seq(new ProcfsBasedMetricsGetter) val namesAndReporting = getters.flatMap(_.namesAndReporting) val names = namesAndReporting.map(_._1) @@ -328,9 +330,10 @@ object MemoryMonitor { } class MemoryMonitorExecutorExtension extends ExecutorPlugin { - // the "extension class" api just lets you invoke a constructor. We really just want to - // call this static method, so that's good enough. + // Each Spark executor will create an instance of this plugin. When this class + // is instantiated, this static method is called, which is good enough for us. MemoryMonitor.installIfSysProps() + /* val args = MemoryMonitorArgs.sysPropsArgs val monitoredTaskCount = new AtomicInteger(0) @@ -347,6 +350,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin { } else { null } + val pollingTask = new AtomicReference[ScheduledFuture[_]]() override def taskStart(taskContext: TaskContext): Unit = { @@ -380,6 +384,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin { } } } + */ } class MemoryMonitorArgs extends FieldArgs { diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorPlugin.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorPlugin.scala deleted file mode 100644 index 625ecb0..0000000 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorPlugin.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.executor - -import org.apache.spark.TaskContext -import org.apache.spark.util.{TaskFailureListener, TaskCompletionListener} - -/** - * This really needs to be in the spark source. But because I don't have a release artifact to - * build against with this, I - */ -trait ExecutorPlugin extends TaskCompletionListener with TaskFailureListener { - def taskStart(taskContext: TaskContext): Unit -} diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsBasedMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsBasedMetrics.scala new file mode 100644 index 0000000..29e44f8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsBasedMetrics.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging + +case class MemoryUsage( + val javaRSSTotal: Long, + val javaVmemTotal: Long, + val pythonRSSTotal: Long, + val pythonVmemTotal: Long, + val otherRSSTotal: Long, + val otherVmemTotal: Long) + +class ProcfsBasedMetrics extends Logging { + + val PROCFS_DIR = "/proc" + var isAvailable: Boolean = isProcfsFound + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[Int, Set[Int]] = + scala.collection.mutable.Map[Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + var javaVmemTotal: Long = 0 + var javaRSSTotal: Long = 0 + var pythonVmemTotal: Long = 0 + var pythonRSSTotal: Long = 0 + var otherVmemTotal: Long = 0 + var otherRSSTotal: Long = 0 + + createProcessTree() + + def isProcfsFound: Boolean = { + try { + if (!Files.exists(Paths.get(PROCFS_DIR))) { + return false + } + } catch { + case e: FileNotFoundException => return false + } + true + } + + def computePid(): Int = { + if (!isAvailable) { + return -1; + } + try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + var out: Array[Byte] = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + pid; + } catch { + case NonFatal(e) => logDebug("An error occurred when trying to compute the process tree. " + + "As a result, reporting of process tree metrics is stopped.") + isAvailable = false + -1 + } + } + + def createProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while (!queue.isEmpty) { + val p = queue.dequeue() + val c = getChildPIds(p) + if (!c.isEmpty) { + queue ++= c + ptree += (p -> c.toSet) + } else { + ptree += (p -> Set[Int]()) + } + } + } + + def updateProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while (!queue.isEmpty) { + val p = queue.dequeue() + val c = getChildPIds(p) + if (!c.isEmpty) { + queue ++= c + val preChildren = ptree.get(p) + preChildren match { + case Some(children) => if (!c.toSet.equals(children)) { + val diff: Set[Int] = children -- c.toSet + ptree.update(p, c.toSet ) + diff.foreach(ptree.remove(_)) + } + case None => ptree.update(p, c.toSet ) + } + } else { + ptree.update(p, Set[Int]()) + } + } + } + + /** + * The computation of RSS and Vmem is based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + def getProcessInfo(pid: Int): Unit = { + try { + val pidDir = new File(PROCFS_DIR, pid.toString) + val statFile = new File(pidDir, PROCFS_STAT_FILE) + val in = new BufferedReader(new InputStreamReader( + new FileInputStream(statFile), Charset.forName("UTF-8"))) + val procInfo = in.readLine + in.close + val procInfoSplit = procInfo.split(" ") + if (procInfoSplit != null) { + if (procInfoSplit(1).toLowerCase.contains("java")) { + javaVmemTotal += procInfoSplit(22).toLong + javaRSSTotal += procInfoSplit(23).toLong + } else if (procInfoSplit(1).toLowerCase.contains("python")) { + pythonVmemTotal += procInfoSplit(22).toLong + pythonRSSTotal += procInfoSplit(23).toLong + } else { + otherVmemTotal += procInfoSplit(22).toLong + otherRSSTotal += procInfoSplit(23).toLong + } + } + } catch { + case e: FileNotFoundException => + } + } + + def getMemoryUsage(): MemoryUsage = { + if (!isAvailable) { + return null + } + updateProcessTree() + val pids = ptree.keySet + javaRSSTotal = 0 + javaVmemTotal = 0 + pythonRSSTotal = 0 + pythonVmemTotal = 0 + otherRSSTotal = 0 + otherVmemTotal = 0 + for (p <- pids) { + getProcessInfo(p) + } + MemoryUsage( + javaRSSTotal, + javaVmemTotal, + pythonRSSTotal, + pythonVmemTotal, + otherRSSTotal, + otherVmemTotal) + } + + def getChildPIds(pid: Int): ArrayBuffer[Int] = { + try { + val cmd = Array("pgrep", "-P", pid.toString) + val input = Runtime.getRuntime.exec(cmd).getInputStream + val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer() + var d = input.read() + while (d != -1) { + childPidsInByte.append(d.asInstanceOf[Byte]) + d = input.read() + } + input.close() + val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n") + val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]() + for (p <- childPids) { + if (p != "") { + childPidsInInt += Integer.parseInt(p) + } + } + childPidsInInt + } catch { + case NonFatal(e) => logDebug("An error occurred when trying to compute the process tree. " + + "As a result, reporting of process tree metrics is stopped.") + isAvailable = false + new mutable.ArrayBuffer() + } + } +}