diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index 3b684bbeceaf..0a2c7cc30a13 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -18,6 +18,7 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** @@ -56,14 +57,12 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners) + // TODO: Make context non-optional in a future release + val spillMetrics = Option(context).map( + _.taskMetrics.getOrCreateShuffleWriteSpillMetrics()).getOrElse(new ShuffleWriteMetrics()) + val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners, + spillMetrics = spillMetrics) combiners.insertAll(iter) - // Update task metrics if context is not null - // TODO: Make context non optional in a future release - Option(context).foreach { c => - c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) - c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) - } combiners.iterator } } @@ -87,17 +86,15 @@ case class Aggregator[K, V, C] ( } combiners.iterator } else { - val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners) + // TODO: Make context non-optional in a future release + val spillMetrics = Option(context).map( + _.taskMetrics.getOrCreateShuffleReadSpillMetrics()).getOrElse(new ShuffleWriteMetrics()) + val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners, + spillMetrics = spillMetrics) while (iter.hasNext) { val pair = iter.next() combiners.insert(pair._1, pair._2) } - // Update task metrics if context is not null - // TODO: Make context non-optional in a future release - Option(context).foreach { c => - c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled) - c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled) - } combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b89effc16d36..bf27d49a4bbb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -124,8 +124,9 @@ private[spark] class PythonRDD( init, finish)) val memoryBytesSpilled = stream.readLong() val diskBytesSpilled = stream.readLong() - context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) + val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics() + spillMetrics.incMemorySize(memoryBytesSpilled) + spillMetrics.incShuffleBytesWritten(diskBytesSpilled) read() case SpecialLengths.PYTHON_EXCEPTION_THROWN => // Signals that an exception has been thrown in python diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index bf3f1e4fc783..ad7fccaa4d5e 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -84,21 +84,41 @@ class TaskMetrics extends Serializable { def resultSerializationTime = _resultSerializationTime private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value + private var _shuffleReadSpillMetrics: Option[ShuffleWriteMetrics] = None + def shuffleReadSpillMetrics = _shuffleReadSpillMetrics + + /** + * This should only be used when recreating TaskMetrics, not when updating read metrics in + * executors. + */ + private[spark] def setShuffleReadSpillMetrics(value: Option[ShuffleWriteMetrics]) { + _shuffleReadSpillMetrics = value + } + + private var _shuffleWriteSpillMetrics: Option[ShuffleWriteMetrics] = None + def shuffleWriteSpillMetrics = _shuffleWriteSpillMetrics + + /** + * This should only be used when recreating TaskMetrics, not when updating read metrics in + * executors. + */ + private[spark] def setShuffleWriteSpillMetrics(value: Option[ShuffleWriteMetrics]) { + _shuffleWriteSpillMetrics = value + } + /** * The number of in-memory bytes spilled by this task */ - private var _memoryBytesSpilled: Long = _ - def memoryBytesSpilled = _memoryBytesSpilled - private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value - private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value + @deprecated + def memoryBytesSpilled = _shuffleReadSpillMetrics.map(_.memorySize).getOrElse(0L) + + _shuffleWriteSpillMetrics.map(_.memorySize).getOrElse(0L) /** * The number of on-disk bytes spilled by this task */ - private var _diskBytesSpilled: Long = _ - def diskBytesSpilled = _diskBytesSpilled - def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value - def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value + @deprecated + def diskBytesSpilled = _shuffleReadSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L) + + _shuffleWriteSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L) /** * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read @@ -211,6 +231,26 @@ class TaskMetrics extends Serializable { private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } + + /** + * Get or create metrics for spills while this task was aggregating shuffle data. + */ + def getOrCreateShuffleWriteSpillMetrics(): ShuffleWriteMetrics = { + if (_shuffleWriteSpillMetrics.isEmpty) { + _shuffleWriteSpillMetrics = Some(new ShuffleWriteMetrics()) + } + _shuffleWriteSpillMetrics.get + } + + /** + * Get or create metrics for spills while this task was writing out shuffle data. + */ + def getOrCreateShuffleReadSpillMetrics(): ShuffleWriteMetrics = { + if (_shuffleReadSpillMetrics.isEmpty) { + _shuffleReadSpillMetrics = Some(new ShuffleWriteMetrics()) + } + _shuffleReadSpillMetrics.get + } } private[spark] object TaskMetrics { @@ -363,6 +403,14 @@ class ShuffleReadMetrics extends Serializable { */ @DeveloperApi class ShuffleWriteMetrics extends Serializable { + /** + * Size the data took up in memory. + */ + @volatile var _memorySize: Long = _ + def memorySize = _memorySize + private[spark] def incMemorySize(value: Long) = _memorySize += value + private[spark] def setMemorySize(value: Long) = _memorySize = value + /** * Number of bytes written for the shuffle by this task */ @@ -387,4 +435,12 @@ class ShuffleWriteMetrics extends Serializable { private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value + + def +=(writeMetrics: ShuffleWriteMetrics): ShuffleWriteMetrics = { + incMemorySize(writeMetrics.memorySize) + incShuffleBytesWritten(writeMetrics.shuffleBytesWritten) + incShuffleWriteTime(writeMetrics.shuffleWriteTime) + incShuffleRecordsWritten(writeMetrics.shuffleRecordsWritten) + this + } } diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index 07398a6fa62f..99c958423ea6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -26,10 +26,11 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext} import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} -import org.apache.spark.util.Utils +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleHandle +import org.apache.spark.util.Utils +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} private[spark] sealed trait CoGroupSplitDep extends Serializable @@ -155,18 +156,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } else { - val map = createExternalMap(numRdds) + val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics() + val map = createExternalMap(numRdds, spillMetrics) for ((it, depNum) <- rddIterators) { map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) } - context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled) new InterruptibleIterator(context, map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) } } - private def createExternalMap(numRdds: Int) + private def createExternalMap(numRdds: Int, spillMetrics: ShuffleWriteMetrics) : ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = { val createCombiner: (CoGroupValue => CoGroupCombiner) = value => { @@ -189,7 +189,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: combiner1 } new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner]( - createCombiner, mergeValue, mergeCombiners) + createCombiner, mergeValue, mergeCombiners, spillMetrics = spillMetrics) } override def clearDependencies() { diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 41bafabde05b..07eba189e353 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -57,10 +57,10 @@ private[spark] class HashShuffleReader[K, C]( case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. - val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) + val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics() + val sorter = new ExternalSorter[K, C, C]( + ordering = Some(keyOrd), serializer = Some(ser), spillMetrics = spillMetrics) sorter.insertAll(aggregatedIter) - context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled) sorter.iterator case None => aggregatedIter diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 27496c5a289c..ab88a5291f67 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -49,17 +49,19 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { + val spillMetrics = context.taskMetrics.getOrCreateShuffleWriteSpillMetrics() + if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( - dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) + dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer, spillMetrics) sorter.insertAll(records) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( - None, Some(dep.partitioner), None, dep.serializer) + None, Some(dep.partitioner), None, dep.serializer, spillMetrics) sorter.insertAll(records) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 1f8536d1b719..16e9dca2e6ea 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -41,13 +41,15 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage var hasOutput = false var hasShuffleWrite = false var hasShuffleRead = false - var hasBytesSpilled = false + var hasReadBytesSpilled = false + var hasWriteBytesSpilled = false stageData.foreach(data => { hasInput = data.hasInput hasOutput = data.hasOutput hasShuffleRead = data.hasShuffleRead hasShuffleWrite = data.hasShuffleWrite - hasBytesSpilled = data.hasBytesSpilled + hasReadBytesSpilled = data.hasReadBytesSpilled + hasWriteBytesSpilled = data.hasWriteBytesSpilled }) @@ -80,9 +82,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage Shuffle Write Size / Records }} - {if (hasBytesSpilled) { - - + {if (hasReadBytesSpilled) { + + + }} + {if (hasWriteBytesSpilled) { + + }} @@ -130,12 +136,20 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} }} - {if (stageData.hasBytesSpilled) { - + + }} + {if (stageData.hasWriteBytesSpilled) { + - }} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index f463f8d7c721..43da9d1b2cd1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -436,15 +436,29 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.outputRecords += outputRecordsDelta execSummary.outputRecords += outputRecordsDelta - val diskSpillDelta = - taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L) - stageData.diskBytesSpilled += diskSpillDelta - execSummary.diskBytesSpilled += diskSpillDelta - - val memorySpillDelta = - taskMetrics.memoryBytesSpilled - oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L) - stageData.memoryBytesSpilled += memorySpillDelta - execSummary.memoryBytesSpilled += memorySpillDelta + val shuffleWriteDiskSpillDelta = + (taskMetrics.shuffleWriteSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteSpillMetrics).map(_.shuffleBytesWritten).getOrElse(0L)) + stageData.shuffleWriteDiskSpillBytes += shuffleWriteDiskSpillDelta + execSummary.shuffleWriteDiskBytesSpilled += shuffleWriteDiskSpillDelta + + val shuffleReadDiskSpillDelta = + (taskMetrics.shuffleReadSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleReadSpillMetrics).map(_.shuffleBytesWritten).getOrElse(0L)) + stageData.shuffleReadDiskSpillBytes += shuffleReadDiskSpillDelta + execSummary.shuffleReadDiskBytesSpilled += shuffleReadDiskSpillDelta + + val shuffleWriteMemorySpillDelta = + (taskMetrics.shuffleWriteSpillMetrics.map(_.memorySize).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteSpillMetrics).map(_.memorySize).getOrElse(0L)) + stageData.shuffleWriteMemorySpillBytes += shuffleWriteMemorySpillDelta + execSummary.shuffleWriteMemoryBytesSpilled += shuffleWriteMemorySpillDelta + + val shuffleReadMemorySpillDelta = + (taskMetrics.shuffleReadSpillMetrics.map(_.memorySize).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleReadSpillMetrics).map(_.memorySize).getOrElse(0L)) + stageData.shuffleReadMemorySpillBytes += shuffleReadMemorySpillDelta + execSummary.shuffleReadMemoryBytesSpilled += shuffleReadMemorySpillDelta val timeDelta = taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 05ffd5bc58fb..31ee8a8fa1d1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -96,14 +96,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { s"${stageData.shuffleWriteRecords}"} }} - {if (stageData.hasBytesSpilled) { + {if (stageData.hasReadBytesSpilled) {
  • - Shuffle spill (memory): - {Utils.bytesToString(stageData.memoryBytesSpilled)} + Shuffle read spill (memory): + {Utils.bytesToString(stageData.shuffleReadMemorySpillBytes)}
  • - Shuffle spill (disk): - {Utils.bytesToString(stageData.diskBytesSpilled)} + Shuffle read spill (disk): + {Utils.bytesToString(stageData.shuffleReadDiskSpillBytes)} +
  • + }} + {if (stageData.hasWriteBytesSpilled) { +
  • + Shuffle write spill (memory): + {Utils.bytesToString(stageData.shuffleWriteMemorySpillBytes)} +
  • +
  • + Shuffle write spill (disk): + {Utils.bytesToString(stageData.shuffleWriteDiskSpillBytes)}
  • }} @@ -190,19 +200,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } else { Nil }} ++ - {if (stageData.hasBytesSpilled) { - Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) + {if (stageData.hasReadBytesSpilled) { + Seq(("Shuffle Read Spill (Memory)", ""), ("Shuffle Read Spill (Disk)", "")) } else { Nil }} ++ - Seq(("Errors", "")) + {if (stageData.hasWriteBytesSpilled) { + Seq(("Shuffle Write Spill (Memory)", ""), ("Shuffle Write Spill (Disk)", "")) + } else { + Nil + }} ++ + Seq(("Errors", "")) val unzipped = taskHeadersAndCssClasses.unzip val taskTable = UIUtils.listingTable( unzipped._1, - taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput, - stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled), + taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput, stageData.hasShuffleRead, + stageData.hasShuffleWrite, stageData.hasReadBytesSpilled, stageData.hasWriteBytesSpilled), tasks, headerClasses = unzipped._2) // Excludes tasks which failed and have incomplete metrics @@ -345,17 +360,33 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val shuffleWriteQuantiles = +: getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) - val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.memoryBytesSpilled.toDouble + val readMemorySpillSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleReadSpillMetrics.map(_.memorySize).getOrElse(0L).toDouble + } + + val readMemorySpillQuantiles = +: + getFormattedSizeQuantiles(readMemorySpillSizes) + + val readDiskSpillSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleReadSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val memoryBytesSpilledQuantiles = +: - getFormattedSizeQuantiles(memoryBytesSpilledSizes) - val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => - metrics.get.diskBytesSpilled.toDouble + val readDiskSpillQuantiles = +: + getFormattedSizeQuantiles(readDiskSpillSizes) + + val writeMemorySpillSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleWriteSpillMetrics.map(_.memorySize).getOrElse(0L).toDouble + } + + val writeMemorySpillQuantiles = +: + getFormattedSizeQuantiles(writeMemorySpillSizes) + + val writeDiskSpillSizes = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleWriteSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val diskBytesSpilledQuantiles = +: - getFormattedSizeQuantiles(diskBytesSpilledSizes) + + val writeDiskSpillQuantiles = +: + getFormattedSizeQuantiles(writeDiskSpillSizes) val listings: Seq[Seq[Node]] = Seq( {serviceQuantiles}, @@ -379,8 +410,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { Nil }, if (stageData.hasShuffleWrite) {shuffleWriteQuantiles} else Nil, - if (stageData.hasBytesSpilled) {memoryBytesSpilledQuantiles} else Nil, - if (stageData.hasBytesSpilled) {diskBytesSpilledQuantiles} else Nil) + if (stageData.hasReadBytesSpilled) {readMemorySpillQuantiles} else Nil, + if (stageData.hasReadBytesSpilled) {readDiskSpillQuantiles} else Nil, + if (stageData.hasWriteBytesSpilled) {writeMemorySpillQuantiles} else Nil, + if (stageData.hasWriteBytesSpilled) {writeDiskSpillQuantiles} else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -419,7 +452,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { hasOutput: Boolean, hasShuffleRead: Boolean, hasShuffleWrite: Boolean, - hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { + hasReadBytesSpilled: Boolean, + hasWriteBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = { taskData match { case TaskUIData(info, metrics, errorMessage) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) @@ -472,14 +506,29 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { if (ms == 0) "" else UIUtils.formatDuration(ms) }.getOrElse("") - val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) - val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("") - val memoryBytesSpilledReadable = - maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") - - val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled) - val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("") - val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") + val maybeReadMemoryBytesSpilled = metrics.flatMap(_.shuffleReadSpillMetrics).map(_.memorySize) + val readMemoryBytesSpilledSortable = maybeReadMemoryBytesSpilled.map(_.toString).getOrElse("") + val readMemoryBytesSpilledReadable = + maybeReadMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeWriteMemoryBytesSpilled = + metrics.flatMap(_.shuffleWriteSpillMetrics).map(_.memorySize) + val writeMemoryBytesSpilledSortable = + maybeWriteMemoryBytesSpilled.map(_.toString).getOrElse("") + val writeMemoryBytesSpilledReadable = + maybeReadMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeReadDiskBytesSpilled = + metrics.flatMap(_.shuffleReadSpillMetrics).map(_.shuffleBytesWritten) + val readDiskBytesSpilledSortable = maybeReadDiskBytesSpilled.map(_.toString).getOrElse("") + val readDiskBytesSpilledReadable = + maybeReadDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") + + val maybeWriteDiskBytesSpilled = + metrics.flatMap(_.shuffleWriteSpillMetrics).map(_.shuffleBytesWritten) + val writeDiskBytesSpilledSortable = maybeWriteDiskBytesSpilled.map(_.toString).getOrElse("") + val writeDiskBytesSpilledReadable = + maybeReadDiskBytesSpilled.map(Utils.bytesToString).getOrElse("") @@ -545,12 +594,20 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {s"$shuffleWriteReadable / $shuffleWriteRecords"} }} - {if (hasBytesSpilled) { - + - + }} {errorMessageCell(errorMessage)} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 69aac6c862de..44aec3a1a432 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -38,8 +38,10 @@ private[jobs] object UIData { var shuffleReadRecords : Long = 0 var shuffleWrite : Long = 0 var shuffleWriteRecords : Long = 0 - var memoryBytesSpilled : Long = 0 - var diskBytesSpilled : Long = 0 + var shuffleReadMemoryBytesSpilled: Long = _ + var shuffleReadDiskBytesSpilled: Long = _ + var shuffleWriteMemoryBytesSpilled: Long = _ + var shuffleWriteDiskBytesSpilled: Long = _ } class JobUIData( @@ -84,8 +86,10 @@ private[jobs] object UIData { var shuffleReadRecords : Long = _ var shuffleWriteBytes: Long = _ var shuffleWriteRecords: Long = _ - var memoryBytesSpilled: Long = _ - var diskBytesSpilled: Long = _ + var shuffleReadMemorySpillBytes: Long = _ + var shuffleReadDiskSpillBytes: Long = _ + var shuffleWriteMemorySpillBytes: Long = _ + var shuffleWriteDiskSpillBytes: Long = _ var schedulingPool: String = "" var description: Option[String] = None @@ -98,7 +102,8 @@ private[jobs] object UIData { def hasOutput = outputBytes > 0 def hasShuffleRead = shuffleReadBytes > 0 def hasShuffleWrite = shuffleWriteBytes > 0 - def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 + def hasReadBytesSpilled = shuffleReadMemorySpillBytes > 0 && shuffleReadDiskSpillBytes > 0 + def hasWriteBytesSpilled = shuffleWriteMemorySpillBytes > 0 && shuffleWriteDiskSpillBytes > 0 } /** diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index b0b545640f5a..c97a5b6c0c52 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -263,6 +263,10 @@ private[spark] object JsonProtocol { taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing) val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val shuffleReadSpillMetrics = + taskMetrics.shuffleReadSpillMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) + val shuffleWriteSpillMetrics = + taskMetrics.shuffleWriteSpillMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) val inputMetrics = taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing) val outputMetrics = @@ -280,8 +284,8 @@ private[spark] object JsonProtocol { ("Result Size" -> taskMetrics.resultSize) ~ ("JVM GC Time" -> taskMetrics.jvmGCTime) ~ ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~ - ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~ - ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~ + ("Shuffle Read Spill Metrics" -> shuffleReadSpillMetrics) ~ + ("Shuffle Write Spill Metrics" -> shuffleWriteSpillMetrics) ~ ("Shuffle Read Metrics" -> shuffleReadMetrics) ~ ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~ ("Input Metrics" -> inputMetrics) ~ @@ -647,8 +651,10 @@ private[spark] object JsonProtocol { metrics.setResultSize((json \ "Result Size").extract[Long]) metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long]) metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long]) - metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long]) - metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long]) + metrics.setShuffleReadSpillMetrics( + Utils.jsonOption(json \ "Shuffle Read Spill Metrics").map(shuffleWriteMetricsFromJson)) + metrics.setShuffleWriteSpillMetrics( + Utils.jsonOption(json \ "Shuffle Write Spill Metrics").map(shuffleWriteMetricsFromJson)) metrics.setShuffleReadMetrics( Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 8a0f5a602de1..7ee307346eb7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -65,7 +65,8 @@ class ExternalAppendOnlyMap[K, V, C]( mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, serializer: Serializer = SparkEnv.get.serializer, - blockManager: BlockManager = SparkEnv.get.blockManager) + blockManager: BlockManager = SparkEnv.get.blockManager, + val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics) extends Iterable[(K, C)] with Serializable with Logging @@ -87,9 +88,6 @@ class ExternalAppendOnlyMap[K, V, C]( */ private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000) - // Number of bytes spilled in total - private var _diskBytesSpilled = 0L - private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024 // Write metrics for current spill @@ -163,7 +161,8 @@ class ExternalAppendOnlyMap[K, V, C]( val w = writer writer = null w.commitAndClose() - _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten + spillMetrics += curWriteMetrics + spillMetrics.setMemorySize(memoryBytesSpilled) batchSizes.append(curWriteMetrics.shuffleBytesWritten) objectsWritten = 0 } @@ -207,8 +206,6 @@ class ExternalAppendOnlyMap[K, V, C]( spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes)) } - def diskBytesSpilled: Long = _diskBytesSpilled - /** * Return an iterator that merges the in-memory map with the spilled maps. * If no spill has occurred, simply return the in-memory map's iterator. diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eaec5a71e681..9853e07348db 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -84,7 +84,8 @@ private[spark] class ExternalSorter[K, V, C]( aggregator: Option[Aggregator[K, V, C]] = None, partitioner: Option[Partitioner] = None, ordering: Option[Ordering[K]] = None, - serializer: Option[Serializer] = None) + serializer: Option[Serializer] = None, + val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics) extends Logging with Spillable[SizeTrackingPairCollection[(Int, K), C]] { private val numPartitions = partitioner.map(_.numPartitions).getOrElse(1) @@ -119,9 +120,6 @@ private[spark] class ExternalSorter[K, V, C]( private var map = new SizeTrackingAppendOnlyMap[(Int, K), C] private var buffer = new SizeTrackingPairBuffer[(Int, K), C] - // Total spilling statistics - private var _diskBytesSpilled = 0L - // Write metrics for current spill private var curWriteMetrics: ShuffleWriteMetrics = _ @@ -287,7 +285,8 @@ private[spark] class ExternalSorter[K, V, C]( val w = writer writer = null w.commitAndClose() - _diskBytesSpilled += curWriteMetrics.shuffleBytesWritten + spillMetrics += curWriteMetrics + spillMetrics.setMemorySize(memoryBytesSpilled) batchSizes.append(curWriteMetrics.shuffleBytesWritten) objectsWritten = 0 } @@ -757,14 +756,8 @@ private[spark] class ExternalSorter[K, V, C]( } } - context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) - context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) - context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => - if (curWriteMetrics != null) { - m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) - m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) - m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten) - } + if (bypassMergeSort && curWriteMetrics != null) { + context.taskMetrics.shuffleWriteMetrics.foreach(_ += curWriteMetrics) } lengths @@ -792,8 +785,6 @@ private[spark] class ExternalSorter[K, V, C]( } } - def diskBytesSpilled: Long = _diskBytesSpilled - /** * Given a stream of ((partition, key), combiner) pairs *assumed to be sorted by partition ID*, * group together the pairs for each partition into a sub-iterator. diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9f5431207485..f9a464c6f27c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -84,11 +84,12 @@ private[spark] trait Spillable[C] extends Logging { _spillCount += 1 logSpillage(currentMemory) + _memoryBytesSpilled += currentMemory + spill(collection) _elementsRead = 0 - // Keep track of spills, and release memory - _memoryBytesSpilled += currentMemory + releaseMemoryForThisThread() return true } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index e8405baa8e3e..076fc56465e7 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -230,8 +230,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc shuffleReadMetrics.incRemoteBlocksFetched(base + 2) shuffleWriteMetrics.incShuffleBytesWritten(base + 3) taskMetrics.setExecutorRunTime(base + 4) - taskMetrics.incDiskBytesSpilled(base + 5) - taskMetrics.incMemoryBytesSpilled(base + 6) + val shuffleReadSpillMetrics = taskMetrics.getOrCreateShuffleReadSpillMetrics() + val shuffleWriteSpillMetrics = taskMetrics.getOrCreateShuffleWriteSpillMetrics() + shuffleReadSpillMetrics.incShuffleBytesWritten(base + 5) + shuffleReadSpillMetrics.setMemorySize(base + 6) + shuffleWriteSpillMetrics.incShuffleBytesWritten(base + 5) + shuffleWriteSpillMetrics.setMemorySize(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.setInputMetrics(Some(inputMetrics)) inputMetrics.incBytesRead(base + 7) @@ -266,10 +270,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(stage1Data.shuffleWriteBytes == 203) assert(stage0Data.executorRunTime == 108) assert(stage1Data.executorRunTime == 204) - assert(stage0Data.diskBytesSpilled == 110) - assert(stage1Data.diskBytesSpilled == 205) - assert(stage0Data.memoryBytesSpilled == 112) - assert(stage1Data.memoryBytesSpilled == 206) + assert(stage0Data.shuffleReadDiskSpillBytes == 110) + assert(stage0Data.shuffleWriteDiskSpillBytes == 110) + assert(stage1Data.shuffleReadDiskSpillBytes == 205) + assert(stage1Data.shuffleWriteDiskSpillBytes == 205) + assert(stage0Data.shuffleReadMemorySpillBytes == 112) + assert(stage0Data.shuffleWriteMemorySpillBytes == 112) + assert(stage1Data.shuffleReadMemorySpillBytes == 206) + assert(stage1Data.shuffleWriteMemorySpillBytes == 206) assert(stage0Data.inputBytes == 114) assert(stage1Data.inputBytes == 207) assert(stage0Data.outputBytes == 116) @@ -296,10 +304,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(stage1Data.shuffleWriteBytes == 606) assert(stage0Data.executorRunTime == 408) assert(stage1Data.executorRunTime == 608) - assert(stage0Data.diskBytesSpilled == 410) - assert(stage1Data.diskBytesSpilled == 610) - assert(stage0Data.memoryBytesSpilled == 412) - assert(stage1Data.memoryBytesSpilled == 612) + assert(stage0Data.shuffleReadDiskSpillBytes == 410) + assert(stage0Data.shuffleWriteDiskSpillBytes == 410) + assert(stage1Data.shuffleReadDiskSpillBytes == 610) + assert(stage1Data.shuffleWriteDiskSpillBytes == 610) + assert(stage0Data.shuffleReadMemorySpillBytes == 412) + assert(stage0Data.shuffleWriteMemorySpillBytes == 412) + assert(stage1Data.shuffleReadMemorySpillBytes == 612) + assert(stage1Data.shuffleWriteMemorySpillBytes == 612) assert(stage0Data.inputBytes == 414) assert(stage1Data.inputBytes == 614) assert(stage0Data.outputBytes == 416) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f3017dc42cd5..416a4ab8ee40 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -681,7 +681,8 @@ class JsonProtocolSuite extends FunSuite { t.setResultSize(c) t.setJvmGCTime(d) t.setResultSerializationTime(a + b) - t.incMemoryBytesSpilled(a + c) + // TODO +// t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 48f79ea65101..f450432746b9 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.FunSuite import org.apache.spark._ +import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.io.CompressionCodec class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { @@ -31,8 +32,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { private def mergeCombiners[T](buf1: ArrayBuffer[T], buf2: ArrayBuffer[T]): ArrayBuffer[T] = buf1 ++= buf2 - private def createExternalMap[T] = new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]]( - createCombiner[T], mergeValue[T], mergeCombiners[T]) + private def createExternalMap[T](spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics) = + new ExternalAppendOnlyMap[T, T, ArrayBuffer[T]]( + createCombiner[T], mergeValue[T], mergeCombiners[T], spillMetrics = spillMetrics) private def createSparkConf(loadDefaults: Boolean, codec: Option[String] = None): SparkConf = { val conf = new SparkConf(loadDefaults) @@ -51,7 +53,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { test("simple insert") { val conf = createSparkConf(loadDefaults = false) sc = new SparkContext("local", "test", conf) - val map = createExternalMap[Int] + val spillMetrics = new ShuffleWriteMetrics() + val map = createExternalMap[Int](spillMetrics) // Single insert map.insert(1, 10) @@ -71,12 +74,15 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { (2, ArrayBuffer[Int](20)), (3, ArrayBuffer[Int](30)))) sc.stop() + assert(spillMetrics.shuffleRecordsWritten === 0) + assert(spillMetrics.shuffleBytesWritten === 0) + assert(spillMetrics.memorySize === 0) } test("insert with collision") { val conf = createSparkConf(loadDefaults = false) sc = new SparkContext("local", "test", conf) - val map = createExternalMap[Int] + val map = createExternalMap[Int]() map.insertAll(Seq( (1, 10), @@ -99,17 +105,17 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = false) sc = new SparkContext("local", "test", conf) - val map1 = createExternalMap[Int] + val map1 = createExternalMap[Int]() map1.insert(1, 10) map1.insert(2, 20) map1.insert(3, 30) - val map2 = createExternalMap[Int] + val map2 = createExternalMap[Int]() map2.insert(2, 20) map2.insert(3, 30) map2.insert(1, 10) - val map3 = createExternalMap[Int] + val map3 = createExternalMap[Int]() map3.insert(3, 30) map3.insert(1, 10) map3.insert(2, 20) @@ -142,7 +148,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = false) sc = new SparkContext("local", "test", conf) - val map = createExternalMap[Int] + val map = createExternalMap[Int]() map.insert(1, 5) map.insert(2, 6) map.insert(3, 7) @@ -295,7 +301,8 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) - val map = createExternalMap[String] + val spillMetrics = new ShuffleWriteMetrics() + val map = createExternalMap[String](spillMetrics) val collisionPairs = Seq( ("Aa", "BB"), // 2112 @@ -338,6 +345,10 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { } assert(count === 100000 + collisionPairs.size * 2) sc.stop() + + assert(spillMetrics.shuffleRecordsWritten > 0) + assert(spillMetrics.shuffleBytesWritten > 0) + assert(spillMetrics.memorySize > 0) } test("spilling with many hash collisions") { @@ -369,7 +380,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) - val map = createExternalMap[Int] + val map = createExternalMap[Int]() (1 to 100000).foreach { i => map.insert(i, i) } map.insert(Int.MaxValue, Int.MaxValue) @@ -386,7 +397,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext { val conf = createSparkConf(loadDefaults = true) conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,512]", "test", conf) - val map = createExternalMap[Int] + val map = createExternalMap[Int]() map.insertAll((1 to 100000).iterator.map(i => (i, i))) map.insert(null.asInstanceOf[Int], 1) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 72d96798b114..a673999a6423 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{PrivateMethodTester, FunSuite} import org.apache.spark._ +import org.apache.spark.executor.ShuffleWriteMetrics import scala.util.Random @@ -57,10 +58,15 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe val ord = implicitly[Ordering[Int]] // Both aggregator and ordering + val spillMetrics = new ShuffleWriteMetrics() val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(3)), Some(ord), None) + Some(agg), Some(new HashPartitioner(3)), Some(ord), None, spillMetrics) assert(sorter.iterator.toSeq === Seq()) sorter.stop() + assert(spillMetrics.memorySize === 0) + assert(spillMetrics.shuffleBytesWritten === 0) + assert(spillMetrics.shuffleRecordsWritten === 0) + assert(spillMetrics.shuffleWriteTime === 0) // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( @@ -95,11 +101,16 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe (5, Set((5, 5))), (6, Set())) // Both aggregator and ordering + val spillMetrics = new ShuffleWriteMetrics() val sorter = new ExternalSorter[Int, Int, Int]( - Some(agg), Some(new HashPartitioner(7)), Some(ord), None) + Some(agg), Some(new HashPartitioner(7)), Some(ord), None, spillMetrics) sorter.insertAll(elements.iterator) assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter.stop() + assert(spillMetrics.memorySize === 0) + assert(spillMetrics.shuffleBytesWritten === 0) + assert(spillMetrics.shuffleRecordsWritten === 0) + assert(spillMetrics.shuffleWriteTime === 0) // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( @@ -133,8 +144,9 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe val ord = implicitly[Ordering[Int]] val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) + val spillMetrics = new ShuffleWriteMetrics() val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), Some(ord), None) + None, Some(new HashPartitioner(7)), Some(ord), None, spillMetrics) assertDidNotBypassMergeSort(sorter) sorter.insertAll(elements) assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled @@ -147,6 +159,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe assert(iter.next() === (5, List((5, 5)))) assert(iter.next() === (6, Nil)) sorter.stop() + + assert(spillMetrics.shuffleRecordsWritten > 0) + assert(spillMetrics.shuffleBytesWritten > 0) + assert(spillMetrics.memorySize > 0) } test("empty partitions with spilling, bypass merge-sort") { @@ -158,8 +174,9 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) + val spillMetrics = new ShuffleWriteMetrics() val sorter = new ExternalSorter[Int, Int, Int]( - None, Some(new HashPartitioner(7)), None, None) + None, Some(new HashPartitioner(7)), None, None, spillMetrics) assertBypassedMergeSort(sorter) sorter.insertAll(elements) assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled @@ -172,6 +189,10 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMe assert(iter.next() === (5, List((5, 5)))) assert(iter.next() === (6, Nil)) sorter.stop() + + assert(spillMetrics.shuffleRecordsWritten === 0) + assert(spillMetrics.shuffleBytesWritten === 0) + assert(spillMetrics.memorySize === 0) } test("spilling in local cluster") {
    Shuffle Spill (Memory)Shuffle Spill (Disk)Shuffle Read Spill (Memory)Shuffle Read Spill (Disk)Shuffle Write Spill (Memory)Shuffle Write Spill (Disk)
    - {Utils.bytesToString(v.memoryBytesSpilled)} + {if (stageData.hasReadBytesSpilled) { + + {Utils.bytesToString(v.shuffleReadMemoryBytesSpilled)} + + {Utils.bytesToString(v.shuffleReadDiskBytesSpilled)} + + {Utils.bytesToString(v.shuffleWriteMemoryBytesSpilled)} - {Utils.bytesToString(v.diskBytesSpilled)} + + {Utils.bytesToString(v.shuffleWriteDiskBytesSpilled)}
    Shuffle Write Size / RecordsShuffle read spill (memory)Shuffle spill (memory)Shuffle read spill (disk)Shuffle write spill (memory)Shuffle spill (disk)Shuffle write spill (disk)
    {info.index} - {memoryBytesSpilledReadable} + {if (hasReadBytesSpilled) { + + {readMemoryBytesSpilledReadable} + + {readDiskBytesSpilledReadable} - {diskBytesSpilledReadable} + }} + {if (hasWriteBytesSpilled) { + + {writeMemoryBytesSpilledReadable} + + {writeDiskBytesSpilledReadable}