@@ -127,14 +127,13 @@ private[spark] class CompressedMapStatus(
127127
128128/**
129129 * A [[MapStatus ]] implementation that stores the accurate size of huge blocks, which are larger
130- * than both spark.shuffle.accurateBlockThreshold and
131- * spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the
132- * average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty.
130+ * than both spark.shuffle.accurateBlockThreshold. It stores the average size of other non-empty
131+ * blocks, plus a bitmap for tracking which blocks are empty.
133132 *
134133 * @param loc location where the task is being executed
135134 * @param numNonEmptyBlocks the number of non-empty blocks
136135 * @param emptyBlocks a bitmap tracking which blocks are empty
137- * @param avgSize average size of the non-empty blocks
136+ * @param avgSize average size of the non-empty and non-huge blocks
138137 * @param hugeBlockSizes sizes of huge blocks by their reduceId.
139138 */
140139private [spark] class HighlyCompressedMapStatus private (
@@ -146,7 +145,7 @@ private[spark] class HighlyCompressedMapStatus private (
146145 extends MapStatus with Externalizable {
147146
148147 // loc could be null when the default constructor is called during deserialization
149- require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0 ,
148+ require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0 ,
150149 " Average size can only be zero for map stages that produced no output" )
151150
152151 protected def this () = this (null , - 1 , null , - 1 , null ) // For deserialization only
@@ -204,11 +203,21 @@ private[spark] object HighlyCompressedMapStatus {
204203 // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions.
205204 val emptyBlocks = new RoaringBitmap ()
206205 val totalNumBlocks = uncompressedSizes.length
206+ val threshold = Option (SparkEnv .get)
207+ .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD ))
208+ .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD .defaultValue.get)
209+ val hugeBlockSizesArray = ArrayBuffer [Tuple2 [Int , Byte ]]()
207210 while (i < totalNumBlocks) {
208- var size = uncompressedSizes(i)
211+ val size = uncompressedSizes(i)
209212 if (size > 0 ) {
210213 numNonEmptyBlocks += 1
211- totalSize += size
214+ // Remove the huge blocks from the calculation for average size and have accurate size for
215+ // smaller blocks.
216+ if (size < threshold) {
217+ totalSize += size
218+ } else {
219+ hugeBlockSizesArray += Tuple2 (i, MapStatus .compressSize(uncompressedSizes(i)))
220+ }
212221 } else {
213222 emptyBlocks.add(i)
214223 }
@@ -219,24 +228,6 @@ private[spark] object HighlyCompressedMapStatus {
219228 } else {
220229 0
221230 }
222- val threshold1 = Option (SparkEnv .get)
223- .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD ))
224- .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD .defaultValue.get)
225- val threshold2 = avgSize * Option (SparkEnv .get)
226- .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE ))
227- .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE .defaultValue.get)
228- val threshold = math.max(threshold1, threshold2)
229- val hugeBlockSizesArray = ArrayBuffer [Tuple2 [Int , Byte ]]()
230- if (numNonEmptyBlocks > 0 ) {
231- i = 0
232- while (i < totalNumBlocks) {
233- if (uncompressedSizes(i) > threshold) {
234- hugeBlockSizesArray += Tuple2 (i, MapStatus .compressSize(uncompressedSizes(i)))
235-
236- }
237- i += 1
238- }
239- }
240231 emptyBlocks.trim()
241232 emptyBlocks.runOptimize()
242233 new HighlyCompressedMapStatus (loc, numNonEmptyBlocks, emptyBlocks, avgSize,
0 commit comments