Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21979][SQL]Improve QueryPlanConstraints framework #19201

Closed

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Improve QueryPlanConstraints framework, make it robust and simple.
In #15319, constraints for expressions like a = f(b, c) is resolved.
However, for expressions like

a = f(b, c) && c = g(a, b)

The current QueryPlanConstraints framework will produce non-converging constraints.
Essentially, the problem is caused by having both the name and child of aliases in the same constraint set. We infer constraints, and push down constraints as predicates in filters, later on these predicates are propagated as constraints, etc..
Simply using the alias names only can resolve these problems. The size of constraints is reduced without losing any information. We can always get these inferred constraints on child of aliases when pushing down filters.

Also, the EqualNullSafe between name and child in propagating alias is meaningless

allConstraints += EqualNullSafe(e, a.toAttribute)

It just produces redundant constraints.

How was this patch tested?

Unit test

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81668 has finished for PR 19201 at commit 036e846.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81671 has finished for PR 19201 at commit 7b414fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def replaceConstraints(
constraints: Set[Expression],
source: Expression,
destination: Attribute): Set[Expression] = constraints.map(_ transform {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile
Copy link
Member

LGTM except a minor comment.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except 2 minor comments

case _ => // Skip
private def eliminateAliasedExpressionInConstraints(constraints: Set[Expression])
: Set[Expression] = {
val attributesInEqualTo = constraints.flatMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it a set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a set :)

* to an selected attribute.
* Replace the aliased expression in [[Alias]] with the alias name if both exist in constraints.
* Thus non-converging inference can be prevented.
* E.g. `a = f(a, b)`, `a = f(b, c) && c = g(a, b)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example doesn't even have an alias...

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81689 has finished for PR 19201 at commit d456876.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

Thanks! Merged to master.

@asfgit asfgit closed this in 1a98574 Sep 12, 2017
asfgit pushed a commit that referenced this pull request Jan 5, 2018
…onstraints

## What changes were proposed in this pull request?

#19201 introduced the following regression: given something like `df.withColumn("c", lit(2))`, we're no longer picking up `c === 2` as a constraint and infer filters from it when joins are involved, which may lead to noticeable performance degradation.

This patch re-enables this optimization by picking up Aliases of Literals in Projection lists as constraints and making sure they're not treated as aliased columns.

## How was this patch tested?

Unit test was added.

Author: Adrian Ionescu <adrian@databricks.com>

Closes #20155 from adrian-ionescu/constant_constraints.

(cherry picked from commit 51c33bd)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
asfgit pushed a commit that referenced this pull request Jan 5, 2018
…onstraints

## What changes were proposed in this pull request?

#19201 introduced the following regression: given something like `df.withColumn("c", lit(2))`, we're no longer picking up `c === 2` as a constraint and infer filters from it when joins are involved, which may lead to noticeable performance degradation.

This patch re-enables this optimization by picking up Aliases of Literals in Projection lists as constraints and making sure they're not treated as aliased columns.

## How was this patch tested?

Unit test was added.

Author: Adrian Ionescu <adrian@databricks.com>

Closes #20155 from adrian-ionescu/constant_constraints.
ghost pushed a commit to dbtsai/spark that referenced this pull request Jan 17, 2018
## What changes were proposed in this pull request?

Previously, PR apache#19201 fix the problem of non-converging constraints.
After that PR apache#19149 improve the loop and constraints is inferred only once.
So the problem of non-converging constraints is gone.

However, the case below will fail.

```

spark.range(5).write.saveAsTable("t")
val t = spark.read.table("t")
val left = t.withColumn("xid", $"id" + lit(1)).as("x")
val right = t.withColumnRenamed("id", "xid").as("y")
val df = left.join(right, "xid").filter("id = 3").toDF()
checkAnswer(df, Row(4, 3))

```

Because `aliasMap` replace all the aliased child. See the test case in PR for details.

This PR is to fix this bug by removing useless code for preventing non-converging constraints.
It can be also fixed with apache#20270, but this is much simpler and clean up the code.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes apache#20278 from gengliangwang/FixConstraintSimple.
asfgit pushed a commit that referenced this pull request Jan 17, 2018
## What changes were proposed in this pull request?

Previously, PR #19201 fix the problem of non-converging constraints.
After that PR #19149 improve the loop and constraints is inferred only once.
So the problem of non-converging constraints is gone.

However, the case below will fail.

```

spark.range(5).write.saveAsTable("t")
val t = spark.read.table("t")
val left = t.withColumn("xid", $"id" + lit(1)).as("x")
val right = t.withColumnRenamed("id", "xid").as("y")
val df = left.join(right, "xid").filter("id = 3").toDF()
checkAnswer(df, Row(4, 3))

```

Because `aliasMap` replace all the aliased child. See the test case in PR for details.

This PR is to fix this bug by removing useless code for preventing non-converging constraints.
It can be also fixed with #20270, but this is much simpler and clean up the code.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #20278 from gengliangwang/FixConstraintSimple.

(cherry picked from commit 8598a98)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
asfgit pushed a commit that referenced this pull request Sep 9, 2018
## What changes were proposed in this pull request?
How to reproduce:
```scala
val df1 = spark.createDataFrame(Seq(
   (1, 1)
)).toDF("a", "b").withColumn("c", lit(null).cast("int"))
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull)
df2.show

+---+---+----+---+
|  a|  b|   c|  d|
+---+---+----+---+
|  1|  1|null|  0|
|  1|  1|null|  1|
+---+---+----+---+
```
`filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before #19201, but it is transformed to `(c#10 = null)` since #20155. This pr revert it to `(null <=> c#10)` to fix this issue.

## How was this patch tested?

unit tests

Closes #22368 from wangyum/SPARK-25368.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit 77c9964)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
asfgit pushed a commit that referenced this pull request Sep 9, 2018
How to reproduce:
```scala
val df1 = spark.createDataFrame(Seq(
   (1, 1)
)).toDF("a", "b").withColumn("c", lit(null).cast("int"))
val df2 = df1.union(df1).withColumn("d", spark_partition_id).filter($"c".isNotNull)
df2.show

+---+---+----+---+
|  a|  b|   c|  d|
+---+---+----+---+
|  1|  1|null|  0|
|  1|  1|null|  1|
+---+---+----+---+
```
`filter($"c".isNotNull)` was transformed to `(null <=> c#10)` before #19201, but it is transformed to `(c#10 = null)` since #20155. This pr revert it to `(null <=> c#10)` to fix this issue.

unit tests

Closes #22368 from wangyum/SPARK-25368.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
(cherry picked from commit 77c9964)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants