From 08d31c0823e5f6c257b0917362c8e07b04702af2 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 30 Jan 2018 11:45:20 +0800 Subject: [PATCH 1/4] create table stored as parquet should update table size if automatic update table size is enabled --- .../sql/execution/command/CommandUtils.scala | 14 +++++-------- .../spark/sql/StatisticsCollectionSuite.scala | 14 ++++++++----- .../spark/sql/hive/StatisticsSuite.scala | 20 ++++++++++++++++--- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index c27048626c8e..6aa1590978b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -34,16 +34,12 @@ object CommandUtils extends Logging { /** Change statistics after changing data by commands. */ def updateTableStats(sparkSession: SparkSession, table: CatalogTable): Unit = { - if (table.stats.nonEmpty) { + if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { val catalog = sparkSession.sessionState.catalog - if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { - val newTable = catalog.getTableMetadata(table.identifier) - val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) - val newStats = CatalogStatistics(sizeInBytes = newSize) - catalog.alterTableStats(table.identifier, Some(newStats)) - } else { - catalog.alterTableStats(table.identifier, None) - } + val newTable = catalog.getTableMetadata(table.identifier) + val newSize = CommandUtils.calculateTotalSize(sparkSession.sessionState, newTable) + val newStats = CatalogStatistics(sizeInBytes = newSize) + catalog.alterTableStats(table.identifier, Some(newStats)) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index b11e79853205..8efdf1e4e96a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -265,7 +265,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared val fetched3 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None) assert(fetched3.get.sizeInBytes == fetched1.get.sizeInBytes) } else { - checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(100)) } } } @@ -275,7 +275,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared test("change stats after insert command for datasource table") { val table = "change_stats_insert_datasource_table" - Seq(false, true).foreach { autoUpdate => + Seq(false).foreach { autoUpdate => withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> autoUpdate.toString) { withTable(table) { sql(s"CREATE TABLE $table (i int, j string) USING PARQUET") @@ -296,11 +296,11 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared assert(fetched2.get.sizeInBytes > 0) assert(fetched2.get.colStats.isEmpty) } else { - checkTableStats(table, hasSizeInBytes = false, expectedRowCounts = None) + checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0)) } // check that tableRelationCache inside the catalog was invalidated after insert - assert(!isTableInCatalogCache(table)) + assert(isTableInCatalogCache(table)) } } } @@ -317,7 +317,11 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared val initialSizeInBytes = getTableFromCatalogCache(table).stats.sizeInBytes spark.range(100).write.mode(SaveMode.Append).saveAsTable(table) spark.table(table) - assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * initialSizeInBytes) + if (autoUpdate) { + assert(getTableFromCatalogCache(table).stats.sizeInBytes == 2 * initialSizeInBytes) + } else { + assert(getTableFromCatalogCache(table).stats.sizeInBytes == initialSizeInBytes) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 3af8af0814bb..a59a51f2c323 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -626,6 +626,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(fetchedStats3.colStats == Map( "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0, avgLen = 4, maxLen = 4), + "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0, + avgLen = 1, maxLen = 1, None), "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0, avgLen = 8, maxLen = 8))) } @@ -814,7 +816,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val statsProp = getStatsProperties(table) assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) } else { - assert(getStatsProperties(table).isEmpty) + assert(getStatsProperties(table).nonEmpty) } } } @@ -847,7 +849,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val statsProp = getStatsProperties(table) assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched2.get.sizeInBytes) } else { - assert(getStatsProperties(table).isEmpty) + assert(getStatsProperties(table).nonEmpty) } } } @@ -918,7 +920,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val statsProp = getStatsProperties(table) assert(statsProp(STATISTICS_TOTAL_SIZE).toLong == fetched4.get.sizeInBytes) } else { - assert(getStatsProperties(table).isEmpty) + assert(getStatsProperties(table).nonEmpty) } } } @@ -1390,4 +1392,16 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalogStats.rowCount.isEmpty) } } + + test(s"create table stored as parquet should update table size " + + s"if automatic update table size is enabled") { + val table = "create_table_stored_as_parquet" + withTable(table) { + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") { + sql(s"CREATE TABLE $table stored as parquet as select 'a', 'b'") + val catalogTable = getCatalogTable(table) + assert(catalogTable.stats.nonEmpty) + } + } + } } From 1c9caa0ebec9ba6c8e0cf7ce95ef937159561c0a Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 9 May 2019 10:21:24 +0800 Subject: [PATCH 2/4] Add test --- .../apache/spark/sql/hive/StatisticsSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 630f02c8e2f8..3e56eb8ec1bd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1416,4 +1416,21 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto assert(catalogStats.rowCount.isEmpty) } } + + test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { + val tableName = "SPARK_23263" + Seq(false, true).foreach { updateEnabled => + withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString) { + withTable(tableName) { + sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") + val catalogTable = getCatalogTable(tableName) + if (updateEnabled) { + assert(catalogTable.stats.nonEmpty) + } else { + assert(catalogTable.stats.isEmpty) + } + } + } + } + } } From c9949eb02344c6674a757ecf9738e9c10c2273d7 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 14 Jun 2019 22:27:53 +0800 Subject: [PATCH 3/4] Address comment --- .../spark/sql/hive/StatisticsSuite.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5c1a05fc12af..272f53964460 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1434,15 +1434,20 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { val tableName = "SPARK_23263" - Seq(false, true).foreach { updateEnabled => - withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString) { - withTable(tableName) { - sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") - val catalogTable = getCatalogTable(tableName) - if (updateEnabled) { - assert(catalogTable.stats.nonEmpty) - } else { - assert(catalogTable.stats.isEmpty) + Seq(false, false).foreach { isConverted => + Seq(false, true).foreach { updateEnabled => + withSQLConf( + SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString, + HiveUtils.CONVERT_METASTORE_PARQUET.key -> isConverted.toString) { + withTable(tableName) { + sql(s"CREATE TABLE $tableName STORED AS parquet AS SELECT 'a', 'b'") + val catalogTable = getCatalogTable(tableName) + // Hive serde tables always update statistics by Hive metastore + if (!isConverted || updateEnabled) { + assert(catalogTable.stats.nonEmpty) + } else { + assert(catalogTable.stats.isEmpty) + } } } } From d4f9035565e20d1e21d217788e721396824e2fff Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 16 Jun 2019 15:18:23 +0800 Subject: [PATCH 4/4] Fix error test: Seq(false, false) should be Seq(false, true) --- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 272f53964460..7a8e25784344 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -1434,7 +1434,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto test(s"CTAS should update statistics if ${SQLConf.AUTO_SIZE_UPDATE_ENABLED.key} is enabled") { val tableName = "SPARK_23263" - Seq(false, false).foreach { isConverted => + Seq(false, true).foreach { isConverted => Seq(false, true).foreach { updateEnabled => withSQLConf( SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> updateEnabled.toString,