From a61e865b84e76575698679fc12e2a5a8b9c5b0fd Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Fri, 12 May 2017 15:04:20 +0800 Subject: [PATCH 1/3] fix HiveTableScanExec canonicalization --- .../spark/sql/catalyst/plans/QueryPlan.scala | 12 +++++++- .../sql/execution/DataSourceScanExec.scala | 16 ++--------- .../hive/execution/HiveTableScanExec.scala | 2 +- .../hive/execution/HiveTableScanSuite.scala | 28 ++++++++++++++----- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 2fb65bd43550..f90c7fb6e781 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -423,7 +423,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT lazy val allAttributes: AttributeSeq = children.flatMap(_.output) } -object QueryPlan { +object QueryPlan extends PredicateHelper { /** * Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference` * with its referenced ordinal from input attributes. It's similar to `BindReferences` but we @@ -442,4 +442,14 @@ object QueryPlan { } }.canonicalized.asInstanceOf[T] } + + /** Normalize and reorder the expressions in the given sequence. */ + def canonicalizeExprSeq(exprSeq: Seq[Expression], output: AttributeSeq): Seq[Expression] = { + if (exprSeq.nonEmpty) { + val normalizedExprs = QueryPlan.normalizeExprId(exprSeq.reduce(And), output) + splitConjunctivePredicates(normalizedExprs) + } else { + Nil + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 251098c9a884..596b4c98d1d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils -trait DataSourceScanExec extends LeafExecNode with CodegenSupport with PredicateHelper { +trait DataSourceScanExec extends LeafExecNode with CodegenSupport { val relation: BaseRelation val metastoreTableIdentifier: Option[TableIdentifier] @@ -519,18 +519,8 @@ case class FileSourceScanExec( relation, output.map(QueryPlan.normalizeExprId(_, output)), requiredSchema, - canonicalizeFilters(partitionFilters, output), - canonicalizeFilters(dataFilters, output), + QueryPlan.canonicalizeExprSeq(partitionFilters, output), + QueryPlan.canonicalizeExprSeq(dataFilters, output), None) } - - private def canonicalizeFilters(filters: Seq[Expression], output: Seq[Attribute]) - : Seq[Expression] = { - if (filters.nonEmpty) { - val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), output) - splitConjunctivePredicates(normalizedFilters) - } else { - Nil - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 666548d1a490..df9b1da94c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -206,7 +206,7 @@ case class HiveTableScanExec( HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), relation.canonicalized.asInstanceOf[CatalogRelation], - partitionPruningPred.map(QueryPlan.normalizeExprId(_, input)))(sparkSession) + QueryPlan.canonicalizeExprSeq(partitionPruningPred, input))(sparkSession) } override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 90e037e29279..ae64cb3210b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -164,16 +164,30 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH |PARTITION (p1='a',p2='c',p3='c',p4='d',p5='e') |SELECT v.id """.stripMargin) - val plan = sql( - s""" - |SELECT * FROM $table - """.stripMargin).queryExecution.sparkPlan - val scan = plan.collectFirst { - case p: HiveTableScanExec => p - }.get + val scan = getHiveTableScanExec(s"SELECT * FROM $table") val numDataCols = scan.relation.dataCols.length scan.rawPartitions.foreach(p => assert(p.getCols.size == numDataCols)) } } } + + test("HiveTableScanExec canonicalization for different orders of partition filters") { + val table = "hive_tbl_part" + withTable(table) { + sql( + s""" + |CREATE TABLE $table (id int) + |PARTITIONED BY (a int, b int) + """.stripMargin) + val scan1 = getHiveTableScanExec(s"SELECT * FROM $table WHERE a = 1 AND b = 2") + val scan2 = getHiveTableScanExec(s"SELECT * FROM $table WHERE b = 2 AND a = 1") + assert(scan1.sameResult(scan2)) + } + } + + private def getHiveTableScanExec(query: String): HiveTableScanExec = { + sql(query).queryExecution.sparkPlan.collectFirst { + case p: HiveTableScanExec => p + }.get + } } From eee74e7b95caac4c577333c0f884098fe7a43b75 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Fri, 12 May 2017 16:03:28 +0800 Subject: [PATCH 2/3] fix comments about naming --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 8 ++++---- .../apache/spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index f90c7fb6e781..9f917e5e0724 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -444,10 +444,10 @@ object QueryPlan extends PredicateHelper { } /** Normalize and reorder the expressions in the given sequence. */ - def canonicalizeExprSeq(exprSeq: Seq[Expression], output: AttributeSeq): Seq[Expression] = { - if (exprSeq.nonEmpty) { - val normalizedExprs = QueryPlan.normalizeExprId(exprSeq.reduce(And), output) - splitConjunctivePredicates(normalizedExprs) + def normalizePredicates(predicates: Seq[Expression], output: AttributeSeq): Seq[Expression] = { + if (predicates.nonEmpty) { + val normalized = normalizeExprId(predicates.reduce(And), output) + splitConjunctivePredicates(normalized) } else { Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 596b4c98d1d5..74fc23a52a14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -519,8 +519,8 @@ case class FileSourceScanExec( relation, output.map(QueryPlan.normalizeExprId(_, output)), requiredSchema, - QueryPlan.canonicalizeExprSeq(partitionFilters, output), - QueryPlan.canonicalizeExprSeq(dataFilters, output), + QueryPlan.normalizePredicates(partitionFilters, output), + QueryPlan.normalizePredicates(dataFilters, output), None) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index df9b1da94c8e..e191071efbf1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -206,7 +206,7 @@ case class HiveTableScanExec( HiveTableScanExec( requestedAttributes.map(QueryPlan.normalizeExprId(_, input)), relation.canonicalized.asInstanceOf[CatalogRelation], - QueryPlan.canonicalizeExprSeq(partitionPruningPred, input))(sparkSession) + QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession) } override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession) From dd9dace3f829114fb96c54959d72b182d4ef8708 Mon Sep 17 00:00:00 2001 From: wangzhenhua Date: Fri, 12 May 2017 16:53:44 +0800 Subject: [PATCH 3/3] improve comments --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 9f917e5e0724..51faa333307b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -443,7 +443,10 @@ object QueryPlan extends PredicateHelper { }.canonicalized.asInstanceOf[T] } - /** Normalize and reorder the expressions in the given sequence. */ + /** + * Composes the given predicates into a conjunctive predicate, which is normalized and reordered. + * Then returns a new sequence of predicates by splitting the conjunctive predicate. + */ def normalizePredicates(predicates: Seq[Expression], output: AttributeSeq): Seq[Expression] = { if (predicates.nonEmpty) { val normalized = normalizeExprId(predicates.reduce(And), output)