Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.util

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import org.apache.spark.sql.catalyst.util.QuantileSummaries.Stats

Expand Down Expand Up @@ -61,7 +61,12 @@ class QuantileSummaries(
def insert(x: Double): QuantileSummaries = {
headSampled += x
if (headSampled.size >= defaultHeadSize) {
this.withHeadBufferInserted
val result = this.withHeadBufferInserted
if (result.sampled.length >= compressThreshold) {
result.compress()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @thunterdb -- is this the right fix?
I also 'adjusted' calls to .append() which is actually a varargs method; += appends an element

Copy link
Contributor

@clockfly clockfly Sep 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen
I think the compression decision need to be related to relative error setting. (The smaller the relative error is, the less frequent we do compression)

When implementing aggregation function percentile_approx, I have implemented compression like this:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala#L214

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, would you mind change ApproximatePercentile altogether, the compression in ApproximatePercentile will not be necessary if we have done compression at QuantileSummaries

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the fix, thanks for doing it. I had never realized that .append takes a vararg input, thanks for the hint.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clockfly oh, hm, this code just landed recently? it seems to call compress() itself rather than leave it to QuantileSummaries, in which case I'm not clear why there's a compressThreshold in QuantileSummaries It seems like the new class is trying to manage it. What's the right way to rationalize this -- are you saying QuantileSummaries shouldn't manage compression at all? that's fine too (in which case this can just turn into a very small optimization change).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clockfly @srowen the compression threshold is just here to amortize the cost of performing compression. If you wanted to, you could run compression every iteration (it is an idempotent operation). Internally, the compress method uses a merging threshold that indeeds depends on the number of elements seen, but it operates on a number of samples that is bounded by O(1/\epsi).
This patch will work. @clockfly I suspect some of the wrappers done in the Approximate percentile are not required either, once I submit a PR that fixes an off-by-1 error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @thunterdb

I think the compression still need to be done in QuantileSummaries. I added some compression implementation in wrapper class ApproximatePercentile because ApproximatePercentile need to know whether the QuantileSummaries is compressed or not, otherwise ApproximatePercentile don't know whether it is OK to call def query(quantile: Double) of QuantileSummaries.

Maybe QuantileSummaries should expose an API like "isCompressed"? So that the caller can skip calling compress if QuantileSummaries is already compressed.

} else {
result
}
} else {
this
}
Expand Down Expand Up @@ -236,7 +241,7 @@ object QuantileSummaries {
if (currentSamples.isEmpty) {
return Array.empty[Stats]
}
val res: ArrayBuffer[Stats] = ArrayBuffer.empty
val res = ListBuffer.empty[Stats]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also couldn't help but change this -- ArrayBuffer is inefficient where all the new elements are prepended.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks, I thought it was smart enough to use a circular buffer, thanks! It would be nice to have more control over the memory eventually, but this is fine for now.

// Start for the last element, which is always part of the set.
// The head contains the current new head, that may be merged with the current element.
var head = currentSamples.last
Expand Down