diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 180c8d1827e1..396b39d816ba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -20,8 +20,8 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.collection.BitSet import org.apache.spark.util.Utils +import org.apache.spark.util.collection.{BitSet, OpenHashSet} /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the @@ -80,7 +80,6 @@ private[spark] object MapStatus { } } - /** * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is * represented using a single byte. @@ -119,6 +118,66 @@ private[spark] class CompressedMapStatus( } } +/** + * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, + * plus a OpenHashSet for tracking which blocks are empty(dense) / non-empty(sparse). + * using a OpenHashSet[Int] can save more memory usage than BitSet + * + * @param loc location where the task is being executed + * @param numNonEmptyBlocks the number of non-empty blocks + * @param markedBlocks a OpenHashSet tracking which blocks are empty(dense) / non-empty(sparse) + * @param avgSize average size of the non-empty blocks + */ +private[spark] class MapStatusTrackingEmptyBlocks private ( + private[this] var loc: BlockManagerId, + private[this] var numNonEmptyBlocks: Int, + private[this] var markedBlocks: OpenHashSet[Int], + private[this] var avgSize: Long, + private[this] var isSparse: Boolean) + extends MapStatus with Externalizable { + + // loc could be null when the default constructor is called during deserialization + require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0, + "Average size can only be zero for map stages that produced no output") + + protected def this() = this(null, -1, null, -1, false) // For deserialization only + + override def location: BlockManagerId = loc + + override def getSizeForBlock(reduceId: Int): Long = { + if (isSparse ^ markedBlocks.contains(reduceId)) { + 0 + } else { + avgSize + } + } + + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { + loc.writeExternal(out) + out.writeObject(markedBlocks) + out.writeLong(avgSize) + out.writeBoolean(isSparse) + } + + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { + loc = BlockManagerId(in) + markedBlocks = in.readObject().asInstanceOf[OpenHashSet[Int]] + avgSize = in.readLong() + isSparse = in.readBoolean() + } +} + +private[spark] object MapStatusTrackingEmptyBlocks { + def apply( + loc: BlockManagerId, + numNonEmptyBlocks: Int , + markedBlocks: OpenHashSet[Int], + avgSize: Long, + isSparse: Boolean): MapStatusTrackingEmptyBlocks = { + new MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, markedBlocks, avgSize, isSparse) + } +} + /** * A [[MapStatus]] implementation that only stores the average size of non-empty blocks, * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap @@ -167,7 +226,7 @@ private[spark] class HighlyCompressedMapStatus private ( } private[spark] object HighlyCompressedMapStatus { - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + def apply[T >: MapStatus](loc: BlockManagerId, uncompressedSizes: Array[Long]): T = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 @@ -178,13 +237,17 @@ private[spark] object HighlyCompressedMapStatus { // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. val totalNumBlocks = uncompressedSizes.length val emptyBlocks = new BitSet(totalNumBlocks) + val emptyBlocksHashSet = new OpenHashSet[Int] + val nonEmptyBlocks = new OpenHashSet[Int] while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 totalSize += size + nonEmptyBlocks.add(i) } else { emptyBlocks.set(i) + emptyBlocksHashSet.add(i) } i += 1 } @@ -193,6 +256,12 @@ private[spark] object HighlyCompressedMapStatus { } else { 0 } - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + if(numNonEmptyBlocks * 32 < totalNumBlocks){ + MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, nonEmptyBlocks, avgSize, isSparse = true) + } else if ((totalNumBlocks - numNonEmptyBlocks) * 32 < totalNumBlocks){ + MapStatusTrackingEmptyBlocks(loc, numNonEmptyBlocks, emptyBlocksHashSet, avgSize, isSparse = false) + } else { + new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize) + } } } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc51d4f2820c..1eaa1f1a0b24 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -35,7 +35,7 @@ import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.network.util.ByteUnit -import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} +import org.apache.spark.scheduler.{MapStatusTrackingEmptyBlocks, CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf} import org.apache.spark.util.collection.{BitSet, CompactBuffer} @@ -363,6 +363,7 @@ private[serializer] object KryoSerializer { classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], classOf[BitSet], + classOf[MapStatusTrackingEmptyBlocks], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index b8e466fab450..71ec79f258f8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -66,23 +66,52 @@ class MapStatusSuite extends SparkFunSuite { } } - test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { + test("large tasks with dense non-empty blocks should use" + + classOf[MapStatusTrackingEmptyBlocks].getName) { val sizes = Array.fill[Long](2001)(150L) val status = MapStatus(null, sizes) - assert(status.isInstanceOf[HighlyCompressedMapStatus]) + assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks]) assert(status.getSizeForBlock(10) === 150L) assert(status.getSizeForBlock(50) === 150L) assert(status.getSizeForBlock(99) === 150L) assert(status.getSizeForBlock(2000) === 150L) } - test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { + test("large tasks with sparse non-empty blocks should use " + + classOf[MapStatusTrackingEmptyBlocks].getName) { + val sizes = Array.fill[Long](2001)(0L) + sizes(0) = 1L + val status = MapStatus(null, sizes) + assert(status.isInstanceOf[MapStatusTrackingEmptyBlocks]) + assert(status.getSizeForBlock(0) === 1L) + assert(status.getSizeForBlock(50) === 0L) + assert(status.getSizeForBlock(99) === 0L) + assert(status.getSizeForBlock(2000) === 0L) + } + + test("large tasks with not dense or sparse non-empty blocks should use " + + classOf[HighlyCompressedMapStatus].getName) { + val sizes = Array.fill[Long](2001)(0L) + for(i <- 0 to sizes.length - 1){ + if (i % 2 == 1) { + sizes(i) = 1L + } + } + val status = MapStatus(null, sizes) + assert(status.isInstanceOf[HighlyCompressedMapStatus]) + assert(status.getSizeForBlock(0) === 0L) + assert(status.getSizeForBlock(1) === 1L) + assert(status.getSizeForBlock(1999) === 1L) + assert(status.getSizeForBlock(2000) === 0L) + } + + test("MapStatusTrackingEmptyBlocks: estimated size should be the average non-empty block size") { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.filter(_ != 0).length val loc = BlockManagerId("a", "b", 10) val status = MapStatus(loc, sizes) val status1 = compressAndDecompressMapStatus(status) - assert(status1.isInstanceOf[HighlyCompressedMapStatus]) + assert(status1.isInstanceOf[MapStatusTrackingEmptyBlocks]) assert(status1.location == loc) for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i)