Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
* If the elements in RDD do not vary (max == min) always returns a single bucket.
*/
def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
// Compute the minimum and the maxium
// Scala's built-in range has issues. See #SI-8782
def customRange(min: Double, max: Double, steps: Int): IndexedSeq[Double] = {
val span = max - min
Range.Int(0, steps, 1).map(s => min + (s * span) / steps) :+ max
}
// Compute the minimum and the maximum
val (max: Double, min: Double) = self.mapPartitions { items =>
Iterator(items.foldRight(Double.NegativeInfinity,
Double.PositiveInfinity)((e: Double, x: Pair[Double, Double]) =>
Expand All @@ -107,9 +112,11 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
throw new UnsupportedOperationException(
"Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
}
val increment = (max-min)/bucketCount.toDouble
val range = if (increment != 0) {
Range.Double.inclusive(min, max, increment)
val range = if (min != max) {
// Range.Double.inclusive(min, max, increment)
// The above code doesn't always work. See Scala bug #SI-8782.
// https://issues.scala-lang.org/browse/SI-8782
customRange(min, max, bucketCount)
} else {
List(min, min)
}
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,29 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithoutBucketsForLargerDatasets") {
// Verify the case of slighly larger datasets
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(8)
val expectedHistogramResults =
Array(12, 12, 11, 12, 12, 11, 12, 12)
val expectedHistogramBuckets =
Array(6.0, 17.625, 29.25, 40.875, 52.5, 64.125, 75.75, 87.375, 99.0)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets === expectedHistogramBuckets)
}

test("WorksWithoutBucketsWithIrrationalBucketEdges") {
// Verify the case of buckets with irrational edges. See #SPARK-2862.
val rdd = sc.parallelize(6 to 99)
val (histogramBuckets, histogramResults) = rdd.histogram(9)
val expectedHistogramResults =
Array(11, 10, 11, 10, 10, 11, 10, 10, 11)
assert(histogramResults === expectedHistogramResults)
assert(histogramBuckets(0) === 6.0)
assert(histogramBuckets(9) === 99.0)
}

// Test the failure mode with an invalid RDD
test("ThrowsExceptionOnInvalidRDDs") {
// infinity
Expand Down