Skip to content

Conversation

@cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jul 11, 2019

What changes were proposed in this pull request?

This is an alternative solution of #24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by #24442 is mostly a heuristic.

background of the self-join problem:

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:

val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result

The root cause is, Dataset.apply is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. Dataset.apply returns an AttributeReference . Different Datasets may share the same AttributeReference. In the example above, df2 adds a Filter operator above the logical plan of df1, and the Filter operator reserves the output AttributeReference of its child. This means, df1("a") is exactly the same as df2("a"), and df1("a") > df2("a") always evaluates to false.

The rule to detect ambiguous column reference caused by self join:

We can reuse the infra in #24442 :

  1. each Dataset has a globally unique id.
  2. the AttributeReference returned by Dataset.apply carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
  3. the logical plan of a Dataset carries the ID via TreeNodeTag

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:

  1. find all column references (i.e. AttributeReferences with Dataset ID and col position) in the root node of a query plan.
  2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
  3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
  4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

How was this patch tested?

existing tests and new test cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move this out as a separate PR.

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107505 has finished for PR 25107 at commit 1898674.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan]

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107524 has finished for PR 25107 at commit 1898674.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan]

@SparkQA
Copy link

SparkQA commented Jul 11, 2019

Test build #107535 has finished for PR 25107 at commit 69627b4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that based on my experience, aliasing the dataset before joining still results in an ambiguous reference exception when a certain column is selected. For instance, joined_df = df.as("a").join(df.as("b"), $"a.id" > $"b.id") and then joined_df.select('certain_column') gave an exception.

Providing the alias name didn't help as well -> joined_df.select(a.certain_column).

However, by deep copying the dataframes gave the correct result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a concrete example? It looks to me that joined_df.select($"a.certain_column") should work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think moving this comment to .doc() like the above is better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to this patch, we can fix it and other similar configs in another PR.

@SparkQA
Copy link

SparkQA commented Jul 24, 2019

Test build #108076 has finished for PR 25107 at commit 6bff6f3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan]

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jul 24, 2019

Test build #108093 has finished for PR 25107 at commit 6bff6f3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan]


- Since Spark 3.0, 0-argument Java UDF is executed in the executor side identically with other UDFs. In Spark version 2.4 and earlier, 0-argument Java UDF alone was executed in the driver side, and the result was propagated to executors, which might be more performant in some cases but caused inconsistency with a correctness issue in some cases.

- Since Spark 3.0, Dataset query fails if it contains ambiguous column reference that is caused by self join. A typical example: `val df1 = ...; val df2 = df1.filter(...);`, then `df1.join(df2, df1("a") > df2("a"))` returns empty result which is quite confusing. This is because Spark cannot resolve Dataset column references that point to tables being self joined, and `df1("a")` is exactly the same as `df2("a")` in Spark. To restore the behavior before Spark 3.0, you can set `spark.sql.analyzer.failAmbiguousSelfJoin` to `false`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty result -> an empty result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Should we take the opportunity to describe the other way to correct the problem is by using column aliases (with an example) ? Or do you think that would make the text too long for users to consume ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

describe the other way to correct the problem is by using column aliases (with an example)

The error message contains it, see https://github.com/apache/spark/pull/25107/files#diff-72682666ae0e00b0be514f6867838be5R143

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Ah.. got it.. Thanks Wenchen.

Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil)
}

test("SPARK-28344: fail ambiguous self join") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a dedicate test suite for all these test cases? I believe we might have more cases. It would be good to move all the them in the same suite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do a dedicated suite have individual cases for these scenarios.

} else {
val expr = resolve(colName)
Column(expr)
Column(addDataFrameIdToCol(expr))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place that requires calling addDataFrameIdToCol. How about colRegex?

We might need a clear comment and explain which cases we should call it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question: doesn't it apply to colRegex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! We should apply it in colRegex as well.

.booleanConf
.createWithDefault(true)

val FAIL_AMBIGUOUS_SELF_JOIN =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would FAIL_AMBIGUOUS_DATASET_COLUMN_REF be more accurate?
And in the doc below, we can describe when this usually happens (e.g., with self-join).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A self-join that contains ambiguous column reference is an ambiguous self-join. I think ambiguous self-join is shorter and easier to understand, as a config name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW there are no other places that can make column reference ambiguous. Self-join is the only place.

assertAmbiguousSelfJoin(df3.join(df1.join(df2), df2("id") < df3("id")))
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more tests to cover:

  1. aliased references work fine;
  2. suppose df4 has a different attribute set, and test df1.join(df4).df2.select(df1("id"))?

@SparkQA
Copy link

SparkQA commented Jul 29, 2019

Test build #108305 has finished for PR 25107 at commit 4870abc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 30, 2019

Test build #108378 has finished for PR 25107 at commit 65b6762.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

How about the nested column?

    val df = spark.read
      .json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS())
    val df1 = df
    df1.join(df, df("a.b") > df1("a.b"))

@cloud-fan
Copy link
Contributor Author

The ambiguity comes from AttributeReferences with the same expr ID. The nested column in Spark is just an expression: ExtractValue(Attr, ...). It's not related to ambiguous self join, like other expressions.

Your example is the same as

val df1 = ...
val df2 = df1.filter...
df1.join(df2, df1("a") * 3 > df2("a") + 2)

which is definitely a ambiguous self-join because the column a is ambiguous in the query.

@cloud-fan
Copy link
Contributor Author

df("a.b") is different from things like df("a") + 1, because we need to take care of df("a.b") to attach the dataframe id. I've fixed it and added a test. @gatorsmile thanks for catching it!

@SparkQA
Copy link

SparkQA commented Jul 31, 2019

Test build #108474 has finished for PR 25107 at commit 6b18efe.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override def hashCode: Int = this.expr.hashCode()
override def hashCode: Int = this.normalizedExpr().hashCode()

private def normalizedExpr(): Expression = expr match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we add the metadata in addDataFrameIdToCol, we use transform. Do we need to use it here when removing it?

Our test case can cover it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

However, I'm not able to test it. The normalizedExpr is only used in equals and hashCode. For df("a.b"), we will create a new alias everytime, see AttributeSeq.resolve.

That said, if we get multiple Column instances via calling df("a.b") multiple times, these Column instances will never equal to each other, because the alias is different.

@SparkQA
Copy link

SparkQA commented Aug 1, 2019

Test build #108516 has finished for PR 25107 at commit eec92f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetermineTableStats(session) +:
new DetectAmbiguousSelfJoin(conf) +:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a test case for ensuring all these rules are included? So far, we do not have a test case for covering it if the rule is missing to add.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to know the details of each rule so that I can write a test to make sure these rules are included. Maybe do it in a followup instead of blocking this PR? BTW the new tests added in this PR do guarantee the DetectAmbiguousSelfJoin rule is there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* [[AttributeReference]] with new expr IDs for the right side plan of the join. If the Dataset
* column reference points to a column in the right side plan of a self-join, users will get
* unexpected result because the column reference can't match the newly generated
* [[AttributeReference]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description, we need to also document the metadata are removed by this rule.


// Attach the dataset id and column position to the column reference, so that we can detect
// ambiguous self-join correctly.See the rule `DetectAmbbiguousSelfJoin`.
// This must be called before we return a `Column` that contains `AttributeReference`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, document these metadata will be removed in the rule DetectAmbbiguousSelfJoin

@gatorsmile
Copy link
Member

LGTM except the above comments. cc @maryannxue

@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108645 has finished for PR 25107 at commit 62228e8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 5, 2019

Test build #108657 has finished for PR 25107 at commit 62228e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

LGTM

@cloud-fan
Copy link
Contributor Author

thanks, merging to master!

@nchammas
Copy link
Contributor

nchammas commented Dec 4, 2019

I wonder if this patch fixes the correctness issue reported (with repro) in SPARK-25150. I'll give it a try later this week.

@dongjoon-hyun
Copy link
Member

Hi, All.
Can we have this in branch-2.4?
(Also, cc @tgravescs )

@cloud-fan
Copy link
Contributor Author

This is not a complete fix but fail the query if we are not able to resolve the ambiguity. Is it OK to backport? It can break existing queries.

@dongjoon-hyun
Copy link
Member

I think we are safe because we have spark.sql.analyzer.failAmbiguousSelfJoin.

Do we have a false positive?

  1. If this PR makes a correct existing query fail, we can backport this patch with spark.sql.analyzer.failAmbiguousSelfJoin=false by default.
  2. If this PR prevents only wrong query, we can backport this patch like 3.0. spark.sql.analyzer.failAmbiguousSelfJoin=true by default.

In both (1) and (2) case, we can claim that the correctness issue is resolved.
How do you think about that? (Also, cc @gatorsmile )

@cloud-fan
Copy link
Contributor Author

I'm fine to backport. I'm on vacation soon, can someone help to backport?

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan . Did you come back from the vacation? :)

cloud-fan added a commit to cloud-fan/spark that referenced this pull request Jan 31, 2020
This is an alternative solution of apache#24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by apache#24442 is mostly a heuristic.

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

We can reuse the infra in apache#24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

existing tests and new test cases

Closes apache#25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor Author

done in #27417

@dongjoon-hyun
Copy link
Member

It turns out there are at least 5 commits required. Please see the discussion on #27417 . We are re-evaluating the risks.

rshkv pushed a commit to palantir/spark that referenced this pull request Jan 28, 2021
This is an alternative solution of apache#24442 . It fails the query if ambiguous self join is detected, instead of trying to disambiguate it. The problem is that, it's hard to come up with a reasonable rule to disambiguate, the rule proposed by apache#24442 is mostly a heuristic.

This is a long-standing bug and I've seen many people complaining about it in JIRA/dev list.

A typical example:
```
val df1 = …
val df2 = df1.filter(...)
df1.join(df2, df1("a") > df2("a")) // returns empty result
```
The root cause is, `Dataset.apply` is so powerful that users think it returns a column reference which can point to the column of the Dataset at anywhere. This is not true in many cases. `Dataset.apply` returns an `AttributeReference` . Different Datasets may share the same `AttributeReference`. In the example above, `df2` adds a Filter operator above the logical plan of `df1`, and the Filter operator reserves the output `AttributeReference` of its child. This means, `df1("a")` is exactly the same as `df2("a")`, and `df1("a") > df2("a")` always evaluates to false.

We can reuse the infra in apache#24442 :
1. each Dataset has a globally unique id.
2. the `AttributeReference` returned by `Dataset.apply` carries the ID and column position(e.g. 3rd column of the Dataset) via metadata.
3. the logical plan of a `Dataset` carries the ID via `TreeNodeTag`

When self-join happens, the analyzer asks the right side plan of join to re-generate output attributes with new exprIds. Based on it, a simple rule to detect ambiguous self join is:
1. find all column references (i.e. `AttributeReference`s with Dataset ID and col position) in the root node of a query plan.
2. for each column reference, traverse the query plan tree, find a sub-plan that carries Dataset ID and the ID is the same as the one in the column reference.
3. get the corresponding output attribute of the sub-plan by the col position in the column reference.
4. if the corresponding output attribute has a different exprID than the column reference, then it means this sub-plan is on the right side of a self-join and has regenerated its output attributes. This is an ambiguous self join because the column reference points to a table being self-joined.

existing tests and new test cases

Closes apache#25107 from cloud-fan/new-self-join.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
rshkv pushed a commit to palantir/spark that referenced this pull request Jan 29, 2021
…nd planner

query plan was designed to be immutable, but sometimes we do allow it to carry mutable states, because of the complexity of the SQL system. One example is `TreeNodeTag`. It's a state of `TreeNode` and can be carried over during copy and transform. The adaptive execution framework relies on it to link the logical and physical plans.

This leads to a problem: when we get `QueryExecution#analyzed`, the plan can be changed unexpectedly because it's mutable. I hit a real issue in apache#25107 : I use `TreeNodeTag` to carry dataset id in logical plans. However, the analyzed plan ends up with many duplicated dataset id tags in different nodes. It turns out that, the optimizer transforms the logical plan and add the tag to more nodes.

For example, the logical plan is `SubqueryAlias(Filter(...))`, and I expect only the `SubqueryAlais` has the dataset id tag. However, the optimizer removes `SubqueryAlias` and carries over the dataset id tag to `Filter`. When I go back to the analyzed plan, both `SubqueryAlias` and `Filter` has the dataset id tag, which breaks my assumption.

Since now query plan is mutable, I think it's better to limit the life cycle of a query plan instance. We can clone the query plan between analyzer, optimizer and planner, so that the life cycle is limited in one stage.

new test

Closes apache#25111 from cloud-fan/clone.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants