diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 353e5954fd5b7..f862854abda1f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -163,6 +163,10 @@ case class BucketSpec( * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the * catalog. If false, it is inferred automatically based on file * structure. + * @param schemaFromTableProps Whether the schema field was obtained by parsing a case-sensitive + * schema embedded in the table properties. Used to trigger schema + * inference when using a Hive Metastore, if configured. Defaults to + * false. */ case class CatalogTable( identifier: TableIdentifier, @@ -180,7 +184,8 @@ case class CatalogTable( viewText: Option[String] = None, comment: Option[String] = None, unsupportedFeatures: Seq[String] = Seq.empty, - tracksPartitionsInCatalog: Boolean = false) { + tracksPartitionsInCatalog: Boolean = false, + schemaFromTableProps: Boolean = false) { import CatalogTable._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc0f130406932..0d5064e2dd927 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -296,6 +296,17 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode") + .doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " + + "table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " + + "schema from the underlying data files and write it back to the table properties), " + + "INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " + + "NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).") + .stringConf + .transform(_.toUpperCase()) + .checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER")) + .createWithDefault("INFER_AND_SAVE") + val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly") .doc("When true, enable the metadata-only query optimization that use the table's metadata " + "to produce the partition columns instead of table scans. It applies when all the columns " + @@ -774,6 +785,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) + def schemaInferenceMode: String = getConf(HIVE_SCHEMA_INFERENCE_MODE) + def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT) def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index a283ff971adcd..ed82753bbadc7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -270,4 +270,8 @@ class SQLConfSuite extends QueryTest with SharedSQLContext { val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key)) assert(e2.message.contains("Cannot modify the value of a static config")) } + + test("Default value of HIVE_SCHEMA_INFERENCE_MODE") { + assert(spark.sessionState.conf.schemaInferenceMode === "INFER_AND_SAVE") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ea48256147857..2e8142e17773e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -510,8 +510,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, tableDefinition.identifier.table) verifyTableProperties(tableDefinition) + // Add table metadata such as table schema, partition columns, etc. if they aren't already + // present. + val withMetaProps = tableDefinition.copy( + properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition)) + // convert table statistics to properties so that we can persist them through hive api - val withStatsProps = if (tableDefinition.stats.isDefined) { + val withStatsProps = if (withMetaProps.stats.isDefined) { val stats = tableDefinition.stats.get var statsProperties: Map[String, String] = Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()) @@ -523,9 +528,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat statsProperties += (columnStatKeyPropName(colName, k) -> v) } } - tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties) + withMetaProps.copy(properties = withMetaProps.properties ++ statsProperties) } else { - tableDefinition + withMetaProps } if (tableDefinition.tableType == VIEW) { @@ -680,7 +685,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat hiveTable.copy( schema = schemaFromTableProps, partitionColumnNames = getPartitionColumnsFromTableProperties(table), - bucketSpec = getBucketSpecFromTableProperties(table)) + bucketSpec = getBucketSpecFromTableProperties(table), + schemaFromTableProps = true) } else { // Hive metastore may change the table schema, e.g. schema inference. If the table // schema we read back is different(ignore case and nullability) from the one in table diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 677da0dbdc654..88cf47c08c96e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -161,12 +161,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log bucketSpec, Some(partitionSchema)) + val catalogTable = metastoreRelation.catalogTable val logicalRelation = cached.getOrElse { val sizeInBytes = metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { - val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) + val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes) if (lazyPruningEnabled) { index } else { @@ -174,10 +174,36 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = - StructType(metastoreSchema + val filteredMetastoreSchema = StructType(metastoreSchema .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) + val inferenceMode = sparkSession.sessionState.conf.schemaInferenceMode + val dataSchema = if (inferenceMode != "NEVER_INFER" && + !catalogTable.schemaFromTableProps) { + val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files) + val inferred = defaultSource.inferSchema(sparkSession, options, fileStatuses) + val merged = if (fileType.equals("parquet")) { + inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _)) + } else { + inferred + } + if (inferenceMode == "INFER_AND_SAVE") { + // If a case-sensitive schema was successfully inferred, execute an alterTable + // operation to save the schema to the table properties. + merged.foreach { mergedSchema => + val updatedTable = catalogTable.copy(schema = mergedSchema) + sparkSession.sharedState.externalCatalog.alterTable(updatedTable) + } + } + merged.getOrElse { + logWarning(s"Unable to infer schema for table $tableIdentifier from file format " + + s"$defaultSource; using metastore schema.") + filteredMetastoreSchema + } + } else { + filteredMetastoreSchema + } + val relation = HadoopFsRelation( location = fileIndex, partitionSchema = partitionSchema, @@ -186,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log fileFormat = defaultSource, options = options)(sparkSession = sparkSession) - val created = LogicalRelation(relation, - catalogTable = Some(metastoreRelation.catalogTable)) + val created = LogicalRelation(relation, catalogTable = Some(catalogTable)) tableRelationCache.put(tableIdentifier, created) created } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala new file mode 100644 index 0000000000000..afd0145279c52 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File +import java.util.concurrent.{Executors, TimeUnit} + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.execution.datasources.FileStatusCache +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ + +class HiveSchemaInferenceSuite + extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach { + + import HiveSchemaInferenceSuite._ + + // Create a CatalogTable instance modeling an external Hive table in a metastore that isn't + // controlled by Spark (i.e. has no Spark-specific table properties set). + private def hiveExternalCatalogTable( + tableName: String, + location: String, + schema: StructType, + partitionColumns: Seq[String], + properties: Map[String, String] = Map.empty): CatalogTable = { + CatalogTable( + identifier = TableIdentifier(table = tableName, database = Option("default")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + locationUri = Option(location), + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), + compressed = false, + properties = Map("serialization.format" -> "1")), + schema = schema, + provider = Option("hive"), + partitionColumnNames = partitionColumns, + properties = properties) + } + + // Creates CatalogTablePartition instances for adding partitions of data to our test table. + private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition + = CatalogTablePartition( + spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString), + storage = CatalogStorageFormat( + locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"), + inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), + outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"), + serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"), + compressed = false, + properties = Map("serialization.format" -> "1"))) + + // Creates a case-sensitive external Hive table for testing schema inference options. Table + // will not have Spark-specific table properties set. + private def setupCaseSensitiveTable( + tableName: String, + dir: File): Unit = { + spark.range(NUM_RECORDS) + .selectExpr("id as fieldOne", "id as partCol1", "id as partCol2") + .write + .partitionBy("partCol1", "partCol2") + .mode("overwrite") + .parquet(dir.getAbsolutePath) + + val lowercaseSchema = StructType(Seq( + StructField("fieldone", LongType), + StructField("partcol1", IntegerType), + StructField("partcol2", IntegerType))) + + val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + + val catalogTable = hiveExternalCatalogTable( + tableName, + dir.getAbsolutePath, + lowercaseSchema, + Seq("partcol1", "partcol2")) + client.createTable(catalogTable, true) + + val partitions = (0 until NUM_RECORDS).map(hiveCatalogPartition(dir.getAbsolutePath, _)).toSeq + client.createPartitions("default", tableName, partitions, true) + } + + // Create a test table used for a single unit test, with data stored in the specified directory. + private def withTestTable(dir: File)(f: File => Unit): Unit = { + setupCaseSensitiveTable(TEST_TABLE_NAME, dir) + try f(dir) finally spark.sql(s"DROP TABLE IF EXISTS $TEST_TABLE_NAME") + } + + override def beforeEach(): Unit = { + super.beforeEach() + FileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + FileStatusCache.resetForTesting() + } + + test("Queries against case-sensitive tables with no schema in table properties should work " + + "when schema inference is enabled") { + withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_AND_SAVE") { + withTempDir { dir => + withTestTable(dir) { dir => + val expectedSchema = StructType(Seq( + StructField("fieldOne", LongType), + // Partition columns remain case-insensitive + StructField("partcol1", IntegerType), + StructField("partcol2", IntegerType))) + assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS) + assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS) + // Test that the case-sensitive schema was storied as a table property after inference + assert(spark.sql(SELECT_ALL_QUERY).schema == expectedSchema) + } + } + } + } + + test("Schema should be inferred but not stored when ...") { + withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_ONLY") { + withTempDir { dir => + withTestTable(dir) { dir => + val existingSchema = spark.sql(SELECT_ALL_QUERY).schema + assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS) + assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS) + assert(spark.sql(SELECT_ALL_QUERY).schema == existingSchema) + } + } + } + } +} + +object HiveSchemaInferenceSuite { + private val NUM_RECORDS = 10 + private val TEST_TABLE_NAME = "test_table" + private val FIELD_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE fieldOne >= 0" + private val PARTITION_COLUMN_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE partCol1 >= 0" + private val SELECT_ALL_QUERY = s"SELECT * FROM $TEST_TABLE_NAME" +}