From 3947488420bfadd082508a0e18ba33f7589f57f4 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Wed, 9 Jan 2019 22:46:10 -0800 Subject: [PATCH 1/3] [SPARK-26576][SQL] Broadcast hint not applied to partitioned table --- .../datasources/PruneFileSourcePartitions.scala | 10 ++++++++-- .../PruneFileSourcePartitionsSuite.scala | 15 ++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 16b2367bfdd5..0055729c6f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, ResolvedHint} import org.apache.spark.sql.catalyst.rules.Rule private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { @@ -71,7 +71,13 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) val filter = Filter(filterExpression, prunedLogicalRelation) - Project(projects, filter) + op match { + case h: ResolvedHint => + // Restore the ResolvedHint removed by PhysicalOperation.collectProjectsAndFilters + h.copy(child = Project(projects, filter)) + case _ => + Project(projects, filter) + } } else { op } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 94384185d190..583861615d0a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -21,10 +21,11 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, ResolvedHint} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType @@ -91,4 +92,16 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te assert(size2 < tableStats.get.sizeInBytes) } } + + test("SPARK-26576 Broadcast hint not applied to partitioned table") { + withTable("tbl") { + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") + + val df = spark.table("tbl") + val hints = df.join(broadcast(df), "p").queryExecution.optimizedPlan.collect { + case h: ResolvedHint => h + } + assert(hints.size === 1, "ResolvedHint removed in optimized plan") + } + } } From 2fcfb792ec8c894e4846712da9603db39695d4d1 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 10 Jan 2019 12:10:58 -0800 Subject: [PATCH 2/3] Update unit test --- .../PruneFileSourcePartitionsSuite.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 583861615d0a..8a9adf7ca6ae 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.execution +import org.scalatest.Matchers._ + import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -25,8 +27,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StructType @@ -95,13 +99,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te test("SPARK-26576 Broadcast hint not applied to partitioned table") { withTable("tbl") { - spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") - - val df = spark.table("tbl") - val hints = df.join(broadcast(df), "p").queryExecution.optimizedPlan.collect { - case h: ResolvedHint => h + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + spark.range(10).selectExpr("id", "id % 3 as p").write.partitionBy("p").saveAsTable("tbl") + val df = spark.table("tbl") + val qe = df.join(broadcast(df), "p").queryExecution + qe.optimizedPlan.collect { case _: ResolvedHint => } should have size 1 + qe.sparkPlan.collect { case j: BroadcastHashJoinExec => j } should have size 1 } - assert(hints.size === 1, "ResolvedHint removed in optimized plan") } } } From 2189ce013b384a2f54df491ee14f815c2c932f20 Mon Sep 17 00:00:00 2001 From: John Zhuge Date: Thu, 10 Jan 2019 20:19:44 -0800 Subject: [PATCH 3/3] Remove ResolvedHint case in PhysicalOperation.unapply instead --- .../apache/spark/sql/catalyst/planning/patterns.scala | 3 --- .../datasources/PruneFileSourcePartitions.scala | 10 ++-------- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 84be677e438a..d91b89057b97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -66,9 +66,6 @@ object PhysicalOperation extends PredicateHelper { val substitutedCondition = substitute(aliases)(condition) (fields, filters ++ splitConjunctivePredicates(substitutedCondition), other, aliases) - case h: ResolvedHint => - collectProjectsAndFilters(h.child) - case other => (None, Nil, other, Map.empty) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 0055729c6f4b..16b2367bfdd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.catalog.CatalogStatistics import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, ResolvedHint} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { @@ -71,13 +71,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { // Keep partition-pruning predicates so that they are visible in physical planning val filterExpression = filters.reduceLeft(And) val filter = Filter(filterExpression, prunedLogicalRelation) - op match { - case h: ResolvedHint => - // Restore the ResolvedHint removed by PhysicalOperation.collectProjectsAndFilters - h.copy(child = Project(projects, filter)) - case _ => - Project(projects, filter) - } + Project(projects, filter) } else { op }