Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21984] [SQL] Join estimation based on equi-height histogram #19594

Closed
wants to merge 8 commits into from

Conversation

wzhfy
Copy link
Contributor

@wzhfy wzhfy commented Oct 28, 2017

What changes were proposed in this pull request?

Equi-height histogram is one of the state-of-the-art statistics for cardinality estimation, which can provide better estimation accuracy, and good at cases with skew data.

This PR is to improve join estimation based on equi-height histogram. The difference from basic estimation (based on ndv) is the logic for computing join cardinality and the new ndv after join.

The main idea is as follows:

  1. find overlapped ranges between two histograms from two join keys;
  2. apply the formula T(A IJ B) = T(A) * T(B) / max(V(A.k1), V(B.k1)) in each overlapped range.

How was this patch tested?

Added new test cases.

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83146 has finished for PR 19594 at commit 67bd651.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class OverlappedRange(

@SparkQA
Copy link

SparkQA commented Nov 14, 2017

Test build #83825 has finished for PR 19594 at commit 96776ce.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class OverlappedRange(

@wzhfy wzhfy force-pushed the join_estimation_histogram branch 2 times, most recently from 67bd651 to 8b2084a Compare November 15, 2017 01:17
@wzhfy wzhfy changed the title [WIP] [SPARK-21984] Join estimation based on equi-height histogram [SPARK-21984] [SQL] Join estimation based on equi-height histogram Nov 15, 2017
@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83871 has finished for PR 19594 at commit 8b2084a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class OverlappedRange(

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83870 has finished for PR 19594 at commit 67bd651.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • case class OverlappedRange(

@wzhfy
Copy link
Contributor Author

wzhfy commented Nov 15, 2017

cc @cloud-fan @gatorsmile @ron8hu

val histogram1 = Histogram(height = 300, Array(
HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30)))
val histogram2 = Histogram(height = 100, Array(
HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Histogram is supposed to handle skewed distribution effectively. In this test case, histogram2 has a skewed distribution as one bin has only one distinct value. Can you add a test case in which both join columns have skewed distributions? That is both join columns have at least one bin with one distinct value each.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've added test cases for joins of skewed histograms (same skewed value and different skewed values).

val histogram1 = Histogram(height = 300, Array(
HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3)))
val histogram2 = Histogram(height = 100, Array(
HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the very skewed cases, multiple bins in a histogram may have same distinct value. We may add one more test case to cover this situation.

Copy link
Contributor Author

@wzhfy wzhfy Dec 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I've added such a test case.

@SparkQA
Copy link

SparkQA commented Dec 9, 2017

Test build #84676 has finished for PR 19594 at commit e69e213.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 9, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Dec 9, 2017

Test build #84679 has finished for PR 19594 at commit e69e213.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 9, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Dec 9, 2017

Test build #84682 has finished for PR 19594 at commit e69e213.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 9, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Dec 9, 2017

Test build #84683 has finished for PR 19594 at commit e69e213.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 10, 2017

ping @cloud-fan

leftHistogram: Histogram,
rightHistogram: Histogram,
newMin: Double,
newMax: Double): Seq[OverlappedRange] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about upperBound/lowerBound? It's hard to understand the meaning of new by looking at this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max/min is also fine

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think upperBound/lowerBound is better.

.filter(b => b.lo <= newMax && b.hi >= newMin)

leftBins.foreach { lb =>
rightBins.foreach { rb =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

for {
  leftBin <- leftBins
  rightBin <- rightBins
} yield {
  ...
  OverlappedRange ...
}

Then we can omit val overlappedRanges = new ArrayBuffer[OverlappedRange]()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only collect OverlappedRange when left part and right part intersect, and the decision is based on some computation, it's not very convenient to use it as guards. So it seems yield form is not very suitable for this case.

// --------+------------------+------------+-------------+------->
(min, max)
} else if (bin.lo <= min && bin.hi >= min) {
// bin.lo min bin.hi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the max is after the bin.hi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, max is after the bin.hi, so the trimmed part is (min, bin.hi). I'll update the figure to indicate that.

// --------+------------------+------------+----------------+------->
val leftRatio = (left.hi - right.lo) / (left.hi - left.lo)
val rightRatio = (left.hi - right.lo) / (right.hi - right.lo)
if (leftRatio == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's more understandable to write if (right.lo == left.hi)

}
keyStatsAfterJoin += (
leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we update the histogram after join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we don't update histogram since min/max can help us to know which bins are valid. It doesn't affect correctness. But updating histograms helps to reduce memory usage for histogram propagation. We can do this in both filter and join estimation in following PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually keeping it unchanged is more memory efficient. We just pass around pointers, but updating the histogram means creating a new one.

Let's keep it, and add some comments to explain it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, we can keep it.

@SparkQA
Copy link

SparkQA commented Dec 16, 2017

Test build #84989 has finished for PR 19594 at commit 2a4ee99.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 16, 2017

Test build #84991 has finished for PR 19594 at commit 2637429.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 16, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Dec 16, 2017

Test build #85001 has finished for PR 19594 at commit 2637429.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 18, 2017

retest this please

* Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part
* of the bin in that range and its number of rows.
*/
def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe explain in the comment that height means the average number of rows of the given bin inside a equi-height histogram.

(bin.lo, bin.hi)
}

if (bin.hi == bin.lo) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this branch? I think the else if branch can also cover it, if we assume bin.ndv must be 1 if bin.hi == bin.lo

rightNumRows = rightHeight / right.ndv
)
} else if (right.lo == right.hi) {
// Case2: the right bin has only one value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need case 1 and 2? aren't they covered by branches below?

// Case3: the left bin is "smaller" than the right bin
// left.lo right.lo left.hi right.hi
// --------+------------------+------------+----------------+------->
if (left.hi == right.lo) {
Copy link
Contributor

@cloud-fan cloud-fan Dec 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea this branch is needed, otherwise we will get 0 ratio which leads to wrong result.

} else {
// lowerBound bin.lo bin.hi upperBound
// --------+------------------+------------+-------------+------->
(bin.lo, bin.hi)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an assert to make sure if we reach here, the case is what we want.

@@ -225,6 +236,43 @@ case class JoinEstimation(join: Join) extends Logging {
(ceil(card), newStats)
}

/** Compute join cardinality using equi-height histograms. */
private def computeByEquiHeightHistogram(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to only say Histogram in method names and explain it's equi-height in comments.

rightHistogram = rightHistogram,
// Only numeric values have equi-height histograms.
lowerBound = newMin.get.toString.toDouble,
upperBound = newMax.get.toString.toDouble)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we assume the min/max must be defined here, I think the parameter type should be double instead of Option[Any]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's because we need to update the column stats' min and max at the end of the method.

val leftKeyStat = leftStats.attributeStats(leftKey)
val rightKeyStat = rightStats.attributeStats(rightKey)
val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we count left/right numRows when calculating this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we use left/right numRows to calculate this? Ideally avgLen is calculated by total length of keys / numRowsAfterJoin. For string type, we don't the exact length of the matched keys (we don't support string histogram yet), for numeric types, their avgLen should be the same. So the equation is a fair approximation.

// truncated by the updated max/min. In this way, only pointers of the histograms are
// propagated and thus reduce memory consumption.
leftKey -> joinStat.copy(histogram = leftKeyStat.histogram),
rightKey -> joinStat.copy(histogram = rightKeyStat.histogram)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we do this inside computeByEquiHeightHistogram?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put it here because computeByEquiHeightHistogram returns a single stats, here we keep the histogram for leftKey and rightKey respectively.


val expectedRanges = Seq(
// histogram1.bins(0) overlaps t0
OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space between oeprators.

OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20)
)
assert(expectedRanges.equals(
getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10D looks weird, how about 10.0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we can just write 10 right?

expectedMin = 10D,
expectedMax = 60D,
// 10 + 20 + 8
expectedNdv = 38L,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expectedNdv = 10 + 20 + 8?

@SparkQA
Copy link

SparkQA commented Dec 18, 2017

Test build #85061 has finished for PR 19594 at commit 2637429.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// 10 + 20 + 8
expectedNdv = 38L,
// 300*40/20 + 200*40/20 + 100*20/10
expectedRows = 1200L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@cloud-fan
Copy link
Contributor

LGTM except some minor comments

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85106 has finished for PR 19594 at commit 16797d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 571aa27 Dec 19, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants