-
Notifications
You must be signed in to change notification settings - Fork 29k
Update SortMergeJoinExec.scala #18836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
fix a bug in outputOrdering
|
You didn't read the link above, I take it? |
|
Can one of the admins verify this patch? |
|
Thanks for fixing this. Please follow the contribution guideline. Also, you need to add a test case. You can follow what we did in this PR: #17339 |
|
A test case to make the existing code fail. import org.apache.spark.sql.SparkSession
object Test extends App {
val spark = SparkSession.builder().master("local").appName("test").getOrCreate()
import spark.sqlContext.implicits._
case class T(i: Int)
spark.sparkContext.parallelize(List(T(1), T(3), T(3))).toDF.createOrReplaceTempView("T")
val in = "select distinct a.i + 1,a.* from T a cross join T t where a.i > 1 and t.i = a.i group by a.i having a.i > 2"
val sql = spark.sql(in)
sql.queryExecution.executedPlan.children.map { x =>
x.children.map { x =>
x.children.map { x =>
x.children.map { x =>
x.children.map { x =>
x.children.map { x =>
println(x.outputOrdering)
}
}
}
}
}
}
} |
|
@BoleynSu Do you want to continue the PR? or you want us to take it over? |
|
@gatorsmile I am not familiar with the PR process, it is great that you can take it over. Thanks. |
|
@BoleynSu Sure, I can do it. Will give all the credits to you. Please continue to help us report new issues or fixes. Thanks! |
| override def outputOrdering: Seq[SortOrder] = joinType match { | ||
| // For inner join, orders of both sides keys should be kept. | ||
| case Inner => | ||
| case _: InnerLike => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can someone explain to me what is being fixed here? The other InnerLike variant, Cross, does not get planned using a SortMergeJoin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get a SortMergeJoin paln with Cross, e.g. select distinct a.i + 1,a.* from T a cross join T t where a.i > 1 and t.i = a.i group by a.i having a.i > 2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even worse, this could cause an exception
val df = Seq((1, 1)).toDF("i", "j")
df.createOrReplaceTempView("T")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " +
"cross join T t2 where t2.i = t1.i").explain(true)
}It will return the following error:
SortMergeJoinExec should not take Cross as the JoinType
java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100)
at org.apache.spark.sql.execution.ProjectExec
We need to backport it to 2.2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense
### What changes were proposed in this pull request? author: BoleynSu closes #18836 ```Scala val df = Seq((1, 1)).toDF("i", "j") df.createOrReplaceTempView("T") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " + "cross join T t2 where t2.i = t1.i").explain(true) } ``` The above code could cause the following exception: ``` SortMergeJoinExec should not take Cross as the JoinType java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100) ``` Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue. ### How was this patch tested? Modified the two existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Author: Boleyn Su <boleyn.su@gmail.com> Closes #18863 from gatorsmile/pr-18836. (cherry picked from commit bbfd6b5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? author: BoleynSu closes apache#18836 ```Scala val df = Seq((1, 1)).toDF("i", "j") df.createOrReplaceTempView("T") withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { sql("select * from (select a.i from T a cross join T t where t.i = a.i) as t1 " + "cross join T t2 where t2.i = t1.i").explain(true) } ``` The above code could cause the following exception: ``` SortMergeJoinExec should not take Cross as the JoinType java.lang.IllegalArgumentException: SortMergeJoinExec should not take Cross as the JoinType at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputOrdering(SortMergeJoinExec.scala:100) ``` Our SortMergeJoinExec supports CROSS. We should not hit such an exception. This PR is to fix the issue. ### How was this patch tested? Modified the two existing test cases. Author: Xiao Li <gatorsmile@gmail.com> Author: Boleyn Su <boleyn.su@gmail.com> Closes apache#18863 from gatorsmile/pr-18836. (cherry picked from commit bbfd6b5) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
fix a bug in outputOrdering
What changes were proposed in this pull request?
Change
case Innertocase _: InnerLikeso that Cross will be handled properly.How was this patch tested?
No unit tests are needed.
Please review http://spark.apache.org/contributing.html before opening a pull request.