diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2b3b575b4c06..6caf9b4f661a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -195,6 +195,9 @@ case class CatalogTable( StructType(partitionFields) } + /** Returns whether the table is partitioned. */ + def isPartitioned: Boolean = partitionColumnNames.nonEmpty + /** * schema of this table's data columns */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 677da0dbdc65..87c67bf2403d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -130,11 +130,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { + val result = if (metastoreRelation.catalogTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(new Path(metastoreRelation.catalogTable.location)) } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. @@ -145,7 +145,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log new Path(p.getLocation) } if (paths.isEmpty) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(new Path(metastoreRelation.catalogTable.location)) } else { paths } @@ -195,7 +195,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = metastoreRelation.hiveQlTable.getDataLocation + val rootPath = new Path(metastoreRelation.catalogTable.location) withTableCreationLock(tableIdentifier, { val cached = getCached(tableIdentifier, Seq(rootPath), @@ -231,7 +231,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { + private def shouldConvertToParquet(relation: MetastoreRelation): Boolean = { relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && sessionState.convertMetastoreParquet } @@ -251,11 +251,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Write path case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + if query.resolved && !r.catalogTable.isPartitioned && shouldConvertToParquet(r) => InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => + case relation: MetastoreRelation if shouldConvertToParquet(relation) => val parquetRelation = convertToParquetRelation(relation) SubqueryAlias(relation.tableName, parquetRelation, None) } @@ -267,7 +267,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * for better performance. */ object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + private def shouldConvertToOrc(relation: MetastoreRelation): Boolean = { relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && sessionState.convertMetastoreOrc } @@ -285,11 +285,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Write path case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Orc data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + if query.resolved && !r.catalogTable.isPartitioned && shouldConvertToOrc(r) => InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => + case relation: MetastoreRelation if shouldConvertToOrc(relation) => val orcRelation = convertToOrcRelation(relation) SubqueryAlias(relation.tableName, orcRelation, None) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 97b120758ba4..33333cba7361 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive import java.io.IOException import com.google.common.base.Objects -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc @@ -57,13 +57,13 @@ private[hive] case class MetastoreRelation( override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) + @transient private val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) @transient override def computeStats(conf: CatalystConf): Statistics = { catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + val totalSize = catalogTable.properties.getOrElse(StatsSetupConst.TOTAL_SIZE, null) + val rawDataSize = catalogTable.properties.getOrElse(StatsSetupConst.RAW_DATA_SIZE, null) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be // relatively cheap if parameters for the table are populated into the metastore. @@ -81,8 +81,9 @@ private[hive] case class MetastoreRelation( } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { try { val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength + val location = new Path(catalogTable.location) + val fs: FileSystem = location.getFileSystem(hadoopConf) + fs.getContentSummary(location).getLength } catch { case e: IOException => logWarning("Failed to get table size from hdfs.", e) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b4b63032ab26..b00cb9278d5b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -39,8 +39,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -48,7 +50,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} * A trait for subclasses that handle table scans. */ private[hive] sealed trait TableReader { - def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] + def makeRDDForTable(catalogTable: CatalogTable): RDD[InternalRow] def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[InternalRow] } @@ -83,9 +85,9 @@ class HadoopTableReader( private val _broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = + override def makeRDDForTable(catalogTable: CatalogTable): RDD[InternalRow] = makeRDDForTable( - hiveTable, + catalogTable, Utils.classForName(relation.tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]], filterOpt = None) @@ -93,15 +95,16 @@ class HadoopTableReader( * Creates a Hadoop RDD to read data from the target table's data directory. Returns a transformed * RDD that contains deserialized rows. * - * @param hiveTable Hive metadata for the table being scanned. + * @param catalogTable the metadata for the table being scanned. * @param deserializerClass Class of the SerDe used to deserialize Writables read from Hadoop. * @param filterOpt If defined, then the filter is used to reject files contained in the data * directory being read. If None, then all files are accepted. */ def makeRDDForTable( - hiveTable: HiveTable, + catalogTable: CatalogTable, deserializerClass: Class[_ <: Deserializer], filterOpt: Option[PathFilter]): RDD[InternalRow] = { + val hiveTable = HiveClientImpl.toHiveTable(catalogTable) assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table, since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 140c352fa6f8..2b22e7314301 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -51,7 +51,7 @@ case class HiveTableScanExec( @transient private val sparkSession: SparkSession) extends LeafExecNode { - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + require(partitionPruningPred.isEmpty || relation.catalogTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") override lazy val metrics = Map( @@ -141,9 +141,9 @@ case class HiveTableScanExec( protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. - val rdd = if (!relation.hiveQlTable.isPartitioned) { + val rdd = if (!relation.catalogTable.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) + hadoopReader.makeRDDForTable(relation.catalogTable) } } else { // The attribute name of predicate could be different than the one in schema in case of diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3e654d8eeb35..85ffb235012c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -221,7 +221,7 @@ case class InsertIntoHiveTable( // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc - val tableLocation = table.hiveQlTable.getDataLocation + val tableLocation = new Path(table.catalogTable.location) val tmpLocation = getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index e2fcd2fd41fa..6bd3b946402f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -21,6 +21,8 @@ import java.io.{File, PrintWriter} import scala.reflect.ClassTag +import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogStatistics @@ -65,9 +67,13 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto val relation = spark.table("csv_table").queryExecution.analyzed.children.head .asInstanceOf[MetastoreRelation] - val properties = relation.hiveQlTable.getParameters - assert(properties.get("totalSize").toLong <= 0, "external table totalSize must be <= 0") - assert(properties.get("rawDataSize").toLong <= 0, "external table rawDataSize must be <= 0") + val properties = relation.catalogTable.properties + val totalSize = properties.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = properties.get(StatsSetupConst.RAW_DATA_SIZE) + assert(totalSize.nonEmpty && totalSize.get.toLong <= 0, + "external table totalSize must be <= 0") + assert(rawDataSize.nonEmpty && rawDataSize.get.toLong <= 0, + "external table rawDataSize must be <= 0") val sizeInBytes = relation.stats(conf).sizeInBytes assert(sizeInBytes === BigInt(file1.length() + file2.length())) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 5c460d25f372..450e5c5a5064 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -144,38 +144,4 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH } } } - - test("SPARK-16926: number of table and partition columns match for new partitioned table") { - val view = "src" - withTempView(view) { - spark.range(1, 5).createOrReplaceTempView(view) - val table = "table_with_partition" - withTable(table) { - sql( - s""" - |CREATE TABLE $table(id string) - |PARTITIONED BY (p1 string,p2 string,p3 string,p4 string,p5 string) - """.stripMargin) - sql( - s""" - |FROM $view v - |INSERT INTO TABLE $table - |PARTITION (p1='a',p2='b',p3='c',p4='d',p5='e') - |SELECT v.id - |INSERT INTO TABLE $table - |PARTITION (p1='a',p2='c',p3='c',p4='d',p5='e') - |SELECT v.id - """.stripMargin) - val plan = sql( - s""" - |SELECT * FROM $table - """.stripMargin).queryExecution.sparkPlan - val relation = plan.collectFirst { - case p: HiveTableScanExec => p.relation - }.get - val tableCols = relation.hiveQlTable.getCols - relation.getHiveQlPartitions().foreach(p => assert(p.getCols.size == tableCols.size)) - } - } - } }