diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e13d65bf8182..4c9b9f0156ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1403,6 +1403,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS = + buildConf("spark.sql.statistics.statisticViaFileSystem.maxPartitionNumber") + .doc("If the number of table (can be either hive table or data source table ) partitions " + + "exceed this value, statistic calculation via file system is not allowed. This is to " + + "avoid calculating size of large number of partitions via file system, eg. HDFS, which " + + "is very time consuming. Setting this value to negative will disable statistic " + + "calculation via file system.") + .intConf + .createWithDefault(100) + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() @@ -2564,6 +2574,9 @@ class SQLConf extends Serializable with Logging { def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def maxPartNumForStatsCalculateViaFS: Int = + getConf(MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def ndvMaxError: Double = getConf(NDV_MAX_ERROR) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 1ea19c187e51..59c55c161bc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, FileScan, FileTable} import org.apache.spark.sql.types.StructType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala index a0349f627d10..2bf590dca7e6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitions.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet, Expression, import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.internal.SQLConf @@ -73,19 +74,30 @@ private[sql] class PruneHiveTablePartitions(session: SparkSession) private def updateTableMeta( tableMeta: CatalogTable, prunedPartitions: Seq[CatalogTablePartition]): CatalogTable = { - val sizeOfPartitions = prunedPartitions.map { partition => + val partitionsWithSize = prunedPartitions.map { partition => val rawDataSize = partition.parameters.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) val totalSize = partition.parameters.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) if (rawDataSize.isDefined && rawDataSize.get > 0) { - rawDataSize.get + (partition, rawDataSize.get) } else if (totalSize.isDefined && totalSize.get > 0L) { - totalSize.get + (partition, totalSize.get) } else { - 0L + (partition, 0L) } } - if (sizeOfPartitions.forall(_ > 0)) { - val sizeInBytes = sizeOfPartitions.sum + if (partitionsWithSize.forall(_._2 > 0)) { + val sizeInBytes = partitionsWithSize.map(_._2).sum + tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + } else if (partitionsWithSize.count(_._2 == 0) <= conf.maxPartNumForStatsCalculateViaFS) { + val sizeInBytes = + partitionsWithSize.map(pair => { + if (pair._2 == 0) { + CommandUtils.calculateLocationSize( + session.sessionState, tableMeta.identifier, pair._1.storage.locationUri) + } else { + pair._2 + } + }).sum tableMeta.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) } else { tableMeta diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index c9c36992906a..b5b3a08f251f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -42,35 +42,31 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te test("PruneFileSourcePartitions should not change the output of LogicalRelation") { withTable("test") { - withTempDir { dir => - sql( - s""" - |CREATE EXTERNAL TABLE test(i int) - |PARTITIONED BY (p int) - |STORED AS parquet - |LOCATION '${dir.toURI}'""".stripMargin) + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS parquet""".stripMargin) - val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0) + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = catalogFileIndex, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) - val dataSchema = StructType(tableMeta.schema.filterNot { f => - tableMeta.partitionColumnNames.contains(f.name) - }) - val relation = HadoopFsRelation( - location = catalogFileIndex, - partitionSchema = tableMeta.partitionSchema, - dataSchema = dataSchema, - bucketSpec = None, - fileFormat = new ParquetFileFormat(), - options = Map.empty)(sparkSession = spark) + val logicalRelation = LogicalRelation(relation, tableMeta) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") === 1, logicalRelation)).analyze - val logicalRelation = LogicalRelation(relation, tableMeta) - val query = Project(Seq(Symbol("i"), Symbol("p")), - Filter(Symbol("p") === 1, logicalRelation)).analyze - - val optimized = Optimize.execute(query) - assert(optimized.missingInput.isEmpty) - } + val optimized = Optimize.execute(query) + assert(optimized.missingInput.isEmpty) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala index e41709841a73..7b3a1e18682f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneHiveTablePartitionsSuite.scala @@ -18,10 +18,15 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -32,7 +37,7 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes EliminateSubqueryAliases, new PruneHiveTablePartitions(spark)) :: Nil } - test("SPARK-15616 statistics pruned after going throuhg PruneHiveTablePartitions") { + test("SPARK-15616 statistics pruned after going through PruneHiveTablePartitions") { withTable("test", "temp") { sql( s""" @@ -54,4 +59,55 @@ class PruneHiveTablePartitionsSuite extends QueryTest with SQLTestUtils with Tes Optimize.execute(analyzed2).stats.sizeInBytes) } } + + test("SPARK-30427 spark.sql.statistics.fallBackToFs.maxPartitionNumber can not control " + + "the action of rule PruneHiveTablePartitions when sizeInBytes of all partitions are " + + "available in meta data.") { + withTable("test", "temp") { + sql( + s""" + |CREATE TABLE test(i int) + |PARTITIONED BY (p int) + |STORED AS textfile""".stripMargin) + + spark.range(0, 1000, 1).selectExpr("id as col") + .createOrReplaceTempView("temp") + for (part <- Seq(1, 2, 3, 4)) { + sql(s""" + |INSERT OVERWRITE TABLE test PARTITION (p='$part') + |select col from temp""".stripMargin) + } + + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val relation = + HiveTableRelation(tableMeta, + tableMeta.dataSchema.asNullable.toAttributes, + tableMeta.partitionSchema.asNullable.toAttributes) + val query = Project(Seq(Symbol("i"), Symbol("p")), + Filter(Symbol("p") === 1, relation)).analyze + var plan1: LogicalPlan = null + var plan2: LogicalPlan = null + var plan3: LogicalPlan = null + var plan4: LogicalPlan = null + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"-1") { + plan1 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"1") { + plan2 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"2") { + plan3 = Optimize.execute(query) + } + withSQLConf( + SQLConf.MAX_PARTITION_NUMBER_FOR_STATS_CALCULATION_VIA_FS.key -> s"3") { + plan4 = Optimize.execute(query) + } + assert(plan1.stats.sizeInBytes === plan2.stats.sizeInBytes) + assert(plan2.stats.sizeInBytes === plan3.stats.sizeInBytes) + assert(plan3.stats.sizeInBytes === plan4.stats.sizeInBytes) + } + } }