diff --git a/core/src/main/scala/org/apache/spark/executor/ProcessTreeMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ProcessTreeMetrics.scala new file mode 100644 index 0000000000000..100ba6c282ce2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ProcessTreeMetrics.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +private[spark] trait ProcessTreeMetrics { + def isAvailable: Boolean + def pid: Int + def computePid(): Int + def createProcessTree() + def updateProcessTree() + def getJVMRSSInfo(): Long + def getJVMVirtualMemInfo(): Long + def getPythonRSSInfo(): Long + def getPythonVirtualMemInfo(): Long + def getOtherRSSInfo(): Long + def getOtherVirtualMemInfo(): Long +} diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala new file mode 100644 index 0000000000000..91420dab3b5a1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsBasedSystems.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import java.io._ +import java.nio.charset.Charset +import java.nio.file.{Files, Paths} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Queue + +import org.apache.spark.internal.Logging + + +// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop +// project. +class ProcfsBasedSystems extends ProcessTreeMetrics with Logging { + val procfsDir = "/proc/" + var isAvailable: Boolean = isItProcfsBased + val pid: Int = computePid() + val ptree: scala.collection.mutable.Map[ Int, Set[Int]] = + scala.collection.mutable.Map[ Int, Set[Int]]() + val PROCFS_STAT_FILE = "stat" + var latestJVMVmemTotal: Long = 0 + var latestJVMRSSTotal: Long = 0 + var latestPythonVmemTotal: Long = 0 + var latestPythonRSSTotal: Long = 0 + var latestOtherVmemTotal: Long = 0 + var latestOtherRSSTotal: Long = 0 + + createProcessTree + + def isItProcfsBased: Boolean = { + val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") + if (testing) { + return true + } + try { + if (!Files.exists(Paths.get(procfsDir))) { + return false + } + } + catch { + case f: FileNotFoundException => return false + } + + val shouldLogStageExecutorProcessTreeMetrics = org.apache.spark.SparkEnv.get.conf. + getBoolean("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", true) + true && shouldLogStageExecutorProcessTreeMetrics + } + + + def computePid(): Int = { + if (!isAvailable) { + return -1; + } + try { + // This can be simplified in java9: + // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html + val cmd = Array("bash", "-c", "echo $PPID") + val length = 10 + var out: Array[Byte] = Array.fill[Byte](length)(0) + Runtime.getRuntime.exec(cmd).getInputStream.read(out) + val pid = Integer.parseInt(new String(out, "UTF-8").trim) + return pid; + } + catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped") + isAvailable = false + return -1 + case _ => logDebug("Some exception occurred when trying to compute process tree. " + + "As a result reporting of ProcessTree metrics is stopped") + isAvailable = false + return -1 + } + } + + + def createProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { + queue ++= c + ptree += (p -> c.toSet) + } + else { + ptree += (p -> Set[Int]()) + } + } + } + + + def updateProcessTree(): Unit = { + if (!isAvailable) { + return + } + val queue: Queue[Int] = new Queue[Int]() + queue += pid + while( !queue.isEmpty ) { + val p = queue.dequeue() + val c = getChildPIds(p) + if(!c.isEmpty) { + queue ++= c + val preChildren = ptree.get(p) + preChildren match { + case Some(children) => if (!c.toSet.equals(children)) { + val diff: Set[Int] = children -- c.toSet + ptree.update(p, c.toSet ) + diff.foreach(ptree.remove(_)) + } + case None => ptree.update(p, c.toSet ) + } + } + else { + ptree.update(p, Set[Int]()) + } + } + } + + + /** + * Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory + * info. I tried that but found it not correct during tests, so I used normal string analysis + * instead. The computation of RSS and Vmem are based on proc(5): + * http://man7.org/linux/man-pages/man5/proc.5.html + */ + def getProcessInfo(pid: Int): Unit = { + try { + val pidDir: File = new File(procfsDir, pid.toString) + val fReader = new InputStreamReader( + new FileInputStream( + new File(pidDir, PROCFS_STAT_FILE)), Charset.forName("UTF-8")) + val in: BufferedReader = new BufferedReader(fReader) + val procInfo = in.readLine + in.close + fReader.close + val procInfoSplit = procInfo.split(" ") + if ( procInfoSplit != null ) { + if (procInfoSplit(1).toLowerCase.contains("java")) { + latestJVMVmemTotal += procInfoSplit(22).toLong + latestJVMRSSTotal += procInfoSplit(23).toLong + } + else if (procInfoSplit(1).toLowerCase.contains("python")) { + latestPythonVmemTotal += procInfoSplit(22).toLong + latestPythonRSSTotal += procInfoSplit(23).toLong + } + else { + latestOtherVmemTotal += procInfoSplit(22).toLong + latestOtherRSSTotal += procInfoSplit(23).toLong } + } + } catch { + case f: FileNotFoundException => return null + } + } + + + def getOtherRSSInfo(): Long = { + if (!isAvailable) { + return -1 + } + updateProcessTree + val pids = ptree.keySet + latestJVMRSSTotal = 0 + latestJVMVmemTotal = 0 + latestPythonRSSTotal = 0 + latestPythonVmemTotal = 0 + latestOtherRSSTotal = 0 + latestOtherVmemTotal = 0 + for (p <- pids) { + getProcessInfo(p) + } + latestOtherRSSTotal + } + + + def getOtherVirtualMemInfo(): Long = { + if (!isAvailable) { + return -1 + } + // We won't call updateProcessTree and also compute total virtual memory here + // since we already did all of this when we computed RSS info + latestOtherVmemTotal + } + + + def getJVMRSSInfo(): Long = { + if (!isAvailable) { + return -1 + } + latestJVMRSSTotal + } + + + def getJVMVirtualMemInfo(): Long = { + if (!isAvailable) { + return -1 + } + latestJVMVmemTotal + } + + + def getPythonRSSInfo(): Long = { + if (!isAvailable) { + return -1 + } + latestPythonRSSTotal + } + + + def getPythonVirtualMemInfo(): Long = { + if (!isAvailable) { + return -1 + } + latestPythonVmemTotal + } + + + def getChildPIds(pid: Int): ArrayBuffer[Int] = { + try { + val cmd = Array("pgrep", "-P", pid.toString) + val input = Runtime.getRuntime.exec(cmd).getInputStream + val childPidsInByte: mutable.ArrayBuffer[Byte] = new mutable.ArrayBuffer() + var d = input.read() + while (d != -1) { + childPidsInByte.append(d.asInstanceOf[Byte]) + d = input.read() + } + input.close() + val childPids = new String(childPidsInByte.toArray, "UTF-8").split("\n") + val childPidsInInt: ArrayBuffer[Int] = new ArrayBuffer[Int]() + for (p <- childPids) { + if (p != "") { + childPidsInInt += Integer.parseInt(p) + } + } + childPidsInInt + } catch { + case e: IOException => logDebug("IO Exception when trying to compute process tree." + + " As a result reporting of ProcessTree metrics is stopped") + isAvailable = false + return new mutable.ArrayBuffer() + case _ => logDebug("Some exception occurred when trying to compute process tree. As a result" + + " reporting of ProcessTree metrics is stopped") + isAvailable = false + return new mutable.ArrayBuffer() + } + } +} diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala index cd10dad25e87b..8ca7b5f79cfa0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala +++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala @@ -19,6 +19,7 @@ package org.apache.spark.metrics import java.lang.management.{BufferPoolMXBean, ManagementFactory} import javax.management.ObjectName +import org.apache.spark.executor.{ProcessTreeMetrics, ProcfsBasedSystems} import org.apache.spark.memory.MemoryManager /** @@ -59,6 +60,42 @@ case object JVMOffHeapMemory extends ExecutorMetricType { } } +case object ProcessTreeJVMRSSMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getJVMRSSInfo() + } +} + +case object ProcessTreeJVMVMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getJVMVirtualMemInfo() + } +} + +case object ProcessTreePythonRSSMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getPythonRSSInfo() + } +} + +case object ProcessTreePythonVMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getPythonVirtualMemInfo() + } +} + +case object ProcessTreeOtherRSSMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getOtherRSSInfo() + } +} + +case object ProcessTreeOtherVMemory extends ExecutorMetricType { + override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = { + ExecutorMetricType.pTreeInfo.getOtherVirtualMemInfo() + } +} + case object OnHeapExecutionMemory extends MemoryManagerExecutorMetricType( _.onHeapExecutionMemoryUsed) @@ -84,6 +121,8 @@ case object MappedPoolMemory extends MBeanExecutorMetricType( "java.nio:type=BufferPool,name=mapped") private[spark] object ExecutorMetricType { + final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems + // List of all executor metric types val values = IndexedSeq( JVMHeapMemory, @@ -95,7 +134,13 @@ private[spark] object ExecutorMetricType { OnHeapUnifiedMemory, OffHeapUnifiedMemory, DirectPoolMemory, - MappedPoolMemory + MappedPoolMemory, + ProcessTreeJVMVMemory, + ProcessTreeJVMRSSMemory, + ProcessTreePythonVMemory, + ProcessTreePythonRSSMemory, + ProcessTreeOtherVMemory, + ProcessTreeOtherRSSMemory ) // Map of executor metric type to its index in values. diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json index 9bf2086cc8e72..bdee36a5a97e0 100644 --- a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json +++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json @@ -37,7 +37,13 @@ "DirectPoolMemory" : 397602, "MappedPoolMemory" : 0, "JVMHeapMemory" : 629553808, - "OffHeapStorageMemory" : 0 + "OffHeapStorageMemory" : 0, + "ProcessTreeJVMVMemory": 500000000, + "ProcessTreeJVMRSSMemory": 200000000, + "ProcessTreePythonVMemory": 80000000, + "ProcessTreePythonRSSMemory": 20000000, + "ProcessTreeOtherVMemory": 80000000, + "ProcessTreeOtherRSSMemory": 20000000 } }, { "id" : "7", @@ -177,7 +183,13 @@ "DirectPoolMemory" : 126261, "MappedPoolMemory" : 0, "JVMHeapMemory" : 518613056, - "OffHeapStorageMemory" : 0 + "OffHeapStorageMemory" : 0, + "ProcessTreeJVMVMemory": 500000000, + "ProcessTreeJVMRSSMemory": 100000000, + "ProcessTreePythonVMemory": 80000000, + "ProcessTreePythonRSSMemory": 20000000, + "ProcessTreeOtherVMemory": 80000000, + "ProcessTreeOtherRSSMemory": 20000000 } }, { "id" : "3", @@ -221,7 +233,13 @@ "DirectPoolMemory" : 87796, "MappedPoolMemory" : 0, "JVMHeapMemory" : 726805712, - "OffHeapStorageMemory" : 0 + "OffHeapStorageMemory" : 0, + "ProcessTreeJVMVMemory": 600000000, + "ProcessTreeJVMRSSMemory": 200000000, + "ProcessTreePythonVMemory": 200000000, + "ProcessTreePythonRSSMemory": 70000000, + "ProcessTreeOtherVMemory": 200000000, + "ProcessTreeOtherRSSMemory": 70000000 } }, { "id" : "2", @@ -265,7 +283,13 @@ "DirectPoolMemory" : 87796, "MappedPoolMemory" : 0, "JVMHeapMemory" : 595946552, - "OffHeapStorageMemory" : 0 + "OffHeapStorageMemory" : 0, + "ProcessTreeJVMVMemory": 400000000, + "ProcessTreeJVMRSSMemory": 90000000, + "ProcessTreePythonVMemory": 100000000, + "ProcessTreePythonRSSMemory": 8000000, + "ProcessTreeOtherVMemory": 100000000, + "ProcessTreeOtherRSSMemory": 8000000 } }, { "id" : "1", @@ -309,6 +333,12 @@ "DirectPoolMemory" : 98230, "MappedPoolMemory" : 0, "JVMHeapMemory" : 755008624, - "OffHeapStorageMemory" : 0 + "OffHeapStorageMemory" : 0, + "ProcessTreeJVMVMemory": 500000000, + "ProcessTreeJVMRSSMemory": 100000000, + "ProcessTreePythonVMemory": 400000000, + "ProcessTreePythonRSSMemory": 40000000, + "ProcessTreeOtherVMemory": 400000000, + "ProcessTreeOtherRSSMemory": 40000000 } } ] diff --git a/core/src/test/resources/spark-events/application_1506645932520_24630151 b/core/src/test/resources/spark-events/application_1506645932520_24630151 index c48ed741c56e0..ce28c43bb909c 100644 --- a/core/src/test/resources/spark-events/application_1506645932520_24630151 +++ b/core/src/test/resources/spark-events/application_1506645932520_24630151 @@ -33,11 +33,11 @@ {"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1524182142251,"Executor ID":"1","Host":"node1404.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182144246,"Failed":false,"Killed":false,"Accumulables":[{"ID":8,"Name":"data size total (min, med, max)","Update":"1920975","Value":"1920974","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":23,"Name":"number of output rows","Update":"3562","Value":"3562","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":25,"Name":"peak memory total (min, med, max)","Update":"41943039","Value":"41943038","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":24,"Name":"sort time total (min, med, max)","Update":"38","Value":"37","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":27,"Name":"duration total (min, med, max)","Update":"1813","Value":"1812","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":28,"Name":"number of output rows","Update":"195602","Value":"195602","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":29,"Name":"number of output rows","Update":"3563","Value":"3563","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":33,"Name":"duration total (min, med, max)","Update":"1558","Value":"1557","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":84,"Name":"internal.metrics.input.recordsRead","Update":3563,"Value":3563,"Internal":true,"Count Failed Values":true},{"ID":83,"Name":"internal.metrics.input.bytesRead","Update":36845111,"Value":36845111,"Internal":true,"Count Failed Values":true},{"ID":82,"Name":"internal.metrics.shuffle.write.writeTime","Update":27318908,"Value":27318908,"Internal":true,"Count Failed Values":true},{"ID":81,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":3562,"Value":3562,"Internal":true,"Count Failed Values":true},{"ID":80,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":349287,"Value":349287,"Internal":true,"Count Failed Values":true},{"ID":71,"Name":"internal.metrics.peakExecutionMemory","Update":41943040,"Value":41943040,"Internal":true,"Count Failed Values":true},{"ID":67,"Name":"internal.metrics.jvmGCTime","Update":33,"Value":33,"Internal":true,"Count Failed Values":true},{"ID":66,"Name":"internal.metrics.resultSize","Update":2394,"Value":2394,"Internal":true,"Count Failed Values":true},{"ID":65,"Name":"internal.metrics.executorCpuTime","Update":1498974375,"Value":1498974375,"Internal":true,"Count Failed Values":true},{"ID":64,"Name":"internal.metrics.executorRunTime","Update":1922,"Value":1922,"Internal":true,"Count Failed Values":true},{"ID":63,"Name":"internal.metrics.executorDeserializeCpuTime","Update":49547405,"Value":49547405,"Internal":true,"Count Failed Values":true},{"ID":62,"Name":"internal.metrics.executorDeserializeTime","Update":56,"Value":56,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":56,"Executor Deserialize CPU Time":49547405,"Executor Run Time":1922,"Executor CPU Time":1498974375,"Result Size":2394,"JVM GC Time":33,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":349287,"Shuffle Write Time":27318908,"Shuffle Records Written":3562},"Input Metrics":{"Bytes Read":36845111,"Records Read":3563},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} {"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1524182130331,"Executor ID":"2","Host":"node4045.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182144444,"Failed":false,"Killed":false,"Accumulables":[{"ID":7,"Name":"data size total (min, med, max)","Update":"204058975","Value":"564814764","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":16,"Name":"number of output rows","Update":"616897","Value":"1707779","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":1,"Name":"number of output rows","Update":"616897","Value":"1707779","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":5,"Name":"duration total (min, med, max)","Update":"23365","Value":"63634","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":59,"Name":"internal.metrics.input.recordsRead","Update":616897,"Value":1707779,"Internal":true,"Count Failed Values":true},{"ID":58,"Name":"internal.metrics.input.bytesRead","Update":50423423,"Value":138656729,"Internal":true,"Count Failed Values":true},{"ID":57,"Name":"internal.metrics.shuffle.write.writeTime","Update":105575962,"Value":301246724,"Internal":true,"Count Failed Values":true},{"ID":56,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":616897,"Value":1707779,"Internal":true,"Count Failed Values":true},{"ID":55,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":22950296,"Value":69377072,"Internal":true,"Count Failed Values":true},{"ID":43,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":5,"Internal":true,"Count Failed Values":true},{"ID":42,"Name":"internal.metrics.jvmGCTime","Update":326,"Value":1107,"Internal":true,"Count Failed Values":true},{"ID":41,"Name":"internal.metrics.resultSize","Update":1856,"Value":5568,"Internal":true,"Count Failed Values":true},{"ID":40,"Name":"internal.metrics.executorCpuTime","Update":11931694025,"Value":31991331624,"Internal":true,"Count Failed Values":true},{"ID":39,"Name":"internal.metrics.executorRunTime","Update":13454,"Value":36578,"Internal":true,"Count Failed Values":true},{"ID":38,"Name":"internal.metrics.executorDeserializeCpuTime","Update":531799977,"Value":1633060096,"Internal":true,"Count Failed Values":true},{"ID":37,"Name":"internal.metrics.executorDeserializeTime","Update":594,"Value":1930,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":594,"Executor Deserialize CPU Time":531799977,"Executor Run Time":13454,"Executor CPU Time":11931694025,"Result Size":1856,"JVM GC Time":326,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":22950296,"Shuffle Write Time":105575962,"Shuffle Records Written":616897},"Input Metrics":{"Bytes Read":50423423,"Records Read":616897},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} {"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1524182130349,"Executor ID":"3","Host":"node0998.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182144840,"Failed":false,"Killed":false,"Accumulables":[{"ID":7,"Name":"data size total (min, med, max)","Update":"207338935","Value":"772153699","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":16,"Name":"number of output rows","Update":"626277","Value":"2334056","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":1,"Name":"number of output rows","Update":"626277","Value":"2334056","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":5,"Name":"duration total (min, med, max)","Update":"24254","Value":"87888","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":59,"Name":"internal.metrics.input.recordsRead","Update":626277,"Value":2334056,"Internal":true,"Count Failed Values":true},{"ID":58,"Name":"internal.metrics.input.bytesRead","Update":50409514,"Value":189066243,"Internal":true,"Count Failed Values":true},{"ID":57,"Name":"internal.metrics.shuffle.write.writeTime","Update":106963069,"Value":408209793,"Internal":true,"Count Failed Values":true},{"ID":56,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":626277,"Value":2334056,"Internal":true,"Count Failed Values":true},{"ID":55,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":31362123,"Value":100739195,"Internal":true,"Count Failed Values":true},{"ID":43,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":7,"Internal":true,"Count Failed Values":true},{"ID":42,"Name":"internal.metrics.jvmGCTime","Update":342,"Value":1449,"Internal":true,"Count Failed Values":true},{"ID":41,"Name":"internal.metrics.resultSize","Update":1856,"Value":7424,"Internal":true,"Count Failed Values":true},{"ID":40,"Name":"internal.metrics.executorCpuTime","Update":12267596062,"Value":44258927686,"Internal":true,"Count Failed Values":true},{"ID":39,"Name":"internal.metrics.executorRunTime","Update":13858,"Value":50436,"Internal":true,"Count Failed Values":true},{"ID":38,"Name":"internal.metrics.executorDeserializeCpuTime","Update":519573839,"Value":2152633935,"Internal":true,"Count Failed Values":true},{"ID":37,"Name":"internal.metrics.executorDeserializeTime","Update":573,"Value":2503,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":573,"Executor Deserialize CPU Time":519573839,"Executor Run Time":13858,"Executor CPU Time":12267596062,"Result Size":1856,"JVM GC Time":342,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":31362123,"Shuffle Write Time":106963069,"Shuffle Records Written":626277},"Input Metrics":{"Bytes Read":50409514,"Records Read":626277},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"driver","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":592412824,"JVMOffHeapMemory":202907152,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":905801,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":905801,"OffHeapUnifiedMemory":0,"DirectPoolMemory":355389,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"2","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":523121272,"JVMOffHeapMemory":88280720,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":52050147,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":52050147,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"1","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":214174608,"JVMOffHeapMemory":91548704,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":47399168,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":47399168,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"4","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":518613056,"JVMOffHeapMemory":95657456,"OnHeapExecutionMemory":37748736,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":63104457,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":100853193,"OffHeapUnifiedMemory":0,"DirectPoolMemory":126261,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"3","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":726805712,"JVMOffHeapMemory":90709624,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":69535048,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":69535048,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"driver","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":592412824,"JVMOffHeapMemory":202907152,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":905801,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":905801,"OffHeapUnifiedMemory":0,"DirectPoolMemory":355389,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":500000000,"ProcessTreeJVMRSSMemory":90000000,"ProcessTreePythonVMemory":70000000,"ProcessTreePythonRSSMemory":10000000,"ProcessTreeOtherVMemory":70000000,"ProcessTreeOtherRSSMemory":10000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"2","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":523121272,"JVMOffHeapMemory":88280720,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":52050147,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":52050147,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":400000000,"ProcessTreeJVMRSSMemory":90000000,"ProcessTreePythonVMemory":100000000,"ProcessTreePythonRSSMemory":8000000,"ProcessTreeOtherVMemory":100000000,"ProcessTreeOtherRSSMemory":8000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"1","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":214174608,"JVMOffHeapMemory":91548704,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":47399168,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":47399168,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":450000000,"ProcessTreeJVMRSSMemory":85000000,"ProcessTreePythonVMemory":300000000,"ProcessTreePythonRSSMemory":30000000,"ProcessTreeOtherVMemory":300000000,"ProcessTreeOtherRSSMemory":30000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"4","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":518613056,"JVMOffHeapMemory":95657456,"OnHeapExecutionMemory":37748736,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":63104457,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":100853193,"OffHeapUnifiedMemory":0,"DirectPoolMemory":126261,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":480000000,"ProcessTreeJVMRSSMemory":95000000,"ProcessTreePythonVMemory":80000000,"ProcessTreePythonRSSMemory":20000000,"ProcessTreeOtherVMemory":80000000,"ProcessTreeOtherRSSMemory":20000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"3","Stage ID":0,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":726805712,"JVMOffHeapMemory":90709624,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":69535048,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":69535048,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":500000000,"ProcessTreeJVMRSSMemory":100000000,"ProcessTreePythonVMemory":100000000,"ProcessTreePythonRSSMemory":60000000,"ProcessTreeOtherVMemory":100000000,"ProcessTreeOtherRSSMemory":60000000}} {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"cache at :41","Number of Tasks":4,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"11\",\"name\":\"Exchange\"}","Callsite":"cache at :41","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"FileScanRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :39","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":2,"Name":"*(1) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n+- *(1) FileScan avro [appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] Batched: false, Format: com.databricks.spark.avro.DefaultSource@7006b304, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct:39","Parent IDs":[1],"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :39","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":5,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at :41","Parent IDs":[4],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":4,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at :41","Parent IDs":[2],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.(:58)\n$line49.$read$$iw$$iw$$iw$$iw.(:60)\n$line49.$read$$iw$$iw$$iw.(:62)\n$line49.$read$$iw$$iw.(:64)\n$line49.$read$$iw.(:66)\n$line49.$read.(:68)\n$line49.$read$.(:72)\n$line49.$read$.()\n$line49.$eval$.$print$lzycompute(:7)\n$line49.$eval$.$print(:6)\n$line49.$eval.$print()\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Submission Time":1524182130229,"Completion Time":1524182144852,"Accumulables":[{"ID":41,"Name":"internal.metrics.resultSize","Value":7424,"Internal":true,"Count Failed Values":true},{"ID":59,"Name":"internal.metrics.input.recordsRead","Value":2334056,"Internal":true,"Count Failed Values":true},{"ID":38,"Name":"internal.metrics.executorDeserializeCpuTime","Value":2152633935,"Internal":true,"Count Failed Values":true},{"ID":56,"Name":"internal.metrics.shuffle.write.recordsWritten","Value":2334056,"Internal":true,"Count Failed Values":true},{"ID":5,"Name":"duration total (min, med, max)","Value":"87888","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":55,"Name":"internal.metrics.shuffle.write.bytesWritten","Value":100739195,"Internal":true,"Count Failed Values":true},{"ID":40,"Name":"internal.metrics.executorCpuTime","Value":44258927686,"Internal":true,"Count Failed Values":true},{"ID":58,"Name":"internal.metrics.input.bytesRead","Value":189066243,"Internal":true,"Count Failed Values":true},{"ID":7,"Name":"data size total (min, med, max)","Value":"772153699","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":16,"Name":"number of output rows","Value":"2334056","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":43,"Name":"internal.metrics.resultSerializationTime","Value":7,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"number of output rows","Value":"2334056","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":37,"Name":"internal.metrics.executorDeserializeTime","Value":2503,"Internal":true,"Count Failed Values":true},{"ID":57,"Name":"internal.metrics.shuffle.write.writeTime","Value":408209793,"Internal":true,"Count Failed Values":true},{"ID":39,"Name":"internal.metrics.executorRunTime","Value":50436,"Internal":true,"Count Failed Values":true},{"ID":42,"Name":"internal.metrics.jvmGCTime","Value":1449,"Internal":true,"Count Failed Values":true}]}} {"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1524182142997,"Executor ID":"4","Host":"node4243.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182145327,"Failed":false,"Killed":false,"Accumulables":[{"ID":8,"Name":"data size total (min, med, max)","Update":"1953295","Value":"3874269","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":23,"Name":"number of output rows","Update":"3575","Value":"7137","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":25,"Name":"peak memory total (min, med, max)","Update":"41943039","Value":"83886077","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":24,"Name":"sort time total (min, med, max)","Update":"49","Value":"86","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":27,"Name":"duration total (min, med, max)","Update":"2002","Value":"3814","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":28,"Name":"number of output rows","Update":"196587","Value":"392189","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":29,"Name":"number of output rows","Update":"3575","Value":"7138","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":33,"Name":"duration total (min, med, max)","Update":"1755","Value":"3312","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":84,"Name":"internal.metrics.input.recordsRead","Update":3575,"Value":7138,"Internal":true,"Count Failed Values":true},{"ID":83,"Name":"internal.metrics.input.bytesRead","Update":36849246,"Value":73694357,"Internal":true,"Count Failed Values":true},{"ID":82,"Name":"internal.metrics.shuffle.write.writeTime","Update":32035583,"Value":59354491,"Internal":true,"Count Failed Values":true},{"ID":81,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":3575,"Value":7137,"Internal":true,"Count Failed Values":true},{"ID":80,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":349006,"Value":698293,"Internal":true,"Count Failed Values":true},{"ID":71,"Name":"internal.metrics.peakExecutionMemory","Update":41943040,"Value":83886080,"Internal":true,"Count Failed Values":true},{"ID":67,"Name":"internal.metrics.jvmGCTime","Update":31,"Value":64,"Internal":true,"Count Failed Values":true},{"ID":66,"Name":"internal.metrics.resultSize","Update":2394,"Value":4788,"Internal":true,"Count Failed Values":true},{"ID":65,"Name":"internal.metrics.executorCpuTime","Update":1785119941,"Value":3284094316,"Internal":true,"Count Failed Values":true},{"ID":64,"Name":"internal.metrics.executorRunTime","Update":2182,"Value":4104,"Internal":true,"Count Failed Values":true},{"ID":63,"Name":"internal.metrics.executorDeserializeCpuTime","Update":71500541,"Value":121047946,"Internal":true,"Count Failed Values":true},{"ID":62,"Name":"internal.metrics.executorDeserializeTime","Update":136,"Value":192,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":136,"Executor Deserialize CPU Time":71500541,"Executor Run Time":2182,"Executor CPU Time":1785119941,"Result Size":2394,"JVM GC Time":31,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":349006,"Shuffle Write Time":32035583,"Shuffle Records Written":3575},"Input Metrics":{"Bytes Read":36849246,"Records Read":3575},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} {"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":7,"Index":3,"Attempt":0,"Launch Time":1524182144237,"Executor ID":"1","Host":"node1404.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182145971,"Failed":false,"Killed":false,"Accumulables":[{"ID":8,"Name":"data size total (min, med, max)","Update":"1337999","Value":"5212268","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":23,"Name":"number of output rows","Update":"2435","Value":"9572","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":25,"Name":"peak memory total (min, med, max)","Update":"37748735","Value":"121634812","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":24,"Name":"sort time total (min, med, max)","Update":"9","Value":"95","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":27,"Name":"duration total (min, med, max)","Update":"1703","Value":"5517","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":28,"Name":"number of output rows","Update":"133759","Value":"525948","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":29,"Name":"number of output rows","Update":"2435","Value":"9573","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":33,"Name":"duration total (min, med, max)","Update":"1609","Value":"4921","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":84,"Name":"internal.metrics.input.recordsRead","Update":2435,"Value":9573,"Internal":true,"Count Failed Values":true},{"ID":83,"Name":"internal.metrics.input.bytesRead","Update":24250210,"Value":97944567,"Internal":true,"Count Failed Values":true},{"ID":82,"Name":"internal.metrics.shuffle.write.writeTime","Update":20055909,"Value":79410400,"Internal":true,"Count Failed Values":true},{"ID":81,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":2435,"Value":9572,"Internal":true,"Count Failed Values":true},{"ID":80,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":242714,"Value":941007,"Internal":true,"Count Failed Values":true},{"ID":71,"Name":"internal.metrics.peakExecutionMemory","Update":37748736,"Value":121634816,"Internal":true,"Count Failed Values":true},{"ID":67,"Name":"internal.metrics.jvmGCTime","Update":31,"Value":95,"Internal":true,"Count Failed Values":true},{"ID":66,"Name":"internal.metrics.resultSize","Update":2394,"Value":7182,"Internal":true,"Count Failed Values":true},{"ID":65,"Name":"internal.metrics.executorCpuTime","Update":896878991,"Value":4180973307,"Internal":true,"Count Failed Values":true},{"ID":64,"Name":"internal.metrics.executorRunTime","Update":1722,"Value":5826,"Internal":true,"Count Failed Values":true},{"ID":63,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2787355,"Value":123835301,"Internal":true,"Count Failed Values":true},{"ID":62,"Name":"internal.metrics.executorDeserializeTime","Update":3,"Value":195,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":3,"Executor Deserialize CPU Time":2787355,"Executor Run Time":1722,"Executor CPU Time":896878991,"Result Size":2394,"JVM GC Time":31,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":242714,"Shuffle Write Time":20055909,"Shuffle Records Written":2435},"Input Metrics":{"Bytes Read":24250210,"Records Read":2435},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} @@ -46,11 +46,11 @@ {"Event":"SparkListenerExecutorAdded","Timestamp":1524182149826,"Executor ID":"7","Executor Info":{"Host":"node6340.grid.company.com","Total Cores":1,"Log Urls":{"stdout":"http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stdout?start=-4096","stderr":"http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stderr?start=-4096"}}} {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"7","Host":"node6340.grid.company.com","Port":5933},"Maximum Memory":956615884,"Timestamp":1524182149983,"Maximum Onheap Memory":956615884,"Maximum Offheap Memory":0} {"Event":"SparkListenerTaskEnd","Stage ID":1,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":6,"Index":2,"Attempt":0,"Launch Time":1524182143166,"Executor ID":"5","Host":"node2477.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182152418,"Failed":false,"Killed":false,"Accumulables":[{"ID":8,"Name":"data size total (min, med, max)","Update":"1910103","Value":"7122371","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":23,"Name":"number of output rows","Update":"3541","Value":"13113","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":25,"Name":"peak memory total (min, med, max)","Update":"41943039","Value":"163577851","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":24,"Name":"sort time total (min, med, max)","Update":"48","Value":"143","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":27,"Name":"duration total (min, med, max)","Update":"6093","Value":"11610","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":28,"Name":"number of output rows","Update":"194553","Value":"720501","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":29,"Name":"number of output rows","Update":"3541","Value":"13114","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":33,"Name":"duration total (min, med, max)","Update":"5951","Value":"10872","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":84,"Name":"internal.metrics.input.recordsRead","Update":3541,"Value":13114,"Internal":true,"Count Failed Values":true},{"ID":83,"Name":"internal.metrics.input.bytesRead","Update":36838295,"Value":134782862,"Internal":true,"Count Failed Values":true},{"ID":82,"Name":"internal.metrics.shuffle.write.writeTime","Update":49790497,"Value":129200897,"Internal":true,"Count Failed Values":true},{"ID":81,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":3541,"Value":13113,"Internal":true,"Count Failed Values":true},{"ID":80,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":355051,"Value":1296058,"Internal":true,"Count Failed Values":true},{"ID":71,"Name":"internal.metrics.peakExecutionMemory","Update":41943040,"Value":163577856,"Internal":true,"Count Failed Values":true},{"ID":68,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":2,"Internal":true,"Count Failed Values":true},{"ID":67,"Name":"internal.metrics.jvmGCTime","Update":920,"Value":1015,"Internal":true,"Count Failed Values":true},{"ID":66,"Name":"internal.metrics.resultSize","Update":2437,"Value":9619,"Internal":true,"Count Failed Values":true},{"ID":65,"Name":"internal.metrics.executorCpuTime","Update":5299274511,"Value":9480247818,"Internal":true,"Count Failed Values":true},{"ID":64,"Name":"internal.metrics.executorRunTime","Update":7847,"Value":13673,"Internal":true,"Count Failed Values":true},{"ID":63,"Name":"internal.metrics.executorDeserializeCpuTime","Update":687811857,"Value":811647158,"Internal":true,"Count Failed Values":true},{"ID":62,"Name":"internal.metrics.executorDeserializeTime","Update":1037,"Value":1232,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":1037,"Executor Deserialize CPU Time":687811857,"Executor Run Time":7847,"Executor CPU Time":5299274511,"Result Size":2437,"JVM GC Time":920,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":355051,"Shuffle Write Time":49790497,"Shuffle Records Written":3541},"Input Metrics":{"Bytes Read":36838295,"Records Read":3541},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"driver","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":629553808,"JVMOffHeapMemory":205304696,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":905801,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":905801,"OffHeapUnifiedMemory":0,"DirectPoolMemory":397602,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"2","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":595946552,"JVMOffHeapMemory":91208368,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":58468944,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":58468944,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"1","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":755008624,"JVMOffHeapMemory":100519936,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":47962185,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":47962185,"OffHeapUnifiedMemory":0,"DirectPoolMemory":98230,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"4","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":518613056,"JVMOffHeapMemory":95657456,"OnHeapExecutionMemory":37748736,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":63104457,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":100853193,"OffHeapUnifiedMemory":0,"DirectPoolMemory":126261,"MappedPoolMemory":0}} -{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"3","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":726805712,"JVMOffHeapMemory":90709624,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":69535048,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":69535048,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"driver","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":629553808,"JVMOffHeapMemory":205304696,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":905801,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":905801,"OffHeapUnifiedMemory":0,"DirectPoolMemory":397602,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":400000000,"ProcessTreeJVMRSSMemory":200000000,"ProcessTreePythonVMemory":80000000,"ProcessTreePythonRSSMemory":20000000,"ProcessTreeOtherVMemory":80000000,"ProcessTreeOtherRSSMemory":20000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"2","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":595946552,"JVMOffHeapMemory":91208368,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":58468944,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":58468944,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":300000000,"ProcessTreeJVMRSSMemory":80000000,"ProcessTreePythonVMemory":90000000,"ProcessTreePythonRSSMemory":7000000,"ProcessTreeOtherVMemory":90000000,"ProcessTreeOtherRSSMemory":7000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"1","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":755008624,"JVMOffHeapMemory":100519936,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":47962185,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":47962185,"OffHeapUnifiedMemory":0,"DirectPoolMemory":98230,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":500000000,"ProcessTreeJVMRSSMemory":100000000,"ProcessTreePythonVMemory":400000000,"ProcessTreePythonRSSMemory":40000000,"ProcessTreeOtherVMemory":400000000,"ProcessTreeOtherRSSMemory":40000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"4","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":518613056,"JVMOffHeapMemory":95657456,"OnHeapExecutionMemory":37748736,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":63104457,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":100853193,"OffHeapUnifiedMemory":0,"DirectPoolMemory":126261,"MappedPoolMemory":0,"ProcessTreeJVMVMemory": 500000000,"ProcessTreeJVMRSSMemory": 100000000,"ProcessTreePythonVMemory":60000000,"ProcessTreePythonRSSMemory":10000000,"ProcessTreeOtherVMemory":60000000,"ProcessTreeOtherRSSMemory":10000000}} +{"Event":"SparkListenerStageExecutorMetrics","Executor ID":"3","Stage ID":1,"Stage Attempt ID":0,"Executor Metrics":{"JVMHeapMemory":726805712,"JVMOffHeapMemory":90709624,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":69535048,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":69535048,"OffHeapUnifiedMemory":0,"DirectPoolMemory":87796,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":600000000,"ProcessTreeJVMRSSMemory":200000000,"ProcessTreePythonVMemory":200000000,"ProcessTreePythonRSSMemory":70000000,"ProcessTreeOtherVMemory":200000000,"ProcessTreeOtherRSSMemory":70000000}} {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"cache at :41","Number of Tasks":4,"RDD Info":[{"RDD ID":14,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"17\",\"name\":\"Exchange\"}","Callsite":"cache at :41","Parent IDs":[13],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":10,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[9],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":9,"Name":"FileScanRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":12,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"19\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[11],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"23\",\"name\":\"Generate\"}","Callsite":"cache at :41","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":13,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"18\",\"name\":\"SortAggregate\"}","Callsite":"cache at :41","Parent IDs":[12],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.(:58)\n$line49.$read$$iw$$iw$$iw$$iw.(:60)\n$line49.$read$$iw$$iw$$iw.(:62)\n$line49.$read$$iw$$iw.(:64)\n$line49.$read$$iw.(:66)\n$line49.$read.(:68)\n$line49.$read$.(:72)\n$line49.$read$.()\n$line49.$eval$.$print$lzycompute(:7)\n$line49.$eval$.$print(:6)\n$line49.$eval.$print()\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Submission Time":1524182130328,"Completion Time":1524182152419,"Accumulables":[{"ID":83,"Name":"internal.metrics.input.bytesRead","Value":134782862,"Internal":true,"Count Failed Values":true},{"ID":23,"Name":"number of output rows","Value":"13113","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":68,"Name":"internal.metrics.resultSerializationTime","Value":2,"Internal":true,"Count Failed Values":true},{"ID":8,"Name":"data size total (min, med, max)","Value":"7122371","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":62,"Name":"internal.metrics.executorDeserializeTime","Value":1232,"Internal":true,"Count Failed Values":true},{"ID":80,"Name":"internal.metrics.shuffle.write.bytesWritten","Value":1296058,"Internal":true,"Count Failed Values":true},{"ID":71,"Name":"internal.metrics.peakExecutionMemory","Value":163577856,"Internal":true,"Count Failed Values":true},{"ID":29,"Name":"number of output rows","Value":"13114","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":65,"Name":"internal.metrics.executorCpuTime","Value":9480247818,"Internal":true,"Count Failed Values":true},{"ID":64,"Name":"internal.metrics.executorRunTime","Value":13673,"Internal":true,"Count Failed Values":true},{"ID":82,"Name":"internal.metrics.shuffle.write.writeTime","Value":129200897,"Internal":true,"Count Failed Values":true},{"ID":67,"Name":"internal.metrics.jvmGCTime","Value":1015,"Internal":true,"Count Failed Values":true},{"ID":25,"Name":"peak memory total (min, med, max)","Value":"163577851","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":28,"Name":"number of output rows","Value":"720501","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":63,"Name":"internal.metrics.executorDeserializeCpuTime","Value":811647158,"Internal":true,"Count Failed Values":true},{"ID":27,"Name":"duration total (min, med, max)","Value":"11610","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":81,"Name":"internal.metrics.shuffle.write.recordsWritten","Value":13113,"Internal":true,"Count Failed Values":true},{"ID":84,"Name":"internal.metrics.input.recordsRead","Value":13114,"Internal":true,"Count Failed Values":true},{"ID":66,"Name":"internal.metrics.resultSize","Value":9619,"Internal":true,"Count Failed Values":true},{"ID":24,"Name":"sort time total (min, med, max)","Value":"143","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":33,"Name":"duration total (min, med, max)","Value":"10872","Internal":true,"Count Failed Values":true,"Metadata":"sql"}]}} {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"show at :40","Number of Tasks":1,"RDD Info":[{"RDD ID":26,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"map\"}","Callsite":"show at :40","Parent IDs":[25],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":25,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"32\",\"name\":\"mapPartitionsInternal\"}","Callsite":"show at :40","Parent IDs":[24],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":8,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[7],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":24,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"27\",\"name\":\"WholeStageCodegen\"}","Callsite":"show at :40","Parent IDs":[23],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":22,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"31\",\"name\":\"InMemoryTableScan\"}","Callsite":"show at :40","Parent IDs":[20],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":20,"Name":"*(5) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n+- SortMergeJoin [appId#0], [appId#137], LeftOuter\n :- *(1) Sort [appId#0 ASC NULLS FIRST], false, 0\n : +- Exchange hashpartitioning(appId#0, 200)\n : +- InMemoryTableScan [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28]\n : +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28], true, 10000, StorageLevel(disk, memory, deserialized, 1 rep...","Scope":"{\"id\":\"26\",\"name\":\"mapPartitionsInternal\"}","Callsite":"cache at :41","Parent IDs":[19],"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"31\",\"name\":\"InMemoryTableScan\"}","Callsite":"show at :40","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":18,"Name":"ZippedPartitionsRDD2","Scope":"{\"id\":\"7\",\"name\":\"SortMergeJoin\"}","Callsite":"cache at :41","Parent IDs":[8,17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":17,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"13\",\"name\":\"SortAggregate\"}","Callsite":"cache at :41","Parent IDs":[16],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":7,"Name":"ShuffledRowRDD","Scope":"{\"id\":\"11\",\"name\":\"Exchange\"}","Callsite":"cache at :41","Parent IDs":[6],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":16,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"14\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[15],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":15,"Name":"ShuffledRowRDD","Scope":"{\"id\":\"17\",\"name\":\"Exchange\"}","Callsite":"cache at :41","Parent IDs":[14],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":19,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"4\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at :41","Parent IDs":[18],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[0,1],"Details":"org.apache.spark.sql.Dataset.show(Dataset.scala:691)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:40)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:45)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:47)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:49)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:51)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:53)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:55)\n$line50.$read$$iw$$iw$$iw$$iw$$iw.(:57)\n$line50.$read$$iw$$iw$$iw$$iw.(:59)\n$line50.$read$$iw$$iw$$iw.(:61)\n$line50.$read$$iw$$iw.(:63)\n$line50.$read$$iw.(:65)\n$line50.$read.(:67)\n$line50.$read$.(:71)\n$line50.$read$.()\n$line50.$eval$.$print$lzycompute(:7)\n$line50.$eval$.$print(:6)\n$line50.$eval.$print()\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Submission Time":1524182152430,"Accumulables":[]},"Properties":{"spark.sql.execution.id":"2"}} {"Event":"SparkListenerTaskStart","Stage ID":2,"Stage Attempt ID":0,"Task Info":{"Task ID":8,"Index":0,"Attempt":0,"Launch Time":1524182152447,"Executor ID":"4","Host":"node4243.grid.company.com","Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index cecd6996df7bd..f2d851b8920d9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -282,53 +282,67 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // receive 3 metric updates from each executor with just stage 0 running, // with different peak updates for each executor createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L))), + new ExecutorMetrics(Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L, 7500L, 3500L, + 6500L, 2500L, 5500L, 1500L))), createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L))), - // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + new ExecutorMetrics(Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L, 8500L, 3500L, + 7500L, 2500L, 6500L, 1500L))), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L))), + new ExecutorMetrics(Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L, 8000L, 4000L, + 7000L, 3000L, 6000L, 2000L))), // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L))), + new ExecutorMetrics(Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L, 9000L, 4000L, + 8000L, 3000L, 7000L, 2000L))), // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L))), + new ExecutorMetrics(Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L, 8000L, 3500L, + 7000L, 2500L, 6000L, 1500L))), // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L))), + new ExecutorMetrics(Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L, 8500L, 3500L, + 7500L, 2500L, 6500L, 1500L))), // now start stage 1, one more metric update for each executor, and new // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks createStageSubmittedEvent(1), // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7; initialize stage 1 peaks createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L))), - // exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 2, 3, 6, 7, 9; + new ExecutorMetrics(Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, + 0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L))), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 3, 6, 7, 9; // initialize stage 1 peaks createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L))), + new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, + 40L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L))), // complete stage 0, and 3 more updates for each executor with just // stage 1 running createStageCompletedEvent(0), // exec 1: new stage 1 peaks for metrics at indexes: 0, 1, 3 createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L))), - // enew ExecutorMetrics(xec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8 + new ExecutorMetrics(Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L, 5000L, 3000L, + 4000L, 2000L, 3000L, 1000L))), + // exec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8 createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L))), + new ExecutorMetrics(Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, + 20L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 5000L, 2000L))), // exec 1: new stage 1 peaks for metrics at indexes: 0, 4, 5, 7 createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L))), + new ExecutorMetrics(Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L, 3000L, 2500L, + 2000L, 1500L, 1000L, 500L))), // exec 2: new stage 1 peak for metrics at index: 7 createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L))), + new ExecutorMetrics(Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, + 20L, 7000L, 3000L, 6000L, 2000L, 5000L, 1000L))), // exec 1: no new stage 1 peaks createExecutorMetricsUpdateEvent(1, - new ExecutorMetrics(Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L))), + new ExecutorMetrics(Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, + 0L, 4000L, 2500L, 3000L, 1500L, 2000L, 500L))), createExecutorRemovedEvent(1), // exec 2: new stage 1 peak for metrics at index: 6 createExecutorMetricsUpdateEvent(2, - new ExecutorMetrics(Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L))), + new ExecutorMetrics(Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L, 7000L, + 4000L, 6000L, 3000L, 5000L, 2000L))), createStageCompletedEvent(1), SparkListenerApplicationEnd(1000L)) @@ -342,19 +356,23 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // expected StageExecutorMetrics, for the given stage id and executor id val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = - Map( - ((0, "1"), - new SparkListenerStageExecutorMetrics("1", 0, 0, - new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L)))), - ((0, "2"), - new SparkListenerStageExecutorMetrics("2", 0, 0, - new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L)))), - ((1, "1"), - new SparkListenerStageExecutorMetrics("1", 1, 0, - new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L)))), - ((1, "2"), - new SparkListenerStageExecutorMetrics("2", 1, 0, - new ExecutorMetrics(Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L))))) + Map( + ((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, + 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)))), + ((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, + 80L, 40L, 9000L, 4000L, 8000L, 3000L, 7000L, 2000L)))), + ((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, + 50L, 0L, 5000L, 3000L, 4000L, 2000L, 3000L, 1000L)))), + ((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + new ExecutorMetrics(Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, + 40L, 40L, 8000L, 5000L, 7000L, 4000L, 6000L, 3000L))))) // Verify the log file contains the expected events. // Posted events should be logged, except for ExecutorMetricsUpdate events -- these diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 016570b9cba73..d34d5a952e4cf 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -1220,58 +1220,74 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // receive 3 metric updates from each executor with just stage 0 running, // with different peak updates for each executor listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L))) + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L, 7500L, 3500L, + 6500L, 2500L, 5500L, 1500L))) listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L))) + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L, 8500L, 3500L, + 7500L, 2500L, 6500L, 1500L))) // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L))) + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L, 8000L, 4000L, + 7000L, 3000L, 6000L, 2000L))) // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L))) + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L, 9000L, 4000L, + 8000L, 3000L, 7000L, 2000L))) // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L))) + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L, 8000L, 3500L, + 7000L, 2500L, 6000L, 1500L))) // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L))) + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L, 8500L, 3500L, + 7500L, 2500L, 6500L, 1500L))) // now start stage 1, one more metric update for each executor, and new // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks listener.onStageSubmitted(createStageSubmittedEvent(1)) // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L))) + Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L, 5000L, 3000L, + 4000L, 2000L, 3000L, 1000L))) // exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 2, 3, 6, 7, 9 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(7000L, 80L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L))) + Array(7000L, 80L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L, 8000L, 4000L, + 7000L, 3000L, 6000L, 2000L))) // complete stage 0, and 3 more updates for each executor with just // stage 1 running listener.onStageCompleted(createStageCompletedEvent(0)) // exec 1: new stage 1 peaks for metrics at indexes: 0, 1, 3 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L))) + Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L, 5000L, 3000L, + 4000L, 2000L, 3000L, 1000L))) // exec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L))) + Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L, 8000L, 5000L, + 7000L, 4000L, 6000L, 3000L))) // exec 1: new stage 1 peaks for metrics at indexes: 0, 4, 5, 7 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L))) + Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L, 3000L, 2500L, 2000L, + 1500L, 1000L, 500L))) // exec 2: new stage 1 peak for metrics at index: 7 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L))) + Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L, 7000L, 3000L, + 6000L, 2000L, 5000L, 1000L))) // exec 1: no new stage 1 peaks listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1, - Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L))) + Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L, 4000L, 2500L, + 3000L, 1500, 2000L, 500L))) listener.onExecutorRemoved(createExecutorRemovedEvent(1)) // exec 2: new stage 1 peak for metrics at index: 6 listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2, - Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L))) + Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L, 7000L, 4000L, 6000L, + 3000L, 5000L, 2000L))) listener.onStageCompleted(createStageCompletedEvent(1)) // expected peak values for each executor val expectedValues = Map( - "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, 70L, 20L)), - "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 80L, 40L))) + "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, + 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)), + "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, + 80L, 40L, 9000L, 5000L, 8000L, 4000L, 7000L, 3000L))) // check that the stored peak values match the expected values expectedValues.foreach { case (id, metrics) => @@ -1299,23 +1315,29 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { listener.onStageSubmitted(createStageSubmittedEvent(0)) listener.onStageSubmitted(createStageSubmittedEvent(1)) listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("1", 0, 0, - new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L)))) + new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, + 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)))) listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("2", 0, 0, - new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L)))) + new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L, 9000L, + 4000L, 8000L, 3000L, 7000L, 2000L)))) listener.onStageCompleted(createStageCompletedEvent(0)) // executor 1 is removed before stage 1 has finished, the stage executor metrics // are logged afterwards and should still be used to update the executor metrics. listener.onExecutorRemoved(createExecutorRemovedEvent(1)) listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("1", 1, 0, - new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L)))) + new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L, 5000L, 3000L, + 4000L, 2000L, 3000L, 1000L)))) listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("2", 1, 0, - new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))) + new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L, 8000L, 5000L, + 7000L, 4000L, 6000L, 3000L)))) listener.onStageCompleted(createStageCompletedEvent(1)) // expected peak values for each executor val expectedValues = Map( - "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, 70L, 20L)), - "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 80L, 40L))) + "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, + 70L, 20L, 8000L, 4000L, 7000L, 3000L, 6000L, 2000L)), + "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, + 80L, 40L, 9000L, 5000L, 8000L, 4000L, 7000L, 3000L))) // check that the stored peak values match the expected values for ((id, metrics) <- expectedValues) { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 1e0d2af9a4711..b2ebfc4642522 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -96,7 +96,8 @@ class JsonProtocolSuite extends SparkFunSuite { .accumulators().map(AccumulatorSuite.makeInfo) .zipWithIndex.map { case (a, i) => a.copy(id = i) } val executorUpdates = new ExecutorMetrics( - Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, 321L, 654L, 765L)) + Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, + 321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L)) SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)), Some(executorUpdates)) } @@ -105,8 +106,8 @@ class JsonProtocolSuite extends SparkFunSuite { "In your multitude...", 300), RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100L, 0L)) val stageExecutorMetrics = SparkListenerStageExecutorMetrics("1", 2, 3, - new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, 321L, 654L, 765L))) - + new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, + 321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L))) testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) testEvent(taskStart, taskStartJsonString) @@ -440,14 +441,14 @@ class JsonProtocolSuite extends SparkFunSuite { test("executorMetricsFromJson backward compatibility: handle missing metrics") { // any missing metrics should be set to 0 - val executorMetrics = new ExecutorMetrics( - Array(12L, 23L, 45L, 67L, 78L, 89L, 90L, 123L, 456L, 789L)) + val executorMetrics = new ExecutorMetrics(Array(12L, 23L, 45L, 67L, 78L, 89L, + 90L, 123L, 456L, 789L, 40L, 20L, 20L, 10L, 20L, 10L)) val oldExecutorMetricsJson = JsonProtocol.executorMetricsToJson(executorMetrics) .removeField( _._1 == "MappedPoolMemory") - val expectedExecutorMetrics = new ExecutorMetrics( - Array(12L, 23L, 45L, 67L, 78L, 89L, 90L, 123L, 456L, 0L)) - assertEquals(expectedExecutorMetrics, + val exepectedExecutorMetrics = new ExecutorMetrics(Array(12L, 23L, 45L, 67L, + 78L, 89L, 90L, 123L, 456L, 0L, 40L, 20L, 20L, 10L, 20L, 10L)) + assertEquals(exepectedExecutorMetrics, JsonProtocol.executorMetricsFromJson(oldExecutorMetricsJson)) } @@ -872,13 +873,14 @@ private[spark] object JsonProtocolSuite extends Assertions { if (includeTaskMetrics) { Seq((1L, 1, 1, Seq(makeAccumulableInfo(1, false, false, None), makeAccumulableInfo(2, false, false, None)))) - } else { + } else { Seq() } val executorMetricsUpdate = if (includeExecutorMetrics) { - Some(new ExecutorMetrics(Array(123456L, 543L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L))) - } else { + Some(new ExecutorMetrics(Array(123456L, 543L, 0L, 0L, 0L, 0L, 0L, + 0L, 0L, 0L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L))) + } else { None } SparkListenerExecutorMetricsUpdate(execId, taskMetrics, executorMetricsUpdate) @@ -2082,7 +2084,13 @@ private[spark] object JsonProtocolSuite extends Assertions { | "OnHeapUnifiedMemory" : 432, | "OffHeapUnifiedMemory" : 321, | "DirectPoolMemory" : 654, - | "MappedPoolMemory" : 765 + | "MappedPoolMemory" : 765, + | "ProcessTreeJVMVMemory": 256912, + | "ProcessTreeJVMRSSMemory": 123456, + | "ProcessTreePythonVMemory": 123456, + | "ProcessTreePythonRSSMemory": 61728, + | "ProcessTreeOtherVMemory": 30364, + | "ProcessTreeOtherRSSMemory": 15182 | } | |} @@ -2105,7 +2113,13 @@ private[spark] object JsonProtocolSuite extends Assertions { | "OnHeapUnifiedMemory" : 432, | "OffHeapUnifiedMemory" : 321, | "DirectPoolMemory" : 654, - | "MappedPoolMemory" : 765 + | "MappedPoolMemory" : 765, + | "ProcessTreeJVMVMemory": 256912, + | "ProcessTreeJVMRSSMemory": 123456, + | "ProcessTreePythonVMemory": 123456, + | "ProcessTreePythonRSSMemory": 61728, + | "ProcessTreeOtherVMemory": 30364, + | "ProcessTreeOtherRSSMemory": 15182 | } |} """.stripMargin