diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b913a9618d6e..21bf926af50d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1697,6 +1697,8 @@ class Analyzer( // Only a few unary nodes (Project/Filter/Aggregate) can contain subqueries. case q: UnaryNode if q.childrenResolved => resolveSubQueries(q, q.children) + case j: Join if j.childrenResolved => + resolveSubQueries(j, Seq(j, j.left, j.right)) case s: SupportsSubquery if s.childrenResolved => resolveSubQueries(s, s.children) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 6a5d938f0fdc..d9dc9ebbcaf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -601,10 +601,10 @@ trait CheckAnalysis extends PredicateHelper { case inSubqueryOrExistsSubquery => plan match { - case _: Filter | _: SupportsSubquery => // Ok + case _: Filter | _: SupportsSubquery | _: Join => // Ok case _ => failAnalysis(s"IN/EXISTS predicate sub-queries can only be used in" + - s" Filter and a few commands: $plan") + s" Filter/Join and a few commands: $plan") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala index 74a8590b5eef..5aa80e1a9bd7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, Project} +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical._ /** * Unit tests for [[ResolveSubquery]]. @@ -29,8 +30,10 @@ class ResolveSubquerySuite extends AnalysisTest { val a = 'a.int val b = 'b.int + val c = 'c.int val t1 = LocalRelation(a) val t2 = LocalRelation(b) + val t3 = LocalRelation(c) test("SPARK-17251 Improve `OuterReference` to be `NamedExpression`") { val expr = Filter( @@ -41,4 +44,13 @@ class ResolveSubquerySuite extends AnalysisTest { assert(m.contains( "Expressions referencing the outer query are not supported outside of WHERE/HAVING clauses")) } + + test("SPARK-29145 Support subquery in join condition") { + val expr = Join(t1, + t2, + Inner, + Some(InSubquery(Seq(a), ListQuery(Project(Seq(UnresolvedAttribute("c")), t3)))), + JoinHint.NONE) + assertAnalysisSuccess(expr) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index a1d7792941ed..266f8e23712d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -204,6 +204,154 @@ class SubquerySuite extends QueryTest with SharedSparkSession { } } + test("SPARK-29145: JOIN Condition use QueryList") { + withTempView("s1", "s2", "s3") { + Seq(1, 3, 5, 7, 9).toDF("id").createOrReplaceTempView("s1") + Seq(1, 3, 4, 6, 9).toDF("id").createOrReplaceTempView("s2") + Seq(3, 4, 6, 9).toDF("id").createOrReplaceTempView("s3") + + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | JOIN s2 ON s1.id = s2.id + | AND s1.id IN (SELECT 9) + """.stripMargin), + Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | JOIN s2 ON s1.id = s2.id + | AND s1.id NOT IN (SELECT 9) + """.stripMargin), + Row(1) :: Row(3) :: Nil) + + // case `IN` + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | JOIN s2 ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(3) :: Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id AS id2 FROM s1 + | LEFT SEMI JOIN s2 + | ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(3) :: Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id as id2 FROM s1 + | LEFT ANTI JOIN s2 + | ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(1) :: Row(5) :: Row(7) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | LEFT OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id as id2 FROM s1 + | RIGHT OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(null, 1) :: Row(3, 3) :: Row(null, 4) :: Row(null, 6) :: Row(9, 9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id AS id2 FROM s1 + | FULL OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id IN (SELECT id FROM s3) + """.stripMargin), + Row(1, null) :: Row(3, 3) :: Row(5, null) :: Row(7, null) :: Row(9, 9) :: + Row(null, 1) :: Row(null, 4) :: Row(null, 6) :: Nil) + + // case `NOT IN` + checkAnswer( + sql( + """ + | SELECT s1.id FROM s1 + | JOIN s2 ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(1) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id AS id2 FROM s1 + | LEFT SEMI JOIN s2 + | ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(1) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id AS id2 FROM s1 + | LEFT ANTI JOIN s2 + | ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(3) :: Row(5) :: Row(7) :: Row(9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id AS id2 FROM s1 + | LEFT OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id AS id2 FROM s1 + | RIGHT OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(1, 1) :: Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil) + + checkAnswer( + sql( + """ + | SELECT s1.id, s2.id AS id2 FROM s1 + | FULL OUTER JOIN s2 + | ON s1.id = s2.id + | AND s1.id NOT IN (SELECT id FROM s3) + """.stripMargin), + Row(1, 1) :: Row(3, null) :: Row(5, null) :: Row(7, null) :: Row(9, null) :: + Row(null, 3) :: Row(null, 4) :: Row(null, 6) :: Row(null, 9) :: Nil) + } + } + test("SPARK-14791: scalar subquery inside broadcast join") { val df = sql("select a, sum(b) as s from l group by a having a > (select avg(a) from l)") val expected = Row(3, 2.0, 3, 3.0) :: Row(6, null, 6, null) :: Nil