Skip to content

Commit a043ca2

Browse files
committed
address comments
1 parent 55c2c5e commit a043ca2

File tree

6 files changed

+20
-115
lines changed

6 files changed

+20
-115
lines changed

sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,9 +223,6 @@ abstract class Catalog {
223223
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
224224
* new version cached lazily.
225225
*
226-
* If the table's schema is inferred at runtime, infer the schema again and update the schema
227-
* in the external catalog.
228-
*
229226
* @since 2.0.0
230227
*/
231228
def refreshTable(tableName: String): Unit

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ case class CreateDataSourceTableCommand(
127127
sparkSession = sparkSession,
128128
tableIdent = tableIdent,
129129
schema = dataSource.schema,
130-
isSchemaInferred = userSpecifiedSchema.isEmpty,
131130
partitionColumns = partitionColumns,
132131
bucketSpec = bucketSpec,
133132
provider = provider,
@@ -279,7 +278,6 @@ case class CreateDataSourceTableAsSelectCommand(
279278
sparkSession = sparkSession,
280279
tableIdent = tableIdent,
281280
schema = result.schema,
282-
isSchemaInferred = false,
283281
partitionColumns = partitionColumns,
284282
bucketSpec = bucketSpec,
285283
provider = provider,
@@ -293,6 +291,7 @@ case class CreateDataSourceTableAsSelectCommand(
293291
}
294292
}
295293

294+
296295
object CreateDataSourceTableUtils extends Logging {
297296

298297
val DATASOURCE_PREFIX = "spark.sql.sources."
@@ -301,7 +300,6 @@ object CreateDataSourceTableUtils extends Logging {
301300
val DATASOURCE_OUTPUTPATH = DATASOURCE_PREFIX + "output.path"
302301
val DATASOURCE_SCHEMA = DATASOURCE_PREFIX + "schema"
303302
val DATASOURCE_SCHEMA_PREFIX = DATASOURCE_SCHEMA + "."
304-
val DATASOURCE_SCHEMA_ISINFERRED = DATASOURCE_SCHEMA_PREFIX + "isInferred"
305303
val DATASOURCE_SCHEMA_NUMPARTS = DATASOURCE_SCHEMA_PREFIX + "numParts"
306304
val DATASOURCE_SCHEMA_NUMPARTCOLS = DATASOURCE_SCHEMA_PREFIX + "numPartCols"
307305
val DATASOURCE_SCHEMA_NUMSORTCOLS = DATASOURCE_SCHEMA_PREFIX + "numSortCols"
@@ -326,18 +324,21 @@ object CreateDataSourceTableUtils extends Logging {
326324
matcher.matches()
327325
}
328326

329-
/**
330-
* Saves the schema (including partition info) into the table properties.
331-
* Overwrites the schema, if already existed.
332-
*/
333-
def saveSchema(
327+
def createDataSourceTable(
334328
sparkSession: SparkSession,
329+
tableIdent: TableIdentifier,
335330
schema: StructType,
336331
partitionColumns: Array[String],
337-
tableProperties: mutable.HashMap[String, String]): Unit = {
338-
// Serialized JSON schema string may be too long to be stored into a single
339-
// metastore SerDe property. In this case, we split the JSON string and store each part as
340-
// a separate table property.
332+
bucketSpec: Option[BucketSpec],
333+
provider: String,
334+
options: Map[String, String],
335+
isExternal: Boolean): Unit = {
336+
val tableProperties = new mutable.HashMap[String, String]
337+
tableProperties.put(DATASOURCE_PROVIDER, provider)
338+
339+
// Saves optional user specified schema. Serialized JSON schema string may be too long to be
340+
// stored into a single metastore SerDe property. In this case, we split the JSON string and
341+
// store each part as a separate SerDe property.
341342
val threshold = sparkSession.sessionState.conf.schemaStringLengthThreshold
342343
val schemaJsonString = schema.json
343344
// Split the JSON string.
@@ -353,23 +354,6 @@ object CreateDataSourceTableUtils extends Logging {
353354
tableProperties.put(s"$DATASOURCE_SCHEMA_PARTCOL_PREFIX$index", partCol)
354355
}
355356
}
356-
}
357-
358-
def createDataSourceTable(
359-
sparkSession: SparkSession,
360-
tableIdent: TableIdentifier,
361-
schema: StructType,
362-
isSchemaInferred: Boolean,
363-
partitionColumns: Array[String],
364-
bucketSpec: Option[BucketSpec],
365-
provider: String,
366-
options: Map[String, String],
367-
isExternal: Boolean): Unit = {
368-
val tableProperties = new mutable.HashMap[String, String]
369-
tableProperties.put(DATASOURCE_PROVIDER, provider)
370-
371-
tableProperties.put(DATASOURCE_SCHEMA_ISINFERRED, isSchemaInferred.toString.toUpperCase)
372-
saveSchema(sparkSession, schema, partitionColumns, tableProperties)
373357

374358
if (bucketSpec.isDefined) {
375359
val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,6 @@ object DDLUtils {
487487
isDatasourceTable(table.properties)
488488
}
489489

490-
def isSchemaInferred(table: CatalogTable): Boolean = {
491-
table.properties.get(DATASOURCE_SCHEMA_ISINFERRED) == Option(true.toString.toUpperCase)
492-
}
493-
494490
/**
495491
* If the command ALTER VIEW is to alter a table or ALTER TABLE is to alter a view,
496492
* issue an exception [[AnalysisException]].

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 2 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.internal
1919

2020
import scala.collection.JavaConverters._
21-
import scala.collection.mutable
2221
import scala.reflect.runtime.universe.TypeTag
2322

2423
import org.apache.spark.annotation.Experimental
@@ -28,8 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifie
2827
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
2928
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
3029
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
31-
import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils}
32-
import org.apache.spark.sql.execution.datasources.{CreateTableUsing, DataSource, HadoopFsRelation}
30+
import org.apache.spark.sql.execution.datasources.CreateTableUsing
3331
import org.apache.spark.sql.types.StructType
3432

3533

@@ -352,68 +350,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
352350
sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
353351
}
354352

355-
/**
356-
* Refresh the inferred schema stored in the external catalog for data source tables.
357-
*/
358-
private def refreshInferredSchema(tableIdent: TableIdentifier): Unit = {
359-
val table = sessionCatalog.getTableMetadataOption(tableIdent)
360-
table.foreach { tableDesc =>
361-
if (DDLUtils.isDatasourceTable(tableDesc) && DDLUtils.isSchemaInferred(tableDesc)) {
362-
val partitionColumns = DDLUtils.getPartitionColumnsFromTableProperties(tableDesc)
363-
val bucketSpec = DDLUtils.getBucketSpecFromTableProperties(tableDesc)
364-
val dataSource =
365-
DataSource(
366-
sparkSession,
367-
userSpecifiedSchema = None,
368-
partitionColumns = partitionColumns,
369-
bucketSpec = bucketSpec,
370-
className = tableDesc.properties(CreateDataSourceTableUtils.DATASOURCE_PROVIDER),
371-
options = tableDesc.storage.serdeProperties)
372-
.resolveRelation().asInstanceOf[HadoopFsRelation]
373-
374-
val schemaProperties = new mutable.HashMap[String, String]
375-
CreateDataSourceTableUtils.saveSchema(
376-
sparkSession, dataSource.schema, dataSource.partitionSchema.fieldNames, schemaProperties)
377-
378-
def isPropertyForInferredSchema(key: String): Boolean = {
379-
key match {
380-
case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTS => true
381-
case CreateDataSourceTableUtils.DATASOURCE_SCHEMA_NUMPARTCOLS => true
382-
case _
383-
if key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PART_PREFIX) ||
384-
key.startsWith(CreateDataSourceTableUtils.DATASOURCE_SCHEMA_PARTCOL_PREFIX)
385-
=> true
386-
case _ => false
387-
}
388-
}
389-
390-
// Keep the properties that are not for schema or partition columns
391-
val tablePropertiesWithoutSchema = tableDesc.properties.filterKeys { k =>
392-
!isPropertyForInferredSchema(k)
393-
}
394-
395-
val newTable = tableDesc.copy(properties = tablePropertiesWithoutSchema ++ schemaProperties)
396-
397-
// Alter the schema-related table properties that are stored in external catalog.
398-
sessionCatalog.alterTable(newTable)
399-
}
400-
}
401-
}
402-
403353
/**
404354
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
405-
* is refreshed.
355+
* is refreshed. For data source tables, the schema will not be inferred and refreshed.
406356
*
407357
* @group cachemgmt
408358
* @since 2.0.0
409359
*/
410360
override def refreshTable(tableName: String): Unit = {
411361
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
412-
// Refresh the schema in external catalog, if it is a data source table whose schema is inferred
413-
// at runtime. For user-specified schema, we do not infer and update the schema.
414-
// TODO: Support column-related ALTER TABLE DDL commands, and then users can update
415-
// the user-specified schema.
416-
refreshInferredSchema(tableIdent)
417362
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
418363
// Non-temp tables: refresh the metadata cache.
419364
sessionCatalog.refreshTable(tableIdent)

sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -336,9 +336,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
336336
val newDF = sparkContext.parallelize(1 to 10).map(i => (i, i.toString))
337337
.toDF("newCol1", "newCol2")
338338
newDF.write.format("json").partitionBy("newCol1").mode(SaveMode.Overwrite).save(path)
339-
val newSchema = StructType(
340-
StructField("newCol2", StringType, nullable = true) ::
341-
StructField("newCol1", IntegerType, nullable = true) :: Nil)
342339

343340
// No change on the schema
344341
val tableMetadataBeforeRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
@@ -349,27 +346,16 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
349346
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataBeforeRefresh)
350347
assert(partColsBeforeRefresh == partitionCols)
351348

352-
// Refresh
349+
// Refresh does not affect the schema
353350
spark.catalog.refreshTable(tabName)
354351

355352
val tableMetadataAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
356353
val tableSchemaAfterRefresh =
357354
DDLUtils.getSchemaFromTableProperties(tableMetadataAfterRefresh)
358-
assert(tableSchemaAfterRefresh == Option(newSchema))
355+
assert(tableSchemaAfterRefresh == Option(schema))
359356
val partColsAfterRefresh =
360357
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataAfterRefresh)
361-
assert(partColsAfterRefresh == Seq("newCol1"))
362-
363-
// Refresh after no change
364-
spark.catalog.refreshTable(tabName)
365-
366-
val tableMetadataNoChangeAfterRefresh = catalog.getTableMetadata(TableIdentifier(tabName))
367-
val tableSchemaNoChangeAfterRefresh =
368-
DDLUtils.getSchemaFromTableProperties(tableMetadataNoChangeAfterRefresh)
369-
assert(tableSchemaNoChangeAfterRefresh == Option(newSchema))
370-
val partColsNoChangeAfterRefresh =
371-
DDLUtils.getPartitionColumnsFromTableProperties(tableMetadataNoChangeAfterRefresh)
372-
assert(partColsNoChangeAfterRefresh == Seq("newCol1"))
358+
assert(partColsAfterRefresh == partitionCols)
373359
}
374360
}
375361
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
191191

192192
sql("REFRESH TABLE jsonTable")
193193

194-
// Check that the refresh worked
194+
// After refresh, schema is not changed.
195195
checkAnswer(
196196
sql("SELECT * FROM jsonTable"),
197-
Row("a1", "b1", "c1"))
197+
Row("a1", "b1"))
198198
}
199199
}
200200
}
@@ -704,7 +704,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
704704
sparkSession = spark,
705705
tableIdent = TableIdentifier("wide_schema"),
706706
schema = schema,
707-
isSchemaInferred = false,
708707
partitionColumns = Array.empty[String],
709708
bucketSpec = None,
710709
provider = "json",
@@ -990,7 +989,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
990989
sparkSession = spark,
991990
tableIdent = TableIdentifier("not_skip_hive_metadata"),
992991
schema = schema,
993-
isSchemaInferred = false,
994992
partitionColumns = Array.empty[String],
995993
bucketSpec = None,
996994
provider = "parquet",
@@ -1006,7 +1004,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
10061004
sparkSession = spark,
10071005
tableIdent = TableIdentifier("skip_hive_metadata"),
10081006
schema = schema,
1009-
isSchemaInferred = false,
10101007
partitionColumns = Array.empty[String],
10111008
bucketSpec = None,
10121009
provider = "parquet",

0 commit comments

Comments
 (0)