Skip to content

Commit ced9c4d

Browse files
author
Budde
committed
[SPARK-19611][SQL] Introduce configurable table schema inference
- Add spark.sql.hive.schemaInferenceMode param to SQLConf - Add schemaFromTableProps field to CatalogTable (set to true when schema is successfully read from table props) - Perform schema inference in HiveMetastoreCatalog if schemaFromTableProps is false, depending on spark.sql.hive.schemaInferenceMode. - Update table metadata properties in HiveExternalCatalog.alterTable() - Add HiveSchemaInferenceSuite tests
1 parent 3755da7 commit ced9c4d

File tree

6 files changed

+226
-11
lines changed

6 files changed

+226
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ case class BucketSpec(
163163
* @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
164164
* catalog. If false, it is inferred automatically based on file
165165
* structure.
166+
* @param schemaFromTableProps Whether the schema field was obtained by parsing a case-sensitive
167+
* schema embedded in the table properties. Used to trigger schema
168+
* inference when using a Hive Metastore, if configured. Defaults to
169+
* false.
166170
*/
167171
case class CatalogTable(
168172
identifier: TableIdentifier,
@@ -180,7 +184,8 @@ case class CatalogTable(
180184
viewText: Option[String] = None,
181185
comment: Option[String] = None,
182186
unsupportedFeatures: Seq[String] = Seq.empty,
183-
tracksPartitionsInCatalog: Boolean = false) {
187+
tracksPartitionsInCatalog: Boolean = false,
188+
schemaFromTableProps: Boolean = false) {
184189

185190
import CatalogTable._
186191

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ object SQLConf {
296296
.longConf
297297
.createWithDefault(250 * 1024 * 1024)
298298

299+
val HIVE_SCHEMA_INFERENCE_MODE = buildConf("spark.sql.hive.schemaInferenceMode")
300+
.doc("Configures the action to take when a case-sensitive schema cannot be read from a Hive " +
301+
"table's properties. Valid options include INFER_AND_SAVE (infer the case-sensitive " +
302+
"schema from the underlying data files and write it back to the table properties), " +
303+
"INFER_ONLY (infer the schema but don't attempt to write it to the table properties) and " +
304+
"NEVER_INFER (fallback to using the case-insensitive metastore schema instead of inferring).")
305+
.stringConf
306+
.transform(_.toUpperCase())
307+
.checkValues(Set("INFER_AND_SAVE", "INFER_ONLY", "NEVER_INFER"))
308+
.createWithDefault("INFER_AND_SAVE")
309+
299310
val OPTIMIZER_METADATA_ONLY = buildConf("spark.sql.optimizer.metadataOnly")
300311
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
301312
"to produce the partition columns instead of table scans. It applies when all the columns " +
@@ -774,6 +785,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
774785

775786
def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE)
776787

788+
def schemaInferenceMode: String = getConf(HIVE_SCHEMA_INFERENCE_MODE)
789+
777790
def gatherFastStats: Boolean = getConf(GATHER_FASTSTAT)
778791

779792
def optimizerMetadataOnly: Boolean = getConf(OPTIMIZER_METADATA_ONLY)

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,4 +270,8 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
270270
val e2 = intercept[AnalysisException](spark.conf.unset(SCHEMA_STRING_LENGTH_THRESHOLD.key))
271271
assert(e2.message.contains("Cannot modify the value of a static config"))
272272
}
273+
274+
test("Default value of HIVE_SCHEMA_INFERENCE_MODE") {
275+
assert(spark.sessionState.conf.schemaInferenceMode === "INFER_AND_SAVE")
276+
}
273277
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
510510
requireTableExists(db, tableDefinition.identifier.table)
511511
verifyTableProperties(tableDefinition)
512512

513+
// Add table metadata such as table schema, partition columns, etc. if they aren't already
514+
// present.
515+
val withMetaProps = tableDefinition.copy(
516+
properties = tableDefinition.properties ++ tableMetaToTableProps(tableDefinition))
517+
513518
// convert table statistics to properties so that we can persist them through hive api
514-
val withStatsProps = if (tableDefinition.stats.isDefined) {
519+
val withStatsProps = if (withMetaProps.stats.isDefined) {
515520
val stats = tableDefinition.stats.get
516521
var statsProperties: Map[String, String] =
517522
Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
@@ -523,9 +528,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
523528
statsProperties += (columnStatKeyPropName(colName, k) -> v)
524529
}
525530
}
526-
tableDefinition.copy(properties = tableDefinition.properties ++ statsProperties)
531+
withMetaProps.copy(properties = withMetaProps.properties ++ statsProperties)
527532
} else {
528-
tableDefinition
533+
withMetaProps
529534
}
530535

531536
if (tableDefinition.tableType == VIEW) {
@@ -680,7 +685,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
680685
hiveTable.copy(
681686
schema = schemaFromTableProps,
682687
partitionColumnNames = getPartitionColumnsFromTableProperties(table),
683-
bucketSpec = getBucketSpecFromTableProperties(table))
688+
bucketSpec = getBucketSpecFromTableProperties(table),
689+
schemaFromTableProps = true)
684690
} else {
685691
// Hive metastore may change the table schema, e.g. schema inference. If the table
686692
// schema we read back is different(ignore case and nullability) from the one in table

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -161,23 +161,49 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
161161
bucketSpec,
162162
Some(partitionSchema))
163163

164+
val catalogTable = metastoreRelation.catalogTable
164165
val logicalRelation = cached.getOrElse {
165166
val sizeInBytes =
166167
metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong
167168
val fileIndex = {
168-
val index = new CatalogFileIndex(
169-
sparkSession, metastoreRelation.catalogTable, sizeInBytes)
169+
val index = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
170170
if (lazyPruningEnabled) {
171171
index
172172
} else {
173173
index.filterPartitions(Nil) // materialize all the partitions in memory
174174
}
175175
}
176176
val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet
177-
val dataSchema =
178-
StructType(metastoreSchema
177+
val filteredMetastoreSchema = StructType(metastoreSchema
179178
.filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase)))
180179

180+
val inferenceMode = sparkSession.sessionState.conf.schemaInferenceMode
181+
val dataSchema = if (inferenceMode != "NEVER_INFER" &&
182+
!catalogTable.schemaFromTableProps) {
183+
val fileStatuses = fileIndex.listFiles(Nil).flatMap(_.files)
184+
val inferred = defaultSource.inferSchema(sparkSession, options, fileStatuses)
185+
val merged = if (fileType.equals("parquet")) {
186+
inferred.map(ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, _))
187+
} else {
188+
inferred
189+
}
190+
if (inferenceMode == "INFER_AND_SAVE") {
191+
// If a case-sensitive schema was successfully inferred, execute an alterTable
192+
// operation to save the schema to the table properties.
193+
merged.foreach { mergedSchema =>
194+
val updatedTable = catalogTable.copy(schema = mergedSchema)
195+
sparkSession.sharedState.externalCatalog.alterTable(updatedTable)
196+
}
197+
}
198+
merged.getOrElse {
199+
logWarning(s"Unable to infer schema for table $tableIdentifier from file format " +
200+
s"$defaultSource; using metastore schema.")
201+
filteredMetastoreSchema
202+
}
203+
} else {
204+
filteredMetastoreSchema
205+
}
206+
181207
val relation = HadoopFsRelation(
182208
location = fileIndex,
183209
partitionSchema = partitionSchema,
@@ -186,8 +212,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
186212
fileFormat = defaultSource,
187213
options = options)(sparkSession = sparkSession)
188214

189-
val created = LogicalRelation(relation,
190-
catalogTable = Some(metastoreRelation.catalogTable))
215+
val created = LogicalRelation(relation, catalogTable = Some(catalogTable))
191216
tableRelationCache.put(tableIdentifier, created)
192217
created
193218
}
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import java.io.File
21+
import java.util.concurrent.{Executors, TimeUnit}
22+
23+
import org.scalatest.BeforeAndAfterEach
24+
25+
import org.apache.spark.metrics.source.HiveCatalogMetrics
26+
import org.apache.spark.sql.catalyst.TableIdentifier
27+
import org.apache.spark.sql.catalyst.catalog._
28+
import org.apache.spark.sql.execution.datasources.FileStatusCache
29+
import org.apache.spark.sql.QueryTest
30+
import org.apache.spark.sql.hive.client.HiveClient
31+
import org.apache.spark.sql.hive.test.TestHiveSingleton
32+
import org.apache.spark.sql.internal.SQLConf
33+
import org.apache.spark.sql.test.SQLTestUtils
34+
import org.apache.spark.sql.types._
35+
36+
class HiveSchemaInferenceSuite
37+
extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
38+
39+
import HiveSchemaInferenceSuite._
40+
41+
// Create a CatalogTable instance modeling an external Hive table in a metastore that isn't
42+
// controlled by Spark (i.e. has no Spark-specific table properties set).
43+
private def hiveExternalCatalogTable(
44+
tableName: String,
45+
location: String,
46+
schema: StructType,
47+
partitionColumns: Seq[String],
48+
properties: Map[String, String] = Map.empty): CatalogTable = {
49+
CatalogTable(
50+
identifier = TableIdentifier(table = tableName, database = Option("default")),
51+
tableType = CatalogTableType.EXTERNAL,
52+
storage = CatalogStorageFormat(
53+
locationUri = Option(location),
54+
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
55+
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
56+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
57+
compressed = false,
58+
properties = Map("serialization.format" -> "1")),
59+
schema = schema,
60+
provider = Option("hive"),
61+
partitionColumnNames = partitionColumns,
62+
properties = properties)
63+
}
64+
65+
// Creates CatalogTablePartition instances for adding partitions of data to our test table.
66+
private def hiveCatalogPartition(location: String, index: Int): CatalogTablePartition
67+
= CatalogTablePartition(
68+
spec = Map("partcol1" -> index.toString, "partcol2" -> index.toString),
69+
storage = CatalogStorageFormat(
70+
locationUri = Option(s"${location}/partCol1=$index/partCol2=$index/"),
71+
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
72+
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
73+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"),
74+
compressed = false,
75+
properties = Map("serialization.format" -> "1")))
76+
77+
// Creates a case-sensitive external Hive table for testing schema inference options. Table
78+
// will not have Spark-specific table properties set.
79+
private def setupCaseSensitiveTable(
80+
tableName: String,
81+
dir: File): Unit = {
82+
spark.range(NUM_RECORDS)
83+
.selectExpr("id as fieldOne", "id as partCol1", "id as partCol2")
84+
.write
85+
.partitionBy("partCol1", "partCol2")
86+
.mode("overwrite")
87+
.parquet(dir.getAbsolutePath)
88+
89+
val lowercaseSchema = StructType(Seq(
90+
StructField("fieldone", LongType),
91+
StructField("partcol1", IntegerType),
92+
StructField("partcol2", IntegerType)))
93+
94+
val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
95+
96+
val catalogTable = hiveExternalCatalogTable(
97+
tableName,
98+
dir.getAbsolutePath,
99+
lowercaseSchema,
100+
Seq("partcol1", "partcol2"))
101+
client.createTable(catalogTable, true)
102+
103+
val partitions = (0 until NUM_RECORDS).map(hiveCatalogPartition(dir.getAbsolutePath, _)).toSeq
104+
client.createPartitions("default", tableName, partitions, true)
105+
}
106+
107+
// Create a test table used for a single unit test, with data stored in the specified directory.
108+
private def withTestTable(dir: File)(f: File => Unit): Unit = {
109+
setupCaseSensitiveTable(TEST_TABLE_NAME, dir)
110+
try f(dir) finally spark.sql(s"DROP TABLE IF EXISTS $TEST_TABLE_NAME")
111+
}
112+
113+
override def beforeEach(): Unit = {
114+
super.beforeEach()
115+
FileStatusCache.resetForTesting()
116+
}
117+
118+
override def afterEach(): Unit = {
119+
super.afterEach()
120+
FileStatusCache.resetForTesting()
121+
}
122+
123+
test("Queries against case-sensitive tables with no schema in table properties should work " +
124+
"when schema inference is enabled") {
125+
withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_AND_SAVE") {
126+
withTempDir { dir =>
127+
withTestTable(dir) { dir =>
128+
val expectedSchema = StructType(Seq(
129+
StructField("fieldOne", LongType),
130+
// Partition columns remain case-insensitive
131+
StructField("partcol1", IntegerType),
132+
StructField("partcol2", IntegerType)))
133+
assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS)
134+
assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS)
135+
// Test that the case-sensitive schema was storied as a table property after inference
136+
assert(spark.sql(SELECT_ALL_QUERY).schema == expectedSchema)
137+
}
138+
}
139+
}
140+
}
141+
142+
test("Schema should be inferred but not stored when ...") {
143+
withSQLConf("spark.sql.hive.schemaInferenceMode" -> "INFER_ONLY") {
144+
withTempDir { dir =>
145+
withTestTable(dir) { dir =>
146+
val existingSchema = spark.sql(SELECT_ALL_QUERY).schema
147+
assert(spark.sql(FIELD_QUERY).count == NUM_RECORDS)
148+
assert(spark.sql(PARTITION_COLUMN_QUERY).count == NUM_RECORDS)
149+
assert(spark.sql(SELECT_ALL_QUERY).schema == existingSchema)
150+
}
151+
}
152+
}
153+
}
154+
}
155+
156+
object HiveSchemaInferenceSuite {
157+
private val NUM_RECORDS = 10
158+
private val TEST_TABLE_NAME = "test_table"
159+
private val FIELD_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE fieldOne >= 0"
160+
private val PARTITION_COLUMN_QUERY = s"SELECT * FROM $TEST_TABLE_NAME WHERE partCol1 >= 0"
161+
private val SELECT_ALL_QUERY = s"SELECT * FROM $TEST_TABLE_NAME"
162+
}

0 commit comments

Comments
 (0)