Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS =
buildConf("spark.sql.statistics.statisticViaFileSystem.maxPartitionNumber")
.doc("If the number of table (can be either hive table or data source table ) partitions " +
"exceed this value, statistic calculation via file system is not allowed. This is to " +
"avoid calculating size of large number of partitions via file system, eg. HDFS, which " +
"is very time consuming. Setting this value to negative will disable statistic " +
"calculation via file system.")
.intConf
.createWithDefault(100)

val NDV_MAX_ERROR =
buildConf("spark.sql.statistics.ndv.maxError")
.internal()
Expand Down Expand Up @@ -2564,6 +2574,9 @@ class SQLConf extends Serializable with Logging {

def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)

def maxPartNumForStatsCalculateViaFS: Int =
getConf(MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS)

def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)

def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable}
import org.apache.spark.sql.types.StructType

private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression,
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -73,19 +74,30 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession)
private def updateTableMeta(
tableMeta: CatalogTable,
prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = {
val sizeOfPartitions = prunedPartitions.map { partition =>
val partitionsWithSize = prunedPartitions.map { partition =>
val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong)
val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
if (rawDataSize.isDefined && rawDataSize.get > 0) {
rawDataSize.get
(partition, rawDataSize.get)
} else if (totalSize.isDefined && totalSize.get > 0L) {
totalSize.get
(partition, totalSize.get)
} else {
0L
(partition, 0L)
}
}
if (sizeOfPartitions.forall(_ > 0)) {
val sizeInBytes = sizeOfPartitions.sum
if (partitionsWithSize.forall(_._2 > 0)) {
val sizeInBytes = partitionsWithSize.map(_._2).sum
tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
} else if (partitionsWithSize.count(_._2 == 0) <= conf.maxPartNumForStatsCalculateViaFS) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to do calculation here? I think it introduces extra cost.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean just leave the tableMeta unchanged (which is table level meta without partition pruning) if there is at least one partition whose size is not available in meta store ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

@fuwhu fuwhu Jan 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some discussion about this was in #26805 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fuwhu, are you're proposing a configuration to automatically calculate the size? why don't you just manually run analyze comment to calculate the stats? It's weird to do this based on the number of partitions.

val sizeInBytes =
partitionsWithSize.map(pair => {
if (pair._2 == 0) {
CommandUtils.calculateLocationSize(
session.sessionState, tableMeta.identifier, pair._1.storage.locationUri)
} else {
pair._2
}
}).sum
tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
} else {
tableMeta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,31 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te

test("PruneFileSourcePartitions should not change the output of LogicalRelation") {
withTable("test") {
withTempDir { dir =>
sql(
s"""
|CREATE EXTERNAL TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS parquet
|LOCATION '${dir.toURI}'""".stripMargin)
sql(
s"""
|CREATE TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS parquet""".stripMargin)

val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)

val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)
})
val relation = HadoopFsRelation(
location = catalogFileIndex,
partitionSchema = tableMeta.partitionSchema,
dataSchema = dataSchema,
bucketSpec = None,
fileFormat = new ParquetFileFormat(),
options = Map.empty)(sparkSession = spark)
val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq(Symbol("i"), Symbol("p")),
Filter(Symbol("p") === 1, logicalRelation)).analyze

val logicalRelation = LogicalRelation(relation, tableMeta)
val query = Project(Seq(Symbol("i"), Symbol("p")),
Filter(Symbol("p") === 1, logicalRelation)).analyze

val optimized = Optimize.execute(query)
assert(optimized.missingInput.isEmpty)
}
val optimized = Optimize.execute(query)
assert(optimized.missingInput.isEmpty)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils

class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
Expand All @@ -32,7 +37,7 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes
EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil
}

test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") {
test("SPARK-15616 statistics pruned after going through PruneHiveTablePartitions") {
withTable("test", "temp") {
sql(
s"""
Expand All @@ -54,4 +59,55 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes
Optimize.execute(analyzed2).stats.sizeInBytes)
}
}

test("SPARK-30427 spark.sql.statistics.fallBackToFs.maxPartitionNumber can not control " +
"the action of rule PruneHiveTablePartitions when sizeInBytes of all partitions are " +
"available in meta data.") {
withTable("test", "temp") {
sql(
s"""
|CREATE TABLE test(i int)
|PARTITIONED BY (p int)
|STORED AS textfile""".stripMargin)

spark.range(0, 1000, 1).selectExpr("id as col")
.createOrReplaceTempView("temp")
for (part <- Seq(1, 2, 3, 4)) {
sql(s"""
|INSERT OVERWRITE TABLE test PARTITION (p='$part')
|select col from temp""".stripMargin)
}

val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
val relation =
HiveTableRelation(tableMeta,
tableMeta.dataSchema.asNullable.toAttributes,
tableMeta.partitionSchema.asNullable.toAttributes)
val query = Project(Seq(Symbol("i"), Symbol("p")),
Filter(Symbol("p") === 1, relation)).analyze
var plan1: LogicalPlan = null
var plan2: LogicalPlan = null
var plan3: LogicalPlan = null
var plan4: LogicalPlan = null
withSQLConf(
SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"-1") {
plan1 = Optimize.execute(query)
}
withSQLConf(
SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"1") {
plan2 = Optimize.execute(query)
}
withSQLConf(
SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"2") {
plan3 = Optimize.execute(query)
}
withSQLConf(
SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"3") {
plan4 = Optimize.execute(query)
}
assert(plan1.stats.sizeInBytes === plan2.stats.sizeInBytes)
assert(plan2.stats.sizeInBytes === plan3.stats.sizeInBytes)
assert(plan3.stats.sizeInBytes === plan4.stats.sizeInBytes)
}
}
}