Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)

joinType match {
case _: InnerLike | LeftExistence(_) =>
case _: InnerLike | LeftSemi | ExistenceJoin(_) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of ExistenceJoin says we need to preserve all the rows from the left table through the join operation as if it is a regular LeftOuter join. The ExistenceJoin augments the LeftOuter operation with a new column called exists, set to true when the join condition in the ON clause is true and false otherwise. The filter of any rows will happen in the Filter operation above the ExistenceJoin.

Example:

A(c1, c2):
{ (1, 1), (1, 2) }

B(c1):
{ (NULL) }

// can be any value as it is irrelevant in this example

select A.*
from   A
where  exists (select 1 from B where A.c1 = A.c2)
       or A.c2=2

In this example, the correct result is all the rows from A. If the pattern ExistenceJoin at line 935 in Optimizer.scala added by the PR of this JIRA is indeed active, the code will push down the predicate A.c1 = A.c2 to be a Filter on relation A, which will filter the row (1,2) from A.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nsyca Have you tried the above example using the latest master?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have included the full comment from the JIRA. The keyword is "this is not currently exposed". Here is the first part of my comment:

"ExistenceJoin should be treated the same as LeftOuter and LeftAnti, not InnerLike and LeftSemi. This is not currently exposed because the rewrite of [NOT] EXISTS OR ... to ExistenceJoin happens in rule RewritePredicateSubquery, which is in a separate rule set and placed after the rule PushPredicateThroughJoin. During the transformation in the rule PushPredicateThroughJoin, an ExistenceJoin never exists."

// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
Expand All @@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, RightOuter, newJoinCond)
case LeftOuter =>
case LeftOuter | LeftAnti =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)

Join(newLeft, newRight, LeftOuter, newJoinCond)
Join(newLeft, newRight, joinType, newJoinCond)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}

test("joins: push down where clause into left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.where("x.a".attr > 10)
.analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.where("x.a".attr > 10)
.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}

test("joins: only push down join conditions to the right of a left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y,
LeftAnti,
Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.join(
y.where("y.a".attr > 10),
LeftAnti,
Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}


val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))

test("generate: predicate referenced no generated column") {
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);

SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);
29 changes: 29 additions & 0 deletions sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 3


-- !query 0
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2)
-- !query 0 schema
struct<>
-- !query 0 output



-- !query 1
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
-- !query 1 schema
struct<>
-- !query 1 output



-- !query 2
SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2)
-- !query 2 schema
struct<c1:int,c2:int>
-- !query 2 output
2 1
3 6