From 009944fcc1492890026997da5f56a5ac7a02e3e1 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Tue, 24 Sep 2019 23:22:49 +0800 Subject: [PATCH 1/8] [SPARK-15616][SQL] Hive table supports partition pruning in JoinSelection --- .../sql/hive/HiveSessionStateBuilder.scala | 17 +++++- .../spark/sql/hive/HiveStrategies.scala | 61 ++++++++++++++++++- .../spark/sql/hive/StatisticsSuite.scala | 31 ++++++++++ 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 3df77fec2099..7f9f0ff78217 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -21,9 +21,10 @@ import org.apache.spark.annotation.Unstable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, ResolveSessionCatalog} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlanner +import org.apache.spark.sql.execution.{SparkOptimizer, SparkPlanner} import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck @@ -93,6 +94,20 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session customCheckRules } + /** + * Logical query plan optimizer that takes into account Hive. + */ + override protected def optimizer: Optimizer = { + new SparkOptimizer(catalog, experimentalMethods) { + override def postHocOptimizationBatches: Seq[Batch] = Seq( + Batch("Prune Hive Table Partitions", Once, PruneHiveTablePartitions(session)) + ) + + override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = + super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules + } + } + /** * Planner that takes into account Hive-specific strategies. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 33ca1889e944..b96070223fc6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -21,15 +21,16 @@ import java.io.IOException import java.util.Locale import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -231,6 +232,62 @@ case class RelationConversions( } } +/** + * TODO: merge this with PruneFileSourcePartitions after we completely make hive as a data source. + */ +case class PruneHiveTablePartitions( + session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => + val predicates = splitConjunctivePredicates(condition) + val normalizedFilters = predicates.map { e => + e transform { + case a: AttributeReference => + a.withName(relation.output.find(_.semanticEquals(a)).get.name) + } + } + val partitionSet = AttributeSet(relation.partitionCols) + val pruningPredicates = normalizedFilters.filter { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionSet) + } + val conf = session.sessionState.conf + if (pruningPredicates.nonEmpty && conf.fallBackToHdfsForStatsEnabled && + conf.metastorePartitionPruning) { + val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + pruningPredicates, + conf.sessionLocalTimeZone) + val sizeInBytes = try { + prunedPartitions.map { part => + val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get + } else if (totalSize.isDefined && totalSize.get > 0L) { + totalSize.get + } else { + CommandUtils.calculateLocationSize( + session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) + } + }.sum + } catch { + case e: IOException => + logWarning("Failed to get table size from HDFS.", e) + conf.defaultSizeInBytes + } + val withStats = relation.tableMeta.copy( + stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + val prunedCatalogRelation = relation.copy(tableMeta = withStats) + val filterExpression = predicates.reduceLeft(And) + Filter(filterExpression, prunedCatalogRelation) + } else { + filter + } + } +} + private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 40581066c62b..8b483441970f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1514,4 +1514,35 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } } + + test("Broadcast join can by inferred if partitioned table can be pruned under threshold") { + withTempView("tempTbl", "largeTbl") { + withTable("partTbl") { + spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("tempTbl") + spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("largeTbl") + sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " + + "PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile") + for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2') + |select col1, col2 from tempTbl + """.stripMargin) + } + val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + + "and partTbl.part1 = 'a' and partTbl.part2 = 1)" + Seq(true, false).foreach { partitionPruning => + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001", + SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> s"$partitionPruning") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.nonEmpty === partitionPruning) + } + } + } + } + } } From e744da59aad15e8d1f5c7582fab195ddfde0a18c Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Sun, 20 Oct 2019 11:59:49 +0800 Subject: [PATCH 2/8] Keep pruned partition in HiveTableRelation to avoid multiple calls to hive meta store. --- .../spark/sql/catalyst/catalog/interface.scala | 7 +++++-- .../apache/spark/sql/hive/HiveStrategies.scala | 16 ++++++++++------ .../sql/hive/execution/HiveTableScanExec.scala | 14 ++++++++++---- .../sql/hive/execution/HiveTableScanSuite.scala | 14 ++++++++++---- 4 files changed, 35 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f653bf41c162..0afbf9ce6718 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Expression, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} @@ -610,7 +610,10 @@ case class HiveTableRelation( tableMeta: CatalogTable, dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference], - tableStats: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation { + tableStats: Option[Statistics] = None, + @transient normalizedFilters: Seq[Expression] = Nil, + @transient prunedPartitions: Seq[CatalogTablePartition] = Nil) + extends LeafNode with MultiInstanceRelation { assert(tableMeta.identifier.database.isDefined) assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) assert(tableMeta.dataSchema.sameType(dataCols.toStructType)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b96070223fc6..62f47fe251da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -252,26 +252,29 @@ case class PruneHiveTablePartitions( predicate.references.subsetOf(partitionSet) } val conf = session.sessionState.conf - if (pruningPredicates.nonEmpty && conf.fallBackToHdfsForStatsEnabled && - conf.metastorePartitionPruning) { + if (conf.metastorePartitionPruning && pruningPredicates.nonEmpty) { val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( relation.tableMeta.database, relation.tableMeta.identifier.table, pruningPredicates, conf.sessionLocalTimeZone) val sizeInBytes = try { - prunedPartitions.map { part => + val sizeOfPartitions = prunedPartitions.map { part => val rawDataSize = part.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) val totalSize = part.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) if (rawDataSize.isDefined && rawDataSize.get > 0) { rawDataSize.get } else if (totalSize.isDefined && totalSize.get > 0L) { totalSize.get - } else { + } else if (conf.fallBackToHdfsForStatsEnabled) { CommandUtils.calculateLocationSize( session.sessionState, relation.tableMeta.identifier, part.storage.locationUri) + } else { // we cannot get any size statics here. Use 0 as the default size to sum up. + 0L } }.sum + // If size of partitions is zero fall back to the default size. + if (sizeOfPartitions == 0L) conf.defaultSizeInBytes else sizeOfPartitions } catch { case e: IOException => logWarning("Failed to get table size from HDFS.", e) @@ -279,9 +282,10 @@ case class PruneHiveTablePartitions( } val withStats = relation.tableMeta.copy( stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) - val prunedCatalogRelation = relation.copy(tableMeta = withStats) + val prunedHiveTableRelation = relation.copy(tableMeta = withStats, + normalizedFilters = pruningPredicates, prunedPartitions = prunedPartitions) val filterExpression = predicates.reduceLeft(And) - Filter(filterExpression, prunedCatalogRelation) + Filter(filterExpression, prunedHiveTableRelation) } else { filter } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5b00e2ebafa4..a86d784eb6a5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -166,14 +166,20 @@ case class HiveTableScanExec( @transient lazy val rawPartitions = { val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning && - partitionPruningPred.size > 0) { + partitionPruningPred.nonEmpty) { // Retrieve the original attributes based on expression ID so that capitalization matches. val normalizedFilters = partitionPruningPred.map(_.transform { case a: AttributeReference => originalAttributes(a) }) - sparkSession.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, - normalizedFilters) + val isFiltersEqual = normalizedFilters.zip(relation.normalizedFilters) + .forall { case (e1, e2) => e1.semanticEquals(e2) } + if (isFiltersEqual) { + relation.prunedPartitions + } else { + sparkSession.sessionState.catalog.listPartitionsByFilter( + relation.tableMeta.identifier, + normalizedFilters) + } } else { sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 3f9bb8de42e0..e97ee83bf766 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -128,8 +128,12 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH // If the pruning predicate is used, getHiveQlPartitions should only return the // qualified partition; Otherwise, it return all the partitions. val expectedNumPartitions = if (hivePruning == "true") 1 else 2 - checkNumScannedPartitions( - stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'", expectedNumPartitions) + val stmt = s"SELECT id, p2 FROM $table WHERE p2 <= 'b'" + checkNumScannedPartitions(stmt = stmt, expectedNumPartitions) + // prunedPartitions are held in HiveTableRelation + val prunedNumPartitions = if (hivePruning == "true") 1 else 0 + assert( + getHiveTableScanExec(stmt).relation.prunedPartitions.size === prunedNumPartitions) } } @@ -137,8 +141,10 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> hivePruning) { // If the pruning predicate does not exist, getHiveQlPartitions should always // return all the partitions. - checkNumScannedPartitions( - stmt = s"SELECT id, p2 FROM $table WHERE id <= 3", expectedNumParts = 2) + val stmt = s"SELECT id, p2 FROM $table WHERE id <= 3" + checkNumScannedPartitions(stmt = stmt, expectedNumParts = 2) + // no pruning is triggered, no partitions are held in HiveTableRelation + assert(getHiveTableScanExec(stmt).relation.prunedPartitions.isEmpty) } } } From 12e1dc56db32ee4c939e8ad8260d484a30c7b3f8 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Sun, 20 Oct 2019 14:10:36 +0800 Subject: [PATCH 3/8] Fix compile error --- .../spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index 5ff33b9cfbfc..f5fc2d8a4e28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -84,7 +84,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { queryRelations.add(alias.identifier) case LogicalRelation(_, _, Some(catalogTable), _) => queryRelations.add(catalogTable.identifier.table) - case HiveTableRelation(tableMeta, _, _, _) => + case HiveTableRelation(tableMeta, _, _, _, _, _) => queryRelations.add(tableMeta.identifier.table) case _ => } From b334e9926cc810f73e2c41934021ef7b5ffe7128 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Sun, 20 Oct 2019 14:34:50 +0800 Subject: [PATCH 4/8] Fix compile error --- .../org/apache/spark/sql/hive/HiveSessionStateBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 7f9f0ff78217..1d71f4770899 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -98,7 +98,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session * Logical query plan optimizer that takes into account Hive. */ override protected def optimizer: Optimizer = { - new SparkOptimizer(catalog, experimentalMethods) { + new SparkOptimizer(catalogManager, catalog, experimentalMethods) { override def postHocOptimizationBatches: Seq[Batch] = Seq( Batch("Prune Hive Table Partitions", Once, PruneHiveTablePartitions(session)) ) From 17e0ba0b01c426f67ff8f51ce2cba9bbbf6f4376 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Mon, 21 Oct 2019 00:07:58 +0800 Subject: [PATCH 5/8] Exclude normalizedFilters and prunedPartitions in canonicalization. --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 0afbf9ce6718..c7b20bb098d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -633,7 +633,9 @@ case class HiveTableRelation( }, partitionCols = partitionCols.zipWithIndex.map { case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) - } + }, + normalizedFilters = Nil, + prunedPartitions = Nil ) override def computeStats(): Statistics = { From 8d615f73fccd47f3690af4d4806f120e5c058d9e Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Mon, 21 Oct 2019 10:56:47 +0800 Subject: [PATCH 6/8] Remove scalar subquery in partition expression. --- .../org/apache/spark/sql/hive/HiveStrategies.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 62f47fe251da..333aadff523c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -247,10 +247,12 @@ case class PruneHiveTablePartitions( } } val partitionSet = AttributeSet(relation.partitionCols) - val pruningPredicates = normalizedFilters.filter { predicate => - !predicate.references.isEmpty && - predicate.references.subsetOf(partitionSet) - } + // SPARK-24085: remove scalar subquery in partition expression then get normalized predicates + val pruningPredicates = normalizedFilters + .filterNot(SubqueryExpression.hasSubquery) + .filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) + } val conf = session.sessionState.conf if (conf.metastorePartitionPruning && pruningPredicates.nonEmpty) { val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( From 86a0d9c489501e1e7b42351154973f21a838763b Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Sat, 26 Oct 2019 00:22:17 +0800 Subject: [PATCH 7/8] Address comments: 1. don't store pruningFilters in HiveTableRelation 2. follow PruneFilSourcePartitions's style to extract projections, predicates and hive relation 3. skip partition pruning if scalar subquery is involved. --- .../sql/catalyst/catalog/interface.scala | 5 +--- .../benchmark/TPCDSQueryBenchmark.scala | 2 +- .../spark/sql/hive/HiveStrategies.scala | 28 +++++++++---------- .../hive/execution/HiveTableScanExec.scala | 10 ++----- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c7b20bb098d2..cd87c6eca05b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -611,8 +611,7 @@ case class HiveTableRelation( dataCols: Seq[AttributeReference], partitionCols: Seq[AttributeReference], tableStats: Option[Statistics] = None, - @transient normalizedFilters: Seq[Expression] = Nil, - @transient prunedPartitions: Seq[CatalogTablePartition] = Nil) + @transient prunedPartitions: Option[Seq[CatalogTablePartition]] = None) extends LeafNode with MultiInstanceRelation { assert(tableMeta.identifier.database.isDefined) assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) @@ -634,8 +633,6 @@ case class HiveTableRelation( partitionCols = partitionCols.zipWithIndex.map { case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) }, - normalizedFilters = Nil, - prunedPartitions = Nil ) override def computeStats(): Statistics = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala index f5fc2d8a4e28..e8ce7aef75ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala @@ -84,7 +84,7 @@ object TPCDSQueryBenchmark extends SqlBasedBenchmark { queryRelations.add(alias.identifier) case LogicalRelation(_, _, Some(catalogTable), _) => queryRelations.add(catalogTable.identifier.table) - case HiveTableRelation(tableMeta, _, _, _, _, _) => + case HiveTableRelation(tableMeta, _, _, _, _) => queryRelations.add(tableMeta.identifier.table) case _ => } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 333aadff523c..cf806cda4119 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, InsertIntoDir, InsertIntoStatement, LogicalPlan, Project, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} @@ -238,8 +238,8 @@ case class RelationConversions( case class PruneHiveTablePartitions( session: SparkSession) extends Rule[LogicalPlan] with PredicateHelper { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case filter @ Filter(condition, relation: HiveTableRelation) if relation.isPartitioned => - val predicates = splitConjunctivePredicates(condition) + case op @ PhysicalOperation(projections, predicates, relation: HiveTableRelation) + if predicates.nonEmpty && relation.isPartitioned && relation.prunedPartitions.isEmpty => val normalizedFilters = predicates.map { e => e transform { case a: AttributeReference => @@ -247,14 +247,13 @@ case class PruneHiveTablePartitions( } } val partitionSet = AttributeSet(relation.partitionCols) - // SPARK-24085: remove scalar subquery in partition expression then get normalized predicates - val pruningPredicates = normalizedFilters - .filterNot(SubqueryExpression.hasSubquery) - .filter { predicate => - !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) - } + val pruningPredicates = normalizedFilters.filter { predicate => + !predicate.references.isEmpty && predicate.references.subsetOf(partitionSet) + } + // SPARK-24085: scalar subquery should be skipped for partition pruning + val hasScalarSubquery = pruningPredicates.exists(SubqueryExpression.hasSubquery) val conf = session.sessionState.conf - if (conf.metastorePartitionPruning && pruningPredicates.nonEmpty) { + if (conf.metastorePartitionPruning && pruningPredicates.nonEmpty && !hasScalarSubquery) { val prunedPartitions = session.sharedState.externalCatalog.listPartitionsByFilter( relation.tableMeta.database, relation.tableMeta.identifier.table, @@ -284,12 +283,13 @@ case class PruneHiveTablePartitions( } val withStats = relation.tableMeta.copy( stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) - val prunedHiveTableRelation = relation.copy(tableMeta = withStats, - normalizedFilters = pruningPredicates, prunedPartitions = prunedPartitions) + val prunedHiveTableRelation = + relation.copy(tableMeta = withStats, prunedPartitions = Some(prunedPartitions)) val filterExpression = predicates.reduceLeft(And) - Filter(filterExpression, prunedHiveTableRelation) + val filter = Filter(filterExpression, prunedHiveTableRelation) + Project(projections, filter) } else { - filter + op } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index a86d784eb6a5..d353d5c546ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -171,15 +171,9 @@ case class HiveTableScanExec( val normalizedFilters = partitionPruningPred.map(_.transform { case a: AttributeReference => originalAttributes(a) }) - val isFiltersEqual = normalizedFilters.zip(relation.normalizedFilters) - .forall { case (e1, e2) => e1.semanticEquals(e2) } - if (isFiltersEqual) { - relation.prunedPartitions - } else { + relation.prunedPartitions.getOrElse( sparkSession.sessionState.catalog.listPartitionsByFilter( - relation.tableMeta.identifier, - normalizedFilters) - } + relation.tableMeta.identifier, normalizedFilters)) } else { sparkSession.sessionState.catalog.listPartitions(relation.tableMeta.identifier) } From ecfbe4d975aeee1da38240753c99a6927a8aa690 Mon Sep 17 00:00:00 2001 From: Xianjin YE Date: Sat, 26 Oct 2019 12:07:50 +0800 Subject: [PATCH 8/8] Fix style issue. --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cd87c6eca05b..96e8e7852529 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Expression, ExprId, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} @@ -632,7 +632,7 @@ case class HiveTableRelation( }, partitionCols = partitionCols.zipWithIndex.map { case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) - }, + } ) override def computeStats(): Statistics = {