Skip to content

Commit bde5452

Browse files
thunterdbsrowen
authored andcommitted
[SPARK-17439][SQL] Fixing compression issues with approximate quantiles and adding more tests
This PR build on #14976 and fixes a correctness bug that would cause the wrong quantile to be returned for small target errors. This PR adds 8 unit tests that were failing without the fix. Author: Timothy Hunter <timhunter@databricks.com> Author: Sean Owen <sowen@cloudera.com> Closes #15002 from thunterdb/ml-1783. (cherry picked from commit 180796e) Signed-off-by: Sean Owen <sowen@cloudera.com>
1 parent c2378a6 commit bde5452

File tree

2 files changed

+44
-11
lines changed

2 files changed

+44
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution.stat
1919

20-
import scala.collection.mutable.ArrayBuffer
20+
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2121

2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
@@ -119,7 +119,7 @@ object StatFunctions extends Logging {
119119
class QuantileSummaries(
120120
val compressThreshold: Int,
121121
val relativeError: Double,
122-
val sampled: ArrayBuffer[Stats] = ArrayBuffer.empty,
122+
val sampled: Array[Stats] = Array.empty,
123123
private[stat] var count: Long = 0L,
124124
val headSampled: ArrayBuffer[Double] = ArrayBuffer.empty) extends Serializable {
125125

@@ -134,7 +134,12 @@ object StatFunctions extends Logging {
134134
def insert(x: Double): QuantileSummaries = {
135135
headSampled.append(x)
136136
if (headSampled.size >= defaultHeadSize) {
137-
this.withHeadBufferInserted
137+
val result = this.withHeadBufferInserted
138+
if (result.sampled.length >= compressThreshold) {
139+
result.compress()
140+
} else {
141+
result
142+
}
138143
} else {
139144
this
140145
}
@@ -186,7 +191,7 @@ object StatFunctions extends Logging {
186191
newSamples.append(sampled(sampleIdx))
187192
sampleIdx += 1
188193
}
189-
new QuantileSummaries(compressThreshold, relativeError, newSamples, currentCount)
194+
new QuantileSummaries(compressThreshold, relativeError, newSamples.toArray, currentCount)
190195
}
191196

192197
/**
@@ -305,10 +310,10 @@ object StatFunctions extends Logging {
305310

306311
private def compressImmut(
307312
currentSamples: IndexedSeq[Stats],
308-
mergeThreshold: Double): ArrayBuffer[Stats] = {
309-
val res: ArrayBuffer[Stats] = ArrayBuffer.empty
313+
mergeThreshold: Double): Array[Stats] = {
314+
val res = ListBuffer.empty[Stats]
310315
if (currentSamples.isEmpty) {
311-
return res
316+
return res.toArray
312317
}
313318
// Start for the last element, which is always part of the set.
314319
// The head contains the current new head, that may be merged with the current element.
@@ -331,8 +336,11 @@ object StatFunctions extends Logging {
331336
}
332337
res.prepend(head)
333338
// If necessary, add the minimum element:
334-
res.prepend(currentSamples.head)
335-
res
339+
val currHead = currentSamples.head
340+
if (currHead.value < head.value) {
341+
res.prepend(currentSamples.head)
342+
}
343+
res.toArray
336344
}
337345
}
338346

sql/core/src/test/scala/org/apache/spark/sql/execution/stat/ApproxQuantileSuite.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,20 @@ class ApproxQuantileSuite extends SparkFunSuite {
4242
summary.compress()
4343
}
4444

45+
/**
46+
* Interleaves compression and insertions.
47+
*/
48+
private def buildCompressSummary(
49+
data: Seq[Double],
50+
epsi: Double,
51+
threshold: Int): QuantileSummaries = {
52+
var summary = new QuantileSummaries(threshold, epsi)
53+
data.foreach { x =>
54+
summary = summary.insert(x).compress()
55+
}
56+
summary
57+
}
58+
4559
private def checkQuantile(quant: Double, data: Seq[Double], summary: QuantileSummaries): Unit = {
4660
val approx = summary.query(quant)
4761
// The rank of the approximation.
@@ -56,8 +70,8 @@ class ApproxQuantileSuite extends SparkFunSuite {
5670

5771
for {
5872
(seq_name, data) <- Seq(increasing, decreasing, random)
59-
epsi <- Seq(0.1, 0.0001)
60-
compression <- Seq(1000, 10)
73+
epsi <- Seq(0.1, 0.0001) // With a significant value and with full precision
74+
compression <- Seq(1000, 10) // This interleaves n so that we test without and with compression
6175
} {
6276

6377
test(s"Extremas with epsi=$epsi and seq=$seq_name, compression=$compression") {
@@ -77,6 +91,17 @@ class ApproxQuantileSuite extends SparkFunSuite {
7791
checkQuantile(0.1, data, s)
7892
checkQuantile(0.001, data, s)
7993
}
94+
95+
test(s"Some quantile values with epsi=$epsi and seq=$seq_name, compression=$compression " +
96+
s"(interleaved)") {
97+
val s = buildCompressSummary(data, epsi, compression)
98+
assert(s.count == data.size, s"Found count=${s.count} but data size=${data.size}")
99+
checkQuantile(0.9999, data, s)
100+
checkQuantile(0.9, data, s)
101+
checkQuantile(0.5, data, s)
102+
checkQuantile(0.1, data, s)
103+
checkQuantile(0.001, data, s)
104+
}
80105
}
81106

82107
// Tests for merging procedure

0 commit comments

Comments
 (0)