Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ case class BucketSpec(
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
* catalog. If false, it is inferred automatically based on file
* structure.
* @param schemaFromTableProps Whether the schema field was obtained by parsing a case-sensitive
* schema embedded in the table properties. Used to trigger schema
* inference when using a Hive Metastore, if configured. Defaults to
* false.
*/
case class CatalogTable(
identifier: TableIdentifier,
Expand All @@ -180,7 +184,8 @@ case class CatalogTable(
viewText: Option[String] = None,
comment: Option[String] = None,
unsupportedFeatures: Seq[String] = Seq.empty,
tracksPartitionsInCatalog: Boolean = false) {
tracksPartitionsInCatalog: Boolean = false,
schemaFromTableProps: Boolean = false) {

import CatalogTable._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,17 @@ object SQLConf {
.longConf
.createWithDefault(250 * 1024 * 1024)

val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode")
.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " +
"table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " +
"schema from the underlying data files and write it back to the table properties), " +
"INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " +
"NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).")
.stringConf
.transform(_.toUpperCase())
.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER"))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the PR, I'm looking for a good place to store these values as constants or an enum.

.createWithDefault("INFER_AND_SAVE")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open for discussion on whether or not this should be the default behavior.


val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
"to produce the partition columns instead of table scans. It applies when all the columns " +
Expand Down Expand Up @@ -774,6 +785,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)

def schemaInferenceMode: String = getConf(HIVE_SCHEMA_INFERENCE_MODE)

def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)

def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,8 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
assert(e2.message.contains("Cannot modify the value of a static config"))
}

test("Default value of HIVE_SCHEMA_INFERENCE_MODE") {
assert(spark.sessionState.conf.schemaInferenceMode === "INFER_AND_SAVE")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireTableExists(db, tableDefinition.identifier.table)
verifyTableProperties(tableDefinition)

// Add table metadata such as table schema, partition columns, etc. if they aren't already
// present.
val withMetaProps = tableDefinition.copy(
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))

// convert table statistics to properties so that we can persist them through hive api
val withStatsProps = if (tableDefinition.stats.isDefined) {
val withStatsProps = if (withMetaProps.stats.isDefined) {
val stats = tableDefinition.stats.get
var statsProperties: Map[String, String] =
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
Expand All @@ -523,9 +528,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
statsProperties += (columnStatKeyPropName(colName, k) -> v)
}
}
tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
withMetaProps.copy(properties = withMetaProps.properties ++ statsProperties)
} else {
tableDefinition
withMetaProps
}

if (tableDefinition.tableType == VIEW) {
Expand Down Expand Up @@ -680,7 +685,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
hiveTable.copy(
schema = schemaFromTableProps,
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
bucketSpec = getBucketSpecFromTableProperties(table))
bucketSpec = getBucketSpecFromTableProperties(table),
schemaFromTableProps = true)
} else {
// Hive metastore may change the table schema, e.g. schema inference. If the table
// schema we read back is different(ignore case and nullability) from the one in table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,23 +161,49 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
bucketSpec,
Some(partitionSchema))

val catalogTable = metastoreRelation.catalogTable
val logicalRelation = cached.getOrElse {
val sizeInBytes =
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
val fileIndex = {
val index = new CatalogFileIndex(
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
index
} else {
index.filterPartitions(Nil) // materialize all the partitions in memory
}
}
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
val dataSchema =
StructType(metastoreSchema
val filteredMetastoreSchema = StructType(metastoreSchema
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))

val inferenceMode = sparkSession.sessionState.conf.schemaInferenceMode
val dataSchema = if (inferenceMode != "NEVER_INFER" &&
!catalogTable.schemaFromTableProps) {
val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files)
val inferred = defaultSource.inferSchema(sparkSession, options, fileStatuses)
val merged = if (fileType.equals("parquet")) {
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
} else {
inferred
}
if (inferenceMode == "INFER_AND_SAVE") {
// If a case-sensitive schema was successfully inferred, execute an alterTable
// operation to save the schema to the table properties.
merged.foreach { mergedSchema =>
val updatedTable = catalogTable.copy(schema = mergedSchema)
sparkSession.sharedState.externalCatalog.alterTable(updatedTable)
}
}
merged.getOrElse {
logWarning(s"Unable to infer schema for table $tableIdentifier from file format " +
s"$defaultSource; using metastore schema.")
filteredMetastoreSchema
}
} else {
filteredMetastoreSchema
}

val relation = HadoopFsRelation(
location = fileIndex,
partitionSchema = partitionSchema,
Expand All @@ -186,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
fileFormat = defaultSource,
options = options)(sparkSession = sparkSession)

val created = LogicalRelation(relation,
catalogTable = Some(metastoreRelation.catalogTable))
val created = LogicalRelation(relation, catalogTable = Some(catalogTable))
tableRelationCache.put(tableIdentifier, created)
created
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import java.io.File
import java.util.concurrent.{Executors, TimeUnit}

import org.scalatest.BeforeAndAfterEach

import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.datasources.FileStatusCache
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._

class HiveSchemaInferenceSuite
extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {

import HiveSchemaInferenceSuite._

// Create a CatalogTable instance modeling an external Hive table in a metastore that isn't
// controlled by Spark (i.e. has no Spark-specific table properties set).
private def hiveExternalCatalogTable(
tableName: String,
location: String,
schema: StructType,
partitionColumns: Seq[String],
properties: Map[String, String] = Map.empty): CatalogTable = {
CatalogTable(
identifier = TableIdentifier(table = tableName, database = Option("default")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
locationUri = Option(location),
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
compressed = false,
properties = Map("serialization.format" -> "1")),
schema = schema,
provider = Option("hive"),
partitionColumnNames = partitionColumns,
properties = properties)
}

// Creates CatalogTablePartition instances for adding partitions of data to our test table.
private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition
= CatalogTablePartition(
spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString),
storage = CatalogStorageFormat(
locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"),
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
compressed = false,
properties = Map("serialization.format" -> "1")))

// Creates a case-sensitive external Hive table for testing schema inference options. Table
// will not have Spark-specific table properties set.
private def setupCaseSensitiveTable(
tableName: String,
dir: File): Unit = {
spark.range(NUM_RECORDS)
.selectExpr("id as fieldOne", "id as partCol1", "id as partCol2")
.write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)

val lowercaseSchema = StructType(Seq(
StructField("fieldone", LongType),
StructField("partcol1", IntegerType),
StructField("partcol2", IntegerType)))

val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client

val catalogTable = hiveExternalCatalogTable(
tableName,
dir.getAbsolutePath,
lowercaseSchema,
Seq("partcol1", "partcol2"))
client.createTable(catalogTable, true)

val partitions = (0 until NUM_RECORDS).map(hiveCatalogPartition(dir.getAbsolutePath, _)).toSeq
client.createPartitions("default", tableName, partitions, true)
}

// Create a test table used for a single unit test, with data stored in the specified directory.
private def withTestTable(dir: File)(f: File => Unit): Unit = {
setupCaseSensitiveTable(TEST_TABLE_NAME, dir)
try f(dir) finally spark.sql(s"DROP TABLE IF EXISTS $TEST_TABLE_NAME")
}

override def beforeEach(): Unit = {
super.beforeEach()
FileStatusCache.resetForTesting()
}

override def afterEach(): Unit = {
super.afterEach()
FileStatusCache.resetForTesting()
}

test("Queries against case-sensitive tables with no schema in table properties should work " +
"when schema inference is enabled") {
withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_AND_SAVE") {
Copy link
Author

@budde budde Feb 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change this to reference the key via the constant in SQLConf rather than "spark.sql.hive.schemaInferenceMode".

withTempDir { dir =>
withTestTable(dir) { dir =>
val expectedSchema = StructType(Seq(
StructField("fieldOne", LongType),
// Partition columns remain case-insensitive
StructField("partcol1", IntegerType),
StructField("partcol2", IntegerType)))
assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS)
assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS)
// Test that the case-sensitive schema was storied as a table property after inference
assert(spark.sql(SELECT_ALL_QUERY).schema == expectedSchema)
}
}
}
}

test("Schema should be inferred but not stored when ...") {
withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_ONLY") {
withTempDir { dir =>
withTestTable(dir) { dir =>
val existingSchema = spark.sql(SELECT_ALL_QUERY).schema
assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS)
assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS)
assert(spark.sql(SELECT_ALL_QUERY).schema == existingSchema)
}
}
}
}
}

object HiveSchemaInferenceSuite {
private val NUM_RECORDS = 10
private val TEST_TABLE_NAME = "test_table"
private val FIELD_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE fieldOne >= 0"
private val PARTITION_COLUMN_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE partCol1 >= 0"
private val SELECT_ALL_QUERY = s"SELECT * FROM $TEST_TABLE_NAME"
}