Skip to content

Commit 4f992fc

Browse files
author
jinxing
committed
show distribution with probabilities.
1 parent 6a96c3b commit 4f992fc

File tree

4 files changed

+37
-99
lines changed

4 files changed

+37
-99
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
4848
import org.apache.spark.shuffle.ShuffleWriter;
4949
import org.apache.spark.storage.*;
50+
import org.apache.spark.util.Distribution;
5051
import org.apache.spark.util.Utils;
5152

5253
/**
@@ -181,48 +182,22 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
181182
}
182183
}
183184
writeMetrics.incUnderestimatedBlocksSize(underestimatedBlocksSize);
184-
if (logger.isDebugEnabled()) {
185+
if (logger.isDebugEnabled() && partitionLengths.length > 0) {
185186
int underestimatedBlocksNum = 0;
186-
// Distribution of sizes in MapStatus. The ranges are: [0, 1k), [1k, 10k), [10k, 100k),
187-
// [100k, 1m), [1m, 10m), [10m, 100m), [100m, 1g), [1g, 10g), [10g, Long.MaxValue).
188-
int[] lenDistribution = {0, 0, 0, 0, 0, 0, 0, 0, 0};
187+
// Distribution of sizes in MapStatus.
188+
double[] cp = new double[partitionLengths.length];
189189
for (int i = 0; i < partitionLengths.length; i++) {
190-
long len = partitionLengths[i];
191-
if (len > mapStatus.getSizeForBlock(i)) {
192-
underestimatedBlocksNum++;
193-
}
194-
if (len >= 0L && len < 1024L) {
195-
lenDistribution[0]++;
196-
} else if (len >= 1024L && len < 10240L) {
197-
lenDistribution[1]++;
198-
} else if (len >= 10240L && len < 102400L) {
199-
lenDistribution[2]++;
200-
} else if (len >= 102400L && len < 1048576L ) {
201-
lenDistribution[3]++;
202-
} else if (len >= 1048576L && len < 10485760L) {
203-
lenDistribution[4]++;
204-
} else if (len >= 10485760L && len < 104857600L) {
205-
lenDistribution[5]++;
206-
} else if (len >= 104857600L && len < 1073741824L) {
207-
lenDistribution[6]++;
208-
} else if (len >= 1073741824L && len < 10737418240L) {
209-
lenDistribution[7]++;
210-
} else {
211-
lenDistribution[8]++;
212-
}
213-
}
214-
String[] ranges = {"[0, 1k)", "[1k, 10k)", "[10k, 100k)", "[100k, 1m)", "[1m, 10m)",
215-
"[10m, 100m)", "[100m, 1g)", "[1g, 10g)", ">10g"};
216-
String[] rangesAndDistribute = new String[9];
217-
for (int j = 0; j < 9; j++) {
218-
rangesAndDistribute[j] = ranges[j] + ":" + lenDistribution[j];
190+
cp[i] = partitionLengths[i];
219191
}
192+
Distribution distribution = new Distribution(cp, 0, cp.length);
193+
double[] probabilities = {0.0, 0.25, 0.5, 0.75, 1.0};
194+
String distributionStr = distribution.getQuantiles(probabilities).mkString(", ");
220195
logger.debug("For task {}.{} in stage {} (TID {}), the block sizes in MapStatus are " +
221196
"inaccurate (average is {}, {} blocks underestimated, size of underestimated is {})," +
222-
" distribution is {}.", taskContext.partitionId(), taskContext.attemptNumber(),
223-
taskContext.stageId(), taskContext.taskAttemptId(), hc.getAvgSize(),
224-
underestimatedBlocksNum, underestimatedBlocksSize,
225-
String.join(", ", rangesAndDistribute));
197+
" distribution at the given probabilities(0, 0.25, 0.5, 0.75, 1.0) is {}.",
198+
taskContext.partitionId(), taskContext.attemptNumber(), taskContext.stageId(),
199+
taskContext.taskAttemptId(), hc.getAvgSize(),
200+
underestimatedBlocksNum, underestimatedBlocksSize, distributionStr);
226201
}
227202
}
228203
}

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.spark.storage.BlockManager;
5555
import org.apache.spark.storage.TimeTrackingOutputStream;
5656
import org.apache.spark.unsafe.Platform;
57+
import org.apache.spark.util.Distribution;
5758
import org.apache.spark.util.Utils;
5859

5960
@Private
@@ -238,48 +239,22 @@ void closeAndWriteOutput() throws IOException {
238239
}
239240
}
240241
writeMetrics.incUnderestimatedBlocksSize(underestimatedBlocksSize);
241-
if (logger.isDebugEnabled()) {
242+
if (logger.isDebugEnabled() && partitionLengths.length > 0) {
242243
int underestimatedBlocksNum = 0;
243-
// Distribution of sizes in MapStatus. The ranges are: [0, 1k), [1k, 10k), [10k, 100k),
244-
// [100k, 1m), [1m, 10m), [10m, 100m), [100m, 1g), [1g, 10g), [10g, Long.MaxValue).
245-
int[] lenDistribution = {0, 0, 0, 0, 0, 0, 0, 0, 0};
244+
// Distribution of sizes in MapStatus.
245+
double[] cp = new double[partitionLengths.length];
246246
for (int i = 0; i < partitionLengths.length; i++) {
247-
long len = partitionLengths[i];
248-
if (len > mapStatus.getSizeForBlock(i)) {
249-
underestimatedBlocksNum++;
250-
}
251-
if (len >= 0L && len < 1024L) {
252-
lenDistribution[0]++;
253-
} else if (len >= 1024L && len < 10240L) {
254-
lenDistribution[1]++;
255-
} else if (len >= 10240L && len < 102400L) {
256-
lenDistribution[2]++;
257-
} else if (len >= 102400L && len < 1048576L ) {
258-
lenDistribution[3]++;
259-
} else if (len >= 1048576L && len < 10485760L) {
260-
lenDistribution[4]++;
261-
} else if (len >= 10485760L && len < 104857600L) {
262-
lenDistribution[5]++;
263-
} else if (len >= 104857600L && len < 1073741824L) {
264-
lenDistribution[6]++;
265-
} else if (len >= 1073741824L && len < 10737418240L) {
266-
lenDistribution[7]++;
267-
} else {
268-
lenDistribution[8]++;
269-
}
270-
}
271-
String[] ranges = {"[0, 1k)", "[1k, 10k)", "[10k, 100k)", "[100k, 1m)", "[1m, 10m)",
272-
"[10m, 100m)", "[100m, 1g)", "[1g, 10g)", ">10g"};
273-
String[] rangesAndDistribute = new String[9];
274-
for (int j = 0; j < 9; j++) {
275-
rangesAndDistribute[j] = ranges[j] + ":" + lenDistribution[j];
247+
cp[i] = partitionLengths[i];
276248
}
249+
Distribution distribution = new Distribution(cp, 0, cp.length);
250+
double[] probabilities = {0.0, 0.25, 0.5, 0.75, 1.0};
251+
String distributionStr = distribution.getQuantiles(probabilities).mkString(", ");
277252
logger.debug("For task {}.{} in stage {} (TID {}), the block sizes in MapStatus are " +
278253
"inaccurate (average is {}, {} blocks underestimated, size of underestimated is {})," +
279-
" distribution is {}.", taskContext.partitionId(), taskContext.attemptNumber(),
280-
taskContext.stageId(), taskContext.taskAttemptId(), hc.getAvgSize(),
281-
underestimatedBlocksNum, underestimatedBlocksSize,
282-
String.join(", ", rangesAndDistribute));
254+
" distribution at the given probabilities(0, 0.25, 0.5, 0.75, 1.0) is {}.",
255+
taskContext.partitionId(), taskContext.attemptNumber(), taskContext.stageId(),
256+
taskContext.taskAttemptId(), hc.getAvgSize(),
257+
underestimatedBlocksNum, underestimatedBlocksSize, distributionStr);
283258
}
284259
}
285260
}

core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.internal.Logging
2222
import org.apache.spark.scheduler.{HighlyCompressedMapStatus, MapStatus}
2323
import org.apache.spark.shuffle.{BaseShuffleHandle, IndexShuffleBlockResolver, ShuffleWriter}
2424
import org.apache.spark.storage.ShuffleBlockId
25-
import org.apache.spark.util.Utils
25+
import org.apache.spark.util.{Distribution, Utils}
2626
import org.apache.spark.util.collection.ExternalSorter
2727

2828
private[spark] class SortShuffleWriter[K, V, C](
@@ -78,30 +78,18 @@ private[spark] class SortShuffleWriter[K, V, C](
7878
val underestimatedLengths = partitionLengths.filter(_ > hc.getAvgSize)
7979
writeMetrics.incUnderestimatedBlocksSize(underestimatedLengths.sum)
8080
if (log.isDebugEnabled()) {
81-
// Distribution of sizes in MapStatus. The ranges are: [0, 1k), [1k, 10k), [10k, 100k),
82-
// [100k, 1m), [1m, 10m), [10m, 100m), [100m, 1g), [1g, 10g), [10g, Long.MaxValue).
83-
val lenDistribution = Array[Int](0, 0, 0, 0, 0, 0, 0, 0, 0)
84-
partitionLengths.foreach {
85-
case len: Long if len >= 0L && len < 1024L => lenDistribution(0) += 1
86-
case len: Long if len >= 1024L && len < 10240L => lenDistribution(1) += 1
87-
case len: Long if len >= 10240L && len < 102400L => lenDistribution(2) += 1
88-
case len: Long if len >= 102400L && len < 1048576L => lenDistribution(3) += 1
89-
case len: Long if len >= 1048576L && len < 10485760L => lenDistribution(4) += 1
90-
case len: Long if len >= 10485760L && len < 104857600L => lenDistribution(5) += 1
91-
case len: Long if len >= 104857600L && len < 1073741824L => lenDistribution(6) += 1
92-
case len: Long if len >= 1073741824L && len < 10737418240L => lenDistribution(7) += 1
93-
case len => lenDistribution(8) += 1
81+
// Distribution of sizes in MapStatus.
82+
Distribution(partitionLengths.map(_.toDouble)) match {
83+
case Some(distribution) =>
84+
val distributionStr = distribution.getQuantiles().mkString(", ")
85+
logDebug(s"For task ${context.partitionId()}.${context.attemptNumber()} in stage" +
86+
s" ${context.stageId()} (TID ${context.taskAttemptId()}), the block sizes in" +
87+
s" MapStatus are inaccurate (average is ${hc.getAvgSize}," +
88+
s" ${underestimatedLengths.length} blocks underestimated, sum of sizes is" +
89+
s" ${underestimatedLengths.sum}), distribution at the given probabilities" +
90+
s" (0, 0.25, 0.5, 0.75, 1.0) is $distributionStr.")
91+
case None => // no-op
9492
}
95-
val ranges = List[String]("[0, 1k)", "[1k, 10k)", "[10k, 100k)", "[100k, 1m)",
96-
"[1m, 10m)", "[10m, 100m)", "[100m, 1g)", "[1g, 10g)", ">10g")
97-
val distributeStr = ranges.zip(lenDistribution).map {
98-
case (range, num) => s"$range:$num"
99-
}.mkString(", ")
100-
logDebug(s"For task ${context.partitionId()}.${context.attemptNumber()} in stage " +
101-
s"${context.stageId()} (TID ${context.taskAttemptId()}), " +
102-
s"the block sizes in MapStatus are inaccurate (average is ${hc.getAvgSize}, " +
103-
s"${underestimatedLengths.length} blocks underestimated, " +
104-
s"sum of sizes is ${underestimatedLengths.sum}), distribution is $distributeStr.")
10593
}
10694
case _ => // no-op
10795
}

core/src/main/scala/org/apache/spark/util/Distribution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
4242
* given from 0 to 1
4343
* @param probabilities
4444
*/
45-
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
45+
def getQuantiles(probabilities: Array[Double] = defaultProbabilities)
4646
: IndexedSeq[Double] = {
4747
probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) }
4848
}

0 commit comments

Comments
 (0)