Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
Expand Down Expand Up @@ -56,14 +57,12 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
// TODO: Make context non-optional in a future release
val spillMetrics = Option(context).map(
_.taskMetrics.getOrCreateShuffleWriteSpillMetrics()).getOrElse(new ShuffleWriteMetrics())
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners,
spillMetrics = spillMetrics)
combiners.insertAll(iter)
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
}
Expand All @@ -87,17 +86,15 @@ case class Aggregator[K, V, C] (
}
combiners.iterator
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
// TODO: Make context non-optional in a future release
val spillMetrics = Option(context).map(
_.taskMetrics.getOrCreateShuffleReadSpillMetrics()).getOrElse(new ShuffleWriteMetrics())
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners,
spillMetrics = spillMetrics)
while (iter.hasNext) {
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ private[spark] class PythonRDD(
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics()
spillMetrics.incMemorySize(memoryBytesSpilled)
spillMetrics.incShuffleBytesWritten(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand Down
72 changes: 64 additions & 8 deletions core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,41 @@ class TaskMetrics extends Serializable {
def resultSerializationTime = _resultSerializationTime
private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value

private var _shuffleReadSpillMetrics: Option[ShuffleWriteMetrics] = None
def shuffleReadSpillMetrics = _shuffleReadSpillMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating read metrics in
* executors.
*/
private[spark] def setShuffleReadSpillMetrics(value: Option[ShuffleWriteMetrics]) {
_shuffleReadSpillMetrics = value
}

private var _shuffleWriteSpillMetrics: Option[ShuffleWriteMetrics] = None
def shuffleWriteSpillMetrics = _shuffleWriteSpillMetrics

/**
* This should only be used when recreating TaskMetrics, not when updating read metrics in
* executors.
*/
private[spark] def setShuffleWriteSpillMetrics(value: Option[ShuffleWriteMetrics]) {
_shuffleWriteSpillMetrics = value
}

/**
* The number of in-memory bytes spilled by this task
*/
private var _memoryBytesSpilled: Long = _
def memoryBytesSpilled = _memoryBytesSpilled
private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value
@deprecated
def memoryBytesSpilled = _shuffleReadSpillMetrics.map(_.memorySize).getOrElse(0L) +
_shuffleWriteSpillMetrics.map(_.memorySize).getOrElse(0L)

/**
* The number of on-disk bytes spilled by this task
*/
private var _diskBytesSpilled: Long = _
def diskBytesSpilled = _diskBytesSpilled
def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value
@deprecated
def diskBytesSpilled = _shuffleReadSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L) +
_shuffleWriteSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L)

/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
Expand Down Expand Up @@ -211,6 +231,26 @@ class TaskMetrics extends Serializable {
private[spark] def updateInputMetrics(): Unit = synchronized {
inputMetrics.foreach(_.updateBytesRead())
}

/**
* Get or create metrics for spills while this task was aggregating shuffle data.
*/
def getOrCreateShuffleWriteSpillMetrics(): ShuffleWriteMetrics = {
if (_shuffleWriteSpillMetrics.isEmpty) {
_shuffleWriteSpillMetrics = Some(new ShuffleWriteMetrics())
}
_shuffleWriteSpillMetrics.get
}

/**
* Get or create metrics for spills while this task was writing out shuffle data.
*/
def getOrCreateShuffleReadSpillMetrics(): ShuffleWriteMetrics = {
if (_shuffleReadSpillMetrics.isEmpty) {
_shuffleReadSpillMetrics = Some(new ShuffleWriteMetrics())
}
_shuffleReadSpillMetrics.get
}
}

private[spark] object TaskMetrics {
Expand Down Expand Up @@ -363,6 +403,14 @@ class ShuffleReadMetrics extends Serializable {
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {
/**
* Size the data took up in memory.
*/
@volatile var _memorySize: Long = _
def memorySize = _memorySize
private[spark] def incMemorySize(value: Long) = _memorySize += value
private[spark] def setMemorySize(value: Long) = _memorySize = value

/**
* Number of bytes written for the shuffle by this task
*/
Expand All @@ -387,4 +435,12 @@ class ShuffleWriteMetrics extends Serializable {
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value

def +=(writeMetrics: ShuffleWriteMetrics): ShuffleWriteMetrics = {
incMemorySize(writeMetrics.memorySize)
incShuffleBytesWritten(writeMetrics.shuffleBytesWritten)
incShuffleWriteTime(writeMetrics.shuffleWriteTime)
incShuffleRecordsWritten(writeMetrics.shuffleRecordsWritten)
this
}
}
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.util.Utils
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}

private[spark] sealed trait CoGroupSplitDep extends Serializable

Expand Down Expand Up @@ -155,18 +156,17 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
} else {
val map = createExternalMap(numRdds)
val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics()
val map = createExternalMap(numRdds, spillMetrics)
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
}

private def createExternalMap(numRdds: Int)
private def createExternalMap(numRdds: Int, spillMetrics: ShuffleWriteMetrics)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {

val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
Expand All @@ -189,7 +189,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
combiner1
}
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
createCombiner, mergeValue, mergeCombiners)
createCombiner, mergeValue, mergeCombiners, spillMetrics = spillMetrics)
}

override def clearDependencies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ private[spark] class HashShuffleReader[K, C](
case Some(keyOrd: Ordering[K]) =>
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
val spillMetrics = context.taskMetrics.getOrCreateShuffleReadSpillMetrics()
val sorter = new ExternalSorter[K, C, C](
ordering = Some(keyOrd), serializer = Some(ser), spillMetrics = spillMetrics)
sorter.insertAll(aggregatedIter)
context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
sorter.iterator
case None =>
aggregatedIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ private[spark] class SortShuffleWriter[K, V, C](

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
val spillMetrics = context.taskMetrics.getOrCreateShuffleWriteSpillMetrics()

if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer, spillMetrics)
sorter.insertAll(records)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
sorter = new ExternalSorter[K, V, V](
None, Some(dep.partitioner), None, dep.serializer)
None, Some(dep.partitioner), None, dep.serializer, spillMetrics)
sorter.insertAll(records)
}

Expand Down
34 changes: 24 additions & 10 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
var hasOutput = false
var hasShuffleWrite = false
var hasShuffleRead = false
var hasBytesSpilled = false
var hasReadBytesSpilled = false
var hasWriteBytesSpilled = false
stageData.foreach(data => {
hasInput = data.hasInput
hasOutput = data.hasOutput
hasShuffleRead = data.hasShuffleRead
hasShuffleWrite = data.hasShuffleWrite
hasBytesSpilled = data.hasBytesSpilled
hasReadBytesSpilled = data.hasReadBytesSpilled
hasWriteBytesSpilled = data.hasWriteBytesSpilled
})

<table class={UIUtils.TABLE_CLASS_STRIPED}>
Expand Down Expand Up @@ -80,9 +82,13 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
Shuffle Write Size / Records</span>
</th>
}}
{if (hasBytesSpilled) {
<th>Shuffle Spill (Memory)</th>
<th>Shuffle Spill (Disk)</th>
{if (hasReadBytesSpilled) {
<th>Shuffle Read Spill (Memory)</th>
<th>Shuffle Read Spill (Disk)</th>
}}
{if (hasWriteBytesSpilled) {
<th>Shuffle Write Spill (Memory)</th>
<th>Shuffle Write Spill (Disk)</th>
}}
</thead>
<tbody>
Expand Down Expand Up @@ -130,12 +136,20 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
{s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"}
</td>
}}
{if (stageData.hasBytesSpilled) {
<td sorttable_customkey={v.memoryBytesSpilled.toString}>
{Utils.bytesToString(v.memoryBytesSpilled)}
{if (stageData.hasReadBytesSpilled) {
<td sorttable_customkey={v.shuffleReadMemoryBytesSpilled.toString}>
{Utils.bytesToString(v.shuffleReadMemoryBytesSpilled)}
</td>
<td sorttable_customkey={v.shuffleReadDiskBytesSpilled.toString}>
{Utils.bytesToString(v.shuffleReadDiskBytesSpilled)}
</td>
}}
{if (stageData.hasWriteBytesSpilled) {
<td sorttable_customkey={v.shuffleWriteMemoryBytesSpilled.toString}>
{Utils.bytesToString(v.shuffleWriteMemoryBytesSpilled)}
</td>
<td sorttable_customkey={v.diskBytesSpilled.toString}>
{Utils.bytesToString(v.diskBytesSpilled)}
<td sorttable_customkey={v.shuffleWriteDiskBytesSpilled.toString}>
{Utils.bytesToString(v.shuffleWriteDiskBytesSpilled)}
</td>
}}
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,15 +436,29 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.outputRecords += outputRecordsDelta
execSummary.outputRecords += outputRecordsDelta

val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
execSummary.diskBytesSpilled += diskSpillDelta

val memorySpillDelta =
taskMetrics.memoryBytesSpilled - oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L)
stageData.memoryBytesSpilled += memorySpillDelta
execSummary.memoryBytesSpilled += memorySpillDelta
val shuffleWriteDiskSpillDelta =
(taskMetrics.shuffleWriteSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteSpillMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
stageData.shuffleWriteDiskSpillBytes += shuffleWriteDiskSpillDelta
execSummary.shuffleWriteDiskBytesSpilled += shuffleWriteDiskSpillDelta

val shuffleReadDiskSpillDelta =
(taskMetrics.shuffleReadSpillMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleReadSpillMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
stageData.shuffleReadDiskSpillBytes += shuffleReadDiskSpillDelta
execSummary.shuffleReadDiskBytesSpilled += shuffleReadDiskSpillDelta

val shuffleWriteMemorySpillDelta =
(taskMetrics.shuffleWriteSpillMetrics.map(_.memorySize).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteSpillMetrics).map(_.memorySize).getOrElse(0L))
stageData.shuffleWriteMemorySpillBytes += shuffleWriteMemorySpillDelta
execSummary.shuffleWriteMemoryBytesSpilled += shuffleWriteMemorySpillDelta

val shuffleReadMemorySpillDelta =
(taskMetrics.shuffleReadSpillMetrics.map(_.memorySize).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleReadSpillMetrics).map(_.memorySize).getOrElse(0L))
stageData.shuffleReadMemorySpillBytes += shuffleReadMemorySpillDelta
execSummary.shuffleReadMemoryBytesSpilled += shuffleReadMemorySpillDelta

val timeDelta =
taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L)
Expand Down
Loading