From 1f6086ffc210105d0158deee7b7db548a51e72ac Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 17 May 2016 10:29:10 -0700 Subject: [PATCH 1/4] [SPARK-15365] [SQL]: When table size statistics are not available from metastore, fall back to HDFS. --- .../spark/sql/hive/MetastoreRelation.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 1671228fd9b42..6485f6c7ae49b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import com.google.common.base.Objects +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} import org.apache.hadoop.hive.metastore.api.FieldSchema @@ -114,17 +115,27 @@ private[hive] case class MetastoreRelation( val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. An - // alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot - // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, - // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. BigInt( // When table is external,`totalSize` is always zero, which will influence join strategy // so when `totalSize` is zero, use `rawDataSize` instead - // if the size is still less than zero, we use default size + // if the size is still less than zero, we try to get the file size from HDFS. + // given this is only needed for optimization, if the HDFS call fails we return the default. Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) + .getOrElse({ + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + try { + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: Exception => + log.warn("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + }))) } ) From 9729d7222ff1264a44be431a20f644b0ab6ef75c Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 19 May 2016 15:58:42 -0700 Subject: [PATCH 2/4] Addressing review comments. --- .../spark/sql/hive/MetastoreRelation.scala | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 6485f6c7ae49b..18a95d028c590 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.io.IOException + import scala.collection.JavaConverters._ import com.google.common.base.Objects @@ -123,19 +125,21 @@ private[hive] case class MetastoreRelation( // so when `totalSize` is zero, use `rawDataSize` instead // if the size is still less than zero, we try to get the file size from HDFS. // given this is only needed for optimization, if the HDFS call fails we return the default. - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse({ - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - try { - fs.getContentSummary(hiveQlTable.getPath).getLength - } catch { - case e: Exception => - log.warn("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes - } - }))) + if (Option(totalSize).map(_.toLong).getOrElse(0L) > 0) { + totalSize.toLong + } else if (Option(rawDataSize).map(_.toLong).getOrElse(0L) > 0) { + rawDataSize.toLong + } else { + try { + val hadoopConf = sparkSession.sessionState.newHadoopConf() + val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) + fs.getContentSummary(hiveQlTable.getPath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + sparkSession.sessionState.conf.defaultSizeInBytes + } + }) } ) From f5d5ddec9386489400ca947f2c095086e757e3c4 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 23 May 2016 08:45:29 -0700 Subject: [PATCH 3/4] Adding a config to control if we should fall back to HDFS in absence of stats or not. Default is false. --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ .../org/apache/spark/sql/hive/MetastoreRelation.scala | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f3064eb6ac6d6..516a9759e484c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -113,6 +113,13 @@ object SQLConf { .longConf .createWithDefault(10L * 1024 * 1024) + val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = + SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats") + .doc("If the table statistics are not available from table metadata enable fall back to hdfs" + + " This is useful in determining if a table is small enough to use auto broadcast joins.") + .booleanConf + .createWithDefault(false) + val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes") .internal() .doc("The default table size used in query planning. By default, it is set to a larger " + @@ -596,6 +603,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD) + def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS) + def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN) def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 18a95d028c590..dc0772af01290 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -129,7 +129,7 @@ private[hive] case class MetastoreRelation( totalSize.toLong } else if (Option(rawDataSize).map(_.toLong).getOrElse(0L) > 0) { rawDataSize.toLong - } else { + } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = sparkSession.sessionState.newHadoopConf() val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) @@ -139,6 +139,8 @@ private[hive] case class MetastoreRelation( logWarning("Failed to get table size from hdfs.", e) sparkSession.sessionState.conf.defaultSizeInBytes } + } else { + sparkSession.sessionState.conf.defaultSizeInBytes }) } ) From ff69f91e273580547b6d86a6dfc87f2a94066507 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Tue, 24 May 2016 18:36:11 -0700 Subject: [PATCH 4/4] Addressing more review comments, adding a test case. --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/hive/MetastoreRelation.scala | 4 +- .../spark/sql/hive/StatisticsSuite.scala | 50 ++++++++++++++++++- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 516a9759e484c..32d44763bdc83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -115,7 +115,7 @@ object SQLConf { val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS = SQLConfigBuilder("spark.sql.enableFallBackToHdfsForStats") - .doc("If the table statistics are not available from table metadata enable fall back to hdfs" + + .doc("If the table statistics are not available from table metadata enable fall back to hdfs." + " This is useful in determining if a table is small enough to use auto broadcast joins.") .booleanConf .createWithDefault(false) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index dc0772af01290..9c820144aee12 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -125,9 +125,9 @@ private[hive] case class MetastoreRelation( // so when `totalSize` is zero, use `rawDataSize` instead // if the size is still less than zero, we try to get the file size from HDFS. // given this is only needed for optimization, if the HDFS call fails we return the default. - if (Option(totalSize).map(_.toLong).getOrElse(0L) > 0) { + if (totalSize != null && totalSize.toLong > 0L) { totalSize.toLong - } else if (Option(rawDataSize).map(_.toLong).getOrElse(0L) > 0) { + } else if (rawDataSize != null && rawDataSize.toLong > 0) { rawDataSize.toLong } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 1a7b6c0112279..f1415bbabac19 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.io.{File, PrintWriter} + import scala.reflect.ClassTag import org.apache.spark.sql.{QueryTest, Row} @@ -25,8 +27,9 @@ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils -class StatisticsSuite extends QueryTest with TestHiveSingleton { +class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { import hiveContext.sql test("parse analyze commands") { @@ -68,6 +71,51 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { classOf[AnalyzeTableCommand]) } + test("MetastoreRelations fallback to HDFS for size estimation") { + val enableFallBackToHdfsForStats = hiveContext.conf.fallBackToHdfsForStatsEnabled + try { + withTempDir { tempDir => + + // EXTERNAL OpenCSVSerde table pointing to LOCATION + + val file1 = new File(tempDir + "/data1") + val writer1 = new PrintWriter(file1) + writer1.write("1,2") + writer1.close() + + val file2 = new File(tempDir + "/data2") + val writer2 = new PrintWriter(file2) + writer2.write("1,2") + writer2.close() + + sql( + s"""CREATE EXTERNAL TABLE csv_table(page_id INT, impressions INT) + ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + WITH SERDEPROPERTIES ( + \"separatorChar\" = \",\", + \"quoteChar\" = \"\\\"\", + \"escapeChar\" = \"\\\\\") + LOCATION '$tempDir' + """) + + hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, true) + + val relation = hiveContext.sessionState.catalog.lookupRelation(TableIdentifier("csv_table")) + .asInstanceOf[MetastoreRelation] + + val properties = relation.hiveQlTable.getParameters + assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0") + assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") + + val sizeInBytes = relation.statistics.sizeInBytes + assert(sizeInBytes === BigInt(file1.length() + file2.length())) + } + } finally { + hiveContext.setConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS, enableFallBackToHdfsForStats) + sql("DROP TABLE csv_table ") + } + } + ignore("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = hiveContext.sessionState.catalog.lookupRelation(