From 5384f7ac7fb4b9b55a5c9b3521fdedf81ad19c76 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Mon, 12 Jun 2017 16:36:28 -0400 Subject: [PATCH 01/23] [SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes --- .../sql/catalyst/catalog/interface.scala | 4 +- .../spark/sql/execution/SparkSqlParser.scala | 27 ++- .../command/AnalyzeTableCommand.scala | 71 ++++++-- .../sql/execution/command/CommandUtils.scala | 9 +- .../sql/execution/SparkSqlParserSuite.scala | 23 ++- .../spark/sql/hive/HiveExternalCatalog.scala | 161 ++++++++++++------ .../sql/hive/client/HiveClientImpl.scala | 2 + .../spark/sql/hive/StatisticsSuite.scala | 108 ++++++++++++ 8 files changed, 325 insertions(+), 80 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 5a8c4e7610ff..f225f1af76bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -96,7 +96,8 @@ object CatalogStorageFormat { case class CatalogTablePartition( spec: CatalogTypes.TablePartitionSpec, storage: CatalogStorageFormat, - parameters: Map[String, String] = Map.empty) { + parameters: Map[String, String] = Map.empty, + stats: Option[CatalogStatistics] = None) { def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() @@ -106,6 +107,7 @@ case class CatalogTablePartition( if (parameters.nonEmpty) { map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}") } + stats.foreach(s => map.put("Partition Statistics", s.simpleString)) map } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d4414b6f78ca..680de22ad0da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -101,19 +101,30 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { - if (ctx.partitionSpec != null) { - logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") - } - if (ctx.identifier != null) { + val noscan = if (ctx.identifier != null) { if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") { throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) } - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier)) - } else if (ctx.identifierSeq() == null) { - AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false) + true + } else { + false + } + + val partitionSpec = if (ctx.partitionSpec != null) { + Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) } else { + None + } + + val table = visitTableIdentifier(ctx.tableIdentifier) + if (ctx.identifierSeq() == null) { + AnalyzeTableCommand(table, noscan, partitionSpec) + } else { + if (partitionSpec.isDefined) { + logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") + } AnalyzeColumnCommand( - visitTableIdentifier(ctx.tableIdentifier), + table, visitIdentifierSeq(ctx.identifierSeq())) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index cba147c35dd9..4e065611327e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -17,17 +17,22 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Literal} /** - * Analyzes the given table to generate statistics, which will be used in query optimizations. + * Analyzes the given table or partition to generate statistics, which will be used in + * query optimizations. */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, - noscan: Boolean = true) extends RunnableCommand { + noscan: Boolean = true, + partitionSpec: Option[TablePartitionSpec] = None) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val sessionState = sparkSession.sessionState @@ -37,10 +42,57 @@ case class AnalyzeTableCommand( if (tableMeta.tableType == CatalogTableType.VIEW) { throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) - val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L) - val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + val partitionMeta = partitionSpec.map( + p => sessionState.catalog.getPartition(tableMeta.identifier, partitionSpec.get)) + + val oldStats = + if (partitionMeta.isDefined) { + partitionMeta.get.stats + } else { + tableMeta.stats + } + + def calculateTotalSize(): BigInt = { + if (partitionMeta.isDefined) { + CommandUtils.calculateTotalSize(sessionState, tableMeta, partitionMeta.get) + } else { + CommandUtils.calculateTotalSize(sessionState, tableMeta) + } + } + + def calculateRowCount(): Long = { + if (partitionSpec.isDefined) { + val filters = partitionSpec.get.map { + case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) + } + val filter = filters match { + case head::tail => + if (tail.isEmpty) head + else tail.foldLeft(head : Expression)((a, b) => And(a, b)) + } + sparkSession.table(tableIdentWithDB).filter(Column(filter)).count() + } else { + sparkSession.table(tableIdentWithDB).count() + } + } + + def updateStats(newStats: CatalogStatistics): Unit = { + if (partitionMeta.isDefined) { + sessionState.catalog.alterPartitions(tableMeta.identifier, + List(partitionMeta.get.copy(stats = Some(newStats)))) + } else { + sessionState.catalog.alterTableStats(tableIdentWithDB, Some(newStats)) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) + } + } + + val newTotalSize = calculateTotalSize() + + val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[CatalogStatistics] = None if (newTotalSize >= 0 && newTotalSize != oldTotalSize) { newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) @@ -50,7 +102,7 @@ case class AnalyzeTableCommand( // 2. when total size is changed, `oldRowCount` becomes invalid. // This is to make sure that we only record the right statistics. if (!noscan) { - val newRowCount = sparkSession.table(tableIdentWithDB).count() + val newRowCount = calculateRowCount() if (newRowCount >= 0 && newRowCount != oldRowCount) { newStats = if (newStats.isDefined) { newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) @@ -63,11 +115,8 @@ case class AnalyzeTableCommand( // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. if (newStats.isDefined) { - sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) + updateStats(newStats.get) } - Seq.empty[Row] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index de45be85220e..1c27b80acbe4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} import org.apache.spark.sql.internal.SessionState @@ -59,6 +59,13 @@ object CommandUtils extends Logging { } } + def calculateTotalSize( + sessionState: SessionState, + catalogTable: CatalogTable, + partition: CatalogTablePartition): Long = { + calculateLocationSize(sessionState, catalogTable.identifier, partition.storage.locationUri) + } + def calculateLocationSize( sessionState: SessionState, identifier: TableIdentifier, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index d238c76fbeef..125f3ddfad60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -258,18 +258,20 @@ class SparkSqlParserSuite extends AnalysisTest { AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) assertEqual("analyze table t compute statistics noscan", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) - assertEqual("analyze table t partition (a) compute statistics nOscAn", + assertEqual("analyze table t compute statistics nOscAn", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) - // Partitions specified - we currently parse them but don't do anything with it + // Partitions specified assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) + AnalyzeTableCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) - assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) - assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzeTableCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) + intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", + "empty partition key 'ds'") + intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", + "empty partition key 'ds'") intercept("analyze table t compute statistics xxxx", "Expected `NOSCAN` instead of `xxxx`") @@ -282,6 +284,11 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value", AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value"))) + + // Partition specified - should be ignored + assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " + + "COMPUTE STATISTICS FOR COLUMNS key, value", + AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value"))) } test("query organization") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index e9d48f95aa90..946d3afa5e96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -639,26 +639,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, table) val rawTable = getRawTable(db, table) - // convert table statistics to properties so that we can persist them through hive client - val statsProperties = new mutable.HashMap[String, String]() - if (stats.isDefined) { - statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString() - if (stats.get.rowCount.isDefined) { - statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString() - } - - // For datasource tables and hive serde tables created by spark 2.1 or higher, - // the data schema is stored in the table properties. - val schema = restoreTableMetadata(rawTable).schema + // For datasource tables and hive serde tables created by spark 2.1 or higher, + // the data schema is stored in the table properties. + val schema = restoreTableMetadata(rawTable).schema - val colNameTypeMap: Map[String, DataType] = - schema.fields.map(f => (f.name, f.dataType)).toMap - stats.get.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => - statsProperties += (columnStatKeyPropName(colName, k) -> v) - } + // convert table statistics to properties so that we can persist them through hive client + var statsProperties = + if (stats.isDefined) { + statsToHiveProperties(stats.get, rawTable.schema) + } else { + new mutable.HashMap[String, String]() } - } val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX)) val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties) @@ -704,36 +695,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val version: String = table.properties.getOrElse(CREATED_SPARK_VERSION, "2.2 or prior") // Restore Spark's statistics from information in Metastore. - val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) - - // Currently we have two sources of statistics: one from Hive and the other from Spark. - // In our design, if Spark's statistics is available, we respect it over Hive's statistics. - if (statsProps.nonEmpty) { - val colStats = new mutable.HashMap[String, ColumnStat] - - // For each column, recover its column stats. Note that this is currently a O(n^2) operation, - // but given the number of columns it usually not enormous, this is probably OK as a start. - // If we want to map this a linear operation, we'd need a stronger contract between the - // naming convention used for serialization. - table.schema.foreach { field => - if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { - // If "version" field is defined, then the column stat is defined. - val keyPrefix = columnStatKeyPropName(field.name, "") - val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => - (k.drop(keyPrefix.length), v) - } - - ColumnStat.fromMap(table.identifier.table, field, colStatMap).foreach { - colStat => colStats += field.name -> colStat - } - } - } - - table = table.copy( - stats = Some(CatalogStatistics( - sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)), - rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)), - colStats = colStats.toMap))) + val restoredStats = + statsFromHiveProperties(table.properties, table.identifier.table, table.schema) + if (restoredStats.isDefined) { + table = table.copy(stats = restoredStats) } // Get the original table properties as defined by the user. @@ -1037,17 +1002,92 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat currentFullPath } + private def statsToHiveProperties( + stats: CatalogStatistics, + schema: StructType): Map[String, String] = { + + var statsProperties: Map[String, String] = + Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) + if (stats.rowCount.isDefined) { + statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() + } + + val colNameTypeMap: Map[String, DataType] = + schema.fields.map(f => (f.name, f.dataType)).toMap + stats.colStats.foreach { case (colName, colStat) => + colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => + statsProperties += (columnStatKeyPropName(colName, k) -> v) + } + } + + statsProperties + } + + private def statsFromHiveProperties( + properties: Map[String, String], + table: String, + schema: StructType): Option[CatalogStatistics] = { + + val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX)) + if (statsProps.isEmpty) { + None + } else { + + val colStats = new mutable.HashMap[String, ColumnStat] + + // For each column, recover its column stats. Note that this is currently a O(n^2) operation, + // but given the number of columns it usually not enormous, this is probably OK as a start. + // If we want to map this a linear operation, we'd need a stronger contract between the + // naming convention used for serialization. + schema.foreach { field => + if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { + // If "version" field is defined, then the column stat is defined. + val keyPrefix = columnStatKeyPropName(field.name, "") + val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => + (k.drop(keyPrefix.length), v) + } + + ColumnStat.fromMap(table, field, colStatMap).foreach { + colStat => colStats += field.name -> colStat + } + } + } + + Some(CatalogStatistics( + sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)), + rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)), + colStats = colStats.toMap)) + } + } + override def alterPartitions( db: String, table: String, newParts: Seq[CatalogTablePartition]): Unit = withClient { val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) + + val rawTable = getRawTable(db, table) + + // For datasource tables and hive serde tables created by spark 2.1 or higher, + // the data schema is stored in the table properties. + val schema = restoreTableMetadata(rawTable).schema + + // convert partition statistics to properties so that we can persist them through hive api + val withStatsProps = lowerCasedParts.map(p => { + if (p.stats.isDefined) { + val statsProperties = statsToHiveProperties(p.stats.get, rawTable.schema) + p.copy(parameters = p.parameters ++ statsProperties) + } else { + p + } + }) + // Note: Before altering table partitions in Hive, you *must* set the current database // to the one that contains the table of interest. Otherwise you will end up with the // most helpful error message ever: "Unable to alter partition. alter is not possible." // See HIVE-2742 for more detail. client.setCurrentDatabase(db) - client.alterPartitions(db, table, lowerCasedParts) + client.alterPartitions(db, table, withStatsProps) } override def getPartition( @@ -1055,7 +1095,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, spec: TablePartitionSpec): CatalogTablePartition = withClient { val part = client.getPartition(db, table, lowerCasePartitionSpec(spec)) - part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + restorePartitionMetadata(part, getTable(db, table)) + } + + private def restorePartitionMetadata( + partition: CatalogTablePartition, + table: CatalogTable): CatalogTablePartition = { + val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames) + + // construct Spark's statistics from information in Hive metastore + val restoredStats = + statsFromHiveProperties(partition.parameters, table.identifier.table, table.schema) + if (restoredStats.isDefined) { + partition.copy( + spec = restoredSpec, + stats = restoredStats, + parameters = partition.parameters.filterNot { + case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) + } else { + partition.copy(spec = restoredSpec) + } } /** @@ -1066,7 +1125,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient { client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part => - part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames)) + restorePartitionMetadata(part, getTable(db, table)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index bde9a81c65a4..723f678c7a99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -21,6 +21,7 @@ import java.io.{File, PrintStream} import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration @@ -986,6 +987,7 @@ private[hive] object HiveClientImpl { tpart.setTableName(ht.getTableName) tpart.setValues(partValues.asJava) tpart.setSd(storageDesc) + tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava) new HivePartition(ht, tpart) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 71cf79c473b4..76f7a9e18802 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation} import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.StringUtils @@ -256,6 +257,113 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze single partition") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String): CatalogStatistics = { + val partition = + spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds)) + partition.stats.get + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + sql( + s""" + |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02') + |SELECT * FROM src + |UNION ALL + |SELECT * FROM src + """.stripMargin) + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS").collect() + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS").collect() + + assert(queryStats("2010-01-01").rowCount.get === 500) + assert(queryStats("2010-01-01").sizeInBytes === 5812) + + assert(queryStats("2010-01-02").rowCount.get === 2*500) + assert(queryStats("2010-01-02").sizeInBytes === 2*5812) + } + } + + test("analyze single partition noscan") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String): CatalogStatistics = { + val partition = + spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds)) + partition.stats.get + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + sql( + s""" + |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02') + |SELECT * FROM src + |UNION ALL + |SELECT * FROM src + """.stripMargin) + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") + .collect() + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") + .collect() + + assert(queryStats("2010-01-01").rowCount === None) + assert(queryStats("2010-01-01").sizeInBytes === 5812) + + assert(queryStats("2010-01-02").rowCount === None) + assert(queryStats("2010-01-02").sizeInBytes === 2*5812) + } + } + + test("analyze non-existent partition") { + val tableName = "analyzeTable_part" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + + intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS") + } + + intercept[NoSuchPartitionException] { + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS") + } + } + } + + test("analyzing views is not supported") { + def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { + val err = intercept[AnalysisException] { + sql(analyzeCommand) + } + assert(err.message.contains("ANALYZE TABLE is not supported")) + } + + val tableName = "tbl" + withTable(tableName) { + spark.range(10).write.saveAsTable(tableName) + val viewName = "view" + withView(viewName) { + sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + } + } + } + test("test table-level statistics for hive tables created in HiveExternalCatalog") { val textTable = "textTable" withTable(textTable) { From 3ee5ebf2b8ab35b4122849f0e15153bfd079585e Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 28 Jun 2017 00:15:55 -0400 Subject: [PATCH 02/23] [SPARK-21213][SQL] review comments --- .../sql/catalyst/catalog/interface.scala | 3 ++- .../spark/sql/execution/SparkSqlParser.scala | 22 ++++++++----------- .../command/AnalyzeTableCommand.scala | 2 ++ 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index f225f1af76bd..1965144e8119 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,7 +91,8 @@ object CatalogStorageFormat { * * @param spec partition spec values indexed by column name * @param storage storage format of the partition - * @param parameters some parameters for the partition, for example, stats. + * @param parameters some parameters for the partition + * @param stats optional statistics (number of rows, total size, etc.) */ case class CatalogTablePartition( spec: CatalogTypes.TablePartitionSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 680de22ad0da..151417950de6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -95,30 +95,26 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * {{{ * ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; * }}} + * Example SQL for analyzing a single partition : + * {{{ + * ANALYZE TABLE table PARTITION (key=value,..) COMPUTE STATISTICS [NOSCAN]; + * }}} * Example SQL for analyzing columns : * {{{ * ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2; * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { - val noscan = if (ctx.identifier != null) { - if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") { - throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) - } - true - } else { - false + if (ctx.identifier != null && + ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") { + throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) } - val partitionSpec = if (ctx.partitionSpec != null) { - Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) - } else { - None - } + val partitionSpec = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) val table = visitTableIdentifier(ctx.tableIdentifier) if (ctx.identifierSeq() == null) { - AnalyzeTableCommand(table, noscan, partitionSpec) + AnalyzeTableCommand(table, noscan = ctx.identifier != null, partitionSpec) } else { if (partitionSpec.isDefined) { logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 4e065611327e..f81203530d7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -28,6 +28,8 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Lite /** * Analyzes the given table or partition to generate statistics, which will be used in * query optimizations. + * + * If certain partition spec is specified, then statistics are gathered for only that partition. */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, From d17aa4b54e2fdffcf69d968a9004d92a29a077bb Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 28 Jun 2017 09:56:00 -0400 Subject: [PATCH 03/23] [SPARK-21213][SQL] improved comments per review feedback --- .../spark/sql/execution/SparkSqlParser.scala | 14 +++++++------- .../execution/command/AnalyzeTableCommand.scala | 3 +++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 151417950de6..e4d9baa8b834 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -91,17 +91,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { /** * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. - * Example SQL for analyzing table : + * Example SQL for analyzing table or a single partition : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN]; - * }}} - * Example SQL for analyzing a single partition : - * {{{ - * ANALYZE TABLE table PARTITION (key=value,..) COMPUTE STATISTICS [NOSCAN]; + * ANALYZE TABLE [db_name.]tablename [PARTITION (parcol1=val1, partcol2=val2, ...)] + * COMPUTE STATISTICS [NOSCAN]; * }}} + * Partitions spec, if provided, must identify a single partition by specifying values + * for all partition columns. + * * Example SQL for analyzing columns : * {{{ - * ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2; + * ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2; * }}} */ override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index f81203530d7b..224211eb9ed0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -30,6 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Lite * query optimizations. * * If certain partition spec is specified, then statistics are gathered for only that partition. + * Partition spec must identify a single partition by specifying values for all partition columns. + * + * TODO Support a range of partitions by allowing partial partition specs */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, From e0e351e1c5e38f193cc894dbbe0d74b2a2ff8bf3 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 28 Jun 2017 10:01:46 -0400 Subject: [PATCH 04/23] [SPARK-21213][SQL] typo --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e4d9baa8b834..864afe21fcbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -93,7 +93,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. * Example SQL for analyzing table or a single partition : * {{{ - * ANALYZE TABLE [db_name.]tablename [PARTITION (parcol1=val1, partcol2=val2, ...)] + * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1=val1, partcol2=val2, ...)] * COMPUTE STATISTICS [NOSCAN]; * }}} * Partitions spec, if provided, must identify a single partition by specifying values From 8dad9bc6f0416a1de6acec1aa6efc5c5c32f878a Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 29 Jun 2017 17:08:57 -0400 Subject: [PATCH 05/23] [SPARK-21213][SQL] add support for partial partition specs --- .../spark/sql/execution/SparkSqlParser.scala | 4 +- .../command/AnalyzeTableCommand.scala | 126 +++++++++++------- .../spark/sql/hive/StatisticsSuite.scala | 100 ++++++++++++++ 3 files changed, 179 insertions(+), 51 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 864afe21fcbc..d19233cb9242 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -91,13 +91,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { /** * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. - * Example SQL for analyzing table or a single partition : + * Example SQL for analyzing table or a set of partitions : * {{{ * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1=val1, partcol2=val2, ...)] * COMPUTE STATISTICS [NOSCAN]; * }}} - * Partitions spec, if provided, must identify a single partition by specifying values - * for all partition columns. * * Example SQL for analyzing columns : * {{{ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 224211eb9ed0..47b17669f8fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Literal} @@ -29,10 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Lite * Analyzes the given table or partition to generate statistics, which will be used in * query optimizations. * - * If certain partition spec is specified, then statistics are gathered for only that partition. - * Partition spec must identify a single partition by specifying values for all partition columns. - * - * TODO Support a range of partitions by allowing partial partition specs + * If certain partition specs are specified, then statistics are gathered for only those partitions. */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, @@ -48,52 +45,88 @@ case class AnalyzeTableCommand( throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val partitionMeta = partitionSpec.map( - p => sessionState.catalog.getPartition(tableMeta.identifier, partitionSpec.get)) + if (!partitionSpec.isDefined) { + // Compute stats for the whole table + val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) + val newRowCount = + if (noscan) { + None + } else { + Some(BigInt(sparkSession.table(tableIdentWithDB).count())) + } - val oldStats = - if (partitionMeta.isDefined) { - partitionMeta.get.stats - } else { - tableMeta.stats + def updateStats(newStats: CatalogStatistics): Unit = { + sessionState.catalog.alterTableStats(tableIdentWithDB, Some(newStats)) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) } - def calculateTotalSize(): BigInt = { - if (partitionMeta.isDefined) { - CommandUtils.calculateTotalSize(sessionState, tableMeta, partitionMeta.get) - } else { - CommandUtils.calculateTotalSize(sessionState, tableMeta) + calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, updateStats) + } else { + val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec) + + if (partitionSpec.isDefined && partitions.isEmpty) { + throw new NoSuchPartitionException(db, tableIdent.table, partitionSpec.get) } - } - def calculateRowCount(): Long = { - if (partitionSpec.isDefined) { - val filters = partitionSpec.get.map { - case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) + // Compute stats for individual partitions + val rowCounts: Map[TablePartitionSpec, BigInt] = + if (noscan) { + Map.empty + } else { + calculateRowCountsPerPartition(sparkSession, tableMeta) } - val filter = filters match { - case head::tail => - if (tail.isEmpty) head - else tail.foldLeft(head : Expression)((a, b) => And(a, b)) + + partitions.foreach(p => { + val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta, p) + val newRowCount = rowCounts.get(p.spec) + + def updateStats(newStats: CatalogStatistics): Unit = { + sessionState.catalog.alterPartitions(tableMeta.identifier, + List(p.copy(stats = Some(newStats)))) } - sparkSession.table(tableIdentWithDB).filter(Column(filter)).count() - } else { - sparkSession.table(tableIdentWithDB).count() - } + + calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, updateStats) + }) } - def updateStats(newStats: CatalogStatistics): Unit = { - if (partitionMeta.isDefined) { - sessionState.catalog.alterPartitions(tableMeta.identifier, - List(partitionMeta.get.copy(stats = Some(newStats)))) - } else { - sessionState.catalog.alterTableStats(tableIdentWithDB, Some(newStats)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) - } + Seq.empty[Row] + } + + private def calculateRowCountsPerPartition( + sparkSession: SparkSession, + tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = { + val filters = partitionSpec.get.map { + case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) + } + val filter = filters match { + case head :: tail => + if (tail.isEmpty) head + else tail.foldLeft(head: Expression)((a, b) => And(a, b)) } - val newTotalSize = calculateTotalSize() + val partitionColumns = tableMeta.partitionColumnNames.map(c => Column(c)) + + val df = sparkSession.table(tableMeta.identifier).filter(Column(filter)) + .groupBy(partitionColumns: _*).count() + + val numPartitionColumns = partitionColumns.size + val partitionColumnIndexes = 0 to (numPartitionColumns - 1) + + df.collect().map(r => { + val partitionColumnValues = partitionColumnIndexes.map(i => r.get(i).toString) + val spec: TablePartitionSpec = + tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap + val count = BigInt(r.getLong(numPartitionColumns)) + (spec, count) + }).toMap + } + + private def calculateAndUpdateStats( + oldStats: Option[CatalogStatistics], + newTotalSize: BigInt, + newRowCount: Option[BigInt], + updateStats: CatalogStatistics => Unit): Unit = { val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L) val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) @@ -106,14 +139,12 @@ case class AnalyzeTableCommand( // 1. when total size is not changed, we don't need to alter the table; // 2. when total size is changed, `oldRowCount` becomes invalid. // This is to make sure that we only record the right statistics. - if (!noscan) { - val newRowCount = calculateRowCount() - if (newRowCount >= 0 && newRowCount != oldRowCount) { + if (newRowCount.isDefined) { + if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) { newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) + newStats.map(_.copy(rowCount = newRowCount)) } else { - Some(CatalogStatistics( - sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) + Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)) } } } @@ -122,6 +153,5 @@ case class AnalyzeTableCommand( if (newStats.isDefined) { updateStats(newStats.get) } - Seq.empty[Row] } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 76f7a9e18802..486ad1c436ec 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -327,6 +327,106 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze a set of partitions") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { + val tableId = TableIdentifier(tableName) + val partition = + spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) + partition.stats + } + + def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: BigInt): Unit = { + val stats = queryStats(ds, hr).get + assert(stats.rowCount === Some(rowCount)) + assert(stats.sizeInBytes === sizeInBytes) + } + + def assertNoStats(ds: String, hr: String): Unit = { + assert(queryStats(ds, hr) === None) + } + + def createPartition(ds: String, hr: Int, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + + createPartition("2010-01-01", 10, "SELECT * FROM SRC") + createPartition("2010-01-01", 11, "SELECT * FROM SRC") + createPartition("2010-01-02", 10, "SELECT * FROM SRC") + createPartition("2010-01-02", 11, "SELECT * FROM SRC UNION ALL SELECT * FROM SRC") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") + .collect() + + assertStats("2010-01-01", "10", 500, 5812) + assertStats("2010-01-01", "11", 500, 5812) + assertNoStats("2010-01-02", "10") + assertNoStats("2010-01-02", "11") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") + .collect() + + assertStats("2010-01-01", "10", 500, 5812) + assertStats("2010-01-01", "11", 500, 5812) + assertStats("2010-01-02", "10", 500, 5812) + assertStats("2010-01-02", "11", 2*500, 2*5812) + } + } + + test("analyze a set of partitions noscan") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { + val tableId = TableIdentifier(tableName) + val partition = + spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) + partition.stats + } + + def assertStats(ds: String, hr: String, sizeInBytes: BigInt): Unit = { + val stats = queryStats(ds, hr).get + assert(stats.rowCount === None) + assert(stats.sizeInBytes === sizeInBytes) + } + + def assertNoStats(ds: String, hr: String): Unit = { + assert(queryStats(ds, hr) === None) + } + + def createPartition(ds: String, hr: Int, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + + createPartition("2010-01-01", 10, "SELECT * FROM SRC") + createPartition("2010-01-01", 11, "SELECT * FROM SRC") + createPartition("2010-01-02", 10, "SELECT * FROM SRC") + createPartition("2010-01-02", 11, "SELECT * FROM SRC UNION ALL SELECT * FROM SRC") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") + .collect() + + assertStats("2010-01-01", "10", 5812) + assertStats("2010-01-01", "11", 5812) + assertNoStats("2010-01-02", "10") + assertNoStats("2010-01-02", "11") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") + .collect() + + assertStats("2010-01-01", "10", 5812) + assertStats("2010-01-01", "11", 5812) + assertStats("2010-01-02", "10", 5812) + assertStats("2010-01-02", "11", 2*5812) + } + } + test("analyze non-existent partition") { val tableName = "analyzeTable_part" withTable(tableName) { From 4fdefd504782b2eff859efbf79eec3676ab2d187 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 29 Jun 2017 17:55:47 -0400 Subject: [PATCH 06/23] [SPARK-21213][SQL] add support for partition specs where some partition columns specified with no values --- .../spark/sql/execution/SparkSqlParser.scala | 12 +++++++++++- .../sql/execution/SparkSqlParserSuite.scala | 19 ++++++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index d19233cb9242..64c5793467e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -108,7 +108,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) } - val partitionSpec = Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec) + val partitionSpec = + if (ctx.partitionSpec != null) { + val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x => x._2.isDefined) + if (filteredSpec.isEmpty) { + None + } else { + Some(filteredSpec.mapValues(v => v.get)) + } + } else { + None + } val table = visitTableIdentifier(ctx.tableIdentifier) if (ctx.identifierSeq() == null) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 125f3ddfad60..d7c725e32c10 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -258,7 +258,7 @@ class SparkSqlParserSuite extends AnalysisTest { AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) assertEqual("analyze table t compute statistics noscan", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) - assertEqual("analyze table t compute statistics nOscAn", + assertEqual("analyze table t partition (a) compute statistics nOscAn", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) // Partitions specified @@ -268,10 +268,19 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", AnalyzeTableCommand(TableIdentifier("t"), noscan = true, partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) - intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", - "empty partition key 'ds'") - intercept("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", - "empty partition key 'ds'") + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan", + AnalyzeTableCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Some(Map("ds" -> "2008-04-09")))) + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS", + AnalyzeTableCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Some(Map("ds" -> "2008-04-09")))) + assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan", + AnalyzeTableCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Some(Map("ds" -> "2008-04-09")))) + assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", + AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) + assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", + AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) intercept("analyze table t compute statistics xxxx", "Expected `NOSCAN` instead of `xxxx`") From 1d696c3102f4e011d77737e520470321da186c70 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 29 Jun 2017 17:57:28 -0400 Subject: [PATCH 07/23] [SPARK-21213][SQL] comment update --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 64c5793467e3..6b6908430f48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -93,7 +93,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. * Example SQL for analyzing table or a set of partitions : * {{{ - * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1=val1, partcol2=val2, ...)] + * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)] * COMPUTE STATISTICS [NOSCAN]; * }}} * From 89c07671c9ab7f4bb98cfc7d51045ecad9046b73 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 29 Jun 2017 18:00:16 -0400 Subject: [PATCH 08/23] [SPARK-21213][SQL] removed extra space --- .../org/apache/spark/sql/execution/SparkSqlParserSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index d7c725e32c10..62bf1e3bb296 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -280,7 +280,7 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) intercept("analyze table t compute statistics xxxx", "Expected `NOSCAN` instead of `xxxx`") From 7210568198e3b60ce3e255a1c8c5f46faa64b41f Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 5 Jul 2017 12:09:46 -0400 Subject: [PATCH 09/23] [SPARK-21213][SQL] addressed easy review comments --- .../spark/sql/execution/SparkSqlParser.scala | 4 ++-- .../command/AnalyzeTableCommand.scala | 22 ++++++++----------- .../sql/execution/command/CommandUtils.scala | 7 ------ .../spark/sql/hive/HiveExternalCatalog.scala | 12 +++++----- .../spark/sql/hive/StatisticsSuite.scala | 4 ++-- 5 files changed, 19 insertions(+), 30 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 6b6908430f48..04902e8a8de9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -110,11 +110,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { val partitionSpec = if (ctx.partitionSpec != null) { - val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(x => x._2.isDefined) + val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined) if (filteredSpec.isEmpty) { None } else { - Some(filteredSpec.mapValues(v => v.get)) + Some(filteredSpec.mapValues(_.get)) } } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 47b17669f8fc..809bdc93ea9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -77,8 +77,9 @@ case class AnalyzeTableCommand( calculateRowCountsPerPartition(sparkSession, tableMeta) } - partitions.foreach(p => { - val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta, p) + partitions.foreach { p => + val newTotalSize = CommandUtils.calculateLocationSize(sessionState, + tableMeta.identifier, p.storage.locationUri) val newRowCount = rowCounts.get(p.spec) def updateStats(newStats: CatalogStatistics): Unit = { @@ -87,7 +88,7 @@ case class AnalyzeTableCommand( } calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, updateStats) - }) + } } Seq.empty[Row] @@ -99,27 +100,22 @@ case class AnalyzeTableCommand( val filters = partitionSpec.get.map { case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) } - val filter = filters match { - case head :: tail => - if (tail.isEmpty) head - else tail.foldLeft(head: Expression)((a, b) => And(a, b)) - } + val filter = filters.reduce(And) - val partitionColumns = tableMeta.partitionColumnNames.map(c => Column(c)) + val partitionColumns = tableMeta.partitionColumnNames.map(Column(_)) val df = sparkSession.table(tableMeta.identifier).filter(Column(filter)) .groupBy(partitionColumns: _*).count() val numPartitionColumns = partitionColumns.size - val partitionColumnIndexes = 0 to (numPartitionColumns - 1) - df.collect().map(r => { - val partitionColumnValues = partitionColumnIndexes.map(i => r.get(i).toString) + df.collect().map { r => + val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) val spec: TablePartitionSpec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap val count = BigInt(r.getLong(numPartitionColumns)) (spec, count) - }).toMap + }.toMap } private def calculateAndUpdateStats( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 1c27b80acbe4..842dd0dace1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -59,13 +59,6 @@ object CommandUtils extends Logging { } } - def calculateTotalSize( - sessionState: SessionState, - catalogTable: CatalogTable, - partition: CatalogTablePartition): Long = { - calculateLocationSize(sessionState, catalogTable.identifier, partition.storage.locationUri) - } - def calculateLocationSize( sessionState: SessionState, identifier: TableIdentifier, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 946d3afa5e96..9eac4a14a027 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -646,7 +646,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // convert table statistics to properties so that we can persist them through hive client var statsProperties = if (stats.isDefined) { - statsToHiveProperties(stats.get, rawTable.schema) + statsToProperties(stats.get, rawTable.schema) } else { new mutable.HashMap[String, String]() } @@ -696,7 +696,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Restore Spark's statistics from information in Metastore. val restoredStats = - statsFromHiveProperties(table.properties, table.identifier.table, table.schema) + statsFromProperties(table.properties, table.identifier.table, table.schema) if (restoredStats.isDefined) { table = table.copy(stats = restoredStats) } @@ -1002,7 +1002,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat currentFullPath } - private def statsToHiveProperties( + private def statsToProperties( stats: CatalogStatistics, schema: StructType): Map[String, String] = { @@ -1023,7 +1023,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat statsProperties } - private def statsFromHiveProperties( + private def statsFromProperties( properties: Map[String, String], table: String, schema: StructType): Option[CatalogStatistics] = { @@ -1075,7 +1075,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // convert partition statistics to properties so that we can persist them through hive api val withStatsProps = lowerCasedParts.map(p => { if (p.stats.isDefined) { - val statsProperties = statsToHiveProperties(p.stats.get, rawTable.schema) + val statsProperties = statsToProperties(p.stats.get, rawTable.schema) p.copy(parameters = p.parameters ++ statsProperties) } else { p @@ -1105,7 +1105,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // construct Spark's statistics from information in Hive metastore val restoredStats = - statsFromHiveProperties(partition.parameters, table.identifier.table, table.schema) + statsFromProperties(partition.parameters, table.identifier.table, table.schema) if (restoredStats.isDefined) { partition.copy( spec = restoredSpec, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 486ad1c436ec..f9a7524f7549 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -279,9 +279,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto """.stripMargin) sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src") - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS").collect() + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS").collect() + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") assert(queryStats("2010-01-01").rowCount.get === 500) assert(queryStats("2010-01-01").sizeInBytes === 5812) From 9aa2a1ea18adc6e9ca2b226a28c2659039e5c990 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 5 Jul 2017 18:02:24 -0400 Subject: [PATCH 10/23] [SPARK-21213][SQL] addressed remaining review comments --- .../spark/sql/execution/SparkSqlParser.scala | 9 +- .../command/AnalyzeTableCommand.scala | 144 ++++++++-------- .../sql/execution/command/CommandUtils.scala | 27 ++- .../sql/execution/SparkSqlParserSuite.scala | 20 +-- .../spark/sql/hive/StatisticsSuite.scala | 154 ++++++------------ 5 files changed, 155 insertions(+), 199 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 04902e8a8de9..64fbddf31bb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -90,7 +90,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } /** - * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command. + * Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]] + * or an [[AnalyzeColumnCommand]] command. * Example SQL for analyzing table or a set of partitions : * {{{ * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)] @@ -122,7 +123,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { val table = visitTableIdentifier(ctx.tableIdentifier) if (ctx.identifierSeq() == null) { - AnalyzeTableCommand(table, noscan = ctx.identifier != null, partitionSpec) + if (partitionSpec.isDefined) { + AnalyzePartitionCommand(table, partitionSpec.get, noscan = ctx.identifier != null) + } else { + AnalyzeTableCommand(table, noscan = ctx.identifier != null) + } } else { if (partitionSpec.isDefined) { logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 809bdc93ea9d..4dd157d280d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -26,15 +26,12 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Lite /** - * Analyzes the given table or partition to generate statistics, which will be used in + * Analyzes the given table to generate statistics, which will be used in * query optimizations. - * - * If certain partition specs are specified, then statistics are gathered for only those partitions. */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, - noscan: Boolean = true, - partitionSpec: Option[TablePartitionSpec] = None) extends RunnableCommand { + noscan: Boolean = true) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val sessionState = sparkSession.sessionState @@ -45,50 +42,74 @@ case class AnalyzeTableCommand( throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - if (!partitionSpec.isDefined) { - // Compute stats for the whole table - val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) - val newRowCount = - if (noscan) { - None - } else { - Some(BigInt(sparkSession.table(tableIdentWithDB).count())) - } - - def updateStats(newStats: CatalogStatistics): Unit = { - sessionState.catalog.alterTableStats(tableIdentWithDB, Some(newStats)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) + // Compute stats for the whole table + val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) + val newRowCount = + if (noscan) { + None + } else { + Some(BigInt(sparkSession.table(tableIdentWithDB).count())) } - calculateAndUpdateStats(tableMeta.stats, newTotalSize, newRowCount, updateStats) - } else { - val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionSpec) + val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) + if (newStats.isDefined) { + sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) + } - if (partitionSpec.isDefined && partitions.isEmpty) { - throw new NoSuchPartitionException(db, tableIdent.table, partitionSpec.get) - } + Seq.empty[Row] + } +} + +/** + * Analyzes a given set of partitions to generate per-partition statistics, which will be used in + * query optimizations. + */ +case class AnalyzePartitionCommand( + tableIdent: TableIdentifier, + partitionSpec: TablePartitionSpec, + noscan: Boolean = true) extends RunnableCommand { - // Compute stats for individual partitions - val rowCounts: Map[TablePartitionSpec, BigInt] = - if (noscan) { - Map.empty - } else { - calculateRowCountsPerPartition(sparkSession, tableMeta) - } - - partitions.foreach { p => - val newTotalSize = CommandUtils.calculateLocationSize(sessionState, - tableMeta.identifier, p.storage.locationUri) - val newRowCount = rowCounts.get(p.spec) - - def updateStats(newStats: CatalogStatistics): Unit = { - sessionState.catalog.alterPartitions(tableMeta.identifier, - List(p.copy(stats = Some(newStats)))) - } - - calculateAndUpdateStats(p.stats, newTotalSize, newRowCount, updateStats) + override def run(sparkSession: SparkSession): Seq[Row] = { + val sessionState = sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") + } + + val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, Some(partitionSpec)) + + if (partitions.isEmpty) { + throw new NoSuchPartitionException(db, tableIdent.table, partitionSpec) + } + + // Compute statistics for individual partitions + val rowCounts: Map[TablePartitionSpec, BigInt] = + if (noscan) { + Map.empty + } else { + calculateRowCountsPerPartition(sparkSession, tableMeta) } + + // Update the metastore if newly computed statistics are different from those + // recorded in the metastore. + val partitionStats = partitions.map { p => + val newTotalSize = CommandUtils.calculateLocationSize(sessionState, + tableMeta.identifier, p.storage.locationUri) + val newRowCount = rowCounts.get(p.spec) + val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) + (p, newStats) + } + + val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) => + p.copy(stats = newStats) + }.toList + + if (newPartitions.nonEmpty) { + sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions) } Seq.empty[Row] @@ -97,7 +118,7 @@ case class AnalyzeTableCommand( private def calculateRowCountsPerPartition( sparkSession: SparkSession, tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = { - val filters = partitionSpec.get.map { + val filters = partitionSpec.map { case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) } val filter = filters.reduce(And) @@ -117,37 +138,4 @@ case class AnalyzeTableCommand( (spec, count) }.toMap } - - private def calculateAndUpdateStats( - oldStats: Option[CatalogStatistics], - newTotalSize: BigInt, - newRowCount: Option[BigInt], - updateStats: CatalogStatistics => Unit): Unit = { - - val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L) - val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) - - var newStats: Option[CatalogStatistics] = None - if (newTotalSize >= 0 && newTotalSize != oldTotalSize) { - newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) - } - // We only set rowCount when noscan is false, because otherwise: - // 1. when total size is not changed, we don't need to alter the table; - // 2. when total size is changed, `oldRowCount` becomes invalid. - // This is to make sure that we only record the right statistics. - if (newRowCount.isDefined) { - if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) { - newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = newRowCount)) - } else { - Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)) - } - } - } - // Update the metastore if the above statistics of the table are different from those - // recorded in the metastore. - if (newStats.isDefined) { - updateStats(newStats.get) - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 842dd0dace1d..a3889f6f9e9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.internal.SessionState @@ -112,4 +112,29 @@ object CommandUtils extends Logging { size } + def compareAndGetNewStats( + oldStats: Option[CatalogStatistics], + newTotalSize: BigInt, + newRowCount: Option[BigInt]): Option[CatalogStatistics] = { + val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[CatalogStatistics] = None + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) + } + // We only set rowCount when noscan is false, because otherwise: + // 1. when total size is not changed, we don't need to alter the table; + // 2. when total size is changed, `oldRowCount` becomes invalid. + // This is to make sure that we only record the right statistics. + if (newRowCount.isDefined) { + if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) { + newStats = if (newStats.isDefined) { + newStats.map(_.copy(rowCount = newRowCount)) + } else { + Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount)) + } + } + } + newStats + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 62bf1e3bb296..7f9c2731a2a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -263,20 +263,20 @@ class SparkSqlParserSuite extends AnalysisTest { // Partitions specified assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false, - partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11"))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Some(Map("ds" -> "2008-04-09", "hr" -> "11")))) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11"))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Some(Map("ds" -> "2008-04-09")))) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> "2008-04-09"))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false, - partitionSpec = Some(Map("ds" -> "2008-04-09")))) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> "2008-04-09"))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Some(Map("ds" -> "2008-04-09")))) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> "2008-04-09"))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index f9a7524f7549..1f73ccf98d03 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -257,6 +257,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + private val SELECT_FROM_SRC = "SELECT '1', 'A' from src" + test("analyze single partition") { val tableName = "analyzeTable_part" @@ -266,64 +268,36 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto partition.stats.get } - withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") - - sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") - sql( - s""" - |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02') - |SELECT * FROM src - |UNION ALL - |SELECT * FROM src - """.stripMargin) - sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src") - - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") - - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") - - assert(queryStats("2010-01-01").rowCount.get === 500) - assert(queryStats("2010-01-01").sizeInBytes === 5812) - - assert(queryStats("2010-01-02").rowCount.get === 2*500) - assert(queryStats("2010-01-02").sizeInBytes === 2*5812) - } - } - - test("analyze single partition noscan") { - val tableName = "analyzeTable_part" - - def queryStats(ds: String): CatalogStatistics = { - val partition = - spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds)) - partition.stats.get + def createPartition(ds: String, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query") } withTable(tableName) { sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") - sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") - sql( - s""" - |INSERT INTO TABLE $tableName PARTITION (ds='2010-01-02') - |SELECT * FROM src - |UNION ALL - |SELECT * FROM src - """.stripMargin) - sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-03') SELECT * FROM src") + createPartition("2010-01-01", SELECT_FROM_SRC) + createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL $SELECT_FROM_SRC") + createPartition("2010-01-03", SELECT_FROM_SRC) sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") - .collect() sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") - .collect() assert(queryStats("2010-01-01").rowCount === None) - assert(queryStats("2010-01-01").sizeInBytes === 5812) + assert(queryStats("2010-01-01").sizeInBytes === 2000) assert(queryStats("2010-01-02").rowCount === None) - assert(queryStats("2010-01-02").sizeInBytes === 2*5812) + assert(queryStats("2010-01-02").sizeInBytes === 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") + + assert(queryStats("2010-01-01").rowCount.get === 500) + assert(queryStats("2010-01-01").sizeInBytes === 2000) + + assert(queryStats("2010-01-02").rowCount.get === 2*500) + assert(queryStats("2010-01-02").sizeInBytes === 2*2000) } } @@ -343,60 +317,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(stats.sizeInBytes === sizeInBytes) } - def assertNoStats(ds: String, hr: String): Unit = { - assert(queryStats(ds, hr) === None) - } - - def createPartition(ds: String, hr: Int, query: String): Unit = { - sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") - } - - withTable(tableName) { - sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") - - createPartition("2010-01-01", 10, "SELECT * FROM SRC") - createPartition("2010-01-01", 11, "SELECT * FROM SRC") - createPartition("2010-01-02", 10, "SELECT * FROM SRC") - createPartition("2010-01-02", 11, "SELECT * FROM SRC UNION ALL SELECT * FROM SRC") - - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") - .collect() - - assertStats("2010-01-01", "10", 500, 5812) - assertStats("2010-01-01", "11", 500, 5812) - assertNoStats("2010-01-02", "10") - assertNoStats("2010-01-02", "11") - - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") - .collect() - - assertStats("2010-01-01", "10", 500, 5812) - assertStats("2010-01-01", "11", 500, 5812) - assertStats("2010-01-02", "10", 500, 5812) - assertStats("2010-01-02", "11", 2*500, 2*5812) - } - } - - test("analyze a set of partitions noscan") { - val tableName = "analyzeTable_part" - - def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { - val tableId = TableIdentifier(tableName) - val partition = - spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) - partition.stats - } - - def assertStats(ds: String, hr: String, sizeInBytes: BigInt): Unit = { + def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: BigInt): Unit = { val stats = queryStats(ds, hr).get assert(stats.rowCount === None) assert(stats.sizeInBytes === sizeInBytes) } - def assertNoStats(ds: String, hr: String): Unit = { - assert(queryStats(ds, hr) === None) - } - def createPartition(ds: String, hr: Int, query: String): Unit = { sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") } @@ -404,26 +330,38 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTable(tableName) { sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") - createPartition("2010-01-01", 10, "SELECT * FROM SRC") - createPartition("2010-01-01", 11, "SELECT * FROM SRC") - createPartition("2010-01-02", 10, "SELECT * FROM SRC") - createPartition("2010-01-02", 11, "SELECT * FROM SRC UNION ALL SELECT * FROM SRC") + createPartition("2010-01-01", 10, SELECT_FROM_SRC) + createPartition("2010-01-01", 11, SELECT_FROM_SRC) + createPartition("2010-01-02", 10, SELECT_FROM_SRC) + createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL $SELECT_FROM_SRC") sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") - .collect() - assertStats("2010-01-01", "10", 5812) - assertStats("2010-01-01", "11", 5812) - assertNoStats("2010-01-02", "10") - assertNoStats("2010-01-02", "11") + assertSizeInBytesStats("2010-01-01", "10", 2000) + assertSizeInBytesStats("2010-01-01", "11", 2000) + assert(queryStats("2010-01-02", "10") === None) + assert(queryStats("2010-01-02", "11") === None) sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") - .collect() - assertStats("2010-01-01", "10", 5812) - assertStats("2010-01-01", "11", 5812) - assertStats("2010-01-02", "10", 5812) - assertStats("2010-01-02", "11", 2*5812) + assertSizeInBytesStats("2010-01-01", "10", 2000) + assertSizeInBytesStats("2010-01-01", "11", 2000) + assertSizeInBytesStats("2010-01-02", "10", 2000) + assertSizeInBytesStats("2010-01-02", "11", 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") + + assertStats("2010-01-01", "10", 500, 2000) + assertStats("2010-01-01", "11", 500, 2000) + assertSizeInBytesStats("2010-01-02", "10", 2000) + assertSizeInBytesStats("2010-01-02", "11", 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") + + assertStats("2010-01-01", "10", 500, 2000) + assertStats("2010-01-01", "11", 500, 2000) + assertStats("2010-01-02", "10", 500, 2000) + assertStats("2010-01-02", "11", 2*500, 2*2000) } } From fa218603ff0ffc1aa04b81e2731a88796af116af Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Wed, 5 Jul 2017 18:25:53 -0400 Subject: [PATCH 11/23] [SPARK-21213][SQL] added test case for (ds, hr=11) partition spec --- .../org/apache/spark/sql/execution/SparkSqlParserSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 7f9c2731a2a1..f1f7b535d47d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -277,6 +277,9 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan", AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, partitionSpec = Map("ds" -> "2008-04-09"))) + assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan", + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("hr" -> "11"))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", From f76f49ffc4358208651fc8876ba978ed73f336ba Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 11 Jul 2017 11:42:16 -0400 Subject: [PATCH 12/23] [SPARK-21213][SQL] addressed review comments; fixed PARTITION (ds, hr) case --- .../spark/sql/execution/SparkSqlParser.scala | 22 +-- .../command/AnalyzePartitionCommand.scala | 138 ++++++++++++++++++ .../command/AnalyzeTableCommand.scala | 88 +---------- .../sql/execution/SparkSqlParserSuite.scala | 20 +-- .../spark/sql/hive/StatisticsSuite.scala | 132 ++++++++++------- 5 files changed, 239 insertions(+), 161 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 64fbddf31bb5..7a4debcde32c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -109,28 +109,18 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx) } - val partitionSpec = - if (ctx.partitionSpec != null) { - val filteredSpec = visitPartitionSpec(ctx.partitionSpec).filter(_._2.isDefined) - if (filteredSpec.isEmpty) { - None - } else { - Some(filteredSpec.mapValues(_.get)) - } - } else { - None - } - val table = visitTableIdentifier(ctx.tableIdentifier) if (ctx.identifierSeq() == null) { - if (partitionSpec.isDefined) { - AnalyzePartitionCommand(table, partitionSpec.get, noscan = ctx.identifier != null) + if (ctx.partitionSpec != null) { + AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec), + noscan = ctx.identifier != null) } else { AnalyzeTableCommand(table, noscan = ctx.identifier != null) } } else { - if (partitionSpec.isDefined) { - logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}") + if (ctx.partitionSpec != null) { + logWarning("Partition specification is ignored when collecting column statistics: " + + ctx.partitionSpec.getText) } AnalyzeColumnCommand( table, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala new file mode 100644 index 000000000000..383674e89b55 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} + +/** + * Analyzes a given set of partitions to generate per-partition statistics, which will be used in + * query optimizations. + */ +case class AnalyzePartitionCommand( + tableIdent: TableIdentifier, + partitionSpec: Map[String, Option[String]], + noscan: Boolean = true) extends RunnableCommand { + + private def validatePartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { + val partitionColumnNames = table.partitionColumnNames.toSet + val invalidColumnNames = partitionSpec.keys.filterNot(partitionColumnNames.contains(_)) + if (invalidColumnNames.nonEmpty) { + val tableId = table.identifier + throw new AnalysisException(s"Partition specification for table '${tableId.table}' " + + s"in database '${tableId.database}' refers to unknown partition column(s): " + + invalidColumnNames.mkString(",")) + } + + val filteredSpec = partitionSpec.filter(_._2.isDefined) + if (filteredSpec.isEmpty) { + None + } else { + Some(filteredSpec.mapValues(_.get)) + } + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val sessionState = sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") + } + + val partitionValueSpec = validatePartitionSpec(tableMeta) + + val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec) + + if (partitions.isEmpty) { + if (partitionValueSpec.isDefined) { + throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get) + } else { + // the user requested to analyze all partitions for a table which has no partitions + // return normally, since there is nothing to do + return Seq.empty[Row] + } + } + + // Compute statistics for individual partitions + val rowCounts: Map[TablePartitionSpec, BigInt] = + if (noscan) { + Map.empty + } else { + calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec) + } + + // Update the metastore if newly computed statistics are different from those + // recorded in the metastore. + val partitionStats = partitions.map { p => + val newTotalSize = CommandUtils.calculateLocationSize(sessionState, + tableMeta.identifier, p.storage.locationUri) + val newRowCount = rowCounts.get(p.spec) + val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) + (p, newStats) + } + + val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) => + p.copy(stats = newStats) + }.toList + + if (newPartitions.nonEmpty) { + sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions) + } + + Seq.empty[Row] + } + + private def calculateRowCountsPerPartition( + sparkSession: SparkSession, + tableMeta: CatalogTable, + partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = { + val filter = if (partitionValueSpec.isDefined) { + val filters = partitionValueSpec.get.map { + case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) + } + Some(filters.reduce(And)) + } else { + None + } + + val tableDf = sparkSession.table(tableMeta.identifier) + val partitionColumns = tableMeta.partitionColumnNames.map(Column(_)) + + val df = if (filter.isDefined) { + tableDf.filter(Column(filter.get)).groupBy(partitionColumns: _*).count() + } else { + tableDf.groupBy(partitionColumns: _*).count() + } + + val numPartitionColumns = partitionColumns.size + + df.collect().map { r => + val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) + val spec: TablePartitionSpec = + tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap + val count = BigInt(r.getLong(numPartitionColumns)) + (spec, count) + }.toMap + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 4dd157d280d7..a848572c634c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -17,17 +17,13 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} -import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, Literal} +import org.apache.spark.sql.catalyst.catalog.{CatalogTableType} /** - * Analyzes the given table to generate statistics, which will be used in - * query optimizations. + * Analyzes the given table to generate statistics, which will be used in query optimizations. */ case class AnalyzeTableCommand( tableIdent: TableIdentifier, @@ -61,81 +57,3 @@ case class AnalyzeTableCommand( Seq.empty[Row] } } - -/** - * Analyzes a given set of partitions to generate per-partition statistics, which will be used in - * query optimizations. - */ -case class AnalyzePartitionCommand( - tableIdent: TableIdentifier, - partitionSpec: TablePartitionSpec, - noscan: Boolean = true) extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val sessionState = sparkSession.sessionState - val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) - val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) - if (tableMeta.tableType == CatalogTableType.VIEW) { - throw new AnalysisException("ANALYZE TABLE is not supported on views.") - } - - val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, Some(partitionSpec)) - - if (partitions.isEmpty) { - throw new NoSuchPartitionException(db, tableIdent.table, partitionSpec) - } - - // Compute statistics for individual partitions - val rowCounts: Map[TablePartitionSpec, BigInt] = - if (noscan) { - Map.empty - } else { - calculateRowCountsPerPartition(sparkSession, tableMeta) - } - - // Update the metastore if newly computed statistics are different from those - // recorded in the metastore. - val partitionStats = partitions.map { p => - val newTotalSize = CommandUtils.calculateLocationSize(sessionState, - tableMeta.identifier, p.storage.locationUri) - val newRowCount = rowCounts.get(p.spec) - val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) - (p, newStats) - } - - val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) => - p.copy(stats = newStats) - }.toList - - if (newPartitions.nonEmpty) { - sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions) - } - - Seq.empty[Row] - } - - private def calculateRowCountsPerPartition( - sparkSession: SparkSession, - tableMeta: CatalogTable): Map[TablePartitionSpec, BigInt] = { - val filters = partitionSpec.map { - case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) - } - val filter = filters.reduce(And) - - val partitionColumns = tableMeta.partitionColumnNames.map(Column(_)) - - val df = sparkSession.table(tableMeta.identifier).filter(Column(filter)) - .groupBy(partitionColumns: _*).count() - - val numPartitionColumns = partitionColumns.size - - df.collect().map { r => - val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) - val spec: TablePartitionSpec = - tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap - val count = BigInt(r.getLong(numPartitionColumns)) - (spec, count) - }.toMap - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index f1f7b535d47d..fa7a866f4d55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -259,31 +259,33 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual("analyze table t compute statistics noscan", AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) assertEqual("analyze table t partition (a) compute statistics nOscAn", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true)) // Partitions specified assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, - partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11"))) + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Map("ds" -> "2008-04-09", "hr" -> "11"))) + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11")))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan", AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Map("ds" -> "2008-04-09"))) + partitionSpec = Map("ds" -> Some("2008-04-09")))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS", AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, - partitionSpec = Map("ds" -> "2008-04-09"))) + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None))) assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan", AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Map("ds" -> "2008-04-09"))) + partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan", AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, - partitionSpec = Map("hr" -> "11"))) + partitionSpec = Map("ds" -> None, "hr" -> Some("11")))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS", - AnalyzeTableCommand(TableIdentifier("t"), noscan = false)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = false, + partitionSpec = Map("ds" -> None, "hr" -> None))) assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan", - AnalyzeTableCommand(TableIdentifier("t"), noscan = true)) + AnalyzePartitionCommand(TableIdentifier("t"), noscan = true, + partitionSpec = Map("ds" -> None, "hr" -> None))) intercept("analyze table t compute statistics xxxx", "Expected `NOSCAN` instead of `xxxx`") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 1f73ccf98d03..340ab9ddc4b1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -257,8 +257,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } - private val SELECT_FROM_SRC = "SELECT '1', 'A' from src" - test("analyze single partition") { val tableName = "analyzeTable_part" @@ -275,9 +273,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTable(tableName) { sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") - createPartition("2010-01-01", SELECT_FROM_SRC) - createPartition("2010-01-02", s"$SELECT_FROM_SRC UNION ALL $SELECT_FROM_SRC") - createPartition("2010-01-03", SELECT_FROM_SRC) + createPartition("2010-01-01", "SELECT '1', 'A' from src") + createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") + createPartition("2010-01-03", "SELECT '1', 'A' from src") sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") @@ -311,15 +309,13 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto partition.stats } - def assertStats(ds: String, hr: String, rowCount: BigInt, sizeInBytes: BigInt): Unit = { - val stats = queryStats(ds, hr).get - assert(stats.rowCount === Some(rowCount)) - assert(stats.sizeInBytes === sizeInBytes) - } - - def assertSizeInBytesStats(ds: String, hr: String, sizeInBytes: BigInt): Unit = { + def assertPartitionStats( + ds: String, + hr: String, + rowCount: Option[BigInt], + sizeInBytes: BigInt): Unit = { val stats = queryStats(ds, hr).get - assert(stats.rowCount === None) + assert(stats.rowCount === rowCount) assert(stats.sizeInBytes === sizeInBytes) } @@ -330,38 +326,88 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto withTable(tableName) { sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") - createPartition("2010-01-01", 10, SELECT_FROM_SRC) - createPartition("2010-01-01", 11, SELECT_FROM_SRC) - createPartition("2010-01-02", 10, SELECT_FROM_SRC) - createPartition("2010-01-02", 11, s"$SELECT_FROM_SRC UNION ALL $SELECT_FROM_SRC") + createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 11, + "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN") - assertSizeInBytesStats("2010-01-01", "10", 2000) - assertSizeInBytesStats("2010-01-01", "11", 2000) + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) assert(queryStats("2010-01-02", "10") === None) assert(queryStats("2010-01-02", "11") === None) sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN") - assertSizeInBytesStats("2010-01-01", "10", 2000) - assertSizeInBytesStats("2010-01-01", "11", 2000) - assertSizeInBytesStats("2010-01-02", "10", 2000) - assertSizeInBytesStats("2010-01-02", "11", 2*2000) + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS") - assertStats("2010-01-01", "10", 500, 2000) - assertStats("2010-01-01", "11", 500, 2000) - assertSizeInBytesStats("2010-01-02", "10", 2000) - assertSizeInBytesStats("2010-01-02", "11", 2*2000) + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS") - assertStats("2010-01-01", "10", 500, 2000) - assertStats("2010-01-01", "11", 500, 2000) - assertStats("2010-01-02", "10", 500, 2000) - assertStats("2010-01-02", "11", 2*500, 2*2000) + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000) + } + } + + test("analyze all partitions") { + val tableName = "analyzeTable_part" + + def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { + val tableId = TableIdentifier(tableName) + val partition = + spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) + partition.stats + } + + def assertPartitionStats( + ds: String, + hr: String, + rowCount: Option[BigInt], + sizeInBytes: BigInt): Unit = { + val stats = queryStats(ds, hr).get + assert(stats.rowCount === rowCount) + assert(stats.sizeInBytes === sizeInBytes) + } + + def createPartition(ds: String, hr: Int, query: String): Unit = { + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query") + } + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)") + + createPartition("2010-01-01", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-01", 11, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 10, "SELECT '1', 'A' from src") + createPartition("2010-01-02", 11, + "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src") + + sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS NOSCAN") + + assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000) + + sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS") + + assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000) + assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000) } } @@ -376,28 +422,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS") } - intercept[NoSuchPartitionException] { - sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS") - } - } - } - - test("analyzing views is not supported") { - def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { - val err = intercept[AnalysisException] { - sql(analyzeCommand) + intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS") } - assert(err.message.contains("ANALYZE TABLE is not supported")) - } - val tableName = "tbl" - withTable(tableName) { - spark.range(10).write.saveAsTable(tableName) - val viewName = "view" - withView(viewName) { - sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") - assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") - assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + intercept[NoSuchPartitionException] { + sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS") } } } From 8f31f53b2174203f98246b518f1d2d3c303993b7 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 11 Jul 2017 12:04:08 -0400 Subject: [PATCH 13/23] [SPARK-21213][SQL] shorted new test --- .../org/apache/spark/sql/hive/StatisticsSuite.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 340ab9ddc4b1..cd791009022a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -365,19 +365,13 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test("analyze all partitions") { val tableName = "analyzeTable_part" - def queryStats(ds: String, hr: String): Option[CatalogStatistics] = { - val tableId = TableIdentifier(tableName) - val partition = - spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr)) - partition.stats - } - def assertPartitionStats( ds: String, hr: String, rowCount: Option[BigInt], sizeInBytes: BigInt): Unit = { - val stats = queryStats(ds, hr).get + val stats = spark.sessionState.catalog.getPartition(TableIdentifier(tableName), + Map("ds" -> ds, "hr" -> hr)).stats.get assert(stats.rowCount === rowCount) assert(stats.sizeInBytes === sizeInBytes) } From fae6d4965b4b79298a49255d02e833889f63f829 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 11 Jul 2017 13:04:18 -0400 Subject: [PATCH 14/23] [SPARK-21213][SQL] added documentation; added test for an empty table --- .../command/AnalyzePartitionCommand.scala | 11 +++++++++++ .../apache/spark/sql/hive/StatisticsSuite.scala | 14 ++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 383674e89b55..78255578b362 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -27,6 +27,17 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} /** * Analyzes a given set of partitions to generate per-partition statistics, which will be used in * query optimizations. + * + * When `partitionSpec` is empty, statistics for all partitions are collected and stored in + * Metastore. + * + * When `partitionSpec` mentions only some of the partition columns, all partitions with + * matching values for specified columns are processed. + * + * If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised. + * + * By default, total number of rows and total size in bytes is calculated. When `noscan` + * is `false`, only total size in bytes is computed. */ case class AnalyzePartitionCommand( tableIdent: TableIdentifier, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index cd791009022a..47e0b1fb9175 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -405,6 +405,20 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze partitions for an empty table") { + val tableName = "analyzeTable_part" + + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + // make sure there is no exception + sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS NOSCAN") + + // make sure there is no exception + sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS") + } + } + test("analyze non-existent partition") { val tableName = "analyzeTable_part" withTable(tableName) { From 8880fbdef2f34dd11c040ad266368e326e0af326 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Mon, 31 Jul 2017 13:45:15 -0400 Subject: [PATCH 15/23] [SPARK-21213][SQL] review comments --- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../command/AnalyzePartitionCommand.scala | 31 ++++++------------- .../command/AnalyzeTableCommand.scala | 2 ++ .../sql/execution/command/CommandUtils.scala | 4 +-- .../spark/sql/hive/HiveExternalCatalog.scala | 9 +++++- .../spark/sql/hive/StatisticsSuite.scala | 26 +++++++++++----- 6 files changed, 42 insertions(+), 32 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 7a4debcde32c..8379e740a071 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -92,7 +92,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { /** * Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]] * or an [[AnalyzeColumnCommand]] command. - * Example SQL for analyzing table or a set of partitions : + * Example SQL for analyzing a table or a set of partitions : * {{{ * ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)] * COMPUTE STATISTICS [NOSCAN]; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 78255578b362..f1d66787fbbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -50,7 +50,7 @@ case class AnalyzePartitionCommand( if (invalidColumnNames.nonEmpty) { val tableId = table.identifier throw new AnalysisException(s"Partition specification for table '${tableId.table}' " + - s"in database '${tableId.database}' refers to unknown partition column(s): " + + s"in database '${tableId.database.get}' refers to unknown partition column(s): " + invalidColumnNames.mkString(",")) } @@ -95,18 +95,14 @@ case class AnalyzePartitionCommand( // Update the metastore if newly computed statistics are different from those // recorded in the metastore. - val partitionStats = partitions.map { p => - val newTotalSize = CommandUtils.calculateLocationSize(sessionState, - tableMeta.identifier, p.storage.locationUri) + val newPartitions = partitions.flatMap { p => + val newTotalSize = CommandUtils.calculateLocationSize( + sessionState, tableMeta.identifier, p.storage.locationUri) val newRowCount = rowCounts.get(p.spec) val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) - (p, newStats) + newStats.map(_ => p.copy(stats = newStats)) } - val newPartitions = partitionStats.filter(_._2.isDefined).map { case (p, newStats) => - p.copy(stats = newStats) - }.toList - if (newPartitions.nonEmpty) { sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions) } @@ -122,27 +118,20 @@ case class AnalyzePartitionCommand( val filters = partitionValueSpec.get.map { case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value)) } - Some(filters.reduce(And)) + filters.reduce(And) } else { - None + Literal.TrueLiteral } val tableDf = sparkSession.table(tableMeta.identifier) val partitionColumns = tableMeta.partitionColumnNames.map(Column(_)) - val df = if (filter.isDefined) { - tableDf.filter(Column(filter.get)).groupBy(partitionColumns: _*).count() - } else { - tableDf.groupBy(partitionColumns: _*).count() - } - - val numPartitionColumns = partitionColumns.size + val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count() df.collect().map { r => val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString) - val spec: TablePartitionSpec = - tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap - val count = BigInt(r.getLong(numPartitionColumns)) + val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap + val count = BigInt(r.getLong(partitionColumns.size)) (spec, count) }.toMap } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index a848572c634c..f354d2d79954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -47,6 +47,8 @@ case class AnalyzeTableCommand( Some(BigInt(sparkSession.table(tableIdentWithDB).count())) } + // Update the metastore if the above statistics of the table are different from those + // recorded in the metastore. val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) if (newStats.isDefined) { sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index a3889f6f9e9b..b22958d59336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -116,10 +116,10 @@ object CommandUtils extends Logging { oldStats: Option[CatalogStatistics], newTotalSize: BigInt, newRowCount: Option[BigInt]): Option[CatalogStatistics] = { - val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L) val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) var newStats: Option[CatalogStatistics] = None - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + if (newTotalSize >= 0 && newTotalSize != oldTotalSize) { newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) } // We only set rowCount when noscan is false, because otherwise: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 9eac4a14a027..b8fe755c990b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1098,12 +1098,19 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat restorePartitionMetadata(part, getTable(db, table)) } + /** + * Restores partition metadata from the partition properties. + * + * Reads partition-level statistics from partition properties, puts these + * into [[CatalogTablePartition#stats]] and removes these special entries + * from the partition properties. + */ private def restorePartitionMetadata( partition: CatalogTablePartition, table: CatalogTable): CatalogTablePartition = { val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames) - // construct Spark's statistics from information in Hive metastore + // Restore Spark's statistics from information in Metastore. val restoredStats = statsFromProperties(partition.parameters, table.identifier.table, table.schema) if (restoredStats.isDefined) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 47e0b1fb9175..2b8d4c7bcc10 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -420,19 +420,31 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } test("analyze non-existent partition") { + + def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = { + val message = intercept[AnalysisException] { + sql(analyzeCommand) + }.getMessage + assert(message.contains(errorMessage)) + } + val tableName = "analyzeTable_part" withTable(tableName) { sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") - intercept[AnalysisException] { - sql(s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS") - } - - intercept[AnalysisException] { - sql(s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS") - } + assertAnalysisException( + s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS", + s"Partition specification for table '${tableName.toLowerCase}' " + + "in database 'default' refers to unknown partition column(s): hour" + ) + + assertAnalysisException( + s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS", + s"Partition specification for table '${tableName.toLowerCase}' " + + "in database 'default' refers to unknown partition column(s): hour" + ) intercept[NoSuchPartitionException] { sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS") From 10539914acec68bef6d1598849ab8d8d5b6d2cf1 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Mon, 7 Aug 2017 12:03:34 -0400 Subject: [PATCH 16/23] [SPARK-21213][SQL] fixed bad merge of SPARK-21599 --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index b8fe755c990b..50e985eba1ad 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -646,7 +646,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // convert table statistics to properties so that we can persist them through hive client var statsProperties = if (stats.isDefined) { - statsToProperties(stats.get, rawTable.schema) + statsToProperties(stats.get, schema) } else { new mutable.HashMap[String, String]() } @@ -1075,7 +1075,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // convert partition statistics to properties so that we can persist them through hive api val withStatsProps = lowerCasedParts.map(p => { if (p.stats.isDefined) { - val statsProperties = statsToProperties(p.stats.get, rawTable.schema) + val statsProperties = statsToProperties(p.stats.get, schema) p.copy(parameters = p.parameters ++ statsProperties) } else { p From 41ab30d7319d4437e0cbdb30b3c9c5b791fb0d19 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 8 Aug 2017 11:57:50 -0400 Subject: [PATCH 17/23] [SPARK-21213][SQL] added support for spark.sql.caseSensitive; addressed other review comments --- .../command/AnalyzePartitionCommand.scala | 16 ++++++++------ .../spark/sql/hive/StatisticsSuite.scala | 21 +++++++++++++++++++ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index f1d66787fbbb..e8ec18a40703 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -36,17 +36,20 @@ import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} * * If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised. * - * By default, total number of rows and total size in bytes is calculated. When `noscan` - * is `false`, only total size in bytes is computed. + * By default, total number of rows and total size in bytes are calculated. When `noscan` + * is `true`, only total size in bytes is computed. */ case class AnalyzePartitionCommand( tableIdent: TableIdentifier, partitionSpec: Map[String, Option[String]], noscan: Boolean = true) extends RunnableCommand { - private def validatePartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { + private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { val partitionColumnNames = table.partitionColumnNames.toSet - val invalidColumnNames = partitionSpec.keys.filterNot(partitionColumnNames.contains(_)) + val keys = + if (conf.caseSensitiveAnalysis) partitionSpec.keys + else partitionSpec.keys.map(_.toLowerCase) + val invalidColumnNames = keys.filterNot(partitionColumnNames.contains(_)) if (invalidColumnNames.nonEmpty) { val tableId = table.identifier throw new AnalysisException(s"Partition specification for table '${tableId.table}' " + @@ -58,7 +61,8 @@ case class AnalyzePartitionCommand( if (filteredSpec.isEmpty) { None } else { - Some(filteredSpec.mapValues(_.get)) + if (conf.caseSensitiveAnalysis) Some(filteredSpec.mapValues(_.get)) + else Some(filteredSpec.map { case (key, value) => (key.toLowerCase, value.get) }) } } @@ -71,7 +75,7 @@ case class AnalyzePartitionCommand( throw new AnalysisException("ANALYZE TABLE is not supported on views.") } - val partitionValueSpec = validatePartitionSpec(tableMeta) + val partitionValueSpec = getPartitionSpec(tableMeta) val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 2b8d4c7bcc10..fcb04f91e94f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -419,6 +419,27 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze partitions case sensitivity") { + val tableName = "analyzeTable_part" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)") + + sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src") + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") + } + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val message = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") + }.getMessage + assert(message.contains(s"Partition specification for table '${tableName.toLowerCase}' " + + "in database 'default' refers to unknown partition column(s): DS")) + } + } + } + test("analyze non-existent partition") { def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = { From dc488e53fa7ef6211a4ae1bb38256373f4378255 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Tue, 8 Aug 2017 12:07:54 -0400 Subject: [PATCH 18/23] [SPARK-21213][SQL] addressed remaining review comments --- .../spark/sql/execution/command/AnalyzeTableCommand.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index f354d2d79954..04715bd314d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.CatalogTableType /** @@ -41,11 +41,7 @@ case class AnalyzeTableCommand( // Compute stats for the whole table val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta) val newRowCount = - if (noscan) { - None - } else { - Some(BigInt(sparkSession.table(tableIdentWithDB).count())) - } + if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count())) // Update the metastore if the above statistics of the table are different from those // recorded in the metastore. From c83985594b8daf64005c02e4bd464f42645f3867 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 10 Aug 2017 11:42:38 -0400 Subject: [PATCH 19/23] [SPARK-21213][SQL] Added a test for DESC PARTITION after ANALYZE; review comments --- .../inputs/describe-part-after-analyze.sql | 31 +++ .../describe-part-after-analyze.sql.out | 236 ++++++++++++++++++ .../spark/sql/hive/HiveExternalCatalog.scala | 1 + .../spark/sql/hive/StatisticsSuite.scala | 2 +- 4 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql new file mode 100644 index 000000000000..7aa552a8b200 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql @@ -0,0 +1,31 @@ +CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet + PARTITIONED BY (ds, hr); + +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10) +VALUES ('k1', 100), ('k2', 200), ('k3', 300); + +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11) +VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401); + +INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5) +VALUES ('k1', 102), ('k2', 202); + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); + +-- Collect stats for a single partition +ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); + +-- Collect stats for 2 partitions +ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11); + +-- Collect stats for all partitions +ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS; + +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11); +DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5); diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out new file mode 100644 index 000000000000..62b476c48967 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out @@ -0,0 +1,236 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 14 + + +-- !query 0 +CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet + PARTITIONED BY (ds, hr) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10) +VALUES ('k1', 100), ('k2', 200), ('k3', 300) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11) +VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401) +-- !query 2 schema +struct<> +-- !query 2 output + + + +-- !query 3 +INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5) +VALUES ('k1', 102), ('k2', 202) +-- !query 3 schema +struct<> +-- !query 3 output + + + +-- !query 4 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 4 schema +struct +-- !query 4 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 5 +ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS +-- !query 5 schema +struct<> +-- !query 5 output + + + +-- !query 6 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 6 schema +struct +-- !query 6 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 7 +ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS +-- !query 7 schema +struct<> +-- !query 7 output + + + +-- !query 8 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 8 schema +struct +-- !query 8 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 9 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11) +-- !query 9 schema +struct +-- !query 9 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=11] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11 +Partition Statistics 1080 bytes, 4 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 10 +ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS +-- !query 10 schema +struct<> +-- !query 10 output + + + +-- !query 11 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10) +-- !query 11 schema +struct +-- !query 11 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=10] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10 +Partition Statistics 1067 bytes, 3 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 12 +DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11) +-- !query 12 schema +struct +-- !query 12 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-08-01, hr=11] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11 +Partition Statistics 1080 bytes, 4 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 13 +DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5) +-- !query 13 schema +struct +-- !query 13 output +key string +value string +ds string +hr int +# Partition Information +# col_name data_type comment +ds string +hr int + +# Detailed Partition Information +Database default +Table t +Partition Values [ds=2017-09-01, hr=5] +Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5 +Partition Statistics 1054 bytes, 2 rows + +# Storage Information +Location [not included in comparison]sql/core/spark-warehouse/t diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 50e985eba1ad..547447b31f0a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1111,6 +1111,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames) // Restore Spark's statistics from information in Metastore. + // Note: partition-level statistics were introduced in 2.3. val restoredStats = statsFromProperties(partition.parameters, table.identifier.table, table.schema) if (restoredStats.isDefined) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index fcb04f91e94f..f9472253435e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -435,7 +435,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") }.getMessage assert(message.contains(s"Partition specification for table '${tableName.toLowerCase}' " + - "in database 'default' refers to unknown partition column(s): DS")) + "in database 'default' refers to unknown partition column(s): DS")) } } } From 72e2cd562afe40b7f78447e1f8229651ba7a7e9b Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 10 Aug 2017 15:35:46 -0400 Subject: [PATCH 20/23] [SPARK-21213][SQL] added DROP TABLE to describe-part-after-analyze.sql --- .../sql-tests/inputs/describe-part-after-analyze.sql | 3 +++ .../results/describe-part-after-analyze.sql.out | 10 +++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql index 7aa552a8b200..f4239da90627 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql @@ -29,3 +29,6 @@ ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS; DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10); DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11); DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5); + +-- DROP TEST TABLES/VIEWS +DROP TABLE t; diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out index 62b476c48967..51dac111029e 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 14 +-- Number of queries: 15 -- !query 0 @@ -234,3 +234,11 @@ Partition Statistics 1054 bytes, 2 rows # Storage Information Location [not included in comparison]sql/core/spark-warehouse/t + + +-- !query 14 +DROP TABLE t +-- !query 14 schema +struct<> +-- !query 14 output + From 87594d6aee3e7e1d5a3e5e340331dd7965ea5426 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 17 Aug 2017 16:13:49 -0400 Subject: [PATCH 21/23] [SPARK-21213][SQL] check that partition columns in the partition spec form a prefix of the partition columns defined in table schema --- .../command/AnalyzePartitionCommand.scala | 28 +++++++++---- .../spark/sql/hive/StatisticsSuite.scala | 39 +++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index e8ec18a40703..73692e259c5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -46,10 +46,10 @@ case class AnalyzePartitionCommand( private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { val partitionColumnNames = table.partitionColumnNames.toSet - val keys = - if (conf.caseSensitiveAnalysis) partitionSpec.keys - else partitionSpec.keys.map(_.toLowerCase) - val invalidColumnNames = keys.filterNot(partitionColumnNames.contains(_)) + val partitionSpecWithCase = + if (conf.caseSensitiveAnalysis) partitionSpec + else partitionSpec.map { case (k, v) => (k.toLowerCase, v)} + val invalidColumnNames = partitionSpecWithCase.keys.filterNot(partitionColumnNames.contains(_)) if (invalidColumnNames.nonEmpty) { val tableId = table.identifier throw new AnalysisException(s"Partition specification for table '${tableId.table}' " + @@ -57,12 +57,26 @@ case class AnalyzePartitionCommand( invalidColumnNames.mkString(",")) } - val filteredSpec = partitionSpec.filter(_._2.isDefined) + // Report an error if partition columns in partition specification do not form + // a prefix of the list of partition columns defined in the table schema + val isSpecified = + table.partitionColumnNames.map(partitionSpecWithCase.getOrElse(_, None).isEmpty) + if (isSpecified.init.zip(isSpecified.tail).contains((true, false))) { + val tableId = table.identifier + val schemaColumns = table.partitionColumnNames.mkString(",") + val specColumns = partitionSpecWithCase.keys.mkString(",") + throw new AnalysisException("The list of partition columns with values " + + s"in partition specification for table '${tableId.table}' " + + s"in database '${tableId.database.get}' is not a prefix of the list of " + + "partition columns defined in the table schema. " + + s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].") + } + + val filteredSpec = partitionSpecWithCase.filter(_._2.isDefined).mapValues(_.get) if (filteredSpec.isEmpty) { None } else { - if (conf.caseSensitiveAnalysis) Some(filteredSpec.mapValues(_.get)) - else Some(filteredSpec.map { case (key, value) => (key.toLowerCase, value.get) }) + Some(filteredSpec) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index f9472253435e..02712525e55a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -440,6 +440,45 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto } } + test("analyze partial partition specifications") { + + val tableName = "analyzeTable_part" + + def assertAnalysisException(partitionSpec: String): Unit = { + val message = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS") + }.getMessage + assert(message.contains("The list of partition columns with values " + + s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " + + "is not a prefix of the list of partition columns defined in the table schema")) + } + + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName (key STRING, value STRING) + |PARTITIONED BY (a STRING, b INT, c STRING) + """.stripMargin) + + sql(s"INSERT INTO TABLE $tableName PARTITION (a='a1', b=10, c='c1') SELECT * FROM src") + + sql(s"ANALYZE TABLE $tableName PARTITION (a='a1') COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (a='a1', b=10) COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (A='a1', b=10) COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (b=10, a='a1') COMPUTE STATISTICS") + sql(s"ANALYZE TABLE $tableName PARTITION (b=10, A='a1') COMPUTE STATISTICS") + + assertAnalysisException("PARTITION (b=10)") + assertAnalysisException("PARTITION (a, b=10)") + assertAnalysisException("PARTITION (b=10, c='c1')") + assertAnalysisException("PARTITION (a, b=10, c='c1')") + assertAnalysisException("PARTITION (c='c1')") + assertAnalysisException("PARTITION (a, b, c='c1')") + assertAnalysisException("PARTITION (a='a1', c='c1')") + assertAnalysisException("PARTITION (a='a1', b, c='c1')") + } + } + test("analyze non-existent partition") { def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = { From 3353afadde5ba684372d2ea16ef38ccf22d59d09 Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Thu, 17 Aug 2017 21:21:14 -0400 Subject: [PATCH 22/23] [SPARK-21213][SQL] use PartitioningUtils.normalizePartitionSpec to handle conf.caseSensitiveAnalysis --- .../command/AnalyzePartitionCommand.scala | 21 +++++++------------ .../spark/sql/hive/StatisticsSuite.scala | 10 ++++----- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 73692e259c5c..b20c93ddd8ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, Unresol import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal} +import org.apache.spark.sql.execution.datasources.PartitioningUtils /** * Analyzes a given set of partitions to generate per-partition statistics, which will be used in @@ -45,26 +46,18 @@ case class AnalyzePartitionCommand( noscan: Boolean = true) extends RunnableCommand { private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { - val partitionColumnNames = table.partitionColumnNames.toSet - val partitionSpecWithCase = - if (conf.caseSensitiveAnalysis) partitionSpec - else partitionSpec.map { case (k, v) => (k.toLowerCase, v)} - val invalidColumnNames = partitionSpecWithCase.keys.filterNot(partitionColumnNames.contains(_)) - if (invalidColumnNames.nonEmpty) { - val tableId = table.identifier - throw new AnalysisException(s"Partition specification for table '${tableId.table}' " + - s"in database '${tableId.database.get}' refers to unknown partition column(s): " + - invalidColumnNames.mkString(",")) - } + val normalizedPartitionSpec = + PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames, + table.identifier.quotedString, conf.resolver); // Report an error if partition columns in partition specification do not form // a prefix of the list of partition columns defined in the table schema val isSpecified = - table.partitionColumnNames.map(partitionSpecWithCase.getOrElse(_, None).isEmpty) + table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty) if (isSpecified.init.zip(isSpecified.tail).contains((true, false))) { val tableId = table.identifier val schemaColumns = table.partitionColumnNames.mkString(",") - val specColumns = partitionSpecWithCase.keys.mkString(",") + val specColumns = normalizedPartitionSpec.keys.mkString(",") throw new AnalysisException("The list of partition columns with values " + s"in partition specification for table '${tableId.table}' " + s"in database '${tableId.database.get}' is not a prefix of the list of " + @@ -72,7 +65,7 @@ case class AnalyzePartitionCommand( s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].") } - val filteredSpec = partitionSpecWithCase.filter(_._2.isDefined).mapValues(_.get) + val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get) if (filteredSpec.isEmpty) { None } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 02712525e55a..dc6140756d51 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -434,8 +434,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val message = intercept[AnalysisException] { sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS") }.getMessage - assert(message.contains(s"Partition specification for table '${tableName.toLowerCase}' " + - "in database 'default' refers to unknown partition column(s): DS")) + assert(message.contains( + s"DS is not a valid partition column in table `default`.`${tableName.toLowerCase}`")) } } } @@ -496,14 +496,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assertAnalysisException( s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS", - s"Partition specification for table '${tableName.toLowerCase}' " + - "in database 'default' refers to unknown partition column(s): hour" + s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`" ) assertAnalysisException( s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS", - s"Partition specification for table '${tableName.toLowerCase}' " + - "in database 'default' refers to unknown partition column(s): hour" + s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`" ) intercept[NoSuchPartitionException] { From 8ffb140eb76dd1c41a1c16b2f657cfc0e1ed653c Mon Sep 17 00:00:00 2001 From: Masha Basmanova Date: Fri, 18 Aug 2017 10:04:01 -0400 Subject: [PATCH 23/23] [SPARK-21213][SQL] review comments --- .../sql/execution/command/AnalyzePartitionCommand.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index b20c93ddd8ca..5b54b2270b5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -48,13 +48,13 @@ case class AnalyzePartitionCommand( private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = { val normalizedPartitionSpec = PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames, - table.identifier.quotedString, conf.resolver); + table.identifier.quotedString, conf.resolver) // Report an error if partition columns in partition specification do not form // a prefix of the list of partition columns defined in the table schema - val isSpecified = + val isNotSpecified = table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty) - if (isSpecified.init.zip(isSpecified.tail).contains((true, false))) { + if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) { val tableId = table.identifier val schemaColumns = table.partitionColumnNames.mkString(",") val specColumns = normalizedPartitionSpec.keys.mkString(",")