From 7b5bff517b8933a466a9e200b13c7b246d09dbab Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 13 May 2017 10:55:43 +0900 Subject: [PATCH 1/3] Add a optimizer rule to combine nested Concat --- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../sql/catalyst/optimizer/expressions.scala | 26 +++++++ .../optimizer/CombineConcatSuite.scala | 75 +++++++++++++++++++ .../sql-tests/inputs/string-functions.sql | 4 + .../results/string-functions.sql.out | 28 ++++++- 5 files changed, 134 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f2b9764b0f08..3090e22bd2a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -111,7 +111,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) RemoveRedundantProject, SimplifyCreateStructOps, SimplifyCreateArrayOps, - SimplifyCreateMapOps) ++ + SimplifyCreateMapOps, + CombineConcat) ++ extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, CheckCartesianProducts(conf)) :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 34382bd27240..e609754dd762 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet +import scala.collection.mutable.{ArrayBuffer, Stack} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ @@ -543,3 +544,28 @@ object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] { } } } + +/** + * Combine nested [[Concat]] expressions. + */ +object CombineConcat extends Rule[LogicalPlan] { + + private def flattenConcat(concat: Concat): Concat = { + val stack = Stack[Expression](concat) + val flattened = ArrayBuffer.empty[Expression] + while (stack.nonEmpty) { + stack.pop() match { + case Concat(children) => + stack.pushAll(children.reverse) + case child => + flattened += child + } + } + Concat(flattened) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.transformExpressionsDown { + case concat: Concat if concat.children.exists(_.isInstanceOf[Concat]) => + flattenConcat(concat) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala new file mode 100644 index 000000000000..0f5a0df30d99 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.types.StringType + + +class CombineConcatSuite extends PlanTest with PredicateHelper { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("CombineConcatSuite", FixedPoint(50), CombineConcat) :: Nil + } + + protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { + val correctAnswer = Project(Alias(e2, "out")() :: Nil, OneRowRelation).analyze + val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, OneRowRelation).analyze) + comparePlans(actual, correctAnswer) + } + + test("combine nested Concat exprs") { + def str(s: String): Literal = Literal(s, StringType) + assertEquivalent( + Concat( + Concat(str("a") :: str("b") :: Nil) :: + str("c") :: + str("d") :: + Nil), + Concat(str("a") :: str("b") :: str("c") :: str("d") :: Nil)) + assertEquivalent( + Concat( + str("a") :: + Concat(str("b") :: str("c") :: Nil) :: + str("d") :: + Nil), + Concat(str("a") :: str("b") :: str("c") :: str("d") :: Nil)) + assertEquivalent( + Concat( + str("a") :: + str("b") :: + Concat(str("c") :: str("d") :: Nil) :: + Nil), + Concat(str("a") :: str("b") :: str("c") :: str("d") :: Nil)) + assertEquivalent( + Concat( + Concat( + str("a") :: + Concat( + str("b") :: + Concat(str("c") :: str("d") :: Nil) :: + Nil) :: + Nil) :: + Nil), + Concat(str("a") :: str("b") :: str("c") :: str("d") :: Nil)) + } +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql index 7005cafe35ca..f685779cd34a 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/string-functions.sql @@ -4,3 +4,7 @@ select format_string(); -- A pipe operator for string concatenation select 'a' || 'b' || 'c'; + +-- Check if catalyst combine nested `Concat`s +EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col +FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)); diff --git a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out index 8ee075118e10..d48d1a80c03b 100644 --- a/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/string-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 3 +-- Number of queries: 4 -- !query 0 @@ -26,3 +26,29 @@ select 'a' || 'b' || 'c' struct -- !query 2 output abc + + +-- !query 3 +EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col +FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) +-- !query 3 schema +struct +-- !query 3 output +== Parsed Logical Plan == +'Project [concat(concat(concat('col1, 'col2), 'col3), 'col4) AS col#x] ++- 'Project ['id AS col1#x, 'id AS col2#x, 'id AS col3#x, 'id AS col4#x] + +- 'UnresolvedTableValuedFunction range, [10] + +== Analyzed Logical Plan == +col: string +Project [concat(concat(concat(cast(col1#xL as string), cast(col2#xL as string)), cast(col3#xL as string)), cast(col4#xL as string)) AS col#x] ++- Project [id#xL AS col1#xL, id#xL AS col2#xL, id#xL AS col3#xL, id#xL AS col4#xL] + +- Range (0, 10, step=1, splits=None) + +== Optimized Logical Plan == +Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)) AS col#x] ++- Range (0, 10, step=1, splits=None) + +== Physical Plan == +*Project [concat(cast(id#xL as string), cast(id#xL as string), cast(id#xL as string), cast(id#xL as string)) AS col#x] ++- *Range (0, 10, step=1, splits=2) From 4a9869327311a073b7c6e2197605f8422c2154ba Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sun, 14 May 2017 00:10:18 +0900 Subject: [PATCH 2/3] Rename class --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 6 +++--- .../{CombineConcatSuite.scala => CombineConcatsSuite.scala} | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/{CombineConcatSuite.scala => CombineConcatsSuite.scala} (94%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3090e22bd2a2..1802cd4bb131 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -112,7 +112,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) SimplifyCreateStructOps, SimplifyCreateArrayOps, SimplifyCreateMapOps, - CombineConcat) ++ + CombineConcats) ++ extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, CheckCartesianProducts(conf)) :: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index e609754dd762..d3ef5ea84091 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -548,9 +548,9 @@ object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] { /** * Combine nested [[Concat]] expressions. */ -object CombineConcat extends Rule[LogicalPlan] { +object CombineConcats extends Rule[LogicalPlan] { - private def flattenConcat(concat: Concat): Concat = { + private def flattenConcats(concat: Concat): Concat = { val stack = Stack[Expression](concat) val flattened = ArrayBuffer.empty[Expression] while (stack.nonEmpty) { @@ -566,6 +566,6 @@ object CombineConcat extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan.transformExpressionsDown { case concat: Concat if concat.children.exists(_.isInstanceOf[Concat]) => - flattenConcat(concat) + flattenConcats(concat) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala similarity index 94% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala index 0f5a0df30d99..528ced39cf26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.StringType -class CombineConcatSuite extends PlanTest with PredicateHelper { +class CombineConcatsSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("CombineConcatSuite", FixedPoint(50), CombineConcat) :: Nil + val batches = Batch("CombineConcatsSuite", FixedPoint(50), CombineConcats) :: Nil } protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { From 676d4838b8f6d79193cc68ca44b2ae115a392ab2 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 15 May 2017 00:40:35 +0900 Subject: [PATCH 3/3] Remove PredicateHelper --- .../spark/sql/catalyst/optimizer/CombineConcatsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala index 528ced39cf26..7aa9fbba9a10 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombineConcatsSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.types.StringType -class CombineConcatsSuite extends PlanTest with PredicateHelper { +class CombineConcatsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("CombineConcatsSuite", FixedPoint(50), CombineConcats) :: Nil