Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val ENABLE_PARTITION_PRUNER_FOR_STATS = SQLConfigBuilder("spark.sql.statistics.partitionPruner")
.doc("When true, some predicates will be pushed down into MetastoreRelation so that " +
"determining if partitions that are involved are small enough to use auto broadcast joins.")
.booleanConf
.createWithDefault(false)

val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
.internal()
.doc("The default table size used in query planning. By default, it is set to Long.MaxValue " +
Expand Down Expand Up @@ -710,6 +716,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def partitionPrunerForStatsEnabled: Boolean = getConf(ENABLE_PARTITION_PRUNER_FOR_STATS)

def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, PredicateHelper}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkOptimizer
import org.apache.spark.sql.internal.SQLConf

class HiveOptimizer (
sparkSession: SparkSession,
catalog: HiveSessionCatalog,
conf: SQLConf,
experimentalMethods: ExperimentalMethods)
extends SparkOptimizer(catalog, conf, experimentalMethods) {

override def batches: Seq[Batch] = super.batches :+
Batch("Push filter into relation", Once, PushFilterIntoRelation(conf))
}

case class PushFilterIntoRelation(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper {

override def apply(plan: LogicalPlan): LogicalPlan = {
if (!conf.partitionPrunerForStatsEnabled) {
return plan
}

plan.transform {
case filter @ Filter(condition, relation: MetastoreRelation)
if relation.partitionKeys.nonEmpty && condition.deterministic =>
val partitionKeyIds = AttributeSet(relation.partitionKeys)
val predicates = splitConjunctivePredicates(condition)
val pruningPredicates = predicates.filter { predicate =>
!predicate.references.isEmpty &&
predicate.references.subsetOf(partitionKeyIds)
}
if (pruningPredicates.nonEmpty) {
relation.partitionPruningPred = pruningPredicates
}
filter
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.client.HiveClient
Expand Down Expand Up @@ -70,6 +71,13 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
}
}

/**
* Logical query plan optimizer that takes into account Hive.
*/
override lazy val optimizer: Optimizer =
new HiveOptimizer(sparkSession, catalog, conf, experimentalMethods)


/**
* Planner that takes into account Hive-specific strategies.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,30 @@ import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.StructField


private[hive] case class MetastoreRelation(
databaseName: String,
tableName: String)
tableName: String,
var partitionPruningPred: Seq[Expression] = Seq.empty)
(val catalogTable: CatalogTable,
@transient private val sparkSession: SparkSession)
@transient val sparkSession: SparkSession)
extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {

override def equals(other: Any): Boolean = other match {
case relation: MetastoreRelation =>
databaseName == relation.databaseName &&
tableName == relation.tableName &&
output == relation.output
output == relation.output &&
partitionPruningPred.size == relation.partitionPruningPred.size &&
(partitionPruningPred, relation.partitionPruningPred).zipped.forall(_ semanticEquals _)
case _ => false
}

Expand Down Expand Up @@ -107,41 +110,48 @@ private[hive] case class MetastoreRelation(
new HiveTable(tTable)
}

@transient override lazy val statistics: Statistics = {
catalogTable.stats.getOrElse(Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`,
// which is generated by analyze command.
if (totalSize != null && totalSize.toLong > 0L) {
totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore.
// Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys
// (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we try to get the file size from HDFS.
// given this is only needed for optimization, if the HDFS call fails we return the default.
if (totalSize != null && totalSize.toLong > 0L) {
totalSize.toLong
} else if (rawDataSize != null && rawDataSize.toLong > 0) {
rawDataSize.toLong
} else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) {
try {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
if (partitionPruningPred.isEmpty ||
!sparkSession.sessionState.conf.partitionPrunerForStatsEnabled) {
val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf)
fs.getContentSummary(hiveQlTable.getPath).getLength
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
} else {
val partitions = prunePartitions(getHiveQlPartitions(partitionPruningPred))
partitions.map { partition =>
val fs: FileSystem = partition.getDataLocation.getFileSystem(hadoopConf)
fs.getContentSummary(partition.getDataLocation).getLength
}.sum
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
))
}
} catch {
case e: IOException =>
logWarning("Failed to get table size from hdfs.", e)
sparkSession.sessionState.conf.defaultSizeInBytes
}
} else {
sparkSession.sessionState.conf.defaultSizeInBytes
})
}
)

// When metastore partition pruning is turned off, we cache the list of all partitions to
// mimic the behavior of Spark < 1.5
Expand Down Expand Up @@ -194,11 +204,41 @@ private[hive] case class MetastoreRelation(
}
}

/**
* Prunes partitions not involve the query plan.
*
* @param partitions All partitions of the relation.
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[Partition]) = {
val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
BindReferences.bindReference(pred, partitionKeys)
}
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
val dataTypes = partitionKeys.map(_.dataType)
val castedValues = part.getValues.asScala.zip(dataTypes)
.map { case (value, dataType) => Cast(Literal(value), dataType).eval(null) }

// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
val row = InternalRow.fromSeq(castedValues)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}

/** Only compare database and tablename, not alias. */
override def sameResult(plan: LogicalPlan): Boolean = {
plan.canonicalized match {
case mr: MetastoreRelation =>
mr.databaseName == databaseName && mr.tableName == tableName
mr.databaseName == databaseName && mr.tableName == tableName &&
partitionPruningPred.size == mr.partitionPruningPred.size &&
(partitionPruningPred, mr.partitionPruningPred).zipped.forall(_ semanticEquals _)
case _ => false
}
}
Expand Down Expand Up @@ -253,6 +293,6 @@ private[hive] case class MetastoreRelation(
}

override def newInstance(): MetastoreRelation = {
MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession)
MetastoreRelation(databaseName, tableName, partitionPruningPred)(catalogTable, sparkSession)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSi
val relation = MetastoreRelation("db", "test")(table, null)

// No exception should be thrown
relation.makeCopy(Array("db", "test"))
relation.makeCopy(Array("db", "test", Seq.empty))
// No exception should be thrown
relation.toJSON
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,41 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
}
}

test("MetastoreRelations fallback to hdfs of scanned partitions for size estimation") {
withTempView("tempTbl", "largeTbl", "partTbl") {
spark.range(0, 1000, 1, 2).selectExpr("id as col1", "id as col2")
.createOrReplaceTempView("tempTbl")
spark.range(0, 100000, 1, 2).selectExpr("id as col1", "id as col2").
createOrReplaceTempView("largeTbl")
sql("CREATE TABLE partTbl (col1 INT, col2 STRING) " +
"PARTITIONED BY (part1 STRING, part2 INT) STORED AS textfile")
for (part1 <- Seq("a", "b", "c", "d"); part2 <- Seq(1, 2)) {
sql(
s"""
|INSERT OVERWRITE TABLE partTbl PARTITION (part1='$part1',part2='$part2')
|select col1, col2 from tempTbl
""".stripMargin)
}
val query = "select * from largeTbl join partTbl on (largeTbl.col1 = partTbl.col1 " +
"and partTbl.part1 = 'a' and partTbl.part2 = 1)"
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "8001") {

withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "true") {
val broadcastJoins =
sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(broadcastJoins.nonEmpty)
}

withSQLConf(SQLConf.ENABLE_PARTITION_PRUNER_FOR_STATS.key -> "false") {
val broadcastJoins =
sql(query).queryExecution.sparkPlan.collect { case j: BroadcastHashJoinExec => j }
assert(broadcastJoins.isEmpty)
}
}
}
}

test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
Expand Down