Skip to content

Conversation

@wzhfy
Copy link
Contributor

@wzhfy wzhfy commented Dec 9, 2016

What changes were proposed in this pull request?

Support cardinality estimation and stats propagation for all join types.

Limitations:

  • For inner/outer joins without any equal condition, we estimate it like cartesian product.
  • For left semi/anti joins, since we can't apply the heuristics for inner join to it, for now we just propagate the statistics from left side. We should support them when other advanced stats (e.g. histograms) are available in spark.

How was this patch tested?

Add a new test suite.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 9, 2016

cc @rxin @srinathshankar @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69907 has started for PR 16228 at commit f0bbb43.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 9, 2016

I still leave two issues undecided:

  1. Where can we turn on/off cbo estimation? I think we need to have such switch, because otherwise we will use statistics as long as they are in the metastore, but they can become stale.
  2. Currently we use column name as the key for column statistics, which is problematic because if the output of join have columns from different tables with the same column name, they can't be distinguished. Can we use a combination string like table name + column name?

@Tagar
Copy link

Tagar commented Dec 9, 2016

  1. That is great. Would it be easier to use FK, when it is available (see HMS has FKs since Hive 2.1: https://issues.apache.org/jira/browse/HIVE-13076), and if FK between columns is not defined, then use stats.

Also, do I understand correctly, that the assumption is if two tables being joined by columns with the same name, join columns have the same stats / set of values?

* formula:
* T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
* However, the denominator can become very large and excessively reduce the result, so we use a
* conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
Copy link
Contributor Author

@wzhfy wzhfy Dec 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, Hive uses an exponential decay to compute the denominator when number of join keys > number of join tables, i.e. ndv1 * ndv2^(1/2) * ndv3^(1/4)... I just use a more conservative strategy by max(ndv1, ndv2, ...). I'm not sure which one is better. Do you know any theoretical or empirical support for hive's strategy? @rxin @srinathshankar

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They probably estimate for the number of distinct values in a vector of columns using uniform distribution.

@SparkQA
Copy link

SparkQA commented Dec 9, 2016

Test build #69911 has started for PR 16228 at commit 64603b5.

joinType match {
case LeftAnti | LeftSemi =>
// LeftSemi and LeftAnti won't ever be bigger than left
left.statistics.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need copy here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statistics is immutable, I think it's safe without copy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

rightKeys: Seq[Expression]): Seq[(AttributeReference, AttributeReference)] = {
leftKeys.zip(rightKeys).flatMap {
case (ExtractAttr(left), ExtractAttr(right)) => Some((left, right))
// Currently we don't deal with equal joins like key1 = key2 + 5.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not ready for expressions, I think we should not handle Cast either, as it may be tricky to handle overflow correctly(e.g. cast long to int)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I'm a little worried about losing estimation opportunities because of this rare case (in my understanding this kind of downgrading cast is not common)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define a new parent class for such downgrading cast? Will this change be big?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add a new expression just for the current implementation limitations. BTW, only handle Cast may also lose a lot of estimation opportunities, we should support expression before we release this feature.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 10, 2016

@Tagar Thanks for sharing this information. Yes, it would be better to use PK/FK, but it won't be done in this pr, and we need to implement PK/FK constraints in Spark first.

the assumption is if two tables being joined by columns with the same name, join columns have the same stats / set of values?

It is true for inner join, but not true for outer joins, right?

@Tagar
Copy link

Tagar commented Dec 11, 2016

@wzhfy, thanks for the feedback.
For outer joins, cardinaility estimates should be:

  • left_outer_join_cardinality(table_A, table_B) = MAX(cardinality(A), inner_join_cardinality(table_A, table_B))
    So with left outer join, you'll get join cardinality at least number of the rows on the left (table A);
  • right outer join is similar to left outer join (with exception s/A/B/g);
  • full_outer_join_cardinality(table_A, table_B) = cardinality(A) + cardinality(B) - inner_join_cardinality(table_A, table_B)).
    Does this sound about right?

* T(A IJ B) = T(A) * T(B) / (max(V(A.k1), V(B.k1)) * max(V(A.k2), V(B.k2)) * ... * max(V(A.kn), V(B.kn)))
* However, the denominator can become very large and excessively reduce the result, so we use a
* conservative strategy to take only the largest max(V(A.ki), V(B.ki)) as the denominator.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also include a short description of how column stats are computed after the join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, of course

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 15, 2016

@Tagar For full outer join, how about cardinality = MAX(card(A) + card(B), innerCard(AB)) ?

isBroadcastable = false))

case _ => None
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a design comment. Given a join, this function computes predicate selectivity, plan cardinality, and column statistics. I wonder if it would make sense to encapsulate predicate selectivity computation in its own function i.e. selectivity is a property of a predicate and cardinality (i.e. number of rows) is a property of the data stream. Also, there might be different ways to compute selectivity of a predicate (e.g. uniform vs non uniform distribution) and therefore it might make sense to separate the computation of the two properties. Then, maybe in the future, selectivity hint might be used to overwrite the default CBO selectivity computation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice. We will consider to have such function when we check in the code for Filter estimation, then we can look at this more comprehensively. I think the pr will be submitted soon.

@Tagar
Copy link

Tagar commented Dec 15, 2016

@wzhfy, it's easier to check validity of these type of expressions when you look at extreme cases.
Your formula for full outer join cardinality,

cardinality = MAX(card(A) + card(B), innerCard(AB))

in one of extreme cases when set(A) and set(B) are the same sets, then calculated cardinality would be 2 times more of the actual cardinality.

While

full_outer_join_cardinality(table_A, table_B) = cardinality(A) + cardinality(B) - inner_join_cardinality(table_A, table_B))

will produce correct result.

ps. I find this visualization http://www.radacad.com/wp-content/uploads/2015/07/joins.jpg very helpful.
https://en.wikipedia.org/wiki/Inclusion%E2%80%93exclusion_principle A U B = A + B - A \ B

Hope this helps. Thanks!

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 15, 2016

@Tagar We can always find extreme cases to which these formula can't apply. In my opinion, it's better to over-estimate than under-estimate, which can lead to OOM problems, e.g. broadcast a very large result.

If A is a big table and B is a small one, every A.k has a match in B (a common case for PK and FK), then

cardinality(A) + cardinality(B) - inner_join_cardinality(table_A, table_B))

becomes card(B), which is dramatically smaller than the real outer join card. Even more, it can be negative if all A.k and B.k has the same value, the inner join part becomes a cartesian product.

This formula,

cardinality = MAX(card(A) + card(B), innerCard(AB))

although over estimates sometimes, it's still obviously better than the original one in spark: card(A) * card(B).

@Tagar
Copy link

Tagar commented Dec 15, 2016

@wzhfy, I think overestimating cardinality could be as bad as underestimating.
For example, Optimizer could prematurely switch to SortMergeJoin when it could used broadcast hash join.
But I agree, this PR is a great improvement over current cardinality estimates.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 17, 2016

To solve the two issues I mentioned above, a separate PR is sent here.
We need to rebase this one after that one is resolved.

@wzhfy wzhfy changed the title [WIP] [SPARK-17076] [SQL] Cardinality estimation for join based on basic column statistics [SPARK-17076] [SQL] Cardinality estimation for join based on basic column statistics Dec 23, 2016
@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70533 has finished for PR 16228 at commit c3e3a48.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class LeftSemiAntiEstimation(join: Join)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tagar I take your advice about full outer join, but with a little change by lower bounding it using innerRows.

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70544 has finished for PR 16228 at commit 2c9d6c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 26, 2016

Test build #70589 has finished for PR 16228 at commit de63b59.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 29, 2016

Test build #70708 has started for PR 16228 at commit df839f8.

@wzhfy
Copy link
Contributor Author

wzhfy commented Dec 29, 2016

retest this please

@SparkQA
Copy link

SparkQA commented Dec 29, 2016

Test build #70717 has finished for PR 16228 at commit df839f8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 13, 2017

Test build #71307 has finished for PR 16228 at commit ffb9eee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wzhfy
Copy link
Contributor Author

wzhfy commented Jan 13, 2017

This pr is rebased and ready for review. @rxin

// 2. Estimate the number of output rows
val leftRows = leftStats.rowCount.get
val rightRows = rightStats.rowCount.get
val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: joinedRows

}

// 3. Update statistics based on the output of join
val intersectedStats = if (selectivity == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between selectivity == 0 and outputRows == 0? does it only matter for outer joins?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for outer joins, if selectivity is 0, then the number of output rows is same as the number of left/right side rows. And the column stats should also be same as the left/right side columns, while the other side columns are all null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's name it joinKeyStats

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good point, thanks

leftKeys: Seq[Expression],
rightKeys: Seq[Expression]): Seq[(AttributeReference, AttributeReference)] = {
leftKeys.zip(rightKeys).flatMap {
case (lk: AttributeReference, rk: AttributeReference) => Some((lk, rk))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can check column stats existence here, so that we don't need to do columnStatsExist((leftStats, leftKey), (rightStats, rightKey)) again and again later.

protected val conf = SimpleCatalystConf(caseSensitiveAnalysis = true, cboEnabled = true)

def getColSize(attribute: Attribute, colStat: ColumnStat): Long = attribute.dataType match {
case StringType => colStat.avgLen + 8 + 4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain the + 8 + 4?

def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match {
case StringType | BinaryType => new DefaultRange()
case _ if min.isEmpty || max.isEmpty => new NullRange()
case _ => toNumericRange(min.get, max.get, dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work for empty column stats you defined in https://github.com/apache/spark/pull/16228/files#diff-6387e7aaeb7d8e0cb1457b9d0fe5cd00R270

that's why I was worried about the empty stats, it breaks some assumptions, like numeric type stats must have max/min.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, let's remove empty column stats. When we know rowCount=0, we can derive the column stats is empty, we don't need to keep it. what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM


/** Set up tables and its columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
attr("key11") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), nullCount = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about key-1-5, key-5-9, etc.? then we can know the key value range directly from the name.

// table2 (key21 int, key22 int): (1, 2), (2, 3), (2, 4)
// key12 and key22 are disjoint
val join = Join(table1, table2, Inner, Some(
And(EqualTo(nameToAttr("key11"), nameToAttr("key21")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually we can just use key12 = key22, so that it's more different from the test inner join with multiple equi-join keys

@wzhfy
Copy link
Contributor Author

wzhfy commented Feb 14, 2017

This pr is updated, please review @cloud-fan

val minNdv = leftKeyStats.distinctCount.min(rightKeyStats.distinctCount)
val (newMin1, newMax1, newMin2, newMax2) =
Range.intersect(lRange, rRange, leftKey.dataType, rightKey.dataType)
intersectedStats.put(leftKey, intersectedColumnStat(leftKeyStats, minNdv,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logically the join keys should have same column stats, we can write it more explicitly

assert(leftKey.dataType.sameType(rightKey.dataType))
val stats = ColumnStats(minNdv, newMin, newMax, nullCount = 0) // and some more logic to update the avg/max length.
intersectedStats.put(leftKey, stats)
intersectedStats.put(rightKey, stats)

r1: Range,
r2: Range,
dt1: DataType,
dt2: DataType): (Option[Any], Option[Any], Option[Any], Option[Any]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can simplify this, we will only calculate intersection for same-type ranges.

updateAttrStats(outputRows, fromLeft, inputAttrStats, joinKeyStats) ++
fromRight.map(a => (a, inputAttrStats(a)))
case FullOuter =>
attributesWithStat.map(a => (a, inputAttrStats(a)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just inputAttrStats right?


/**
* Propagate or update column stats for output attributes.
* 1. For empty output, we don't need to keep any column stats.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we hit this method, the outputRows will never be 0 right?

if (joinKeyStats.contains(a)) {
outputAttrStats += a -> joinKeyStats(a)
} else {
val oldCS = oldAttrStats(a)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: oldColumnStats

val inputAttrStats = AttributeMap(
leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
// Propagate the original column stats
val outputAttrStats = getOutputMap(inputAttrStats, join.output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it just inputsAttrStats?

if (rowCountsExist(conf, join.left)) {
val leftStats = join.left.stats(conf)
// Propagate the original column stats for cartesian product
val outputAttrStats = getOutputMap(leftStats.attributeStats, join.output)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it just leftStats.attributeStats?

)

/** Columns in a table with two rows */
val columnInfo2 = mutable.LinkedHashMap[Attribute, ColumnStat](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these totally same with the columnInfo1? may we can create a method to do this

@cloud-fan
Copy link
Contributor

LGTM except some minor comments, thanks for working on it!

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72903 has finished for PR 16228 at commit e8930d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72916 has finished for PR 16228 at commit 8e2d5ae.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2017

Test build #72917 has finished for PR 16228 at commit 8182123.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 601b9c3 Feb 15, 2017
cmonkey pushed a commit to cmonkey/spark that referenced this pull request Feb 16, 2017
…umn statistics

## What changes were proposed in this pull request?

Support cardinality estimation and stats propagation for all join types.

Limitations:
- For inner/outer joins without any equal condition, we estimate it like cartesian product.
- For left semi/anti joins, since we can't apply the heuristics for inner join to it, for now we just propagate the statistics from left side. We should support them when other advanced stats (e.g. histograms) are available in spark.

## How was this patch tested?

Add a new test suite.

Author: Zhenhua Wang <wzh_zju@163.com>
Author: wangzhenhua <wangzhenhua@huawei.com>

Closes apache#16228 from wzhfy/joinEstimate.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants