Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:

val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result

The root cause is, Dataset.apply is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. Dataset.apply returns an AttributeReference . Different Datasets may share the same AttributeReference. In the example above, df2 adds a Filter operator above the logical plan of df1, and the Filter operator reserves the output AttributeReference of its child. This means, df1("a") is exactly the same as df2("a"), and df1("a") > df2("a") always evaluates to false.

The key problem here is, when analyzer resolves self-join by assigning new expr IDs to the conflicting attributes in the right side of the join, how can we map the Dataset column reference to the new attributes of the right side?

The proposal here is:

  1. each Dataset has a globally unique ID
  2. the AttributeReference returned by Dataset.apply carries the ID and column position(e.g. 3rd column of the Dataset) via metadata
  3. In Dataset.join, the left and right side of the join node carry the ID via SubqueryAlias.
  4. Add a new analyzer rule, which finds the special AttributeReferences that have dataset id, and traverses the logical plan to find the corresponding SubqueryAlias with the same dataset id, to get the actual AttributeReference.

When analyzer resolves self-join, it transforms down the right side plan to find nodes like MultiInstanceRelation to generate attributes with new exprId. If the right side is a SubqueryAlias, its output will be the deduplicated AttributeReferences after self-join is resolved.

When we resolve dataset column reference(AttributeReference with dataset id and col position), and find a matching SubqueryAlias with the same dataset id, we can replace the column reference with the corresponding AttributeReference of SubqueryAlias.

How was this patch tested?

new test cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop should not match attribute by equality. We should use semanticEquals.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Apr 23, 2019

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104832 has finished for PR 24442 at commit 817b955.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ResolveDatasetColumnReference(conf: SQLConf) extends Rule[LogicalPlan]

@cloud-fan cloud-fan force-pushed the self-join branch 2 times, most recently from fde7131 to 1539fa8 Compare April 23, 2019 14:20
Copy link
Contributor

@mgaido91 mgaido91 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for starting this and pinging me @cloud-fan. Please may you also add a Close #... for the PRs attempting to fix this? I am pretty sure there is one open by me for instance. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am missing something. Sorry if the question is dumb, but I can't get why do we need to add this "special" node for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to attach the dataset id to the logical plan, so that we can resolve column reference later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather add the datasetId info to the LogicalPlan and avoid introducing a new plan here. I think it would be easier in this way to generalize this approach to other cases when the same problem may arise. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we add dataset id to every kind of LogicalPlan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that could be an option. And for the moment we could add it only to the child/children of a join since we only need it there. But I see there is no guarantee that the plan(s) are not replaces/removed during the analysis/optimization phase, so it may not be doable indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this column prefix? We use it only to avoid a scan, but we get it with a scan using indexOf, so I think we can get rid of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we resolve column reference, we have no idea what's the corresponding column even if we know the corresponding dataframe. That's why we need the col position info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of using special strings as markers, I'd rather introduce a new attribute to carry on the information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's at plan level, we can't use attributes to carry information here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant adding a new attribute to the SubqueryPlan case class, why isn't it possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but I'm a little worried about changing the constructor of a widely used plan node like SubqueryAlais. It's also a little weird to put the dataset concept into the catalyst module.

Another idea I've thought about is adding a new no-op plan node to carry the dataset id. I gave it up because no-op node can be troublesome according to the past experience with ResolvedHint and AnalysisBarrier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea I've thought about is adding a new no-op plan node to carry the dataset id

Yes, that's exactly the reason why I'd like to avoid adding this node. I think I'd prefer a no-op node if this is really needed actually, since this is used also in other places and we may introduce side-effects using it in a different way from what it is intended to do now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I was thinking: in order to avoid adding another "placeholder" plan, since here we are dealing only with joins, what about adding a leftDatasetId and rightDatasetId to the join operator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works, but I'd rather hack into SubqueryAlias instead of Join, as Join node is more widely used...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and things can get tricky if join reorder happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and things can get tricky if join reorder happens.

yes, I also thought of this, but still I am not very confident about using SubqueryAlias: that node is meant for a different purpose and I think ew already have things used in a hacky way and this makes more error prone any future change, because we need to think not only to the logic but also we need to remember all the "hacks" done with it. Honestly rather than using a SubqueryAlias, if no other option works, I'd prefer adding a new node (maybe a sublass of it if we need common functionalities).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, SubqueryAlias might be safer than new node. Because it is well known to Spark SQL community. A new node which isn't aware by others is more dangerous.

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104835 has finished for PR 24442 at commit 29ae5fb.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ResolveDatasetColumnReference(conf: SQLConf) extends Rule[LogicalPlan]

@SparkQA
Copy link

SparkQA commented Apr 23, 2019

Test build #104834 has finished for PR 24442 at commit 1539fa8.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ResolveDatasetColumnReference(conf: SQLConf) extends Rule[LogicalPlan]

@dongjoon-hyun
Copy link
Member

Oh, @cloud-fan . SparkR failure seems to be consistent and might be relevant.

1. Error: test multi-dimensional aggregations with cube and rollup (@test_sparkSQL.R#2132) 
error in evaluating the argument 'x' in selecting a method for function 'collect': 
  error in evaluating the argument 'x' in selecting a method for function 'orderBy': 
  org.apache.spark.sql.AnalysisException: Columns of grouping_id (year#28476,department#28478) does not match grouping columns 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can a column reference point to more than one actual columns? A column reference is determined by dataset id and column position. It can't have more than one column positions or dataset ids.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. df1.join(df1).select(df1("id"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might point to multiple columns, but those columns are the same column. For example, df1("id") points to columns in both left and right plans, but the pointed columns are the same. Can a column reference point to different columns? If not, this can work without ambiguousColRefs, and the code can be simplified?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about outer joins. Columns from different join sides are always different columns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Could we add some tests for nested columns ? In case of nested columns, do we need to carry more metadata ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem only occurs for AttributeReference, which top level columns. I think nested column is totally unrelated here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ah.. ok. Thanks..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add SPARK-27547 at the test case name?

@SparkQA
Copy link

SparkQA commented Apr 24, 2019

Test build #104856 has finished for PR 24442 at commit f2f102a.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Nit: Could this check be done by the caller once for the join or do you anticipate more callers to this function in the future other than prepareJoinPlan ?

@mgaido91
Copy link
Contributor

sorry @cloud-fan , just to have a better understanding of your solution, may you please explain me why this is a better approach than the one in #21449?
Moreover, if this is a better approach than that, why don't we remove the hack there for self-joins?

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Apr 24, 2019

The basic idea is the same: assign a globally unique id to dataset, and carry the dataset id in the column reference(the AttributeReference returned by Dataset.col).

This PR does one more thing: carry the dataset id in the logical plan of dataset in case of self-join. This makes the solution more powerful. #21449 can only resolve column reference with the current datasets, e.g. df1.join(df2, cond), while this PR supports more general cases like df1.join(df2, cond).filter(...).select(df1("id")).

However, the hack in Dataset.join still has its value. For equal condition, we can resolve the column reference even if it's ambiguous, e.g. df1.join(df1, df1("id") === df1("id")). df1("id") is actually ambigupus, but it doesn't matter here as equal condition is symmetrical.

BTW, I think we can extend the solution in this PR with more complex ambiguity resolving logic, so that we can remove the hack in Dataset.join. This can be done in followup.

@mgaido91
Copy link
Contributor

thanks @cloud-fan, I didn't think of that case!

@SparkQA
Copy link

SparkQA commented Apr 24, 2019

Test build #104861 has finished for PR 24442 at commit f2f102a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 25, 2019

Test build #104906 has finished for PR 24442 at commit b24c6bb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

@cloud-fan The changes look good to me.

Copy link
Member

@viirya viirya Apr 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As indicated by the change in Analyzer, after this two attributes we considered the same, now are different if we compare it without canonicalization.

Not sure if it will be an issue. But it is counterintuitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally, I think we should never compare 2 attributes by equal not semanticEqual.

Externally, if users rely on the equality of Column, yes this PR may break it under some cases. If this matters, I can add a migration guide or update Column.equal.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that metadata is not well aware. It might be confusing to some people that the comparison fails if they know it works before. An update to migration guide or Column.equal both sounds good.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check this between line 93 and 94? For the wrong ref, it seems that we don't need to keep it in ambiguousColRefs. The bug can hide due to filterNot(ambiguousColRefs.contains) without showing this warning.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Apr 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is much better than the current status.

One side effect is that we start to distinguish the df2's rows in UNIONed dataframes like the following, too. Should we allow this behavior?

scala> val df1 = spark.range(3)
scala> val df2 = df1.filter($"id" > 0)
scala> df1.union(df2).join(df1, df1("id") > df2("id")).show
+---+---+
| id| id|
+---+---+
|  0|  1|
|  0|  2|
|  1|  2|
|  1|  2|
+---+---+

Copy link
Contributor Author

@cloud-fan cloud-fan Apr 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I don't really know what's the expected result here. df1("id") should point to the right side df1 of the join, and seems it's OK if df2(id) points to the unioned df. Before this PR this query returns no result, which is confusing as well.

If we change Union.output and generate new attributes for it, then this query throws analysis exception, which seems more reasonable.

@SparkQA
Copy link

SparkQA commented Apr 30, 2019

Test build #105029 has finished for PR 24442 at commit f761892.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Could you fix the UT failure please?

[info] HiveDataFrameJoinSuite:
06:26:55.141 WARN org.apache.spark.sql.execution.analysis.ResolveDatasetColumnReference: [BUG] Hit an invalid Dataset column reference: ColumnReference(5462,-1)
[info] - join - self join auto resolve ambiguity with case insensitivity *** FAILED *** (19 milliseconds)
[info]   java.lang.IndexOutOfBoundsException: -1

@SparkQA
Copy link

SparkQA commented May 6, 2019

Test build #105142 has finished for PR 24442 at commit 7118425.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-27547][SQL] fix DataFrame self-join problems [SPARK-27547][SQL] Fix DataFrame self-join problems May 7, 2019
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan , @gatorsmile , @rxin .

Could you split this Column change into another JIRA issue?

This looks required but orthogonal. In addition, although this PR provides a new configuration, spark.sql.analyzer.resolveDatasetColumnReference, we cannot undo this behavior change of Column class. The following is the behavior change which might affect new optimizers. I'm not saying this is good or bed. What I mean is that this need to be considered separately.

scala> spark.version
res0: String = 2.4.3

scala> rand(0).equals(rand(0))
res1: Boolean = true

scala> ($"a" + 1 + 2 + 3).equals($"a" + 3 + 2 + 1)
res2: Boolean = false
scala> spark.version  // This PR
res0: String = 3.0.0-SNAPSHOT

scala> rand(0).equals(rand(0))
res1: Boolean = false

scala> ($"a" + 1 + 2 + 3).equals($"a" + 3 + 2 + 1)
res2: Boolean = true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a new change to avoid this issue completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@SparkQA
Copy link

SparkQA commented May 7, 2019

Test build #105215 has finished for PR 24442 at commit 36fab4f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [SPARK-27547][SQL] Fix DataFrame self-join problems [WIP][SPARK-27547][SQL] Fix DataFrame self-join problems May 9, 2019
@SparkQA
Copy link

SparkQA commented May 9, 2019

Test build #105288 has finished for PR 24442 at commit 77074b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 16, 2019

Test build #105437 has finished for PR 24442 at commit e7bfcc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [WIP][SPARK-27547][SQL] Fix DataFrame self-join problems [SPARK-27547][SQL] Fix DataFrame self-join problems May 16, 2019
@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented May 24, 2019

Test build #105748 has finished for PR 24442 at commit e7bfcc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented May 26, 2019

Test build #105788 has finished for PR 24442 at commit e7bfcc8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

closed in favor of #25107

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Jan 31, 2020
This is an alternative solution of apache#24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by apache#24442 is mostly a heuristic.

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

We can reuse the infra in apache#24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

existing tests and new test cases

Closes apache#25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Jan 28, 2021
This is an alternative solution of apache#24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by apache#24442 is mostly a heuristic.

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

We can reuse the infra in apache#24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

existing tests and new test cases

Closes apache#25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants