From 8b2084a4bec8fdd58cca809b2d2b26bdc939436d Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Tue, 14 Nov 2017 13:27:55 +0800 Subject: [PATCH 1/5] join estimation based on histogram --- .../statsEstimation/EstimationUtils.scala | 164 +++++++++++++- .../statsEstimation/JoinEstimation.scala | 51 ++++- .../statsEstimation/JoinEstimationSuite.scala | 204 +++++++++++++++++- 3 files changed, 413 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 9c34a9b7aa756..220938d0eaf13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation +import scala.collection.mutable.ArrayBuffer import scala.math.BigDecimal.RoundingMode import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.{DecimalType, _} @@ -114,4 +115,165 @@ object EstimationUtils { } } + def getOverlappedRanges( + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Double, + newMax: Double): Seq[OverlappedRange] = { + val overlappedRanges = new ArrayBuffer[OverlappedRange]() + // Only bins whose range intersect [newMin, newMax] have join possibility. + val leftBins = leftHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + val rightBins = rightHistogram.bins + .filter(b => b.lo <= newMax && b.hi >= newMin) + + leftBins.foreach { lb => + rightBins.foreach { rb => + val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) + val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) + // Only collect overlapped ranges. + if (left.lo <= right.hi && left.hi >= right.lo) { + // Collect overlapping ranges. + val range = if (left.lo == left.hi) { + // Case1: the left bin has only one value + OverlappedRange( + lo = left.lo, + hi = left.lo, + minNdv = 1, + maxNdv = 1, + leftNumRows = leftHeight, + rightNumRows = rightHeight / right.ndv + ) + } else if (right.lo == right.hi) { + // Case2: the right bin has only one value + OverlappedRange( + lo = right.lo, + hi = right.lo, + minNdv = 1, + maxNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight + ) + } else if (right.lo >= left.lo && right.hi >= left.hi) { + // Case3: the left bin is "smaller" than the right bin + // left.lo right.lo left.hi right.hi + // --------+------------------+------------+----------------+-------> + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) + if (leftRatio == 0) { + // The overlapping range has only one value. + OverlappedRange( + lo = right.lo, + hi = right.lo, + minNdv = 1, + maxNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight / right.ndv + ) + } else { + OverlappedRange( + lo = right.lo, + hi = left.hi, + minNdv = math.min(left.ndv * leftRatio, right.ndv * rightRatio), + maxNdv = math.max(left.ndv * leftRatio, right.ndv * rightRatio), + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight * rightRatio + ) + } + } else if (right.lo <= left.lo && right.hi <= left.hi) { + // Case4: the left bin is "larger" than the right bin + // right.lo left.lo right.hi left.hi + // --------+------------------+------------+----------------+-------> + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) + if (leftRatio == 0) { + // The overlapping range has only one value. + OverlappedRange( + lo = right.hi, + hi = right.hi, + minNdv = 1, + maxNdv = 1, + leftNumRows = leftHeight / left.ndv, + rightNumRows = rightHeight / right.ndv + ) + } else { + OverlappedRange( + lo = left.lo, + hi = right.hi, + minNdv = math.min(left.ndv * leftRatio, right.ndv * rightRatio), + maxNdv = math.max(left.ndv * leftRatio, right.ndv * rightRatio), + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight * rightRatio + ) + } + } else if (right.lo >= left.lo && right.hi <= left.hi) { + // Case5: the left bin contains the right bin + // left.lo right.lo right.hi left.hi + // --------+------------------+------------+----------------+-------> + val leftRatio = (right.hi - right.lo) / (left.hi - left.lo) + OverlappedRange( + lo = right.lo, + hi = right.hi, + minNdv = math.min(left.ndv * leftRatio, right.ndv), + maxNdv = math.max(left.ndv * leftRatio, right.ndv), + leftNumRows = leftHeight * leftRatio, + rightNumRows = rightHeight + ) + } else { + assert(right.lo <= left.lo && right.hi >= left.hi) + // Case6: the right bin contains the left bin + // right.lo left.lo left.hi right.hi + // --------+------------------+------------+----------------+-------> + val rightRatio = (left.hi - left.lo) / (right.hi - right.lo) + OverlappedRange( + lo = left.lo, + hi = left.hi, + minNdv = math.min(left.ndv, right.ndv * rightRatio), + maxNdv = math.max(left.ndv, right.ndv * rightRatio), + leftNumRows = leftHeight, + rightNumRows = rightHeight * rightRatio + ) + } + overlappedRanges += range + } + } + } + overlappedRanges + } + + def trimBin(bin: HistogramBin, height: Double, min: Double, max: Double) + : (HistogramBin, Double) = { + val (lo, hi) = if (bin.lo <= min && bin.hi >= max) { + // bin.lo min max bin.hi + // --------+------------------+------------+-------------+-------> + (min, max) + } else if (bin.lo <= min && bin.hi >= min) { + // bin.lo min bin.hi + // --------+------------------+-----------+-------> + (min, bin.hi) + } else if (bin.lo <= max && bin.hi >= max) { + // bin.lo max bin.hi + // --------+------------------+-----------+-------> + (bin.lo, max) + } else { + (bin.lo, bin.hi) + } + + if (bin.hi == bin.lo) { + (bin, height) + } else if (hi == lo) { + (HistogramBin(lo, hi, 1), height / bin.ndv) + } else { + val ratio = (hi - lo) / (bin.hi - bin.lo) + (HistogramBin(lo, hi, math.ceil(bin.ndv * ratio).toLong), height * ratio) + } + } + + case class OverlappedRange( + lo: Double, + hi: Double, + minNdv: Double, + maxNdv: Double, + leftNumRows: Double, + rightNumRows: Double) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index b073108c26ee5..814bec785aba9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, Join, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ @@ -191,8 +191,16 @@ case class JoinEstimation(join: Join) extends Logging { val rInterval = ValueInterval(rightKeyStat.min, rightKeyStat.max, rightKey.dataType) if (ValueInterval.isIntersected(lInterval, rInterval)) { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) - val (card, joinStat) = computeByNdv(leftKey, rightKey, newMin, newMax) - keyStatsAfterJoin += (leftKey -> joinStat, rightKey -> joinStat) + val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { + case (Some(l: Histogram), Some(r: Histogram)) => + computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + case _ => + computeByNdv(leftKey, rightKey, newMin, newMax) + } + keyStatsAfterJoin += ( + leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), + rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) + ) // Return cardinality estimated from the most selective join keys. if (card < joinCard) joinCard = card } else { @@ -225,6 +233,43 @@ case class JoinEstimation(join: Join) extends Logging { (ceil(card), newStats) } + /** Compute join cardinality using equi-height histograms. */ + private def computeByEquiHeightHistogram( + leftKey: AttributeReference, + rightKey: AttributeReference, + leftHistogram: Histogram, + rightHistogram: Histogram, + newMin: Option[Any], + newMax: Option[Any]): (BigInt, ColumnStat) = { + val overlappedRanges = getOverlappedRanges( + leftHistogram = leftHistogram, + rightHistogram = rightHistogram, + // Only numeric values have equi-height histograms. + newMin = newMin.get.toString.toDouble, + newMax = newMax.get.toString.toDouble) + + var card: BigDecimal = 0 + var totalNdv: Double = 0 + for (i <- overlappedRanges.indices) { + val range = overlappedRanges(i) + if (i == 0 || range.hi != overlappedRanges(i - 1).hi) { + // If range.hi == overlappingRanges(i - 1).hi, that means the current range has only one + // value, and this value is already counted in the previous range. So there is no need to + // count it in this range. + totalNdv += range.minNdv + } + // Apply the formula in this overlapping range. + card += range.leftNumRows * range.rightNumRows / range.maxNdv + } + + val leftKeyStat = leftStats.attributeStats(leftKey) + val rightKeyStat = rightStats.attributeStats(rightKey) + val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen) + val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2 + val newStats = ColumnStat(ceil(totalNdv), newMin, newMax, 0, newAvgLen, newMaxLen) + (ceil(card), newStats) + } + /** * Propagate or update column stats for output attributes. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index 097c78eb27fca..5eac2ef35164c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -20,10 +20,11 @@ package org.apache.spark.sql.catalyst.statsEstimation import java.sql.{Date, Timestamp} import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeReference, EqualTo} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeReference, EqualTo, Expression, GreaterThanOrEqual, LessThanOrEqual, Literal} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, Project, Statistics} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.{DateType, TimestampType, _} @@ -67,6 +68,205 @@ class JoinEstimationSuite extends StatsEstimationTestBase { rowCount = 2, attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) + private def estimateByHistogram( + histogram1: Histogram, + histogram2: Histogram, + expectedMin: Double, + expectedMax: Double, + expectedNdv: Long, + expectedRows: Long): Unit = { + val col1 = attr("key1") + val col2 = attr("key2") + val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) + val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + + val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) + val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) + val expectedStatsAfterJoin = Statistics( + sizeInBytes = expectedRows * (8 + 2 * 4), + rowCount = Some(expectedRows), + attributeStats = AttributeMap(Seq( + col1 -> c1.stats.attributeStats(col1).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)), + col2 -> c2.stats.attributeStats(col2).copy( + distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)))) + ) + + // Join order should not affect estimation result. + Seq(c1JoinC2, c2JoinC1).foreach { join => + assert(join.stats == expectedStatsAfterJoin) + } + } + + private def generateJoinChild( + col: Attribute, + histogram: Histogram, + expectedMin: Double, + expectedMax: Double): LogicalPlan = { + val colStat = inferColumnStat(histogram) + val t = StatsTestPlan( + outputList = Seq(col), + rowCount = (histogram.height * histogram.bins.length).toLong, + attributeStats = AttributeMap(Seq(col -> colStat))) + + val filterCondition = new ArrayBuffer[Expression]() + if (expectedMin > colStat.min.get.toString.toDouble) { + filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) + } + if (expectedMax < colStat.max.get.toString.toDouble) { + filterCondition += LessThanOrEqual(col, Literal(expectedMax)) + } + if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) + } + + private def inferColumnStat(histogram: Histogram): ColumnStat = { + var ndv = 0L + for (i <- histogram.bins.indices) { + val bin = histogram.bins(i) + if (i == 0 || bin.hi != histogram.bins(i - 1).hi) { + ndv += bin.ndv + } + } + ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo), + max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4, + histogram = Some(histogram)) + } + + test("equi-height histograms: a bin is contained by another one") { + val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 10, hi = 30, ndv = 10), HistogramBin(lo = 30, hi = 60, ndv = 30))) + val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) + // test bin trimming + val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) + assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) + val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) + assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + + val expectedRanges = Seq( + OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), + OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) + ) + assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + + estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 10D, + expectedMax = 60D, + // 10 + 20 + 8 + expectedNdv = 38L, + // 300*40/20 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value") { + val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) + val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) + // test bin trimming + val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60) + assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40) + val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60) + assert(t2 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + + val expectedRanges = Seq( + OverlappedRange(30, 30, 1, 1, 300, 40/20), + OverlappedRange(30, 50, math.min(30*2/3, 20), math.max(30*2/3, 20), 300*2/3, 40), + OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) + ) + assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D))) + + estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 30D, + expectedMax = 60D, + // 1 + 20 + 8 + expectedNdv = 29L, + // 300*20/1 + 200*40/20 + 100*20/10 + expectedRows = 1200L) + } + + test("equi-height histograms: a bin has only one value after trimming") { + val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3))) + val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) + // test bin trimming + val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 50, max = 75) + assert(t1 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h1 == 2) + val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) + assert(t2 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h2 == 50) + + val expectedRanges = Seq( + OverlappedRange(50, 50, 1, 1, 300/10, 2), + OverlappedRange(50, 60, math.min(10, 20*10/25), math.max(10, 20*10/25), 300, 50*10/25), + OverlappedRange(60, 75, math.min(3, 20*15/25), math.max(3, 20*15/25), 300, 50*15/25) + ) + assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + + estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 50D, + expectedMax = 75D, + // 1 + 8 + 3 + expectedNdv = 12L, + // 30*2/1 + 300*20/10 + 300*30/12 + expectedRows = 1410L) + } + + test("equi-height histograms: skip and trim bins by min/max") { + // This case can happen when estimating join after a filter. + val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 1, hi = 30, ndv = 10), + HistogramBin(lo = 30, hi = 60, ndv = 30), + HistogramBin(lo = 60, hi = 90, ndv = 6), + HistogramBin(lo = 90, hi = 200, ndv = 30))) + val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = -50, hi = 0, ndv = 50), + HistogramBin(lo = 0, hi = 50, ndv = 50), + HistogramBin(lo = 50, hi = 100, ndv = 40), + HistogramBin(lo = 100, hi = 150, ndv = 40))) + + // test bin trimming + val (t1, h1) = trimBin(histogram1.bins(1), height = 300, min = 50, max = 75) + assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 10) && h1 == 100) + val (t2, h2) = trimBin(histogram1.bins(2), height = 300, min = 50, max = 75) + assert(t2 == HistogramBin(lo = 60, hi = 75, ndv = 3) && h2 == 150) + val (t3, h3) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) + assert(t3 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h3 == 2) + val (t4, h4) = trimBin(histogram2.bins(2), height = 100, min = 50, max = 75) + assert(t4 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h4 == 50) + + val expectedRanges = Seq( + // t1 overlaps t3 + OverlappedRange(50, 50, 1, 1, 100/10, 2), + // t1 overlaps t4 + OverlappedRange(50, 60, math.min(10, 20*10/25), math.max(10, 20*10/25), 100, 50*10/25), + // t2 overlaps t4 + OverlappedRange(60, 75, math.min(3, 20*15/25), math.max(3, 20*15/25), 150, 50*15/25) + ) + assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + + estimateByHistogram( + histogram1 = histogram1, + histogram2 = histogram2, + expectedMin = 50D, + expectedMax = 75D, + // 1 + 8 + 3 + expectedNdv = 12L, + // 10*2/1 + 100*20/10 + 150*30/12 + expectedRows = 595L) + } + test("cross join") { // table1 (key-1-5 int, key-5-9 int): (1, 9), (2, 8), (3, 7), (4, 6), (5, 5) // table2 (key-1-2 int, key-2-4 int): (1, 2), (2, 3), (2, 4) From e69e21348b4cde2abaec9dbb46381caf1ed3a1a4 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Sat, 9 Dec 2017 10:04:55 +0800 Subject: [PATCH 2/5] add test cases, add some comments and a small factor --- .../statsEstimation/EstimationUtils.scala | 60 ++++-- .../statsEstimation/JoinEstimation.scala | 8 +- .../statsEstimation/JoinEstimationSuite.scala | 198 ++++++++++-------- 3 files changed, 150 insertions(+), 116 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 220938d0eaf13..9bb9e54ac94b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -115,6 +115,9 @@ object EstimationUtils { } } + /** + * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + */ def getOverlappedRanges( leftHistogram: Histogram, rightHistogram: Histogram, @@ -133,14 +136,14 @@ object EstimationUtils { val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) // Only collect overlapped ranges. if (left.lo <= right.hi && left.hi >= right.lo) { - // Collect overlapping ranges. + // Collect overlapped ranges. val range = if (left.lo == left.hi) { // Case1: the left bin has only one value OverlappedRange( lo = left.lo, hi = left.lo, - minNdv = 1, - maxNdv = 1, + leftNdv = 1, + rightNdv = 1, leftNumRows = leftHeight, rightNumRows = rightHeight / right.ndv ) @@ -149,8 +152,8 @@ object EstimationUtils { OverlappedRange( lo = right.lo, hi = right.lo, - minNdv = 1, - maxNdv = 1, + leftNdv = 1, + rightNdv = 1, leftNumRows = leftHeight / left.ndv, rightNumRows = rightHeight ) @@ -161,12 +164,12 @@ object EstimationUtils { val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) if (leftRatio == 0) { - // The overlapping range has only one value. + // The overlapped range has only one value. OverlappedRange( lo = right.lo, hi = right.lo, - minNdv = 1, - maxNdv = 1, + leftNdv = 1, + rightNdv = 1, leftNumRows = leftHeight / left.ndv, rightNumRows = rightHeight / right.ndv ) @@ -174,8 +177,8 @@ object EstimationUtils { OverlappedRange( lo = right.lo, hi = left.hi, - minNdv = math.min(left.ndv * leftRatio, right.ndv * rightRatio), - maxNdv = math.max(left.ndv * leftRatio, right.ndv * rightRatio), + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv * rightRatio, leftNumRows = leftHeight * leftRatio, rightNumRows = rightHeight * rightRatio ) @@ -187,12 +190,12 @@ object EstimationUtils { val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) if (leftRatio == 0) { - // The overlapping range has only one value. + // The overlapped range has only one value. OverlappedRange( lo = right.hi, hi = right.hi, - minNdv = 1, - maxNdv = 1, + leftNdv = 1, + rightNdv = 1, leftNumRows = leftHeight / left.ndv, rightNumRows = rightHeight / right.ndv ) @@ -200,8 +203,8 @@ object EstimationUtils { OverlappedRange( lo = left.lo, hi = right.hi, - minNdv = math.min(left.ndv * leftRatio, right.ndv * rightRatio), - maxNdv = math.max(left.ndv * leftRatio, right.ndv * rightRatio), + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv * rightRatio, leftNumRows = leftHeight * leftRatio, rightNumRows = rightHeight * rightRatio ) @@ -214,8 +217,8 @@ object EstimationUtils { OverlappedRange( lo = right.lo, hi = right.hi, - minNdv = math.min(left.ndv * leftRatio, right.ndv), - maxNdv = math.max(left.ndv * leftRatio, right.ndv), + leftNdv = left.ndv * leftRatio, + rightNdv = right.ndv, leftNumRows = leftHeight * leftRatio, rightNumRows = rightHeight ) @@ -228,8 +231,8 @@ object EstimationUtils { OverlappedRange( lo = left.lo, hi = left.hi, - minNdv = math.min(left.ndv, right.ndv * rightRatio), - maxNdv = math.max(left.ndv, right.ndv * rightRatio), + leftNdv = left.ndv, + rightNdv = right.ndv * rightRatio, leftNumRows = leftHeight, rightNumRows = rightHeight * rightRatio ) @@ -241,6 +244,10 @@ object EstimationUtils { overlappedRanges } + /** + * Given an original bin and a value range [min, max], returns the trimmed bin and its number of + * rows. + */ def trimBin(bin: HistogramBin, height: Double, min: Double, max: Double) : (HistogramBin, Double) = { val (lo, hi) = if (bin.lo <= min && bin.hi >= max) { @@ -269,11 +276,22 @@ object EstimationUtils { } } + /** + * A join between two equi-height histograms may produce multiple overlapped ranges. + * Each overlapped range is produced by a part of one bin in the left histogram and a part of + * one bin in the right histogram. + * @param lo lower bound of this overlapped range. + * @param hi higher bound of this overlapped range. + * @param leftNdv ndv in the left part. + * @param rightNdv ndv in the right part. + * @param leftNumRows number of rows in the left part. + * @param rightNumRows number of rows in the right part. + */ case class OverlappedRange( lo: Double, hi: Double, - minNdv: Double, - maxNdv: Double, + leftNdv: Double, + rightNdv: Double, leftNumRows: Double, rightNumRows: Double) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index 814bec785aba9..3bc85e2200730 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -253,13 +253,13 @@ case class JoinEstimation(join: Join) extends Logging { for (i <- overlappedRanges.indices) { val range = overlappedRanges(i) if (i == 0 || range.hi != overlappedRanges(i - 1).hi) { - // If range.hi == overlappingRanges(i - 1).hi, that means the current range has only one + // If range.hi == overlappedRanges(i - 1).hi, that means the current range has only one // value, and this value is already counted in the previous range. So there is no need to // count it in this range. - totalNdv += range.minNdv + totalNdv += math.min(range.leftNdv, range.rightNdv) } - // Apply the formula in this overlapping range. - card += range.leftNumRows * range.rightNumRows / range.maxNdv + // Apply the formula in this overlapped range. + card += range.leftNumRows * range.rightNumRows / math.max(range.leftNdv, range.rightNdv) } val leftKeyStat = leftStats.attributeStats(leftKey) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index 5eac2ef35164c..49aa8fda23663 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.catalyst.statsEstimation import java.sql.{Date, Timestamp} import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeReference, EqualTo, Expression, GreaterThanOrEqual, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeMap, AttributeReference, EqualTo} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._ @@ -69,16 +68,16 @@ class JoinEstimationSuite extends StatsEstimationTestBase { attributeStats = AttributeMap(Seq("key-1-2", "key-2-3").map(nameToColInfo))) private def estimateByHistogram( - histogram1: Histogram, - histogram2: Histogram, + leftHistogram: Histogram, + rightHistogram: Histogram, expectedMin: Double, expectedMax: Double, expectedNdv: Long, expectedRows: Long): Unit = { val col1 = attr("key1") val col2 = attr("key2") - val c1 = generateJoinChild(col1, histogram1, expectedMin, expectedMax) - val c2 = generateJoinChild(col2, histogram2, expectedMin, expectedMax) + val c1 = generateJoinChild(col1, leftHistogram, expectedMin, expectedMax) + val c2 = generateJoinChild(col2, rightHistogram, expectedMin, expectedMax) val c1JoinC2 = Join(c1, c2, Inner, Some(EqualTo(col1, col2))) val c2JoinC1 = Join(c2, c1, Inner, Some(EqualTo(col2, col1))) @@ -104,21 +103,13 @@ class JoinEstimationSuite extends StatsEstimationTestBase { expectedMin: Double, expectedMax: Double): LogicalPlan = { val colStat = inferColumnStat(histogram) - val t = StatsTestPlan( + StatsTestPlan( outputList = Seq(col), rowCount = (histogram.height * histogram.bins.length).toLong, attributeStats = AttributeMap(Seq(col -> colStat))) - - val filterCondition = new ArrayBuffer[Expression]() - if (expectedMin > colStat.min.get.toString.toDouble) { - filterCondition += GreaterThanOrEqual(col, Literal(expectedMin)) - } - if (expectedMax < colStat.max.get.toString.toDouble) { - filterCondition += LessThanOrEqual(col, Literal(expectedMax)) - } - if (filterCondition.isEmpty) t else Filter(filterCondition.reduce(And), t) } + /** Column statistics should be consistent with histograms in tests. */ private def inferColumnStat(histogram: Histogram): ColumnStat = { var ndv = 0L for (i <- histogram.bins.indices) { @@ -138,22 +129,25 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) // test bin trimming - val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) - assert(t1 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h1 == 80) - val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) - assert(t2 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) + assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) + assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) val expectedRanges = Seq( - OverlappedRange(10, 30, math.min(10, 40*1/2), math.max(10, 40*1/2), 300, 80*1/2), - OverlappedRange(30, 50, math.min(30*2/3, 40*1/2), math.max(30*2/3, 40*1/2), 300*2/3, 80*1/2), - OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) + // histogram1.bins(0) overlaps t0 + OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + // histogram1.bins(1) overlaps t0 + OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + // histogram1.bins(1) overlaps t1 + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) ) assert(expectedRanges.equals( getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) estimateByHistogram( - histogram1 = histogram1, - histogram2 = histogram2, + leftHistogram = histogram1, + rightHistogram = histogram2, expectedMin = 10D, expectedMax = 60D, // 10 + 20 + 8 @@ -162,109 +156,131 @@ class JoinEstimationSuite extends StatsEstimationTestBase { expectedRows = 1200L) } - test("equi-height histograms: a bin has only one value") { + test("equi-height histograms: a bin has only one value after trimming") { val histogram1 = Histogram(height = 300, Array( - HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) + HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3))) val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) // test bin trimming - val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60) - assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40) - val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60) - assert(t2 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h2 == 20) + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 50, max = 75) + assert(t0 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h0 == 2) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) + assert(t1 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h1 == 50) + + val expectedRanges = Seq( + // histogram1.bins(0) overlaps t0 + OverlappedRange(50, 50, 1, 1, 300/10, 2), + // histogram1.bins(0) overlaps t1 + OverlappedRange(50, 60, 10, 20*10/25, 300, 50*10/25), + // histogram1.bins(1) overlaps t1 + OverlappedRange(60, 75, 3, 20*15/25, 300, 50*15/25) + ) + assert(expectedRanges.equals( + getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + + estimateByHistogram( + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 50D, + expectedMax = 75D, + // 1 + 8 + 3 + expectedNdv = 12L, + // 30*2/1 + 300*20/10 + 300*30/12 + expectedRows = 1410L) + } + + test("equi-height histograms: skew distribution (some bins have only one value)") { + val histogram1 = Histogram(height = 300, Array( + HistogramBin(lo = 30, hi = 30, ndv = 1), + HistogramBin(lo = 30, hi = 30, ndv = 1), + HistogramBin(lo = 30, hi = 60, ndv = 30))) + val histogram2 = Histogram(height = 100, Array( + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) + // test bin trimming + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60) + assert(t0 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h0 == 40) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60) + assert(t1 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) val expectedRanges = Seq( OverlappedRange(30, 30, 1, 1, 300, 40/20), - OverlappedRange(30, 50, math.min(30*2/3, 20), math.max(30*2/3, 20), 300*2/3, 40), - OverlappedRange(50, 60, math.min(30*1/3, 8), math.max(30*1/3, 8), 300*1/3, 20) + OverlappedRange(30, 30, 1, 1, 300, 40/20), + OverlappedRange(30, 50, 30*2/3, 20, 300*2/3, 40), + OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) ) assert(expectedRanges.equals( getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D))) estimateByHistogram( - histogram1 = histogram1, - histogram2 = histogram2, + leftHistogram = histogram1, + rightHistogram = histogram2, expectedMin = 30D, expectedMax = 60D, // 1 + 20 + 8 expectedNdv = 29L, - // 300*20/1 + 200*40/20 + 100*20/10 - expectedRows = 1200L) + // 300*2/1 + 300*2/1 + 200*40/20 + 100*20/10 + expectedRows = 1800L) } - test("equi-height histograms: a bin has only one value after trimming") { + test("equi-height histograms: skew distribution (histograms have different skewed values") { val histogram1 = Histogram(height = 300, Array( - HistogramBin(lo = 50, hi = 60, ndv = 10), HistogramBin(lo = 60, hi = 75, ndv = 3))) + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) val histogram2 = Histogram(height = 100, Array( - HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) + HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 50, ndv = 1))) // test bin trimming - val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 50, max = 75) - assert(t1 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h1 == 2) - val (t2, h2) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) - assert(t2 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h2 == 50) + val (t0, h0) = trimBin(histogram1.bins(1), height = 300, min = 30, max = 50) + assert(t0 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h0 == 200) + val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 50) + assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40) val expectedRanges = Seq( - OverlappedRange(50, 50, 1, 1, 300/10, 2), - OverlappedRange(50, 60, math.min(10, 20*10/25), math.max(10, 20*10/25), 300, 50*10/25), - OverlappedRange(60, 75, math.min(3, 20*15/25), math.max(3, 20*15/25), 300, 50*15/25) + OverlappedRange(30, 30, 1, 1, 300, 40/20), + OverlappedRange(30, 50, 20, 20, 200, 40), + OverlappedRange(50, 50, 1, 1, 200/20, 100) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 50D))) estimateByHistogram( - histogram1 = histogram1, - histogram2 = histogram2, - expectedMin = 50D, - expectedMax = 75D, - // 1 + 8 + 3 - expectedNdv = 12L, - // 30*2/1 + 300*20/10 + 300*30/12 - expectedRows = 1410L) + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 30D, + expectedMax = 50D, + // 1 + 20 + expectedNdv = 21L, + // 300*2/1 + 200*40/20 + 10*100/1 + expectedRows = 2000L) } - test("equi-height histograms: skip and trim bins by min/max") { - // This case can happen when estimating join after a filter. + test("equi-height histograms: skew distribution (both histograms have the same skewed value") { val histogram1 = Histogram(height = 300, Array( - HistogramBin(lo = 1, hi = 30, ndv = 10), - HistogramBin(lo = 30, hi = 60, ndv = 30), - HistogramBin(lo = 60, hi = 90, ndv = 6), - HistogramBin(lo = 90, hi = 200, ndv = 30))) - val histogram2 = Histogram(height = 100, Array( - HistogramBin(lo = -50, hi = 0, ndv = 50), - HistogramBin(lo = 0, hi = 50, ndv = 50), - HistogramBin(lo = 50, hi = 100, ndv = 40), - HistogramBin(lo = 100, hi = 150, ndv = 40))) - + HistogramBin(lo = 30, hi = 30, ndv = 1), HistogramBin(lo = 30, hi = 60, ndv = 30))) + val histogram2 = Histogram(height = 150, Array( + HistogramBin(lo = 0, hi = 30, ndv = 30), HistogramBin(lo = 30, hi = 30, ndv = 1))) // test bin trimming - val (t1, h1) = trimBin(histogram1.bins(1), height = 300, min = 50, max = 75) - assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 10) && h1 == 100) - val (t2, h2) = trimBin(histogram1.bins(2), height = 300, min = 50, max = 75) - assert(t2 == HistogramBin(lo = 60, hi = 75, ndv = 3) && h2 == 150) - val (t3, h3) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) - assert(t3 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h3 == 2) - val (t4, h4) = trimBin(histogram2.bins(2), height = 100, min = 50, max = 75) - assert(t4 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h4 == 50) + val (t0, h0) = trimBin(histogram1.bins(1), height = 300, min = 30, max = 30) + assert(t0 == HistogramBin(lo = 30, hi = 30, ndv = 1) && h0 == 10) + val (t1, h1) = trimBin(histogram2.bins(0), height = 150, min = 30, max = 30) + assert(t1 == HistogramBin(lo = 30, hi = 30, ndv = 1) && h1 == 5) val expectedRanges = Seq( - // t1 overlaps t3 - OverlappedRange(50, 50, 1, 1, 100/10, 2), - // t1 overlaps t4 - OverlappedRange(50, 60, math.min(10, 20*10/25), math.max(10, 20*10/25), 100, 50*10/25), - // t2 overlaps t4 - OverlappedRange(60, 75, math.min(3, 20*15/25), math.max(3, 20*15/25), 150, 50*15/25) + OverlappedRange(30, 30, 1, 1, 300, 5), + OverlappedRange(30, 30, 1, 1, 300, 150), + OverlappedRange(30, 30, 1, 1, 10, 5), + OverlappedRange(30, 30, 1, 1, 10, 150) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 30D))) estimateByHistogram( - histogram1 = histogram1, - histogram2 = histogram2, - expectedMin = 50D, - expectedMax = 75D, - // 1 + 8 + 3 - expectedNdv = 12L, - // 10*2/1 + 100*20/10 + 150*30/12 - expectedRows = 595L) + leftHistogram = histogram1, + rightHistogram = histogram2, + expectedMin = 30D, + expectedMax = 30D, + // only one value: 30 + expectedNdv = 1L, + // 300*5/1 + 300*150/1 + 10*5/1 + 10*150/1 + expectedRows = 48050L) } test("cross join") { From 2a4ee99526c654834f3a50ef66e674bda673f926 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Sat, 16 Dec 2017 10:25:13 +0800 Subject: [PATCH 3/5] fix comments --- .../statsEstimation/EstimationUtils.scala | 59 ++++++++++--------- .../statsEstimation/JoinEstimation.scala | 4 +- .../statsEstimation/JoinEstimationSuite.scala | 30 +++++----- 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 118f799f94769..da792105df9c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -214,24 +214,25 @@ object EstimationUtils { } /** - * Returns overlapped ranges between two histograms, in the given value range [newMin, newMax]. + * Returns overlapped ranges between two histograms, in the given value range + * [lowerBound, upperBound]. */ def getOverlappedRanges( leftHistogram: Histogram, rightHistogram: Histogram, - newMin: Double, - newMax: Double): Seq[OverlappedRange] = { + lowerBound: Double, + upperBound: Double): Seq[OverlappedRange] = { val overlappedRanges = new ArrayBuffer[OverlappedRange]() - // Only bins whose range intersect [newMin, newMax] have join possibility. + // Only bins whose range intersect [lowerBound, upperBound] have join possibility. val leftBins = leftHistogram.bins - .filter(b => b.lo <= newMax && b.hi >= newMin) + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) val rightBins = rightHistogram.bins - .filter(b => b.lo <= newMax && b.hi >= newMin) + .filter(b => b.lo <= upperBound && b.hi >= lowerBound) leftBins.foreach { lb => rightBins.foreach { rb => - val (left, leftHeight) = trimBin(lb, leftHistogram.height, newMin, newMax) - val (right, rightHeight) = trimBin(rb, rightHistogram.height, newMin, newMax) + val (left, leftHeight) = trimBin(lb, leftHistogram.height, lowerBound, upperBound) + val (right, rightHeight) = trimBin(rb, rightHistogram.height, lowerBound, upperBound) // Only collect overlapped ranges. if (left.lo <= right.hi && left.hi >= right.lo) { // Collect overlapped ranges. @@ -259,9 +260,7 @@ object EstimationUtils { // Case3: the left bin is "smaller" than the right bin // left.lo right.lo left.hi right.hi // --------+------------------+------------+----------------+-------> - val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) - val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) - if (leftRatio == 0) { + if (left.hi == right.lo) { // The overlapped range has only one value. OverlappedRange( lo = right.lo, @@ -272,6 +271,8 @@ object EstimationUtils { rightNumRows = rightHeight / right.ndv ) } else { + val leftRatio = (left.hi - right.lo) / (left.hi - left.lo) + val rightRatio = (left.hi - right.lo) / (right.hi - right.lo) OverlappedRange( lo = right.lo, hi = left.hi, @@ -285,9 +286,7 @@ object EstimationUtils { // Case4: the left bin is "larger" than the right bin // right.lo left.lo right.hi left.hi // --------+------------------+------------+----------------+-------> - val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) - val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) - if (leftRatio == 0) { + if (right.hi == left.lo) { // The overlapped range has only one value. OverlappedRange( lo = right.hi, @@ -298,6 +297,8 @@ object EstimationUtils { rightNumRows = rightHeight / right.ndv ) } else { + val leftRatio = (right.hi - left.lo) / (left.hi - left.lo) + val rightRatio = (right.hi - left.lo) / (right.hi - right.lo) OverlappedRange( lo = left.lo, hi = right.hi, @@ -343,24 +344,26 @@ object EstimationUtils { } /** - * Given an original bin and a value range [min, max], returns the trimmed bin and its number of - * rows. + * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part + * of the bin in that range and its number of rows. */ - def trimBin(bin: HistogramBin, height: Double, min: Double, max: Double) + def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double) : (HistogramBin, Double) = { - val (lo, hi) = if (bin.lo <= min && bin.hi >= max) { - // bin.lo min max bin.hi + val (lo, hi) = if (bin.lo <= lowerBound && bin.hi >= upperBound) { + // bin.lo lowerBound upperBound bin.hi + // --------+------------------+------------+-------------+-------> + (lowerBound, upperBound) + } else if (bin.lo <= lowerBound && bin.hi >= lowerBound) { + // bin.lo lowerBound bin.hi upperBound // --------+------------------+------------+-------------+-------> - (min, max) - } else if (bin.lo <= min && bin.hi >= min) { - // bin.lo min bin.hi - // --------+------------------+-----------+-------> - (min, bin.hi) - } else if (bin.lo <= max && bin.hi >= max) { - // bin.lo max bin.hi - // --------+------------------+-----------+-------> - (bin.lo, max) + (lowerBound, bin.hi) + } else if (bin.lo <= upperBound && bin.hi >= upperBound) { + // lowerBound bin.lo upperBound bin.hi + // --------+------------------+------------+-------------+-------> + (bin.lo, upperBound) } else { + // lowerBound bin.lo bin.hi upperBound + // --------+------------------+------------+-------------+-------> (bin.lo, bin.hi) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index 3bc85e2200730..f02c93ef42729 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -245,8 +245,8 @@ case class JoinEstimation(join: Join) extends Logging { leftHistogram = leftHistogram, rightHistogram = rightHistogram, // Only numeric values have equi-height histograms. - newMin = newMin.get.toString.toDouble, - newMax = newMax.get.toString.toDouble) + lowerBound = newMin.get.toString.toDouble, + upperBound = newMax.get.toString.toDouble) var card: BigDecimal = 0 var totalNdv: Double = 0 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index 49aa8fda23663..7fe2376663633 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -129,9 +129,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) // test bin trimming - val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 10, max = 60) + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 10, upperBound = 60) assert(t0 == HistogramBin(lo = 10, hi = 50, ndv = 40) && h0 == 80) - val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 10, max = 60) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 10, upperBound = 60) assert(t1 == HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) val expectedRanges = Seq( @@ -143,7 +143,7 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 10D, newMax = 60D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) estimateByHistogram( leftHistogram = histogram1, @@ -162,9 +162,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) // test bin trimming - val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 50, max = 75) + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 50, upperBound = 75) assert(t0 == HistogramBin(lo = 50, hi = 50, ndv = 1) && h0 == 2) - val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 50, max = 75) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 50, upperBound = 75) assert(t1 == HistogramBin(lo = 50, hi = 75, ndv = 20) && h1 == 50) val expectedRanges = Seq( @@ -176,7 +176,7 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(60, 75, 3, 20*15/25, 300, 50*15/25) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 50D, newMax = 75D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 50D, upperBound = 75D))) estimateByHistogram( leftHistogram = histogram1, @@ -197,9 +197,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 100, ndv = 40))) // test bin trimming - val (t0, h0) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 60) + val (t0, h0) = trimBin(histogram2.bins(0), height = 100, lowerBound = 30, upperBound = 60) assert(t0 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h0 == 40) - val (t1, h1) = trimBin(histogram2.bins(1), height = 100, min = 30, max = 60) + val (t1, h1) = trimBin(histogram2.bins(1), height = 100, lowerBound = 30, upperBound = 60) assert(t1 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) val expectedRanges = Seq( @@ -209,7 +209,7 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 60D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 60D))) estimateByHistogram( leftHistogram = histogram1, @@ -228,9 +228,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 100, Array( HistogramBin(lo = 0, hi = 50, ndv = 50), HistogramBin(lo = 50, hi = 50, ndv = 1))) // test bin trimming - val (t0, h0) = trimBin(histogram1.bins(1), height = 300, min = 30, max = 50) + val (t0, h0) = trimBin(histogram1.bins(1), height = 300, lowerBound = 30, upperBound = 50) assert(t0 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h0 == 200) - val (t1, h1) = trimBin(histogram2.bins(0), height = 100, min = 30, max = 50) + val (t1, h1) = trimBin(histogram2.bins(0), height = 100, lowerBound = 30, upperBound = 50) assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40) val expectedRanges = Seq( @@ -239,7 +239,7 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(50, 50, 1, 1, 200/20, 100) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 50D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 50D))) estimateByHistogram( leftHistogram = histogram1, @@ -258,9 +258,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val histogram2 = Histogram(height = 150, Array( HistogramBin(lo = 0, hi = 30, ndv = 30), HistogramBin(lo = 30, hi = 30, ndv = 1))) // test bin trimming - val (t0, h0) = trimBin(histogram1.bins(1), height = 300, min = 30, max = 30) + val (t0, h0) = trimBin(histogram1.bins(1), height = 300, lowerBound = 30, upperBound = 30) assert(t0 == HistogramBin(lo = 30, hi = 30, ndv = 1) && h0 == 10) - val (t1, h1) = trimBin(histogram2.bins(0), height = 150, min = 30, max = 30) + val (t1, h1) = trimBin(histogram2.bins(0), height = 150, lowerBound = 30, upperBound = 30) assert(t1 == HistogramBin(lo = 30, hi = 30, ndv = 1) && h1 == 5) val expectedRanges = Seq( @@ -270,7 +270,7 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(30, 30, 1, 1, 10, 150) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, newMin = 30D, newMax = 30D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 30D))) estimateByHistogram( leftHistogram = histogram1, From 263742914e21ba607904acb0ad35ced32aad48ab Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Sat, 16 Dec 2017 10:52:27 +0800 Subject: [PATCH 4/5] add comment --- .../plans/logical/statsEstimation/JoinEstimation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index f02c93ef42729..1e2f67e36867b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -198,6 +198,9 @@ case class JoinEstimation(join: Join) extends Logging { computeByNdv(leftKey, rightKey, newMin, newMax) } keyStatsAfterJoin += ( + // Histograms are propagated as unchanged. During future estimation, they should be + // truncated by the updated max/min. In this way, only pointers of the histograms are + // propagated and thus reduce memory consumption. leftKey -> joinStat.copy(histogram = leftKeyStat.histogram), rightKey -> joinStat.copy(histogram = rightKeyStat.histogram) ) From 16797d2d02565616cf24e4509e43d3233c7a4714 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Tue, 19 Dec 2017 18:47:59 +0800 Subject: [PATCH 5/5] fix more comments --- .../statsEstimation/EstimationUtils.scala | 42 ++++------ .../statsEstimation/JoinEstimation.scala | 4 +- .../statsEstimation/JoinEstimationSuite.scala | 83 +++++++++---------- 3 files changed, 53 insertions(+), 76 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index da792105df9c9..71e852afe0659 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -236,28 +236,8 @@ object EstimationUtils { // Only collect overlapped ranges. if (left.lo <= right.hi && left.hi >= right.lo) { // Collect overlapped ranges. - val range = if (left.lo == left.hi) { - // Case1: the left bin has only one value - OverlappedRange( - lo = left.lo, - hi = left.lo, - leftNdv = 1, - rightNdv = 1, - leftNumRows = leftHeight, - rightNumRows = rightHeight / right.ndv - ) - } else if (right.lo == right.hi) { - // Case2: the right bin has only one value - OverlappedRange( - lo = right.lo, - hi = right.lo, - leftNdv = 1, - rightNdv = 1, - leftNumRows = leftHeight / left.ndv, - rightNumRows = rightHeight - ) - } else if (right.lo >= left.lo && right.hi >= left.hi) { - // Case3: the left bin is "smaller" than the right bin + val range = if (right.lo >= left.lo && right.hi >= left.hi) { + // Case1: the left bin is "smaller" than the right bin // left.lo right.lo left.hi right.hi // --------+------------------+------------+----------------+-------> if (left.hi == right.lo) { @@ -283,7 +263,7 @@ object EstimationUtils { ) } } else if (right.lo <= left.lo && right.hi <= left.hi) { - // Case4: the left bin is "larger" than the right bin + // Case2: the left bin is "larger" than the right bin // right.lo left.lo right.hi left.hi // --------+------------------+------------+----------------+-------> if (right.hi == left.lo) { @@ -309,7 +289,7 @@ object EstimationUtils { ) } } else if (right.lo >= left.lo && right.hi <= left.hi) { - // Case5: the left bin contains the right bin + // Case3: the left bin contains the right bin // left.lo right.lo right.hi left.hi // --------+------------------+------------+----------------+-------> val leftRatio = (right.hi - right.lo) / (left.hi - left.lo) @@ -323,7 +303,7 @@ object EstimationUtils { ) } else { assert(right.lo <= left.lo && right.hi >= left.hi) - // Case6: the right bin contains the left bin + // Case4: the right bin contains the left bin // right.lo left.lo left.hi right.hi // --------+------------------+------------+----------------+-------> val rightRatio = (left.hi - left.lo) / (right.hi - right.lo) @@ -346,6 +326,11 @@ object EstimationUtils { /** * Given an original bin and a value range [lowerBound, upperBound], returns the trimmed part * of the bin in that range and its number of rows. + * @param bin the input histogram bin. + * @param height the number of rows of the given histogram bin inside an equi-height histogram. + * @param lowerBound lower bound of the given range. + * @param upperBound upper bound of the given range. + * @return trimmed part of the given bin and its number of rows. */ def trimBin(bin: HistogramBin, height: Double, lowerBound: Double, upperBound: Double) : (HistogramBin, Double) = { @@ -364,14 +349,15 @@ object EstimationUtils { } else { // lowerBound bin.lo bin.hi upperBound // --------+------------------+------------+-------------+-------> + assert(bin.lo >= lowerBound && bin.hi <= upperBound) (bin.lo, bin.hi) } - if (bin.hi == bin.lo) { - (bin, height) - } else if (hi == lo) { + if (hi == lo) { + // Note that bin.hi == bin.lo also falls into this branch. (HistogramBin(lo, hi, 1), height / bin.ndv) } else { + assert(bin.hi != bin.lo) val ratio = (hi - lo) / (bin.hi - bin.lo) (HistogramBin(lo, hi, math.ceil(bin.ndv * ratio).toLong), height * ratio) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala index 1e2f67e36867b..f0294a4246703 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala @@ -193,7 +193,7 @@ case class JoinEstimation(join: Join) extends Logging { val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType) val (card, joinStat) = (leftKeyStat.histogram, rightKeyStat.histogram) match { case (Some(l: Histogram), Some(r: Histogram)) => - computeByEquiHeightHistogram(leftKey, rightKey, l, r, newMin, newMax) + computeByHistogram(leftKey, rightKey, l, r, newMin, newMax) case _ => computeByNdv(leftKey, rightKey, newMin, newMax) } @@ -237,7 +237,7 @@ case class JoinEstimation(join: Join) extends Logging { } /** Compute join cardinality using equi-height histograms. */ - private def computeByEquiHeightHistogram( + private def computeByHistogram( leftKey: AttributeReference, rightKey: AttributeReference, leftHistogram: Histogram, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala index 7fe2376663633..26139d85d25fb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala @@ -136,24 +136,22 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val expectedRanges = Seq( // histogram1.bins(0) overlaps t0 - OverlappedRange(10, 30, 10, 40*1/2, 300, 80*1/2), + OverlappedRange(10, 30, 10, 40 * 1 / 2, 300, 80 * 1 / 2), // histogram1.bins(1) overlaps t0 - OverlappedRange(30, 50, 30*2/3, 40*1/2, 300*2/3, 80*1/2), + OverlappedRange(30, 50, 30 * 2 / 3, 40 * 1 / 2, 300 * 2 / 3, 80 * 1 / 2), // histogram1.bins(1) overlaps t1 - OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) + OverlappedRange(50, 60, 30 * 1 / 3, 8, 300 * 1 / 3, 20) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, lowerBound = 10D, upperBound = 60D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 10, upperBound = 60))) estimateByHistogram( leftHistogram = histogram1, rightHistogram = histogram2, - expectedMin = 10D, - expectedMax = 60D, - // 10 + 20 + 8 - expectedNdv = 38L, - // 300*40/20 + 200*40/20 + 100*20/10 - expectedRows = 1200L) + expectedMin = 10, + expectedMax = 60, + expectedNdv = 10 + 20 + 8, + expectedRows = 300 * 40 / 20 + 200 * 40 / 20 + 100 * 20 / 10) } test("equi-height histograms: a bin has only one value after trimming") { @@ -169,24 +167,22 @@ class JoinEstimationSuite extends StatsEstimationTestBase { val expectedRanges = Seq( // histogram1.bins(0) overlaps t0 - OverlappedRange(50, 50, 1, 1, 300/10, 2), + OverlappedRange(50, 50, 1, 1, 300 / 10, 2), // histogram1.bins(0) overlaps t1 - OverlappedRange(50, 60, 10, 20*10/25, 300, 50*10/25), + OverlappedRange(50, 60, 10, 20 * 10 / 25, 300, 50 * 10 / 25), // histogram1.bins(1) overlaps t1 - OverlappedRange(60, 75, 3, 20*15/25, 300, 50*15/25) + OverlappedRange(60, 75, 3, 20 * 15 / 25, 300, 50 * 15 / 25) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, lowerBound = 50D, upperBound = 75D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 50, upperBound = 75))) estimateByHistogram( leftHistogram = histogram1, rightHistogram = histogram2, - expectedMin = 50D, - expectedMax = 75D, - // 1 + 8 + 3 - expectedNdv = 12L, - // 30*2/1 + 300*20/10 + 300*30/12 - expectedRows = 1410L) + expectedMin = 50, + expectedMax = 75, + expectedNdv = 1 + 8 + 3, + expectedRows = 30 * 2 / 1 + 300 * 20 / 10 + 300 * 30 / 12) } test("equi-height histograms: skew distribution (some bins have only one value)") { @@ -203,23 +199,21 @@ class JoinEstimationSuite extends StatsEstimationTestBase { assert(t1 ==HistogramBin(lo = 50, hi = 60, ndv = 8) && h1 == 20) val expectedRanges = Seq( - OverlappedRange(30, 30, 1, 1, 300, 40/20), - OverlappedRange(30, 30, 1, 1, 300, 40/20), - OverlappedRange(30, 50, 30*2/3, 20, 300*2/3, 40), - OverlappedRange(50, 60, 30*1/3, 8, 300*1/3, 20) + OverlappedRange(30, 30, 1, 1, 300, 40 / 20), + OverlappedRange(30, 30, 1, 1, 300, 40 / 20), + OverlappedRange(30, 50, 30 * 2 / 3, 20, 300 * 2 / 3, 40), + OverlappedRange(50, 60, 30 * 1 / 3, 8, 300 * 1 / 3, 20) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 60D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30, upperBound = 60))) estimateByHistogram( leftHistogram = histogram1, rightHistogram = histogram2, - expectedMin = 30D, - expectedMax = 60D, - // 1 + 20 + 8 - expectedNdv = 29L, - // 300*2/1 + 300*2/1 + 200*40/20 + 100*20/10 - expectedRows = 1800L) + expectedMin = 30, + expectedMax = 60, + expectedNdv = 1 + 20 + 8, + expectedRows = 300 * 2 / 1 + 300 * 2 / 1 + 200 * 40 / 20 + 100 * 20 / 10) } test("equi-height histograms: skew distribution (histograms have different skewed values") { @@ -234,22 +228,20 @@ class JoinEstimationSuite extends StatsEstimationTestBase { assert(t1 == HistogramBin(lo = 30, hi = 50, ndv = 20) && h1 == 40) val expectedRanges = Seq( - OverlappedRange(30, 30, 1, 1, 300, 40/20), + OverlappedRange(30, 30, 1, 1, 300, 40 / 20), OverlappedRange(30, 50, 20, 20, 200, 40), - OverlappedRange(50, 50, 1, 1, 200/20, 100) + OverlappedRange(50, 50, 1, 1, 200 / 20, 100) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 50D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30, upperBound = 50))) estimateByHistogram( leftHistogram = histogram1, rightHistogram = histogram2, - expectedMin = 30D, - expectedMax = 50D, - // 1 + 20 - expectedNdv = 21L, - // 300*2/1 + 200*40/20 + 10*100/1 - expectedRows = 2000L) + expectedMin = 30, + expectedMax = 50, + expectedNdv = 1 + 20, + expectedRows = 300 * 2 / 1 + 200 * 40 / 20 + 10 * 100 / 1) } test("equi-height histograms: skew distribution (both histograms have the same skewed value") { @@ -270,17 +262,16 @@ class JoinEstimationSuite extends StatsEstimationTestBase { OverlappedRange(30, 30, 1, 1, 10, 150) ) assert(expectedRanges.equals( - getOverlappedRanges(histogram1, histogram2, lowerBound = 30D, upperBound = 30D))) + getOverlappedRanges(histogram1, histogram2, lowerBound = 30, upperBound = 30))) estimateByHistogram( leftHistogram = histogram1, rightHistogram = histogram2, - expectedMin = 30D, - expectedMax = 30D, + expectedMin = 30, + expectedMax = 30, // only one value: 30 - expectedNdv = 1L, - // 300*5/1 + 300*150/1 + 10*5/1 + 10*150/1 - expectedRows = 48050L) + expectedNdv = 1, + expectedRows = 300 * 5 / 1 + 300 * 150 / 1 + 10 * 5 / 1 + 10 * 150 / 1) } test("cross join") {