From 02b9dd6b8c3eb49bb8e6e537d0432c1eff35baad Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Tue, 28 Feb 2017 20:44:14 +0800 Subject: [PATCH 1/6] fix inner join --- .../sql/catalyst/optimizer/expressions.scala | 8 -- .../resources/sql-tests/inputs/inner-join.sql | 25 ++++ .../sql-tests/results/inner-join.sql.out | 119 ++++++++++++++++++ 3 files changed, 144 insertions(+), 8 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/inner-join.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/inner-join.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 4f593c894acd..f2e05680da13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -452,14 +452,6 @@ object FoldablePropagation extends Rule[LogicalPlan] { case u: UnaryNode if !stop && canPropagateFoldables(u) => u.transformExpressions(replaceFoldable) - // Allow inner joins. We do not allow outer join, although its output attributes are - // derived from its children, they are actually different attributes: the output of outer - // join is not always picked from its children, but can also be null. - // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes - // of outer join. - case j @ Join(_, _, Inner, _) => - j.transformExpressions(replaceFoldable) - // We can fold the projections an expand holds. However expand changes the output columns // and often reuses the underlying attributes; so we cannot assume that a column is still // foldable after the expand has been applied. diff --git a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql new file mode 100644 index 000000000000..0be3a072eaf0 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql @@ -0,0 +1,25 @@ +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a); + +CREATE TEMPORARY VIEW ta AS +SELECT a, 'a' AS tag FROM t1 +UNION ALL +SELECT a, 'b' AS tag FROM t2; + +CREATE TEMPORARY VIEW tb AS +SELECT a, 'a' AS tag FROM t3 +UNION ALL +SELECT a, 'b' AS tag FROM t4; + +-- +SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; + +-- Clean up +DROP VIEW IF EXISTS t1; +DROP VIEW IF EXISTS t2; +DROP VIEW IF EXISTS t3; +DROP VIEW IF EXISTS t4; +DROP VIEW IF EXISTS ta; +DROP VIEW IF EXISTS tb; diff --git a/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out new file mode 100644 index 000000000000..98b5c7d9a4cb --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out @@ -0,0 +1,119 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 13 + + +-- !query 0 +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (1) AS GROUPING(a) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (1) AS GROUPING(a) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +CREATE TEMPORARY VIEW t3 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +CREATE TEMPORARY VIEW t4 AS SELECT * FROM VALUES (1), (1) AS GROUPING(a) +-- !query 3 schema +struct<> +-- !query 3 output + + + +-- !query 4 +CREATE TEMPORARY VIEW ta AS +SELECT a, 'a' AS tag FROM t1 +UNION ALL +SELECT a, 'b' AS tag FROM t2 +-- !query 4 schema +struct<> +-- !query 4 output + + + +-- !query 5 +CREATE TEMPORARY VIEW tb AS +SELECT a, 'a' AS tag FROM t3 +UNION ALL +SELECT a, 'b' AS tag FROM t4 +-- !query 5 schema +struct<> +-- !query 5 output + + + +-- !query 6 +SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag +-- !query 6 schema +struct +-- !query 6 output +1 a +1 a +1 a +1 a +1 b +1 b +1 b +1 b + + +-- !query 7 +DROP VIEW IF EXISTS t1 +-- !query 7 schema +struct<> +-- !query 7 output + + + +-- !query 8 +DROP VIEW IF EXISTS t2 +-- !query 8 schema +struct<> +-- !query 8 output + + + +-- !query 9 +DROP VIEW IF EXISTS t3 +-- !query 9 schema +struct<> +-- !query 9 output + + + +-- !query 10 +DROP VIEW IF EXISTS t4 +-- !query 10 schema +struct<> +-- !query 10 output + + + +-- !query 11 +DROP VIEW IF EXISTS ta +-- !query 11 schema +struct<> +-- !query 11 output + + + +-- !query 12 +DROP VIEW IF EXISTS tb +-- !query 12 schema +struct<> +-- !query 12 output + From 112dd2379bf9febdc9ed81925326b61d2a34efdd Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Tue, 28 Feb 2017 20:55:33 +0800 Subject: [PATCH 2/6] fix inner-join.sql.out --- .../src/test/resources/sql-tests/results/inner-join.sql.out | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out index 98b5c7d9a4cb..fbede7f2bb17 100644 --- a/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out @@ -63,10 +63,6 @@ struct -- !query 6 output 1 a 1 a -1 a -1 a -1 b -1 b 1 b 1 b From 44636483bb1b87d7e4746ae98df47b3f9dc7e8ce Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Tue, 28 Feb 2017 21:13:23 +0800 Subject: [PATCH 3/6] update test --- sql/core/src/test/resources/sql-tests/inputs/inner-join.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql index 0be3a072eaf0..850c6ea07776 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql @@ -13,7 +13,7 @@ SELECT a, 'a' AS tag FROM t3 UNION ALL SELECT a, 'b' AS tag FROM t4; --- +-- SPARK-19766 Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; -- Clean up From fc819f8e4c670d7c31d96be6e628b07b2ebc3509 Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Tue, 28 Feb 2017 22:40:01 +0800 Subject: [PATCH 4/6] revert and add check stop --- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index f2e05680da13..21d1cd593262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -452,6 +452,14 @@ object FoldablePropagation extends Rule[LogicalPlan] { case u: UnaryNode if !stop && canPropagateFoldables(u) => u.transformExpressions(replaceFoldable) + // Allow inner joins. We do not allow outer join, although its output attributes are + // derived from its children, they are actually different attributes: the output of outer + // join is not always picked from its children, but can also be null. + // TODO(cloud-fan): It seems more reasonable to use new attributes as the output attributes + // of outer join. + case j @ Join(_, _, Inner, _) if !stop => + j.transformExpressions(replaceFoldable) + // We can fold the projections an expand holds. However expand changes the output columns // and often reuses the underlying attributes; so we cannot assume that a column is still // foldable after the expand has been applied. From 15fae5029e7c45d3b2a3108e24ca235a6a6fccfc Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Wed, 1 Mar 2017 12:16:24 +0800 Subject: [PATCH 5/6] add `Propagate in inner join` test case into FoldablePropagationSuite --- .../optimizer/FoldablePropagationSuite.scala | 14 ++++++ .../resources/sql-tests/inputs/inner-join.sql | 8 ---- .../sql-tests/results/inner-join.sql.out | 47 ------------------- 3 files changed, 14 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala index 82756f545a8c..2af52eac1a76 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala @@ -130,6 +130,20 @@ class FoldablePropagationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + test("Propagate in inner join") { + val ta = testRelation.select('a, Literal("a").as('tag)) + .union(testRelation.select('a, Literal("b").as('tag))) + .subquery('ta) + val tb = testRelation.select('a, Literal("a").as('tag)) + .union(testRelation.select('a, Literal("b").as('tag))) + .subquery('tb) + val query = ta.join(tb, Inner, + Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag")) + val optimized = Optimize.execute(query.analyze) + val correctAnswer = query.analyze + comparePlans(optimized, correctAnswer) + } + test("Propagate in expand") { val c1 = Literal(1).as('a) val c2 = Literal(2).as('b) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql index 850c6ea07776..38739cb95058 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inner-join.sql @@ -15,11 +15,3 @@ SELECT a, 'b' AS tag FROM t4; -- SPARK-19766 Constant alias columns in INNER JOIN should not be folded by FoldablePropagation rule SELECT tb.* FROM ta INNER JOIN tb ON ta.a = tb.a AND ta.tag = tb.tag; - --- Clean up -DROP VIEW IF EXISTS t1; -DROP VIEW IF EXISTS t2; -DROP VIEW IF EXISTS t3; -DROP VIEW IF EXISTS t4; -DROP VIEW IF EXISTS ta; -DROP VIEW IF EXISTS tb; diff --git a/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out index fbede7f2bb17..aa20537d449e 100644 --- a/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inner-join.sql.out @@ -66,50 +66,3 @@ struct 1 b 1 b - --- !query 7 -DROP VIEW IF EXISTS t1 --- !query 7 schema -struct<> --- !query 7 output - - - --- !query 8 -DROP VIEW IF EXISTS t2 --- !query 8 schema -struct<> --- !query 8 output - - - --- !query 9 -DROP VIEW IF EXISTS t3 --- !query 9 schema -struct<> --- !query 9 output - - - --- !query 10 -DROP VIEW IF EXISTS t4 --- !query 10 schema -struct<> --- !query 10 output - - - --- !query 11 -DROP VIEW IF EXISTS ta --- !query 11 schema -struct<> --- !query 11 output - - - --- !query 12 -DROP VIEW IF EXISTS tb --- !query 12 schema -struct<> --- !query 12 output - From 5611861d171f240e7ae7a722cb761c5a6af7610e Mon Sep 17 00:00:00 2001 From: Stan Zhai Date: Wed, 1 Mar 2017 14:12:18 +0800 Subject: [PATCH 6/6] fix test case --- .../catalyst/optimizer/FoldablePropagationSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala index 2af52eac1a76..d128315b6886 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FoldablePropagationSuite.scala @@ -131,14 +131,14 @@ class FoldablePropagationSuite extends PlanTest { } test("Propagate in inner join") { - val ta = testRelation.select('a, Literal("a").as('tag)) - .union(testRelation.select('a, Literal("b").as('tag))) + val ta = testRelation.select('a, Literal(1).as('tag)) + .union(testRelation.select('a, Literal(2).as('tag))) .subquery('ta) - val tb = testRelation.select('a, Literal("a").as('tag)) - .union(testRelation.select('a, Literal("b").as('tag))) + val tb = testRelation.select('a, Literal(1).as('tag)) + .union(testRelation.select('a, Literal(2).as('tag))) .subquery('tb) val query = ta.join(tb, Inner, - Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag")) + Some("ta.a".attr === "tb.a".attr && "ta.tag".attr === "tb.tag".attr)) val optimized = Optimize.execute(query.analyze) val correctAnswer = query.analyze comparePlans(optimized, correctAnswer)