-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25150][SQL] Rewrite condition when deduplicate Join #22318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Could you describe more in the PR description?; what's the root cause of this issue? How did you solve this by this pr? |
| checkAnswer(r, Row(2, 2, 2) :: Row(4, 4, 4) :: Nil) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
|
|
||
| test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { | ||
| withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually we can remove this
| val b = spark.range(10) | ||
| val c = b.filter($"id" % 2 === 0) | ||
|
|
||
| val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a df a for this test? I think a simple test is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need a here.
If we dropped a and the test would become like:
val b = spark.range(1, 5)
val c = b.filter($"id" % 2 === 0)
val r = b.join(c, b("id") === c("id"), "inner")
checkAnswer(r, Row(2, 2) :: Row(4, 4) :: Nil)then the test would pass even without the fix. This is because we have a special case to handle id = id like conditions in case of EqualTo and EqualNullSafe in Dataset.
My fix comes into play in some other cases where there is an AttributeReference change in the right side of a join due to deduplication.
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes sense to me, despite I quite don't get how this didn't come out earlier.
@peter-toth may you please improve the PR description as suggested by @maropu? May you please also check whether this is a regression from 2.2? In that case, it might be a blocker for the next 2.3 release.
| e: Expression, | ||
| attributeRewrites: AttributeMap[Attribute]): Expression = e match { | ||
| case a: Attribute => attributeRewrites.getOrElse(a, a) | ||
| case _ => e.mapChildren(rewriteJoinCondition(_, attributeRewrites)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may be more clear to do:
case other => e.mapChildren(rewriteJoinCondition(other, attributeRewrites))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, we can't do that, it wouldn't mean the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry, I see now, sorry.
Change-Id: I52ac5aa76e821f5d74cfa108e0b8269665eb081d
|
@ueshin @HyukjinKwon can you trigger tests? |
|
ok to test |
| j.copy(right = dedupRight(left, right)) | ||
| case j @ Join(left, right, _, condition) if !j.duplicateResolved => | ||
| val (dedupedRight, attributeRewrites) = dedupRight(left, right) | ||
| val changedCondition = condition.map(rewriteJoinCondition(_, attributeRewrites)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
val changedCondition = condition.map { _.transform {
case attr: Attribute => attributeRewrites.getOrElse(attr, attr)
}}
Then, removes rewriteJoinCondition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! fixed
| case Some((oldRelation, newRelation)) => | ||
| val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) | ||
| right transformUp { | ||
| (right transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(right transformUp { -> val newRight = right transformUp {?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) | ||
| } | ||
| } | ||
| }, attributeRewrites) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, (newRight, attributeRewrites).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * that resolves these conflicts. Otherwise, the analysis will fail. | ||
| */ | ||
| right | ||
| (right, AttributeMap.empty[Attribute]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I don't want to build an empty object per call, how about defining an empty map in this class field? e.g.,
object ResolveReferences extends Rule[LogicalPlan] {
private val emptyAttrMap = new AttributeMap[Attribute](Map.empty)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just commented quite the same meanwhile :) sorry, I saw your comments only now :)
| case Some((oldRelation, newRelation)) => | ||
| val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) | ||
| right transformUp { | ||
| (right transformUp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may be cleaner doing something like val newPlan = and then return
(newPlan, attributeRewrites)
| new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap) | ||
| } | ||
|
|
||
| def empty[A](): AttributeMap[A] = new AttributeMap(Map.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be better if we can avoid creating a new Map every time. In Scala, this is usually handled creating an empty object and returning it on the invocation of empty (eg. refer to the Nil implementation for List). The only problem is that this would require changing AttributeMap[A] to AttributeMap[+A]. I am not sure whether this may break binary compatibility and therefore whether this is an acceptable change or not. In case it is, it would be great to do this I think.
|
Test build #95625 has finished for PR 22318 at commit
|
|
Also added missing |
|
Test build #95632 has finished for PR 22318 at commit
|
| val b = spark.range(10) | ||
| val c = b.filter($"id" % 2 === 0) | ||
|
|
||
| val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a simpler a.join(b, "id").join(c, "id")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That simpler join doesn't hit the issue. It is handled by a different rule ResolveNaturalAndUsingJoin.
|
@mgaido91 , 2.2 also suffered from this. |
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@peter-toth may you please also update the title of the PR to be more precise than "FIx"? Like "Rewrite condition when deduplicate Join" or something similar which explains better what the PR does?
Just left one comment, otherwise LGTM, thanks.
| */ | ||
| object ResolveReferences extends Rule[LogicalPlan] { | ||
|
|
||
| private val emptyAttrMap = new AttributeMap[Attribute](Map.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to do what I suggested previously as it would be easier to reuse if it will be needed. In this way, next time we need again an empty AttributeMap we need to create a new one and we'd end up with several of them.
cc @cloud-fan @maropu WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgaido91 , I agree with you and happy to do it, just saw your concerns about binary compatibility.
@cloud-fan @maropu please share your thoughts on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its ok to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * of the name, or the expected nullability). | ||
| */ | ||
| object AttributeMap { | ||
| var empty = new AttributeMap(Map.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var -> val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh thanks, fixed
| * a logical plan node's children. | ||
| */ | ||
| object ResolveReferences extends Rule[LogicalPlan] { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unneeded newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Change-Id: I928e9a56410c00c4fbbd11c8e91b7993a4bc1878
|
LGTM |
|
Test build #95714 has finished for PR 22318 at commit
|
|
retest this please |
|
Test build #95720 has finished for PR 22318 at commit
|
|
Test build #95723 has finished for PR 22318 at commit
|
|
Test build #95750 has finished for PR 22318 at commit
|
|
retest this please |
|
Test build #95757 has finished for PR 22318 at commit
|
|
LGTM |
|
To make sure we have no regression by this change, I checked the |
|
retest this please |
|
How does this work? When we have duplicated attributes in the join condition, how can we know which attribute comes from which side? |
|
@cloud-fan this PR doesn't solve that question. |
|
Test build #95854 has finished for PR 22318 at commit
|
|
Can you define the scope of this PR? In which case we should change the references in the join condition? |
|
@cloud-fan , I added some explanation to the description in which cases this PR helps and also where it doesn't. |
|
@cloud-fan, does the new description defines the scope as you suggested? Is there anything I can add to this PR? |
|
@cloud-fan could you please help me with this PR and take it one step forward? |
|
For a query similar to the one in the PR description:
It is interpreted as a cartesian products now. But after this change, it is as the same as the query:
|
|
Thanks @viirya, your analysis is correct. Unfortunately an attribute doesn't have a reference to its dataset so I don't think this scenario can be solved easily. I believe the good solution would be something like #21449 But I would argue that my PR still does make sense as a |
|
Also please consider that currently (and also after this PR) using is interpreted as so my PR just extends this feature to the case with subsequent joins. @cloud-fan , @viirya what do you think? |
|
@srowen, I saw your last comment on https://issues.apache.org/jira/browse/SPARK-25150?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16645484#comment-16645484. I submitted this PR to solve that ticket and I believe the description here explains what is the real issue there. |
|
Oh I see there was indeed more discussion on this, and it does relate to resolving columns to joined DataFrames. I don't know enough to bless this change, but it seems reasonable. @maropu approves, I think, and so does @mgaido91 ? what about @cloud-fan @viirya ? It seems like it's behavior that can be improved but isn't strictly a correctness issue? because it's not quite valid to reference columns from dataframes not involved in the join as a join condition, right? |
|
@srowen the change seems fine to me as I think this does improve the behavior of attributes deduplication (I think it was a bug not rewriting the join condition with the new references). The point is that in order to have a really correct and expected behavior in all the conditions we need to keep a reference of the dataset an attribute is from. That is why I created #21449 and I know @cloud-fan mentioned he had a similar PR for the same reason. But that is another story. In general I think both this and something like #21449 is needed in order to handle properly all the cases. |
|
Can one of the admins verify this patch? |
|
@peter-toth we cannot fix the issue of the description without changing the existing behaviour? |
|
@maropu, I don't think we can. Actually this is how we deal with simpler joins |
|
In the example @viirya described above (#22318 (comment)), I think the interpretation is unclear to most users and I'm fairly concerned that it could be error-prone... |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
ResolveReferences rule modifies (deduplicates) conflicting AttributeReferences in left and right side of a Join. It changes the right side to avoid conflicts but it forgot to change the references in the condition of the join. Solution is to rewrite the condition too.
Details:
With this PR, if any of the resolved attributes in a join condition refer to an attribute that is replaced during the deduplication (
dedupRight()) then the join condition is modified to reflect the change.The modification is done regardless whether the reference is on the left or the right side of the condition.
This PR has no effect on unresolved attributes. Those attributes are resolved later by different rules.
This PR helps in those cases where same origin dataframes (
b,c) are joined to a different origin one (a):Here the result of
a join bcontains anidattribute coming fromb. That attribute conflicts with theidattribute ofcin the second join soc("id")is deduplicated to let's sayXand the join conditiona("id") === c("id")needs to be modified toa("id") === Xto get correct results.I believe it is also worth explaining some situations where this PR does't help (but also does no harm). In cases where same origin dataframes are joined to each other:
In this latter case the deduplication is applied as both
bandchave anidattribute (that actually refer to the same resolved attribute (have the sameExprId)). Deduplication resultsc("id")to be changed to let's sayX. So in this case bothb("id")andc("id")in the join condition are modified toXby the changes introduced in this PR. The reason why the result will be correct with this PR (and is correct without this PR) is that the current implementation of Spark contains a "hack" inDataframethat fixesEqualToandEqualNullSafetype of conditions with identical references on both sides. The hack rewrites these join conditions by re-resolving the attributes with the name of the attribute on both the left and right sides.Please note that until a resolved attribute doesn't contain some kind of reference to it's dataframe not all cases of join can be handled properly. Such an initiative can be found in #21449.
How was this patch tested?
Added unit test.
Please review http://spark.apache.org/contributing.html before opening a pull request.