Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

private[spark] trait ProcessTreeMetrics {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, its not worth creating the trait now when there is only one implementation. We can add that abstraction later if its useful. (this isn't exposed to users at all anyway.)

If we are keeping this, some of these should be private.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about future, when someone want to add new implementation, so that they use the same methods and not having differences across platforms

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I understand that goal, but who knows when / if that will ever happen. in the meantime, it just makes this a slightly harder to follow. Whenever someone does want to put in another implementation, its pretty easy for them to add the interface at that time.

def isAvailable: Boolean
def pid: Int
def computePid(): Int
def createProcessTree()
def updateProcessTree()
def getJVMRSSInfo(): Long
def getJVMVirtualMemInfo(): Long
def getPythonRSSInfo(): Long
def getPythonVirtualMemInfo(): Long
def getOtherRSSInfo(): Long
def getOtherVirtualMemInfo(): Long
}
271 changes: 271 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import java.io._
import java.nio.charset.Charset
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue

import org.apache.spark.internal.Logging


// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
// project.
class ProcfsBasedSystems extends ProcessTreeMetrics with Logging {
val procfsDir = "/proc/"
var isAvailable: Boolean = isItProcfsBased
val pid: Int = computePid()
val ptree: scala.collection.mutable.Map[ Int, Set[Int]] =
scala.collection.mutable.Map[ Int, Set[Int]]()
val PROCFS_STAT_FILE = "stat"
var latestJVMVmemTotal: Long = 0
var latestJVMRSSTotal: Long = 0
var latestPythonVmemTotal: Long = 0
var latestPythonRSSTotal: Long = 0
var latestOtherVmemTotal: Long = 0
var latestOtherRSSTotal: Long = 0

createProcessTree
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method has a side-effect, so you should call it with parens createProcessTree()


def isItProcfsBased: Boolean = {
val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this cause problems when someone tries to run tests on windows?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test this on a windows machine. but since I will catch exceptions during the process tree creation/update and make isAvailable false in case of the exceptions I think there won't be a problem.

if (testing) {
return true
}
try {
if (!Files.exists(Paths.get(procfsDir))) {
return false
}
}
catch {
case f: FileNotFoundException => return false
}

val shouldLogStageExecutorProcessTreeMetrics = org.apache.spark.SparkEnv.get.conf.
getBoolean("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use the config entry EVENT_LOG_STAGE_EXECUTOR_METRICS

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this so that the user can disable process tree metrics even when the normal executor metrics are enabled. So that they have more freedom in case they feel reporting process tree metrics introduces a lot of overhead

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I can see having an extra config here, but then you should create a config entry for it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw in other places that we didn't do this. It would be headache to pass down a config object and I need a lot of changes

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a config entry without the need to send config object. thanks foe the comment.

true && shouldLogStageExecutorProcessTreeMetrics
}


def computePid(): Int = {
if (!isAvailable) {
return -1;
}
try {
// This can be simplified in java9:
// https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
val cmd = Array("bash", "-c", "echo $PPID")
val length = 10
var out: Array[Byte] = Array.fill[Byte](length)(0)
Runtime.getRuntime.exec(cmd).getInputStream.read(out)
val pid = Integer.parseInt(new String(out, "UTF-8").trim)
return pid;
}
catch {
case e: IOException => logDebug("IO Exception when trying to compute process tree." +
" As a result reporting of ProcessTree metrics is stopped")
isAvailable = false
return -1
case _ => logDebug("Some exception occurred when trying to compute process tree. " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe catch NonFatal instead?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, @wypoon for the reviews. I though maybe better to not have an exception in the logs to avoid frightening the user

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an exception I should be worried about? I am running Spark Core 3.0.1 and I saw this warning (at least now it is a warning). I am running Windows 10. I spent hours trying to find why this is happening.

I get the following warning

WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

"As a result reporting of ProcessTree metrics is stopped")
isAvailable = false
return -1
}
}


def createProcessTree(): Unit = {
if (!isAvailable) {
return
}
val queue: Queue[Int] = new Queue[Int]()
queue += pid
while( !queue.isEmpty ) {
val p = queue.dequeue()
val c = getChildPIds(p)
if(!c.isEmpty) {
queue ++= c
ptree += (p -> c.toSet)
}
else {
ptree += (p -> Set[Int]())
}
}
}


def updateProcessTree(): Unit = {
if (!isAvailable) {
return
}
val queue: Queue[Int] = new Queue[Int]()
queue += pid
while( !queue.isEmpty ) {
val p = queue.dequeue()
val c = getChildPIds(p)
if(!c.isEmpty) {
queue ++= c
val preChildren = ptree.get(p)
preChildren match {
case Some(children) => if (!c.toSet.equals(children)) {
val diff: Set[Int] = children -- c.toSet
ptree.update(p, c.toSet )
diff.foreach(ptree.remove(_))
}
case None => ptree.update(p, c.toSet )
}
}
else {
ptree.update(p, Set[Int]())
}
}
}


/**
* Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
* info. I tried that but found it not correct during tests, so I used normal string analysis
* instead. The computation of RSS and Vmem are based on proc(5):
* http://man7.org/linux/man-pages/man5/proc.5.html
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this changes at all, would we know? Or would we just start reporting wrong values?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah if it changes we need to fix this. probably it won't change though. I didn't find another way to retrieve this info.

*/
def getProcessInfo(pid: Int): Unit = {
try {
val pidDir: File = new File(procfsDir, pid.toString)
val fReader = new InputStreamReader(
new FileInputStream(
new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8"))
val in: BufferedReader = new BufferedReader(fReader)
val procInfo = in.readLine
in.close
fReader.close
val procInfoSplit = procInfo.split(" ")
if ( procInfoSplit != null ) {
if (procInfoSplit(1).toLowerCase.contains("java")) {
latestJVMVmemTotal += procInfoSplit(22).toLong
latestJVMRSSTotal += procInfoSplit(23).toLong
}
else if (procInfoSplit(1).toLowerCase.contains("python")) {
latestPythonVmemTotal += procInfoSplit(22).toLong
latestPythonRSSTotal += procInfoSplit(23).toLong
}
else {
latestOtherVmemTotal += procInfoSplit(22).toLong
latestOtherRSSTotal += procInfoSplit(23).toLong }
}
} catch {
case f: FileNotFoundException => return null
}
}


def getOtherRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
updateProcessTree
val pids = ptree.keySet
latestJVMRSSTotal = 0
latestJVMVmemTotal = 0
latestPythonRSSTotal = 0
latestPythonVmemTotal = 0
latestOtherRSSTotal = 0
latestOtherVmemTotal = 0
for (p <- pids) {
getProcessInfo(p)
}
latestOtherRSSTotal
}


def getOtherVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
// We won't call updateProcessTree and also compute total virtual memory here
// since we already did all of this when we computed RSS info
latestOtherVmemTotal
}


def getJVMRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
latestJVMRSSTotal
}


def getJVMVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
latestJVMVmemTotal
}


def getPythonRSSInfo(): Long = {
if (!isAvailable) {
return -1
}
latestPythonRSSTotal
}


def getPythonVirtualMemInfo(): Long = {
if (!isAvailable) {
return -1
}
latestPythonVmemTotal
}


def getChildPIds(pid: Int): ArrayBuffer[Int] = {
try {
val cmd = Array("pgrep", "-P", pid.toString)
val input = Runtime.getRuntime.exec(cmd).getInputStream
val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer()
var d = input.read()
while (d != -1) {
childPidsInByte.append(d.asInstanceOf[Byte])
d = input.read()
}
input.close()
val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n")
val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]()
for (p <- childPids) {
if (p != "") {
childPidsInInt += Integer.parseInt(p)
}
}
childPidsInInt
} catch {
case e: IOException => logDebug("IO Exception when trying to compute process tree." +
" As a result reporting of ProcessTree metrics is stopped")
isAvailable = false
return new mutable.ArrayBuffer()
case _ => logDebug("Some exception occurred when trying to compute process tree. As a result" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NonFatal, maybe?

" reporting of ProcessTree metrics is stopped")
isAvailable = false
return new mutable.ArrayBuffer()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.metrics
import java.lang.management.{BufferPoolMXBean, ManagementFactory}
import javax.management.ObjectName

import org.apache.spark.executor.{ProcessTreeMetrics, ProcfsBasedSystems}
import org.apache.spark.memory.MemoryManager

/**
Expand Down Expand Up @@ -59,6 +60,42 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
}
}

case object ProcessTreeJVMRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getJVMRSSInfo()
}
}

case object ProcessTreeJVMVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getJVMVirtualMemInfo()
}
}

case object ProcessTreePythonRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getPythonRSSInfo()
}
}

case object ProcessTreePythonVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getPythonVirtualMemInfo()
}
}

case object ProcessTreeOtherRSSMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getOtherRSSInfo()
}
}

case object ProcessTreeOtherVMemory extends ExecutorMetricType {
override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
ExecutorMetricType.pTreeInfo.getOtherVirtualMemInfo()
}
}

case object OnHeapExecutionMemory extends MemoryManagerExecutorMetricType(
_.onHeapExecutionMemoryUsed)

Expand All @@ -84,6 +121,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType(
"java.nio:type=BufferPool,name=mapped")

private[spark] object ExecutorMetricType {
final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems

// List of all executor metric types
val values = IndexedSeq(
JVMHeapMemory,
Expand All @@ -95,7 +134,13 @@ private[spark] object ExecutorMetricType {
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory
MappedPoolMemory,
ProcessTreeJVMVMemory,
ProcessTreeJVMRSSMemory,
ProcessTreePythonVMemory,
ProcessTreePythonRSSMemory,
ProcessTreeOtherVMemory,
ProcessTreeOtherRSSMemory
)

// Map of executor metric type to its index in values.
Expand Down
Loading