Skip to content

Commit

Permalink
Support for procfs metrics.
Browse files Browse the repository at this point in the history
1. Use ExecutorPlugin interface from apache/spark#22192
for SPARK-24918. There is no support for taskStart, onTaskFailure and
onTaskCompletion, so MemoryMonitorExecutorExtension cannot support the polling
for thread dumps.
2. Add support for procfs memory metrics.
  • Loading branch information
wypoon committed Sep 12, 2018
1 parent 8efac4b commit 55d45de
Show file tree
Hide file tree
Showing 5 changed files with 309 additions and 32 deletions.
57 changes: 57 additions & 0 deletions core/src/main/java/org/apache/spark/ExecutorPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.annotation.DeveloperApi;

/**
* A plugin which can be automaticaly instantiated within each Spark executor. Users can specify
* plugins which should be created with the "spark.executor.plugins" configuration. An instance
* of each plugin will be created for every executor, including those created by dynamic allocation,
* before the executor starts running any tasks.
*
* The specific api exposed to the end users still considered to be very unstable. We will
* hopefully be able to keep compatability by providing default implementations for any methods
* added, but make no guarantees this will always be possible across all Spark releases.
*
* Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources
* it uses. A plugin acquires the same privileges as the user running the task. A bad plugin
* could also intefere with task execution and make the executor fail in unexpected ways.
*/
@DeveloperApi
public interface ExecutorPlugin {

/**
* Initialize the executor plugin.
*
* <p>Each executor will, during its initialization, invoke this method on each
* plugin provided in the spark.executor.plugins configuration.</p>
*
* <p>Plugins should create threads in their implementation of this method for
* any polling, blocking, or intensive computation.</p>
*/
default void init() {}

/**
* Clean up and terminate this plugin.
*
* <p>This function is called during the executor shutdown phase. The executor
* will wait for the plugin to terminate before continuing its own shutdown.</p>
*/
default void shutdown() {}
}
32 changes: 32 additions & 0 deletions core/src/main/scala/com/cloudera/spark/MemoryGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package com.cloudera.spark

import java.lang.management.{BufferPoolMXBean, MemoryMXBean, MemoryPoolMXBean}

import org.apache.spark.executor.ProcfsBasedMetrics

trait MemoryGetter {
def namesAndReporting: Seq[(String, PeakReporting)]
def values(dest: Array[Long], offset: Int): Unit
Expand Down Expand Up @@ -47,3 +49,33 @@ class BufferPoolGetter(bean: BufferPoolMXBean) extends MemoryGetter {
dest(offset + 1) = bean.getMemoryUsed()
}
}

class ProcfsBasedMetricsGetter extends MemoryGetter {
// TODO: PAGESIZE should be obtained from the system.
// This should be done in ProcfsBasedMetrics. In which case, the RSS numbers
// will be converted to bytes there, and no conversion will be needed here.
final val PAGESIZE = 4096L

val pTreeInfo = new ProcfsBasedMetrics

val namesAndReporting = Seq(
("jvmrssmem", IncrementBytes),
("jvmvmem", IncrementBytes),
("pythonrssmem", IncrementBytes),
("pythonvmem", IncrementBytes),
("otherrssmem", IncrementBytes),
("othervmem", IncrementBytes)
)

def values(dest: Array[Long], offset: Int): Unit = {
val memInfo = pTreeInfo.getMemoryUsage()
if (memInfo != null) {
dest(offset) = memInfo.javaRSSTotal * PAGESIZE
dest(offset + 1) = memInfo.javaVmemTotal
dest(offset + 2) = memInfo.pythonRSSTotal * PAGESIZE
dest(offset + 3) = memInfo.pythonVmemTotal
dest(offset + 4) = memInfo.otherRSSTotal * PAGESIZE
dest(offset + 5) = memInfo.otherVmemTotal
}
}
}
13 changes: 9 additions & 4 deletions core/src/main/scala/com/cloudera/spark/MemoryMonitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import scala.collection.JavaConverters._
import com.quantifind.sumac.FieldArgs

import org.apache.spark.{TaskContext, SparkContext}
import org.apache.spark.executor.ExecutorPlugin
import org.apache.spark.ExecutorPlugin
import org.apache.spark.executor.ProcfsBasedMetrics
import org.apache.spark.memory.SparkMemoryManagerHandle

class MemoryMonitor(val args: MemoryMonitorArgs) {
Expand All @@ -37,7 +38,8 @@ class MemoryMonitor(val args: MemoryMonitorArgs) {
offHeapPoolBeans.map(new PoolGetter(_)) ++
bufferPoolsBeans.map(new BufferPoolGetter(_)) ++
nettyMemoryHandle.toSeq ++
sparkMemManagerHandle.toSeq
sparkMemManagerHandle.toSeq ++
Seq(new ProcfsBasedMetricsGetter)

val namesAndReporting = getters.flatMap(_.namesAndReporting)
val names = namesAndReporting.map(_._1)
Expand Down Expand Up @@ -328,9 +330,10 @@ object MemoryMonitor {
}

class MemoryMonitorExecutorExtension extends ExecutorPlugin {
// the "extension class" api just lets you invoke a constructor. We really just want to
// call this static method, so that's good enough.
// Each Spark executor will create an instance of this plugin. When this class
// is instantiated, this static method is called, which is good enough for us.
MemoryMonitor.installIfSysProps()
/*

This comment has been minimized.

Copy link
@squito

squito Sep 12, 2018

you could also just remove the override, but maybe that would be confusing. Can you at least put in a comment here that these methods were only in a custom plugin implementation that goes beyond whats offered in the current upstream api, and you're leaving them here for now till this code is restructured to allow both to exist side-by-side?

val args = MemoryMonitorArgs.sysPropsArgs
val monitoredTaskCount = new AtomicInteger(0)
Expand All @@ -347,6 +350,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin {
} else {
null
}
val pollingTask = new AtomicReference[ScheduledFuture[_]]()
override def taskStart(taskContext: TaskContext): Unit = {
Expand Down Expand Up @@ -380,6 +384,7 @@ class MemoryMonitorExecutorExtension extends ExecutorPlugin {
}
}
}
*/
}

class MemoryMonitorArgs extends FieldArgs {
Expand Down
28 changes: 0 additions & 28 deletions core/src/main/scala/org/apache/spark/executor/ExecutorPlugin.scala

This file was deleted.

211 changes: 211 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ProcfsBasedMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.executor

import java.io._
import java.nio.charset.Charset
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue
import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

case class MemoryUsage(
val javaRSSTotal: Long,
val javaVmemTotal: Long,
val pythonRSSTotal: Long,
val pythonVmemTotal: Long,
val otherRSSTotal: Long,
val otherVmemTotal: Long)

class ProcfsBasedMetrics extends Logging {

val PROCFS_DIR = "/proc"
var isAvailable: Boolean = isProcfsFound
val pid: Int = computePid()
val ptree: scala.collection.mutable.Map[Int, Set[Int]] =
scala.collection.mutable.Map[Int, Set[Int]]()
val PROCFS_STAT_FILE = "stat"
var javaVmemTotal: Long = 0
var javaRSSTotal: Long = 0
var pythonVmemTotal: Long = 0
var pythonRSSTotal: Long = 0
var otherVmemTotal: Long = 0
var otherRSSTotal: Long = 0

createProcessTree()

def isProcfsFound: Boolean = {
try {
if (!Files.exists(Paths.get(PROCFS_DIR))) {
return false
}
} catch {
case e: FileNotFoundException => return false
}
true
}

def computePid(): Int = {
if (!isAvailable) {
return -1;
}
try {
// This can be simplified in java9:
// https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
val cmd = Array("bash", "-c", "echo $PPID")
val length = 10
var out: Array[Byte] = Array.fill[Byte](length)(0)
Runtime.getRuntime.exec(cmd).getInputStream.read(out)
val pid = Integer.parseInt(new String(out, "UTF-8").trim)
pid;
} catch {
case NonFatal(e) => logDebug("An error occurred when trying to compute the process tree. " +
"As a result, reporting of process tree metrics is stopped.")
isAvailable = false
-1
}
}

def createProcessTree(): Unit = {
if (!isAvailable) {
return
}
val queue: Queue[Int] = new Queue[Int]()
queue += pid
while (!queue.isEmpty) {
val p = queue.dequeue()
val c = getChildPIds(p)
if (!c.isEmpty) {
queue ++= c
ptree += (p -> c.toSet)
} else {
ptree += (p -> Set[Int]())
}
}
}

def updateProcessTree(): Unit = {
if (!isAvailable) {
return
}
val queue: Queue[Int] = new Queue[Int]()
queue += pid
while (!queue.isEmpty) {
val p = queue.dequeue()
val c = getChildPIds(p)
if (!c.isEmpty) {
queue ++= c
val preChildren = ptree.get(p)
preChildren match {
case Some(children) => if (!c.toSet.equals(children)) {
val diff: Set[Int] = children -- c.toSet
ptree.update(p, c.toSet )
diff.foreach(ptree.remove(_))
}
case None => ptree.update(p, c.toSet )
}
} else {
ptree.update(p, Set[Int]())
}
}
}

/**
* The computation of RSS and Vmem is based on proc(5):
* http://man7.org/linux/man-pages/man5/proc.5.html
*/
def getProcessInfo(pid: Int): Unit = {
try {
val pidDir = new File(PROCFS_DIR, pid.toString)
val statFile = new File(pidDir, PROCFS_STAT_FILE)
val in = new BufferedReader(new InputStreamReader(
new FileInputStream(statFile), Charset.forName("UTF-8")))
val procInfo = in.readLine
in.close
val procInfoSplit = procInfo.split(" ")
if (procInfoSplit != null) {
if (procInfoSplit(1).toLowerCase.contains("java")) {
javaVmemTotal += procInfoSplit(22).toLong
javaRSSTotal += procInfoSplit(23).toLong
} else if (procInfoSplit(1).toLowerCase.contains("python")) {
pythonVmemTotal += procInfoSplit(22).toLong
pythonRSSTotal += procInfoSplit(23).toLong
} else {
otherVmemTotal += procInfoSplit(22).toLong
otherRSSTotal += procInfoSplit(23).toLong
}
}
} catch {
case e: FileNotFoundException =>
}
}

def getMemoryUsage(): MemoryUsage = {
if (!isAvailable) {
return null
}
updateProcessTree()
val pids = ptree.keySet
javaRSSTotal = 0
javaVmemTotal = 0
pythonRSSTotal = 0
pythonVmemTotal = 0
otherRSSTotal = 0
otherVmemTotal = 0
for (p <- pids) {
getProcessInfo(p)
}
MemoryUsage(
javaRSSTotal,
javaVmemTotal,
pythonRSSTotal,
pythonVmemTotal,
otherRSSTotal,
otherVmemTotal)
}

def getChildPIds(pid: Int): ArrayBuffer[Int] = {
try {
val cmd = Array("pgrep", "-P", pid.toString)
val input = Runtime.getRuntime.exec(cmd).getInputStream
val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer()
var d = input.read()
while (d != -1) {
childPidsInByte.append(d.asInstanceOf[Byte])
d = input.read()
}
input.close()
val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n")
val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]()
for (p <- childPids) {
if (p != "") {
childPidsInInt += Integer.parseInt(p)
}
}
childPidsInInt
} catch {
case NonFatal(e) => logDebug("An error occurred when trying to compute the process tree. " +
"As a result, reporting of process tree metrics is stopped.")
isAvailable = false
new mutable.ArrayBuffer()
}
}
}

0 comments on commit 55d45de

Please sign in to comment.