Skip to content

Commit f45efbb

Browse files
Chandan Kumarmengxr
authored andcommitted
[SPARK-2862] histogram method fails on some choices of bucketCount
Author: Chandan Kumar <chandan.kumar@imaginea.com> Closes #1787 from nrchandan/spark-2862 and squashes the following commits: a76bbf6 [Chandan Kumar] [SPARK-2862] Fix for a broken test case and add new test cases 4211eea [Chandan Kumar] [SPARK-2862] Add Scala bug id 13854f1 [Chandan Kumar] [SPARK-2862] Use shorthand range notation to avoid Scala bug
1 parent c0cbbde commit f45efbb

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
9595
* If the elements in RDD do not vary (max == min) always returns a single bucket.
9696
*/
9797
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
98-
// Compute the minimum and the maxium
98+
// Scala's built-in range has issues. See #SI-8782
99+
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
100+
val span = max - min
101+
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
102+
}
103+
// Compute the minimum and the maximum
99104
val (max: Double, min: Double) = self.mapPartitions { items =>
100105
Iterator(items.foldRight(Double.NegativeInfinity,
101106
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
@@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
107112
throw new UnsupportedOperationException(
108113
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
109114
}
110-
val increment = (max-min)/bucketCount.toDouble
111-
val range = if (increment != 0) {
112-
Range.Double.inclusive(min, max, increment)
115+
val range = if (min != max) {
116+
// Range.Double.inclusive(min, max, increment)
117+
// The above code doesn't always work. See Scala bug #SI-8782.
118+
// https://issues.scala-lang.org/browse/SI-8782
119+
customRange(min, max, bucketCount)
113120
} else {
114121
List(min, min)
115122
}

core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
245245
assert(histogramBuckets === expectedHistogramBuckets)
246246
}
247247

248+
test("WorksWithoutBucketsForLargerDatasets") {
249+
// Verify the case of slighly larger datasets
250+
val rdd = sc.parallelize(6 to 99)
251+
val (histogramBuckets, histogramResults) = rdd.histogram(8)
252+
val expectedHistogramResults =
253+
Array(12, 12, 11, 12, 12, 11, 12, 12)
254+
val expectedHistogramBuckets =
255+
Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
256+
assert(histogramResults === expectedHistogramResults)
257+
assert(histogramBuckets === expectedHistogramBuckets)
258+
}
259+
260+
test("WorksWithoutBucketsWithIrrationalBucketEdges") {
261+
// Verify the case of buckets with irrational edges. See #SPARK-2862.
262+
val rdd = sc.parallelize(6 to 99)
263+
val (histogramBuckets, histogramResults) = rdd.histogram(9)
264+
val expectedHistogramResults =
265+
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
266+
assert(histogramResults === expectedHistogramResults)
267+
assert(histogramBuckets(0) === 6.0)
268+
assert(histogramBuckets(9) === 99.0)
269+
}
270+
248271
// Test the failure mode with an invalid RDD
249272
test("ThrowsExceptionOnInvalidRDDs") {
250273
// infinity

0 commit comments

Comments
 (0)