diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc31f3bc323f..6ccf1f765f37 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -132,6 +132,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ENABLE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.statistics.partitionPruner") + .doc("When true, some predicates will be pushed down into MetastoreRelation so that " + + "determining if partitions that are involved are small enough to use auto broadcast joins.") + .booleanConf + .createWithDefault(false) + val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to Long.MaxValue " + @@ -710,6 +716,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def partitionPrunerForStatsEnabled: Boolean = getConf(ENABLE_PARTITION_PRUNER_FOR_STATS) + def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala new file mode 100644 index 000000000000..e0852f23b745 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveOptimizer.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{ExperimentalMethods, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, PredicateHelper} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.internal.SQLConf + +class HiveOptimizer ( + sparkSession: SparkSession, + catalog: HiveSessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def batches: Seq[Batch] = super.batches :+ + Batch("Push filter into relation", Once, PushFilterIntoRelation(conf)) +} + +case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!conf.partitionPrunerForStatsEnabled) { + return plan + } + + plan.transform { + case filter @ Filter(condition, relation: MetastoreRelation) + if relation.partitionKeys.nonEmpty && condition.deterministic => + val partitionKeyIds = AttributeSet(relation.partitionKeys) + val predicates = splitConjunctivePredicates(condition) + val pruningPredicates = predicates.filter { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + if (pruningPredicates.nonEmpty) { + relation.partitionPruningPred = pruningPredicates + } + filter + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 6d4fe1a941a9..5c05942610c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.client.HiveClient @@ -70,6 +71,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) } } + /** + * Logical query plan optimizer that takes into account Hive. + */ + override lazy val optimizer: Optimizer = + new HiveOptimizer(sparkSession, catalog, conf, experimentalMethods) + + /** * Planner that takes into account Hive-specific strategies. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index da809cf991de..feb340172cc4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -30,27 +30,30 @@ import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.types.StructField - private[hive] case class MetastoreRelation( databaseName: String, - tableName: String) + tableName: String, + var partitionPruningPred: Seq[Expression] = Seq.empty) (val catalogTable: CatalogTable, - @transient private val sparkSession: SparkSession) + @transient val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => databaseName == relation.databaseName && tableName == relation.tableName && - output == relation.output + output == relation.output && + partitionPruningPred.size == relation.partitionPruningPred.size && + (partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } @@ -107,41 +110,48 @@ private[hive] case class MetastoreRelation( new HiveTable(tTable) } - @transient override lazy val statistics: Statistics = { - catalogTable.stats.getOrElse(Statistics( - sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`, - // which is generated by analyze command. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() + @transient override lazy val statistics: Statistics = Statistics( + sizeInBytes = { + val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. + BigInt( + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead + // if the size is still less than zero, we try to get the file size from HDFS. + // given this is only needed for optimization, if the HDFS call fails we return the default. + if (totalSize != null && totalSize.toLong > 0L) { + totalSize.toLong + } else if (rawDataSize != null && rawDataSize.toLong > 0) { + rawDataSize.toLong + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + if (partitionPruningPred.isEmpty || + !sparkSession.sessionState.conf.partitionPrunerForStatsEnabled) { val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) fs.getContentSummary(hiveQlTable.getPath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes + } else { + val partitions = prunePartitions(getHiveQlPartitions(partitionPruningPred)) + partitions.map { partition => + val fs: FileSystem = partition.getDataLocation.getFileSystem(hadoopConf) + fs.getContentSummary(partition.getDataLocation).getLength + }.sum } - } else { - sparkSession.sessionState.conf.defaultSizeInBytes - }) - } - )) - } + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes + }) + } + ) // When metastore partition pruning is turned off, we cache the list of all partitions to // mimic the behavior of Spark < 1.5 @@ -194,11 +204,41 @@ private[hive] case class MetastoreRelation( } } + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[Partition]) = { + val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + BindReferences.bindReference(pred, partitionKeys) + } + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = partitionKeys.map(_.dataType) + val castedValues = part.getValues.asScala.zip(dataTypes) + .map { case (value, dataType) => Cast(Literal(value), dataType).eval(null) } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan.canonicalized match { case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName + mr.databaseName == databaseName && mr.tableName == tableName && + partitionPruningPred.size == mr.partitionPruningPred.size && + (partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _) case _ => false } } @@ -253,6 +293,6 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession) + MetastoreRelation(databaseName, tableName, partitionPruningPred)(catalogTable, sparkSession) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala index 91ff711445e8..70d497b666f5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala @@ -34,7 +34,7 @@ class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSi val relation = MetastoreRelation("db", "test")(table, null) // No exception should be thrown - relation.makeCopy(Array("db", "test")) + relation.makeCopy(Array("db", "test", Seq.empty)) // No exception should be thrown relation.toJSON } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 4f5ebc3d838b..b4433f2e4425 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -118,6 +118,41 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils } } + test("MetastoreRelations fallback to hdfs of scanned partitions for size estimation") { + withTempView("tempTbl", "largeTbl", "partTbl") { + spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2") + .createOrReplaceTempView("tempTbl") + spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2"). + createOrReplaceTempView("largeTbl") + sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " + + "PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile") + for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) { + sql( + s""" + |INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2') + |select col1, col2 from tempTbl + """.stripMargin) + } + val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " + + "and partTbl.part1 = 'a' and partTbl.part2 = 1)" + withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") { + + withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "true") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.nonEmpty) + } + + withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "false") { + val broadcastJoins = + sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j } + assert(broadcastJoins.isEmpty) + } + } + } + } + test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes