From 1bb78b13fec3db9256bf323f1dc2638223c3bbf7 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Wed, 14 Jan 2015 20:45:35 -0800 Subject: [PATCH 01/11] [SPARK-4874] [CORE] Collect record count metrics Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter everytime the iterators get accessed. For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up. For backwards compatibiliy, if we deserialize an older event that doesn't have record metrics, we set the metric to -1. --- .../scala/org/apache/spark/CacheManager.scala | 8 +- .../apache/spark/executor/TaskMetrics.scala | 26 ++- .../org/apache/spark/rdd/HadoopRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 4 +- .../apache/spark/rdd/PairRDDFunctions.scala | 16 +- .../hash/BlockStoreShuffleFetcher.scala | 12 +- .../spark/storage/BlockObjectWriter.scala | 10 +- .../scala/org/apache/spark/ui/ToolTips.scala | 2 + .../apache/spark/ui/exec/ExecutorsTab.scala | 6 + .../apache/spark/ui/jobs/ExecutorTable.scala | 6 + .../spark/ui/jobs/JobProgressListener.scala | 12 ++ .../org/apache/spark/ui/jobs/StagePage.scala | 55 ++++-- .../org/apache/spark/ui/jobs/UIData.scala | 4 + .../org/apache/spark/util/JsonProtocol.scala | 16 +- .../util/collection/ExternalSorter.scala | 1 + .../spark/util/interceptingIterator.scala | 82 ++++++++ .../metrics/InputOutputMetricsSuite.scala | 176 +++++++++++++++--- .../storage/BlockObjectWriterSuite.scala | 7 + .../apache/spark/util/JsonProtocolSuite.scala | 53 +++++- 19 files changed, 435 insertions(+), 67 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/interceptingIterator.scala diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a0c0372b7f0ef..48193c8c950d5 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,6 +17,8 @@ package org.apache.spark +import org.apache.spark.util.AfterNextInterceptingIterator + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -49,7 +51,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { .getInputMetricsForReadMethod(inputMetrics.readMethod) existingMetrics.addBytesRead(inputMetrics.bytesRead) - new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) + val iter = blockResult.data.asInstanceOf[Iterator[T]] + new InterruptibleIterator(context, AfterNextInterceptingIterator(iter, (next: T) => { + existingMetrics.addRecordsRead(1) + next + })) case None => // Acquire a lock for loading this partition diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 97912c68c5982..9849d1ed87f69 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -201,6 +201,7 @@ class TaskMetrics extends Serializable { merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) merged.incRemoteBytesRead(depMetrics.remoteBytesRead) + merged.recordsRead += depMetrics.recordsRead } _shuffleReadMetrics = Some(merged) } @@ -243,11 +244,17 @@ object DataWriteMethod extends Enumeration with Serializable { case class InputMetrics(readMethod: DataReadMethod.Value) { private val _bytesRead: AtomicLong = new AtomicLong() + private val _recordsRead: AtomicLong = new AtomicLong() /** * Total bytes read. */ def bytesRead: Long = _bytesRead.get() + + /** + * Total records read. + */ + def recordsRead: Long = _recordsRead.get() @volatile @transient var bytesReadCallback: Option[() => Long] = None /** @@ -257,6 +264,10 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { _bytesRead.addAndGet(bytes) } + def addRecordsRead(records: Long) = { + _recordsRead.addAndGet(records) + } + /** * Invoke the bytesReadCallback and mutate bytesRead. */ @@ -287,6 +298,11 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { private var _bytesWritten: Long = _ def bytesWritten = _bytesWritten private[spark] def setBytesWritten(value : Long) = _bytesWritten = value + + /** + * Total records written + */ + var recordsWritten: Long = 0L } /** @@ -334,6 +350,11 @@ class ShuffleReadMetrics extends Serializable { * Number of blocks fetched in this shuffle by this task (remote or local) */ def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched + + /** + * Total number of records read from the shuffle by this task + */ + var recordsRead: Long = _ } /** @@ -358,5 +379,8 @@ class ShuffleWriteMetrics extends Serializable { private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value - + /** + * Total number of records written from the shuffle by this task + */ + var recordsWritten: Long = _ } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 89adddcf0ac36..f30f2a15a1155 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD -import org.apache.spark.util.{NextIterator, Utils} +import org.apache.spark.util.{AfterNextInterceptingIterator, NextIterator, Utils} import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} import org.apache.spark.storage.StorageLevel @@ -247,7 +247,9 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } - + if (!finished) { + inputMetrics.addRecordsRead(1) + } (key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 44b9ffd2a53fd..8d6142ab6583a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -151,7 +151,9 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - + if (!finished) { + inputMetrics.addRecordsRead(1) + } (reader.getCurrentKey, reader.getCurrentValue) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 49b88a90ab5af..2089af56f0338 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, -RecordWriter => NewRecordWriter} + RecordWriter => NewRecordWriter} import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner @@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + var recordsWritten = 0L try { - var recordsWritten = 0L while (iter.hasNext) { val pair = iter.next() writer.write(pair._1, pair._2) @@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } committer.commitTask(hadoopContext) bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.recordsWritten = recordsWritten 1 } : Int @@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() + var recordsWritten = 0L try { - var recordsWritten = 0L while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) @@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.recordsWritten = recordsWritten } self.context.runJob(self, writeToFile) @@ -1089,17 +1091,15 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - if (bytesWrittenCallback.isDefined) { - context.taskMetrics.outputMetrics = Some(outputMetrics) - } + context.taskMetrics.outputMetrics = Some(outputMetrics) (outputMetrics, bytesWrittenCallback) } private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long], outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { - if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 - && bytesWrittenCallback.isDefined) { + if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.recordsWritten = recordsWritten } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index e3e7434df45b0..3465ed14c7bce 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.{InterceptingIterator, CompletionIterator} private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -82,7 +82,15 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024) val itr = blockFetcherItr.flatMap(unpackBlock) - val completionIter = CompletionIterator[T, Iterator[T]](itr, { + val itr2 = new InterceptingIterator[T](itr) { + val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() + override def afterNext(next: T) : T = { + readMetrics.recordsRead += 1 + next + } + } + + val completionIter = CompletionIterator[T, Iterator[T]](itr2, { context.taskMetrics.updateShuffleReadMetrics() }) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 3198d766fca37..8f55ca75bf26c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -117,7 +117,7 @@ private[spark] class DiskBlockObjectWriter( /** Calling channel.position() to update the write metrics can be a little bit expensive, so we * only call it every N writes */ - private var writesSinceMetricsUpdate = 0 + private var numRecordsWritten = 0 override def open(): BlockObjectWriter = { fos = new FileOutputStream(file, true) @@ -168,6 +168,7 @@ private[spark] class DiskBlockObjectWriter( override def revertPartialWritesAndClose() { try { writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) + writeMetrics.recordsWritten -= numRecordsWritten if (initialized) { objOut.flush() @@ -193,12 +194,11 @@ private[spark] class DiskBlockObjectWriter( } objOut.writeObject(value) + numRecordsWritten += 1 + writeMetrics.recordsWritten += 1 - if (writesSinceMetricsUpdate == 32) { - writesSinceMetricsUpdate = 0 + if (numRecordsWritten % 32 == 0) { updateBytesWritten() - } else { - writesSinceMetricsUpdate += 1 } } diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 4307029d44fbb..e8a6781ecc477 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -30,8 +30,10 @@ private[spark] object ToolTips { "Time that the task spent blocked waiting for shuffle data to be read from remote machines." val INPUT = "Bytes read from Hadoop or from Spark storage." + val INPUT_RECORDS = "Number of records read from Hadoop or from Spark storage." val OUTPUT = "Bytes written to Hadoop." + val OUTPUT_RECORDS = "Number of records written to Hadoop." val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dd1c2b78c4094..19e1fb3379817 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() val executorToInputBytes = HashMap[String, Long]() + val executorToInputRecords = HashMap[String, Long]() val executorToOutputBytes = HashMap[String, Long]() + val executorToOutputRecords = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() @@ -78,10 +80,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp metrics.inputMetrics.foreach { inputMetrics => executorToInputBytes(eid) = executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + executorToInputRecords(eid) = + executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead } metrics.outputMetrics.foreach { outputMetrics => executorToOutputBytes(eid) = executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten + executorToOutputRecords(eid) = + executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 9836d11a6d85f..ac07b396208ba 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -45,7 +45,9 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage Failed Tasks Succeeded Tasks Input + Input Records Output + Output Records Shuffle Read Shuffle Write Shuffle Spill (Memory) @@ -78,8 +80,12 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {v.succeededTasks} {Utils.bytesToString(v.inputBytes)} + + {v.inputRecords} {Utils.bytesToString(v.outputBytes)} + + {v.outputRecords} {Utils.bytesToString(v.shuffleRead)} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4d200eeda86b9..45ce0821e5dbd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -406,12 +406,24 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta + val inputRecordsDelta = + (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L) + - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L)) + stageData.inputRecords += inputRecordsDelta + execSummary.inputRecords += inputRecordsDelta + val outputBytesDelta = (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L) - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L)) stageData.outputBytes += outputBytesDelta execSummary.outputBytes += outputBytesDelta + val outputRecordsDelta = + (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L)) + stageData.outputRecords += outputRecordsDelta + execSummary.outputRecords += outputRecordsDelta + val diskSpillDelta = taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L) stageData.diskBytesSpilled += diskSpillDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d8be1b20b3acd..f182f3a140964 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -74,12 +74,20 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { Input: {Utils.bytesToString(stageData.inputBytes)} +
  • + Input Records: + {stageData.inputRecords} +
  • }} {if (hasOutput) {
  • Output: {Utils.bytesToString(stageData.outputBytes)}
  • +
  • + Output Records: + {stageData.outputRecords} +
  • }} {if (hasShuffleRead) {
  • @@ -174,15 +182,16 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ - {if (hasInput) Seq(("Input", "")) else Nil} ++ - {if (hasOutput) Seq(("Output", "")) else Nil} ++ + {if (hasInput) Seq(("Input", ""), ("Input Records","")) else Nil} ++ + {if (hasOutput) Seq(("Output", ""), ("Output Records", "")) else Nil} ++ {if (hasShuffleRead) { Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), - ("Shuffle Read", "")) - } else { + ("Shuffle Read", ""), ("Shuffle Read Records", "")) + } else { Nil }} ++ - {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++ + {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", ""), + ("Shuffle Write Records", "")) else Nil} ++ {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) else Nil} ++ Seq(("Errors", "")) @@ -397,26 +406,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val inputReadable = maybeInput .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") .getOrElse("") + val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") val maybeOutput = metrics.flatMap(_.outputMetrics) val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("") val outputReadable = maybeOutput .map(m => s"${Utils.bytesToString(m.bytesWritten)}") .getOrElse("") + val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") - val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime) - val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) + val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime.toString).getOrElse("") val shuffleReadBlockedTimeReadable = - maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("") + maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") + val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("") + val shuffleReadReadable = maybeShuffleRead + .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("") + val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("") - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") + val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) + val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("") + val shuffleWriteReadable = maybeShuffleWrite + .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") + val shuffleWriteRecords = maybeShuffleWrite.map(_.recordsWritten.toString).getOrElse("") val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") @@ -474,11 +487,17 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {inputReadable} + + {inputRecords} + }} {if (hasOutput) { {outputReadable} + + {outputRecords} + }} {if (hasShuffleRead) { {shuffleReadReadable} + + {shuffleReadRecords} + }} {if (hasShuffleWrite) { @@ -496,6 +518,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {shuffleWriteReadable} + + {shuffleWriteRecords} + }} {if (hasBytesSpilled) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 01f7e23212c3d..b25dd56ecf54f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -31,7 +31,9 @@ private[jobs] object UIData { var failedTasks : Int = 0 var succeededTasks : Int = 0 var inputBytes : Long = 0 + var inputRecords : Long = 0 var outputBytes : Long = 0 + var outputRecords : Long = 0 var shuffleRead : Long = 0 var shuffleWrite : Long = 0 var memoryBytesSpilled : Long = 0 @@ -73,7 +75,9 @@ private[jobs] object UIData { var executorRunTime: Long = _ var inputBytes: Long = _ + var inputRecords: Long = _ var outputBytes: Long = _ + var outputRecords: Long = _ var shuffleReadBytes: Long = _ var shuffleWriteBytes: Long = _ var memoryBytesSpilled: Long = _ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e0e41ad3782e..f151457664038 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -293,22 +293,26 @@ private[spark] object JsonProtocol { ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ - ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) + ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~ + ("Records Read" -> shuffleReadMetrics.recordsRead) } def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~ - ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) + ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~ + ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten) } def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { ("Data Read Method" -> inputMetrics.readMethod.toString) ~ - ("Bytes Read" -> inputMetrics.bytesRead) + ("Bytes Read" -> inputMetrics.bytesRead) ~ + ("Records Read" -> inputMetrics.recordsRead) } def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = { ("Data Write Method" -> outputMetrics.writeMethod.toString) ~ - ("Bytes Written" -> outputMetrics.bytesWritten) + ("Bytes Written" -> outputMetrics.bytesWritten) ~ + ("Records Written" -> outputMetrics.recordsWritten) } def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { @@ -669,6 +673,7 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) + metrics.recordsRead = (json \ "Records Read").extractOpt[Long].getOrElse(-1) metrics } @@ -676,6 +681,7 @@ private[spark] object JsonProtocol { val metrics = new ShuffleWriteMetrics metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) + metrics.recordsWritten = (json \ "Shuffle Records Written").extractOpt[Long].getOrElse(-1) metrics } @@ -683,6 +689,7 @@ private[spark] object JsonProtocol { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) metrics.addBytesRead((json \ "Bytes Read").extract[Long]) + metrics.addRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) metrics } @@ -690,6 +697,7 @@ private[spark] object JsonProtocol { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) + metrics.recordsWritten = (json \ "Records Written").extractOpt[Long].getOrElse(-1) metrics } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6ba03841f746b..f5b8cf6d9b8ec 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -763,6 +763,7 @@ private[spark] class ExternalSorter[K, V, C]( if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) + m.recordsWritten += curWriteMetrics.recordsWritten } } diff --git a/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala b/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala new file mode 100644 index 0000000000000..f0a7abfc51161 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * A Wrapper for iterators where a caller can get hooks before/after + * iterator methods. + * @param sub The iterator being wrapped. + * @tparam A the iterable type + */ +private[spark] +class InterceptingIterator[A](sub: Iterator[A]) extends Iterator[A] { + + override def next() :A = { + beforeNext() + val next = sub.next() + afterNext(next) + } + + override def hasNext: Boolean = { + beforeHasNext() + val r = sub.hasNext + afterHasNext(r) + } + + def beforeNext() = {} + def afterNext(next : A) : A = next + + def beforeHasNext() = {} + def afterHasNext(hasNext : Boolean) : Boolean = hasNext +} + +/** + * Helpers if only needing to intercept one of the methods. + */ + +private[spark] object BeforeNextInterceptingIterator { + def apply[A](sub: Iterator[A], function: => Unit) : InterceptingIterator[A] = { + new InterceptingIterator[A](sub) { + override def beforeNext() = function + } + } +} + +private[spark] object AfterNextInterceptingIterator { + def apply[A](sub: Iterator[A], function: (A) => A) : InterceptingIterator[A] = { + new InterceptingIterator[A](sub) { + override def afterNext(next : A) = function(next) + } + } +} + +private[spark] object BeforeHasNextInterceptingIterator { + def apply[A](sub: Iterator[A], function: => Unit) : InterceptingIterator[A] = { + new InterceptingIterator[A](sub) { + override def beforeHasNext() = function + } + } +} + +private[spark] object AfterHasNextInterceptingIterator { + def apply[A](sub: Iterator[A], function: (Boolean) => Boolean) : InterceptingIterator[A] = { + new InterceptingIterator[A](sub) { + override def afterHasNext(hasNext : Boolean) = function(hasNext) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 81db66ae17464..c87f9a7389155 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -21,44 +21,46 @@ import java.io.{File, FileWriter, PrintWriter} import scala.collection.mutable.ArrayBuffer -import org.scalatest.FunSuite - +import org.apache.commons.lang.math.RandomUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf, - LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter, - TextInputFormat => OldTextInputFormat} import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat, - CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader} -import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader, - TaskAttemptContext} + CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit} +import org.apache.hadoop.mapred.{JobConf, Reporter, FileSplit => OldFileSplit, + InputSplit => OldInputSplit, LineRecordReader => OldLineRecordReader, + RecordReader => OldRecordReader, TextInputFormat => OldTextInputFormat} import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat, CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit, FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.hadoop.mapreduce.{TaskAttemptContext, InputSplit => NewInputSplit, + RecordReader => NewRecordReader} +import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.util.Utils -class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext + with BeforeAndAfter { @transient var tmpDir: File = _ @transient var tmpFile: File = _ @transient var tmpFilePath: String = _ + @transient val numRecords: Int = 100000 + @transient val numBuckets: Int = 10 - override def beforeAll() { - super.beforeAll() - + before { tmpDir = Utils.createTempDir() val testTempDir = new File(tmpDir, "test") testTempDir.mkdir() tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(tmpFile)) - for (x <- 1 to 1000000) { - pw.println("s") + for (x <- 1 to numRecords) { + pw.println(RandomUtils.nextInt(numBuckets)) } pw.close() @@ -66,8 +68,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { tmpFilePath = "file://" + tmpFile.getAbsolutePath } - override def afterAll() { - super.afterAll() + after { Utils.deleteRecursively(tmpDir) } @@ -155,6 +156,97 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { assert(bytesRead >= tmpFile.length()) } + test("input metrics on records read - simple") { + val records = runAndReturnRecordsRead { + sc.textFile(tmpFilePath, 4).count() + } + assert(records == numRecords) + } + + test("input metrics on records read - more stages") { + val records = runAndReturnRecordsRead { + sc.textFile(tmpFilePath, 4) + .map(key => (key.length, 1)) + .reduceByKey(_ + _) + .count() + } + assert(records == numRecords) + } + + test("input metrics on records - New Hadoop API") { + val records = runAndReturnRecordsRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).count() + } + assert(records == numRecords) + } + + test("input metrics on recordsd read with cache") { + // prime the cache manager + val rdd = sc.textFile(tmpFilePath, 4).cache() + rdd.collect() + + val records = runAndReturnRecordsRead { + rdd.count() + } + + assert(records == numRecords) + } + + test("shuffle records read metrics") { + val recordsRead = runAndReturnShuffleRecordsRead { + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + assert(recordsRead == numRecords) + } + + test("shuffle records written metrics") { + val recordsWritten = runAndReturnShuffleRecordsWritten { + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + assert(recordsWritten == numRecords) + } + + /** + * Tests the metrics from end to end. + * 1) reading a hadoop file + * 2) shuffle and writing to a hadoop file. + * 3) writing to hadoop file. + */ + test("input read/write and shuffle read/write metrics all line up") { + var inputRead = 0L + var outputWritten = 0L + var shuffleRead = 0L + var shuffleWritten = 0L + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val metrics = taskEnd.taskMetrics + metrics.inputMetrics.foreach(inputRead += _.recordsRead) + metrics.outputMetrics.foreach(outputWritten += _.recordsWritten) + metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead) + metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten) + } + }) + + val tmpFile = new File(tmpDir, getClass.getSimpleName) + + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .reduceByKey(_+_) + .saveAsTextFile("file://" + tmpFile.getAbsolutePath) + + sc.listenerBus.waitUntilEmpty(500) + assert(inputRead == numRecords) + assert(outputWritten == numBuckets) + assert(shuffleRead == shuffleWritten) + } + test("input metrics with interleaved reads") { val numPartitions = 2 val cartVector = 0 to 9 @@ -193,18 +285,60 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize)) } - private def runAndReturnBytesRead(job : => Unit): Long = { - val taskBytesRead = new ArrayBuffer[Long]() + private def runAndReturnBytesRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead)) + } + + private def runAndReturnRecordsRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead)) + } + + private def runAndReturnRecordsWritten(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten)) + } + + private def runAndReturnShuffleRecordsRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.shuffleReadMetrics.map(_.recordsRead)) + } + + private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.recordsWritten)) + } + + private def runAndReturnMetrics(job: => Unit, + collector: (SparkListenerTaskEnd) => Option[Long]): Long = { + val taskMetrics = new ArrayBuffer[Long]() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead + collector(taskEnd).foreach(taskMetrics += _) } }) job sc.listenerBus.waitUntilEmpty(500) - taskBytesRead.sum + taskMetrics.sum + } + + test("output metrics on records written") { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath + + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).saveAsTextFile(filePath) + } + assert(records == numRecords) + } + + test("output metrics on records written - new Hadoop API") { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath + + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) + .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) + } + assert(records == numRecords) } test("output metrics when writing text file") { @@ -318,4 +452,4 @@ class NewCombineTextRecordReaderWrapper( override def getCurrentValue(): Text = delegate.getCurrentValue override def getProgress(): Float = delegate.getProgress override def close(): Unit = delegate.close() -} \ No newline at end of file +} diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index bbc7e1357b90d..dc937d49edee3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite { new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) + // Record metrics update on every write + assert(writeMetrics.recordsWritten == 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -39,6 +41,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.recordsWritten == 33) writer.commitAndClose() assert(file.length() == writeMetrics.shuffleBytesWritten) } @@ -51,6 +54,8 @@ class BlockObjectWriterSuite extends FunSuite { new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) + // Record metrics update on every write + assert(writeMetrics.recordsWritten == 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -59,7 +64,9 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.recordsWritten == 33) writer.revertPartialWritesAndClose() assert(writeMetrics.shuffleBytesWritten == 0) + assert(writeMetrics.recordsWritten == 0) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6577ebaa2e9a8..86fe1005f1f57 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -187,6 +187,34 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("Input/Output records backwards compatibility") { + // records read were added after 1.2 + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, + hasHadoopInput = true, hasOutput = true, hasRecords = false) + assert(metrics.inputMetrics.nonEmpty) + assert(metrics.outputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } + .removeField { case (field, _) => field == "Records Written" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.get.recordsRead == -1) + assert(newMetrics.outputMetrics.get.recordsWritten == -1) + } + + test("Shuffle Read/Write records backwards compatibility") { + // records read were added after 1.2 + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, + hasHadoopInput = false, hasOutput = false, hasRecords = false) + assert(metrics.shuffleReadMetrics.nonEmpty) + assert(metrics.shuffleWriteMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } + .removeField { case (field, _) => field == "Shuffle Records Written" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.shuffleReadMetrics.get.recordsRead == -1) + assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == -1) + } + test("OutputMetrics backward compatibility") { // OutputMetrics were added after 1.1 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true) @@ -642,7 +670,8 @@ class JsonProtocolSuite extends FunSuite { e: Int, f: Int, hasHadoopInput: Boolean, - hasOutput: Boolean) = { + hasOutput: Boolean, + hasRecords: Boolean = true) = { val t = new TaskMetrics t.setHostname("localhost") t.setExecutorDeserializeTime(a) @@ -655,6 +684,7 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) inputMetrics.addBytesRead(d + e + f) + inputMetrics.addRecordsRead(if(hasRecords) (d + e + f) / 100 else -1) t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics @@ -662,16 +692,19 @@ class JsonProtocolSuite extends FunSuite { sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) + sr.recordsRead = if(hasRecords) (b + d) / 100 else -1 t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(a + b + c) + outputMetrics.recordsWritten = if(hasRecords) (a + b + c)/100 else -1 t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) + sw.recordsWritten = if(hasRecords) (a + b + c) / 100 else -1 t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks @@ -905,11 +938,13 @@ class JsonProtocolSuite extends FunSuite { | "Remote Blocks Fetched": 800, | "Local Blocks Fetched": 700, | "Fetch Wait Time": 900, - | "Remote Bytes Read": 1000 + | "Remote Bytes Read": 1000, + | "Records Read" : 10 | }, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200, - | "Shuffle Write Time": 1500 + | "Shuffle Write Time": 1500, + | "Shuffle Records Written": 12 | }, | "Updated Blocks": [ | { @@ -986,11 +1021,13 @@ class JsonProtocolSuite extends FunSuite { | "Disk Bytes Spilled": 0, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200, - | "Shuffle Write Time": 1500 + | "Shuffle Write Time": 1500, + | "Shuffle Records Written": 12 | }, | "Input Metrics": { | "Data Read Method": "Hadoop", - | "Bytes Read": 2100 + | "Bytes Read": 2100, + | "Records Read": 21 | }, | "Updated Blocks": [ | { @@ -1067,11 +1104,13 @@ class JsonProtocolSuite extends FunSuite { | "Disk Bytes Spilled": 0, | "Input Metrics": { | "Data Read Method": "Hadoop", - | "Bytes Read": 2100 + | "Bytes Read": 2100, + | "Records Read": 21 | }, | "Output Metrics": { | "Data Write Method": "Hadoop", - | "Bytes Written": 1200 + | "Bytes Written": 1200, + | "Records Written": 12 | }, | "Updated Blocks": [ | { From 1aa273c90c6ad99d576df398cee83ccd6fa89aeb Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 15 Jan 2015 23:18:06 -0800 Subject: [PATCH 02/11] CR Feedback --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 4 ++-- .../org/apache/spark/storage/BlockObjectWriterSuite.scala | 8 ++++---- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 8 ++++---- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 9849d1ed87f69..493abcc727f9c 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -194,7 +194,7 @@ class TaskMetrics extends Serializable { /** * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ - private[spark] def updateShuffleReadMetrics() = synchronized { + private[spark] def updateShuffleReadMetrics(): Unit = synchronized { val merged = new ShuffleReadMetrics() for (depMetrics <- depsShuffleReadMetrics) { merged.incFetchWaitTime(depMetrics.fetchWaitTime) @@ -206,7 +206,7 @@ class TaskMetrics extends Serializable { _shuffleReadMetrics = Some(merged) } - private[spark] def updateInputMetrics() = synchronized { + private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index dc937d49edee3..7ec3559bb7d78 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -32,7 +32,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(20)) // Record metrics update on every write - assert(writeMetrics.recordsWritten == 1) + assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -41,7 +41,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.recordsWritten == 33) + assert(writeMetrics.recordsWritten === 33) writer.commitAndClose() assert(file.length() == writeMetrics.shuffleBytesWritten) } @@ -55,7 +55,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(20)) // Record metrics update on every write - assert(writeMetrics.recordsWritten == 1) + assert(writeMetrics.recordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -64,7 +64,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.recordsWritten == 33) + assert(writeMetrics.recordsWritten === 33) writer.revertPartialWritesAndClose() assert(writeMetrics.shuffleBytesWritten == 0) assert(writeMetrics.recordsWritten == 0) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 86fe1005f1f57..7531abb82a16d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -684,7 +684,7 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) inputMetrics.addBytesRead(d + e + f) - inputMetrics.addRecordsRead(if(hasRecords) (d + e + f) / 100 else -1) + inputMetrics.addRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics @@ -692,19 +692,19 @@ class JsonProtocolSuite extends FunSuite { sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) - sr.recordsRead = if(hasRecords) (b + d) / 100 else -1 + sr.recordsRead = if (hasRecords) (b + d) / 100 else -1 t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(a + b + c) - outputMetrics.recordsWritten = if(hasRecords) (a + b + c)/100 else -1 + outputMetrics.recordsWritten = if (hasRecords) (a + b + c)/100 else -1 t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) - sw.recordsWritten = if(hasRecords) (a + b + c) / 100 else -1 + sw.recordsWritten = if (hasRecords) (a + b + c) / 100 else -1 t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From 6cdb44eca2775716ec7e0187da1c139d1fff6f1c Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 15 Jan 2015 23:33:08 -0800 Subject: [PATCH 03/11] Removed the generic InterceptingIterator and repalced it with specific implementation --- .../org/apache/spark/rdd/HadoopRDD.scala | 2 +- .../hash/BlockStoreShuffleFetcher.scala | 4 +- .../util/AfterNextInterceptingIterator.scala | 49 +++++++++++ .../spark/util/interceptingIterator.scala | 82 ------------------- 4 files changed, 52 insertions(+), 85 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala delete mode 100644 core/src/main/scala/org/apache/spark/util/interceptingIterator.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f30f2a15a1155..445040fd0bfce 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD -import org.apache.spark.util.{AfterNextInterceptingIterator, NextIterator, Utils} +import org.apache.spark.util.{NextIterator, Utils} import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation} import org.apache.spark.storage.StorageLevel diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 3465ed14c7bce..5fb1cca62ed92 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.{InterceptingIterator, CompletionIterator} +import org.apache.spark.util.{AfterNextInterceptingIterator, CompletionIterator} private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -82,7 +82,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024) val itr = blockFetcherItr.flatMap(unpackBlock) - val itr2 = new InterceptingIterator[T](itr) { + val itr2 = new AfterNextInterceptingIterator[T](itr) { val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() override def afterNext(next: T) : T = { readMetrics.recordsRead += 1 diff --git a/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala b/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala new file mode 100644 index 0000000000000..1ddebb3cbdd3c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * A Wrapper for iterators where a caller can get a hook after next() + * has been called. + * + * @param sub The iterator being wrapped. + * @tparam A the iterable type + */ +private[spark] +class AfterNextInterceptingIterator[A](sub: Iterator[A]) extends Iterator[A] { + + def hasNext: Boolean = sub.hasNext + override def next() :A = { + val next = sub.next() + afterNext(next) + } + + def afterNext(next : A) : A = next +} + +/** + * A shortcut if running a closure is all you need to do. + */ +private[spark] object AfterNextInterceptingIterator { + def apply[A](sub: Iterator[A], function: (A) => A) : AfterNextInterceptingIterator[A] = { + new AfterNextInterceptingIterator[A](sub) { + override def afterNext(next : A) = function(next) + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala b/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala deleted file mode 100644 index f0a7abfc51161..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/interceptingIterator.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -/** - * A Wrapper for iterators where a caller can get hooks before/after - * iterator methods. - * @param sub The iterator being wrapped. - * @tparam A the iterable type - */ -private[spark] -class InterceptingIterator[A](sub: Iterator[A]) extends Iterator[A] { - - override def next() :A = { - beforeNext() - val next = sub.next() - afterNext(next) - } - - override def hasNext: Boolean = { - beforeHasNext() - val r = sub.hasNext - afterHasNext(r) - } - - def beforeNext() = {} - def afterNext(next : A) : A = next - - def beforeHasNext() = {} - def afterHasNext(hasNext : Boolean) : Boolean = hasNext -} - -/** - * Helpers if only needing to intercept one of the methods. - */ - -private[spark] object BeforeNextInterceptingIterator { - def apply[A](sub: Iterator[A], function: => Unit) : InterceptingIterator[A] = { - new InterceptingIterator[A](sub) { - override def beforeNext() = function - } - } -} - -private[spark] object AfterNextInterceptingIterator { - def apply[A](sub: Iterator[A], function: (A) => A) : InterceptingIterator[A] = { - new InterceptingIterator[A](sub) { - override def afterNext(next : A) = function(next) - } - } -} - -private[spark] object BeforeHasNextInterceptingIterator { - def apply[A](sub: Iterator[A], function: => Unit) : InterceptingIterator[A] = { - new InterceptingIterator[A](sub) { - override def beforeHasNext() = function - } - } -} - -private[spark] object AfterHasNextInterceptingIterator { - def apply[A](sub: Iterator[A], function: (Boolean) => Boolean) : InterceptingIterator[A] = { - new InterceptingIterator[A](sub) { - override def afterHasNext(hasNext : Boolean) = function(hasNext) - } - } -} From 57551c168ab0aeddb91e8f31ab794314fd9900b8 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Fri, 30 Jan 2015 16:34:07 -0500 Subject: [PATCH 04/11] Conforms to SPARK-3288 --- .../scala/org/apache/spark/CacheManager.scala | 4 +- .../apache/spark/executor/TaskMetrics.scala | 47 +++++++++---------- .../org/apache/spark/rdd/HadoopRDD.scala | 4 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 4 +- .../apache/spark/rdd/PairRDDFunctions.scala | 6 +-- .../hash/BlockStoreShuffleFetcher.scala | 6 +-- .../apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockObjectWriter.scala | 4 +- .../org/apache/spark/ui/jobs/StagePage.scala | 3 +- .../org/apache/spark/util/JsonProtocol.scala | 10 ++-- .../util/collection/ExternalSorter.scala | 2 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 10 ++-- 13 files changed, 52 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 48193c8c950d5..2a0518872f13f 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -49,11 +49,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val inputMetrics = blockResult.inputMetrics val existingMetrics = context.taskMetrics .getInputMetricsForReadMethod(inputMetrics.readMethod) - existingMetrics.addBytesRead(inputMetrics.bytesRead) + existingMetrics.incBytesRead(inputMetrics.bytesRead) val iter = blockResult.data.asInstanceOf[Iterator[T]] new InterruptibleIterator(context, AfterNextInterceptingIterator(iter, (next: T) => { - existingMetrics.addRecordsRead(1) + existingMetrics.incRecordsRead(1) next })) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 493abcc727f9c..b985c9a9b60ee 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -201,7 +201,7 @@ class TaskMetrics extends Serializable { merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) merged.incRemoteBytesRead(depMetrics.remoteBytesRead) - merged.recordsRead += depMetrics.recordsRead + merged.incRecordsRead(depMetrics.recordsRead) } _shuffleReadMetrics = Some(merged) } @@ -242,31 +242,22 @@ object DataWriteMethod extends Enumeration with Serializable { */ @DeveloperApi case class InputMetrics(readMethod: DataReadMethod.Value) { - - private val _bytesRead: AtomicLong = new AtomicLong() - private val _recordsRead: AtomicLong = new AtomicLong() + + @volatile @transient var bytesReadCallback: Option[() => Long] = None /** * Total bytes read. */ + private val _bytesRead: AtomicLong = new AtomicLong() def bytesRead: Long = _bytesRead.get() + def incBytesRead(bytes: Long) = _bytesRead.addAndGet(bytes) /** * Total records read. */ - def recordsRead: Long = _recordsRead.get() - @volatile @transient var bytesReadCallback: Option[() => Long] = None - - /** - * Adds additional bytes read for this read method. - */ - def addBytesRead(bytes: Long) = { - _bytesRead.addAndGet(bytes) - } - - def addRecordsRead(records: Long) = { - _recordsRead.addAndGet(records) - } + def recordsRead: Long = _recordsRead.get() + private val _recordsRead: AtomicLong = new AtomicLong() + def incRecordsRead(records: Long) = _recordsRead.addAndGet(records) /** * Invoke the bytesReadCallback and mutate bytesRead. @@ -302,7 +293,9 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { /** * Total records written */ - var recordsWritten: Long = 0L + private var _recordsWritten: Long = 0L + def recordsWritten = _recordsWritten + private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value } /** @@ -317,7 +310,7 @@ class ShuffleReadMetrics extends Serializable { private var _remoteBlocksFetched: Int = _ def remoteBlocksFetched = _remoteBlocksFetched private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value /** * Number of local blocks fetched in this shuffle by this task @@ -325,8 +318,7 @@ class ShuffleReadMetrics extends Serializable { private var _localBlocksFetched: Int = _ def localBlocksFetched = _localBlocksFetched private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value - + private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value /** * Time the task spent waiting for remote shuffle blocks. This only includes the time @@ -354,7 +346,10 @@ class ShuffleReadMetrics extends Serializable { /** * Total number of records read from the shuffle by this task */ - var recordsRead: Long = _ + private var _recordsRead: Long = _ + def recordsRead = _recordsRead + private[spark] def incRecordsRead(value: Long) = _recordsRead += value + private[spark] def decRecordsRead(value: Long) = _recordsRead -= value } /** @@ -380,7 +375,11 @@ class ShuffleWriteMetrics extends Serializable { private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value /** - * Total number of records written from the shuffle by this task + * Total number of records written to the shuffle by this task */ - var recordsWritten: Long = _ + @volatile private var _recordsWritten: Long = _ + def recordsWritten = _recordsWritten + private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value + private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value + private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value } diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 445040fd0bfce..486e86ce1bb19 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -248,7 +248,7 @@ class HadoopRDD[K, V]( finished = true } if (!finished) { - inputMetrics.addRecordsRead(1) + inputMetrics.incRecordsRead(1) } (key, value) } @@ -263,7 +263,7 @@ class HadoopRDD[K, V]( // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.addBytesRead(split.inputSplit.value.getLength) + inputMetrics.incBytesRead(split.inputSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 8d6142ab6583a..7fb94840df99c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -152,7 +152,7 @@ class NewHadoopRDD[K, V]( } havePair = false if (!finished) { - inputMetrics.addRecordsRead(1) + inputMetrics.incRecordsRead(1) } (reader.getCurrentKey, reader.getCurrentValue) } @@ -167,7 +167,7 @@ class NewHadoopRDD[K, V]( // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength) + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 2089af56f0338..3e938c546c0bb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1008,7 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } committer.commitTask(hadoopContext) bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.recordsWritten = recordsWritten + outputMetrics.setRecordsWritten(recordsWritten) 1 } : Int @@ -1081,7 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.recordsWritten = recordsWritten + outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) @@ -1099,7 +1099,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } - outputMetrics.recordsWritten = recordsWritten + outputMetrics.setRecordsWritten(recordsWritten) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 5fb1cca62ed92..47e9eb898ccc9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -83,9 +83,9 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { val itr = blockFetcherItr.flatMap(unpackBlock) val itr2 = new AfterNextInterceptingIterator[T](itr) { - val readMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() - override def afterNext(next: T) : T = { - readMetrics.recordsRead += 1 + val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() + override def afterNext(next: T): T = { + readMetrics.incRecordsRead(1) next } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8bc5a1cd18b64..86dbd89f0ffb8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -53,7 +53,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.addBytesRead(bytes) + inputMetrics.incBytesRead(bytes) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 8f55ca75bf26c..ce02fbd4bd804 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -168,7 +168,7 @@ private[spark] class DiskBlockObjectWriter( override def revertPartialWritesAndClose() { try { writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) - writeMetrics.recordsWritten -= numRecordsWritten + writeMetrics.decRecordsWritten(numRecordsWritten) if (initialized) { objOut.flush() @@ -195,7 +195,7 @@ private[spark] class DiskBlockObjectWriter( objOut.writeObject(value) numRecordsWritten += 1 - writeMetrics.recordsWritten += 1 + writeMetrics.incRecordsWritten(1) if (numRecordsWritten % 32 == 0) { updateBytesWritten() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index f182f3a140964..8d505eb46322c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -416,7 +416,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) - val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime.toString).getOrElse("") + val shuffleReadBlockedTimeSortable = maybeShuffleRead + .map(_.fetchWaitTime.toString).getOrElse("") val shuffleReadBlockedTimeReadable = maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f151457664038..497bb68d7bff0 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -673,7 +673,7 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) - metrics.recordsRead = (json \ "Records Read").extractOpt[Long].getOrElse(-1) + metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) metrics } @@ -681,15 +681,15 @@ private[spark] object JsonProtocol { val metrics = new ShuffleWriteMetrics metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) - metrics.recordsWritten = (json \ "Shuffle Records Written").extractOpt[Long].getOrElse(-1) + metrics.setRecordsWritten((json \ "Shuffle Records Written").extractOpt[Long].getOrElse(-1)) metrics } def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.addBytesRead((json \ "Bytes Read").extract[Long]) - metrics.addRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) + metrics.incBytesRead((json \ "Bytes Read").extract[Long]) + metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) metrics } @@ -697,7 +697,7 @@ private[spark] object JsonProtocol { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) - metrics.recordsWritten = (json \ "Records Written").extractOpt[Long].getOrElse(-1) + metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(-1)) metrics } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index f5b8cf6d9b8ec..a98bd22f7c24c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -763,7 +763,7 @@ private[spark] class ExternalSorter[K, V, C]( if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) - m.recordsWritten += curWriteMetrics.recordsWritten + m.incRecordsWritten(curWriteMetrics.recordsWritten) } } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 68074ae32a672..e8405baa8e3ea 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -234,7 +234,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskMetrics.incMemoryBytesSpilled(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.setInputMetrics(Some(inputMetrics)) - inputMetrics.addBytesRead(base + 7) + inputMetrics.incBytesRead(base + 7) val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) outputMetrics.setBytesWritten(base + 8) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 7531abb82a16d..c5bb399415114 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -683,8 +683,8 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.addBytesRead(d + e + f) - inputMetrics.addRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) + inputMetrics.incBytesRead(d + e + f) + inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics @@ -692,19 +692,19 @@ class JsonProtocolSuite extends FunSuite { sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) - sr.recordsRead = if (hasRecords) (b + d) / 100 else -1 + sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1) t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(a + b + c) - outputMetrics.recordsWritten = if (hasRecords) (a + b + c)/100 else -1 + outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) - sw.recordsWritten = if (hasRecords) (a + b + c) / 100 else -1 + sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From 46c81866a97dd39a9da7b2e174a6a1bb702f5258 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Fri, 30 Jan 2015 18:16:47 -0500 Subject: [PATCH 05/11] Combined Bytes and # records into one column Also made the availabiliy of the # records more complete. --- .../scala/org/apache/spark/ui/ToolTips.scala | 11 +-- .../apache/spark/ui/jobs/ExecutorTable.scala | 24 ++--- .../spark/ui/jobs/JobProgressListener.scala | 12 +++ .../org/apache/spark/ui/jobs/StagePage.scala | 98 +++++++++++-------- .../org/apache/spark/ui/jobs/UIData.scala | 4 + 5 files changed, 87 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index e8a6781ecc477..1c0921e2b9971 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -29,16 +29,15 @@ private[spark] object ToolTips { val SHUFFLE_READ_BLOCKED_TIME = "Time that the task spent blocked waiting for shuffle data to be read from remote machines." - val INPUT = "Bytes read from Hadoop or from Spark storage." - val INPUT_RECORDS = "Number of records read from Hadoop or from Spark storage." + val INPUT = "Bytes and # records read from Hadoop or from Spark storage." - val OUTPUT = "Bytes written to Hadoop." - val OUTPUT_RECORDS = "Number of records written to Hadoop." + val OUTPUT = "Bytes and # records written to Hadoop." - val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." + val SHUFFLE_WRITE = + "Bytes and # records written to disk in order to be read by a shuffle in a future stage." val SHUFFLE_READ = - """Bytes read from remote executors. Typically less than shuffle write bytes + """Bytes and # records read from remote executors. Typically less than shuffle write bytes because this does not include shuffle data read locally.""" val GETTING_RESULT_TIME = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index ac07b396208ba..5b7ed322ef72d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -44,12 +44,12 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage Total Tasks Failed Tasks Succeeded Tasks - Input - Input Records - Output - Output Records - Shuffle Read - Shuffle Write + Input Size / Records + Output Size / Records + + Shuffle Read Size / Records + + Shuffle Write Size / Records Shuffle Spill (Memory) Shuffle Spill (Disk) @@ -79,17 +79,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {v.failedTasks} {v.succeededTasks} - {Utils.bytesToString(v.inputBytes)} - - {v.inputRecords} + {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} - {Utils.bytesToString(v.outputBytes)} - - {v.outputRecords} + {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} - {Utils.bytesToString(v.shuffleRead)} + {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} - {Utils.bytesToString(v.shuffleWrite)} + {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} {Utils.bytesToString(v.memoryBytesSpilled)} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 45ce0821e5dbd..df68da493035d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -394,12 +394,24 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.shuffleWriteBytes += shuffleWriteDelta execSummary.shuffleWrite += shuffleWriteDelta + val shuffleWriteRecordsDelta = + (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L)) + stageData.shuffleWriteRecords += shuffleWriteRecordsDelta + execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta + val shuffleReadDelta = (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L) - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)) stageData.shuffleReadBytes += shuffleReadDelta execSummary.shuffleRead += shuffleReadDelta + val shuffleReadRecordsDelta = + (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L)) + stageData.shuffleReadRecords += shuffleReadRecordsDelta + execSummary.shuffleReadRecords += shuffleReadRecordsDelta + val inputBytesDelta = (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L) - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 8d505eb46322c..0791b218c1406 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -71,34 +71,28 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
  • {if (hasInput) {
  • - Input: - {Utils.bytesToString(stageData.inputBytes)} -
  • -
  • - Input Records: - {stageData.inputRecords} + Input Size / Records: + {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"}
  • }} {if (hasOutput) {
  • Output: - {Utils.bytesToString(stageData.outputBytes)} -
  • -
  • - Output Records: - {stageData.outputRecords} + {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"}
  • }} {if (hasShuffleRead) {
  • Shuffle read: - {Utils.bytesToString(stageData.shuffleReadBytes)} + {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " + + s"${stageData.shuffleReadRecords}"}
  • }} {if (hasShuffleWrite) {
  • Shuffle write: - {Utils.bytesToString(stageData.shuffleWriteBytes)} + {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " + + s"${stageData.shuffleWriteRecords}"}
  • }} {if (hasBytesSpilled) { @@ -182,16 +176,16 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ - {if (hasInput) Seq(("Input", ""), ("Input Records","")) else Nil} ++ - {if (hasOutput) Seq(("Output", ""), ("Output Records", "")) else Nil} ++ + {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++ + {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ {if (hasShuffleRead) { Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), - ("Shuffle Read", ""), ("Shuffle Read Records", "")) + ("Shuffle Read Size / Records", "")) } else { Nil }} ++ - {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", ""), - ("Shuffle Write Records", "")) else Nil} ++ + {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) + else Nil} ++ {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) else Nil} ++ Seq(("Errors", "")) @@ -212,8 +206,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { None } else { + def getDistributionQuantities(data: Seq[Double]): IndexedSeq[Double] = + Distribution(data).get.getQuantiles() + def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = { - Distribution(times).get.getQuantiles().map { millis => + getDistributionQuantities(times).map { millis => {UIUtils.formatDuration(millis.toLong)} } } @@ -282,17 +279,36 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedTimeQuantiles(schedulerDelays) def getFormattedSizeQuantiles(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => {Utils.bytesToString(d.toLong)}) + getDistributionQuantities(data).map(d => {Utils.bytesToString(d.toLong)}) + + def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) = { + val recordDist = getDistributionQuantities(records).iterator + getDistributionQuantities(data).map(d => + {s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"} + ) + } val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble } - val inputQuantiles = Input +: getFormattedSizeQuantiles(inputSizes) + + val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble + } + + val inputQuantiles = Input Size / Records +: + getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble } - val outputQuantiles = Output +: getFormattedSizeQuantiles(outputSizes) + + val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + } + + val outputQuantiles = Output Size / Records +: + getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble @@ -303,14 +319,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } - val shuffleReadQuantiles = Shuffle Read (Remote) +: - getFormattedSizeQuantiles(shuffleReadSizes) + + val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble + } + + val shuffleReadQuantiles = Shuffle Read Size / Records (Remote) +: + getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords) val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val shuffleWriteQuantiles = Shuffle Write +: - getFormattedSizeQuantiles(shuffleWriteSizes) + + val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + } + + val shuffleWriteQuantiles = Shuffle Write Size / Records +: + getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.memoryBytesSpilled.toDouble @@ -486,18 +512,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { }} {if (hasInput) { - {inputReadable} - - - {inputRecords} + {s"$inputReadable / $inputRecords"} }} {if (hasOutput) { - {outputReadable} - - - {outputRecords} + {s"$outputReadable / $outputRecords"} }} {if (hasShuffleRead) { @@ -506,10 +526,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {shuffleReadBlockedTimeReadable} - {shuffleReadReadable} - - - {shuffleReadRecords} + {s"$shuffleReadReadable / $shuffleReadRecords"} }} {if (hasShuffleWrite) { @@ -517,10 +534,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {writeTimeReadable} - {shuffleWriteReadable} - - - {shuffleWriteRecords} + {s"$shuffleWriteReadable / $shuffleWriteRecords"} }} {if (hasBytesSpilled) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index b25dd56ecf54f..47ddc8b8851a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -35,7 +35,9 @@ private[jobs] object UIData { var outputBytes : Long = 0 var outputRecords : Long = 0 var shuffleRead : Long = 0 + var shuffleReadRecords : Long = 0 var shuffleWrite : Long = 0 + var shuffleWriteRecords : Long = 0 var memoryBytesSpilled : Long = 0 var diskBytesSpilled : Long = 0 } @@ -79,7 +81,9 @@ private[jobs] object UIData { var outputBytes: Long = _ var outputRecords: Long = _ var shuffleReadBytes: Long = _ + var shuffleReadRecords : Long = _ var shuffleWriteBytes: Long = _ + var shuffleWriteRecords: Long = _ var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ From b6f9923d9e81fd9e5e376a5058e4fc455cc97b1f Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Mon, 2 Feb 2015 14:54:12 -0500 Subject: [PATCH 06/11] Merge AfterNextInterceptingIterator with InterruptableIterator to save a function call --- .../scala/org/apache/spark/CacheManager.scala | 13 +++-- .../hash/BlockStoreShuffleFetcher.scala | 18 +++---- .../util/AfterNextInterceptingIterator.scala | 49 ------------------- 3 files changed, 14 insertions(+), 66 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 2a0518872f13f..a96d754744a05 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -17,8 +17,6 @@ package org.apache.spark -import org.apache.spark.util.AfterNextInterceptingIterator - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -52,11 +50,12 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { existingMetrics.incBytesRead(inputMetrics.bytesRead) val iter = blockResult.data.asInstanceOf[Iterator[T]] - new InterruptibleIterator(context, AfterNextInterceptingIterator(iter, (next: T) => { - existingMetrics.incRecordsRead(1) - next - })) - + new InterruptibleIterator[T](context, iter) { + override def next(): T = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } case None => // Acquire a lock for loading this partition // If another thread already holds the lock, wait for it to finish return its results diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 47e9eb898ccc9..3cf8272b70399 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.{AfterNextInterceptingIterator, CompletionIterator} +import org.apache.spark.util.{CompletionIterator} private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( @@ -82,18 +82,16 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024) val itr = blockFetcherItr.flatMap(unpackBlock) - val itr2 = new AfterNextInterceptingIterator[T](itr) { + val completionIter = CompletionIterator[T, Iterator[T]](itr, { + context.taskMetrics.updateShuffleReadMetrics() + }) + + new InterruptibleIterator[T](context, completionIter) { val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() - override def afterNext(next: T): T = { + override def next(): T = { readMetrics.incRecordsRead(1) - next + delegate.next() } } - - val completionIter = CompletionIterator[T, Iterator[T]](itr2, { - context.taskMetrics.updateShuffleReadMetrics() - }) - - new InterruptibleIterator[T](context, completionIter) } } diff --git a/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala b/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala deleted file mode 100644 index 1ddebb3cbdd3c..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/AfterNextInterceptingIterator.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -/** - * A Wrapper for iterators where a caller can get a hook after next() - * has been called. - * - * @param sub The iterator being wrapped. - * @tparam A the iterable type - */ -private[spark] -class AfterNextInterceptingIterator[A](sub: Iterator[A]) extends Iterator[A] { - - def hasNext: Boolean = sub.hasNext - override def next() :A = { - val next = sub.next() - afterNext(next) - } - - def afterNext(next : A) : A = next -} - -/** - * A shortcut if running a closure is all you need to do. - */ -private[spark] object AfterNextInterceptingIterator { - def apply[A](sub: Iterator[A], function: (A) => A) : AfterNextInterceptingIterator[A] = { - new AfterNextInterceptingIterator[A](sub) { - override def afterNext(next : A) = function(next) - } - } -} - From 17faa3a16d35540f1267ee89b89746883ca6e45c Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Tue, 3 Feb 2015 13:43:42 -0800 Subject: [PATCH 07/11] Removed AtomicLong in favour of using Long --- .../org/apache/spark/executor/TaskMetrics.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index b985c9a9b60ee..d8a112167d80e 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -248,23 +248,23 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { /** * Total bytes read. */ - private val _bytesRead: AtomicLong = new AtomicLong() - def bytesRead: Long = _bytesRead.get() - def incBytesRead(bytes: Long) = _bytesRead.addAndGet(bytes) + private var _bytesRead: Long = _ + def bytesRead: Long = _bytesRead + def incBytesRead(bytes: Long) = _bytesRead += bytes /** * Total records read. */ - def recordsRead: Long = _recordsRead.get() - private val _recordsRead: AtomicLong = new AtomicLong() - def incRecordsRead(records: Long) = _recordsRead.addAndGet(records) + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + def incRecordsRead(records: Long) = _recordsRead += records /** * Invoke the bytesReadCallback and mutate bytesRead. */ def updateBytesRead() { bytesReadCallback.foreach { c => - _bytesRead.set(c()) + _bytesRead = c() } } From 70620a0cb22803563ee5073029faa9fcd0e32e9b Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 5 Feb 2015 15:50:23 -0800 Subject: [PATCH 08/11] CR Feedback - Hide columns in executor summary table if no data - revert change to show output metrics for hadoop < 2.4 - other cr feedback. --- .../apache/spark/executor/TaskMetrics.scala | 5 +- .../apache/spark/rdd/PairRDDFunctions.scala | 4 +- .../spark/storage/BlockObjectWriter.scala | 6 +- .../scala/org/apache/spark/ui/ToolTips.scala | 8 +- .../apache/spark/ui/jobs/ExecutorTable.scala | 88 ++++++++++++++----- .../org/apache/spark/ui/jobs/StagePage.scala | 65 +++++++------- .../org/apache/spark/ui/jobs/UIData.scala | 6 ++ .../org/apache/spark/util/JsonProtocol.scala | 8 +- .../metrics/InputOutputMetricsSuite.scala | 34 ++++--- .../apache/spark/util/JsonProtocolSuite.scala | 8 +- 10 files changed, 152 insertions(+), 80 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index d8a112167d80e..bc972a2267b77 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -242,7 +242,10 @@ object DataWriteMethod extends Enumeration with Serializable { */ @DeveloperApi case class InputMetrics(readMethod: DataReadMethod.Value) { - + + /** + * This is volatile so that it is visible to the updater thread. + */ @volatile @transient var bytesReadCallback: Option[() => Long] = None /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 3e938c546c0bb..955b42c3baaa1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1091,7 +1091,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private def initHadoopOutputMetrics(context: TaskContext): (OutputMetrics, Option[() => Long]) = { val bytesWrittenCallback = SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback() val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) - context.taskMetrics.outputMetrics = Some(outputMetrics) + if (bytesWrittenCallback.isDefined) { + context.taskMetrics.outputMetrics = Some(outputMetrics) + } (outputMetrics, bytesWrittenCallback) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index ce02fbd4bd804..c6fea66d5d21b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -115,8 +115,10 @@ private[spark] class DiskBlockObjectWriter( private var finalPosition: Long = -1 private var reportedPosition = initialPosition - /** Calling channel.position() to update the write metrics can be a little bit expensive, so we - * only call it every N writes */ + /** + * Keep track of number of records written and also use this to periodically + * output bytes written since the latter is expensive to do for each record. + */ private var numRecordsWritten = 0 override def open(): BlockObjectWriter = { diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 1c0921e2b9971..3a15e603b1969 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -29,15 +29,15 @@ private[spark] object ToolTips { val SHUFFLE_READ_BLOCKED_TIME = "Time that the task spent blocked waiting for shuffle data to be read from remote machines." - val INPUT = "Bytes and # records read from Hadoop or from Spark storage." + val INPUT = "Bytes and records read from Hadoop or from Spark storage." - val OUTPUT = "Bytes and # records written to Hadoop." + val OUTPUT = "Bytes and records written to Hadoop." val SHUFFLE_WRITE = - "Bytes and # records written to disk in order to be read by a shuffle in a future stage." + "Bytes and records written to disk in order to be read by a shuffle in a future stage." val SHUFFLE_READ = - """Bytes and # records read from remote executors. Typically less than shuffle write bytes + """Bytes and records read from remote executors. Typically less than shuffle write bytes because this does not include shuffle data read locally.""" val GETTING_RESULT_TIME = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 5b7ed322ef72d..1f8536d1b7195 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -36,6 +36,20 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage /** Special table which merges two header cells. */ private def executorTable[T](): Seq[Node] = { + val stageData = listener.stageIdToData.get((stageId, stageAttemptId)) + var hasInput = false + var hasOutput = false + var hasShuffleWrite = false + var hasShuffleRead = false + var hasBytesSpilled = false + stageData.foreach(data => { + hasInput = data.hasInput + hasOutput = data.hasOutput + hasShuffleRead = data.hasShuffleRead + hasShuffleWrite = data.hasShuffleWrite + hasBytesSpilled = data.hasBytesSpilled + }) + @@ -44,14 +58,32 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage - - - - - - + {if (hasInput) { + + }} + {if (hasOutput) { + + }} + {if (hasShuffleRead) { + + }} + {if (hasShuffleWrite) { + + }} + {if (hasBytesSpilled) { + + + }} {createExecutorTable()} @@ -78,18 +110,34 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage - - - - - - + {if (stageData.hasInput) { + + }} + {if (stageData.hasOutput) { + + }} + {if (stageData.hasShuffleRead) { + + }} + {if (stageData.hasShuffleWrite) { + + }} + {if (stageData.hasBytesSpilled) { + + + }} } case None => diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0791b218c1406..ea4e125d71c8f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,11 +56,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val numCompleted = tasks.count(_.taskInfo.finished) val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables val hasAccumulators = accumulables.size > 0 - val hasInput = stageData.inputBytes > 0 - val hasOutput = stageData.outputBytes > 0 - val hasShuffleRead = stageData.shuffleReadBytes > 0 - val hasShuffleWrite = stageData.shuffleWriteBytes > 0 - val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 val summary =
    @@ -69,33 +64,33 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { Total task time across all tasks: {UIUtils.formatDuration(stageData.executorRunTime)} - {if (hasInput) { + {if (stageData.hasInput) {
  • Input Size / Records: {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"}
  • }} - {if (hasOutput) { + {if (stageData.hasOutput) {
  • Output: {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"}
  • }} - {if (hasShuffleRead) { + {if (stageData.hasShuffleRead) {
  • Shuffle read: {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " + s"${stageData.shuffleReadRecords}"}
  • }} - {if (hasShuffleWrite) { + {if (stageData.hasShuffleWrite) {
  • Shuffle write: {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " + s"${stageData.shuffleWriteRecords}"}
  • }} - {if (hasBytesSpilled) { + {if (stageData.hasBytesSpilled) {
  • Shuffle spill (memory): {Utils.bytesToString(stageData.memoryBytesSpilled)} @@ -134,7 +129,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { Task Deserialization Time
  • - {if (hasShuffleRead) { + {if (stageData.hasShuffleRead) {
  • @@ -176,26 +171,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ - {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++ - {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ - {if (hasShuffleRead) { + {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++ + {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ + {if (stageData.hasShuffleRead) { Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), ("Shuffle Read Size / Records", "")) - } else { + } else { + Nil + }} ++ + {if (stageData.hasShuffleWrite) { + Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) + } else { + Nil + }} ++ + {if (stageData.hasBytesSpilled) { + Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) + } else { Nil }} ++ - {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) - else Nil} ++ - {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) - else Nil} ++ Seq(("Errors", "")) val unzipped = taskHeadersAndCssClasses.unzip val taskTable = UIUtils.listingTable( unzipped._1, - taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite, - hasBytesSpilled), + taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput, + stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled), tasks, headerClasses = unzipped._2) // Excludes tasks which failed and have incomplete metrics @@ -206,11 +207,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { None } else { - def getDistributionQuantities(data: Seq[Double]): IndexedSeq[Double] = + def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = Distribution(data).get.getQuantiles() def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = { - getDistributionQuantities(times).map { millis => + getDistributionQuantiles(times).map { millis =>
  • } } @@ -279,11 +280,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedTimeQuantiles(schedulerDelays) def getFormattedSizeQuantiles(data: Seq[Double]) = - getDistributionQuantities(data).map(d => ) + getDistributionQuantiles(data).map(d => ) def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) = { - val recordDist = getDistributionQuantities(records).iterator - getDistributionQuantities(data).map(d => + val recordDist = getDistributionQuantiles(records).iterator + getDistributionQuantiles(data).map(d => ) } @@ -361,9 +362,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {serializationQuantiles} , {gettingResultQuantiles}, - if (hasInput) {inputQuantiles} else Nil, - if (hasOutput) {outputQuantiles} else Nil, - if (hasShuffleRead) { + if (stageData.hasInput) {inputQuantiles} else Nil, + if (stageData.hasOutput) {outputQuantiles} else Nil, + if (stageData.hasShuffleRead) { {shuffleReadBlockedQuantiles} @@ -371,9 +372,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } else { Nil }, - if (hasShuffleWrite) {shuffleWriteQuantiles} else Nil, - if (hasBytesSpilled) {memoryBytesSpilledQuantiles} else Nil, - if (hasBytesSpilled) {diskBytesSpilledQuantiles} else Nil) + if (stageData.hasShuffleWrite) {shuffleWriteQuantiles} else Nil, + if (stageData.hasBytesSpilled) {memoryBytesSpilledQuantiles} else Nil, + if (stageData.hasBytesSpilled) {diskBytesSpilledQuantiles} else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 47ddc8b8851a5..69aac6c862de5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -93,6 +93,12 @@ private[jobs] object UIData { var accumulables = new HashMap[Long, AccumulableInfo] var taskData = new HashMap[Long, TaskUIData] var executorSummary = new HashMap[String, ExecutorSummary] + + def hasInput = inputBytes > 0 + def hasOutput = outputBytes > 0 + def hasShuffleRead = shuffleReadBytes > 0 + def hasShuffleWrite = shuffleWriteBytes > 0 + def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 497bb68d7bff0..5144d74afc817 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -673,7 +673,7 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) - metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) + metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0)) metrics } @@ -681,7 +681,7 @@ private[spark] object JsonProtocol { val metrics = new ShuffleWriteMetrics metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) - metrics.setRecordsWritten((json \ "Shuffle Records Written").extractOpt[Long].getOrElse(-1)) + metrics.setRecordsWritten((json \ "Shuffle Records Written").extractOpt[Long].getOrElse(0)) metrics } @@ -689,7 +689,7 @@ private[spark] object JsonProtocol { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) metrics.incBytesRead((json \ "Bytes Read").extract[Long]) - metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(-1)) + metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0)) metrics } @@ -697,7 +697,7 @@ private[spark] object JsonProtocol { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) - metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(-1)) + metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0)) metrics } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index c87f9a7389155..e9427ba392a22 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -243,7 +243,11 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext sc.listenerBus.waitUntilEmpty(500) assert(inputRead == numRecords) - assert(outputWritten == numBuckets) + + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + assert(outputWritten == numBuckets) + } assert(shuffleRead == shuffleWritten) } @@ -321,24 +325,30 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext } test("output metrics on records written") { - val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath - val records = runAndReturnRecordsWritten { - sc.parallelize(1 to numRecords).saveAsTextFile(filePath) + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).saveAsTextFile(filePath) + } + assert(records == numRecords) } - assert(records == numRecords) } test("output metrics on records written - new Hadoop API") { - val file = new File(tmpDir, getClass.getSimpleName) - val filePath = "file://" + file.getAbsolutePath + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath - val records = runAndReturnRecordsWritten { - sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) - .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) + .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) + } + assert(records == numRecords) } - assert(records == numRecords) } test("output metrics when writing text file") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index c5bb399415114..db5016e4e35b6 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -197,8 +197,8 @@ class JsonProtocolSuite extends FunSuite { val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } .removeField { case (field, _) => field == "Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.inputMetrics.get.recordsRead == -1) - assert(newMetrics.outputMetrics.get.recordsWritten == -1) + assert(newMetrics.inputMetrics.get.recordsRead == 0) + assert(newMetrics.outputMetrics.get.recordsWritten == 0) } test("Shuffle Read/Write records backwards compatibility") { @@ -211,8 +211,8 @@ class JsonProtocolSuite extends FunSuite { val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.shuffleReadMetrics.get.recordsRead == -1) - assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == -1) + assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) + assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0) } test("OutputMetrics backward compatibility") { From 6f236a182ba66e4b85308dc0d48450e90dafe612 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 5 Feb 2015 21:57:58 -0800 Subject: [PATCH 09/11] Renamed _recordsWritten in ShuffleWriteMetrics to be more consistent --- .../scala/org/apache/spark/executor/TaskMetrics.scala | 10 +++++----- .../org/apache/spark/storage/BlockObjectWriter.scala | 4 ++-- .../org/apache/spark/ui/jobs/JobProgressListener.scala | 4 ++-- .../scala/org/apache/spark/ui/jobs/StagePage.scala | 5 +++-- .../scala/org/apache/spark/util/JsonProtocol.scala | 5 +++-- .../apache/spark/util/collection/ExternalSorter.scala | 2 +- .../apache/spark/metrics/InputOutputMetricsSuite.scala | 4 ++-- .../apache/spark/storage/BlockObjectWriterSuite.scala | 10 +++++----- .../org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 9 files changed, 25 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bc972a2267b77..d05659193b334 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -380,9 +380,9 @@ class ShuffleWriteMetrics extends Serializable { /** * Total number of records written to the shuffle by this task */ - @volatile private var _recordsWritten: Long = _ - def recordsWritten = _recordsWritten - private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value - private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value - private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value + @volatile private var _shuffleRecordsWritten: Long = _ + def shuffleRecordsWritten = _shuffleRecordsWritten + private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value + private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value + private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index c6fea66d5d21b..01727ff950b2a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -170,7 +170,7 @@ private[spark] class DiskBlockObjectWriter( override def revertPartialWritesAndClose() { try { writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) - writeMetrics.decRecordsWritten(numRecordsWritten) + writeMetrics.decShuffleRecordsWritten(numRecordsWritten) if (initialized) { objOut.flush() @@ -197,7 +197,7 @@ private[spark] class DiskBlockObjectWriter( objOut.writeObject(value) numRecordsWritten += 1 - writeMetrics.incRecordsWritten(1) + writeMetrics.incShuffleRecordsWritten(1) if (numRecordsWritten % 32 == 0) { updateBytesWritten() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index df68da493035d..f463f8d7c7215 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -395,8 +395,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.shuffleWrite += shuffleWriteDelta val shuffleWriteRecordsDelta = - (taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L) - - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L)) + (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L)) stageData.shuffleWriteRecords += shuffleWriteRecordsDelta execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index ea4e125d71c8f..02a3cc3e43c25 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -333,7 +333,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble } val shuffleWriteQuantiles = +: @@ -457,7 +457,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("") val shuffleWriteReadable = maybeShuffleWrite .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") - val shuffleWriteRecords = maybeShuffleWrite.map(_.recordsWritten.toString).getOrElse("") + val shuffleWriteRecords = maybeShuffleWrite + .map(_.shuffleRecordsWritten.toString).getOrElse("") val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 5144d74afc817..f852a6d9f945c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -300,7 +300,7 @@ private[spark] object JsonProtocol { def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~ ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~ - ("Shuffle Records Written" -> shuffleWriteMetrics.recordsWritten) + ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten) } def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { @@ -681,7 +681,8 @@ private[spark] object JsonProtocol { val metrics = new ShuffleWriteMetrics metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) - metrics.setRecordsWritten((json \ "Shuffle Records Written").extractOpt[Long].getOrElse(0)) + metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written") + .extractOpt[Long].getOrElse(0)) metrics } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index a98bd22f7c24c..eaec5a71e6819 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -763,7 +763,7 @@ private[spark] class ExternalSorter[K, V, C]( if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) - m.incRecordsWritten(curWriteMetrics.recordsWritten) + m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten) } } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index e9427ba392a22..78fa98a3b9065 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -230,7 +230,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext metrics.inputMetrics.foreach(inputRead += _.recordsRead) metrics.outputMetrics.foreach(outputWritten += _.recordsWritten) metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead) - metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.recordsWritten) + metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten) } }) @@ -306,7 +306,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext } private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = { - runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.recordsWritten)) + runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten)) } private def runAndReturnMetrics(job: => Unit, diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index 7ec3559bb7d78..3f32785d8a173 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -32,7 +32,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(20)) // Record metrics update on every write - assert(writeMetrics.recordsWritten === 1) + assert(writeMetrics.shuffleRecordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -41,7 +41,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.shuffleRecordsWritten === 33) writer.commitAndClose() assert(file.length() == writeMetrics.shuffleBytesWritten) } @@ -55,7 +55,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(20)) // Record metrics update on every write - assert(writeMetrics.recordsWritten === 1) + assert(writeMetrics.shuffleRecordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -64,9 +64,9 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) - assert(writeMetrics.recordsWritten === 33) + assert(writeMetrics.shuffleRecordsWritten === 33) writer.revertPartialWritesAndClose() assert(writeMetrics.shuffleBytesWritten == 0) - assert(writeMetrics.recordsWritten == 0) + assert(writeMetrics.shuffleRecordsWritten == 0) } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index db5016e4e35b6..1482bd341be61 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -212,7 +212,7 @@ class JsonProtocolSuite extends FunSuite { .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) - assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0) + assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0) } test("OutputMetrics backward compatibility") { @@ -704,7 +704,7 @@ class JsonProtocolSuite extends FunSuite { val sw = new ShuffleWriteMetrics sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) - sw.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) + sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks From dad4d5782bf6cde7f655d44bd29ee31041d3af4a Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 5 Feb 2015 21:59:05 -0800 Subject: [PATCH 10/11] Add a comment and check to BlockObjectWriter so that it cannot be reopend. --- .../apache/spark/storage/BlockObjectWriter.scala | 8 +++++++- .../spark/storage/BlockObjectWriterSuite.scala | 14 ++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 01727ff950b2a..81164178b9e8e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics * appending data to an existing block, and can guarantee atomicity in the case of faults * as it allows the caller to revert partial writes. * - * This interface does not support concurrent writes. + * This interface does not support concurrent writes. Also, once the writer has + * been opened, it cannot be reopened again. */ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { @@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter( private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var initialized = false + private var hasBeenClosed = false /** * Cursors used to represent positions in the file. @@ -122,6 +124,9 @@ private[spark] class DiskBlockObjectWriter( private var numRecordsWritten = 0 override def open(): BlockObjectWriter = { + if (hasBeenClosed) { + throw new IllegalStateException("Writer already closed. Cannot be reopened.") + } fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() @@ -147,6 +152,7 @@ private[spark] class DiskBlockObjectWriter( ts = null objOut = null initialized = false + hasBeenClosed = true } } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index 3f32785d8a173..c21c92b63ad13 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -69,4 +69,18 @@ class BlockObjectWriterSuite extends FunSuite { assert(writeMetrics.shuffleBytesWritten == 0) assert(writeMetrics.shuffleRecordsWritten == 0) } + + test("Reopening a closed block writer") { + val file = new File("somefile") + file.deleteOnExit() + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) + + writer.open() + writer.close() + intercept[IllegalStateException] { + writer.open() + } + } } From bd919be5817e29dad476213a0b3b407d28ee0f24 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Thu, 5 Feb 2015 23:14:48 -0800 Subject: [PATCH 11/11] Changed 'Records Read' in shuffleReadMetrics json output to 'Total Records Read' --- .../apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala | 2 +- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 4 ++-- .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 3cf8272b70399..7a2c5ae32d98b 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId} -import org.apache.spark.util.{CompletionIterator} +import org.apache.spark.util.CompletionIterator private[hash] object BlockStoreShuffleFetcher extends Logging { def fetch[T]( diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f852a6d9f945c..3311cbe6b02ee 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -294,7 +294,7 @@ private[spark] object JsonProtocol { ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~ - ("Records Read" -> shuffleReadMetrics.recordsRead) + ("Total Records Read" -> shuffleReadMetrics.recordsRead) } def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { @@ -673,7 +673,7 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) - metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0)) + metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0)) metrics } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 1482bd341be61..7ad8685d5aaf2 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -208,7 +208,7 @@ class JsonProtocolSuite extends FunSuite { assert(metrics.shuffleReadMetrics.nonEmpty) assert(metrics.shuffleWriteMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) - val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } + val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" } .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) @@ -939,7 +939,7 @@ class JsonProtocolSuite extends FunSuite { | "Local Blocks Fetched": 700, | "Fetch Wait Time": 900, | "Remote Bytes Read": 1000, - | "Records Read" : 10 + | "Total Records Read" : 10 | }, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200,
    Executor IDTotal Tasks Failed Tasks Succeeded TasksInput Size / RecordsOutput Size / Records - Shuffle Read Size / Records - Shuffle Write Size / RecordsShuffle Spill (Memory)Shuffle Spill (Disk) + Input Size / Records + + Output Size / Records + + + Shuffle Read Size / Records + + + Shuffle Write Size / Records + Shuffle Spill (Memory)Shuffle Spill (Disk)
    {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} - {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} - {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} - {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} - {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} - {Utils.bytesToString(v.memoryBytesSpilled)} - {Utils.bytesToString(v.diskBytesSpilled)} + {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} + + {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} + + {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} + + {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} + + {Utils.bytesToString(v.memoryBytesSpilled)} + + {Utils.bytesToString(v.diskBytesSpilled)} +
    {UIUtils.formatDuration(millis.toLong)}{Utils.bytesToString(d.toLong)}{Utils.bytesToString(d.toLong)}{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}
    Shuffle Write Size / Records