Skip to content

Commit 3c09d95

Browse files
dhruvarya-dbtdas
andauthored
[Spark] Make update catalog schema truncation threshold configurable (#2911)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Currently, during schema sync to catalog, the whole schema gets truncated if any of the fields is longer than 4000 characters. This PR makes this threshold a configurable through the config `DeltaSQLConf. DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD`. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Created variations of existing test cases that validate that setting the config to a bigger value skips the truncation. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No Co-authored-by: Tathagata Das <tathagata.das1565@gmail.com>
1 parent 5ace827 commit 3c09d95

File tree

4 files changed

+151
-89
lines changed

4 files changed

+151
-89
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -605,12 +605,11 @@ case class CreateDeltaTableCommand(
605605
if (conf.getConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)) {
606606
// In the case we're creating a Delta table on an existing path and adopting the schema
607607
val schema = if (table.schema.isEmpty) snapshot.schema else table.schema
608-
val truncatedSchema = UpdateCatalog.truncateSchemaIfNecessary(schema)
609-
val additionalProperties = if (truncatedSchema.isEmpty) {
610-
Map(UpdateCatalog.ERROR_KEY -> UpdateCatalog.LONG_SCHEMA_ERROR)
611-
} else {
612-
Map.empty
613-
}
608+
val truncationThreshold = spark.sessionState.conf.getConf(
609+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
610+
val (truncatedSchema, additionalProperties) = UpdateCatalog.truncateSchemaIfNecessary(
611+
snapshot.schema,
612+
truncationThreshold)
614613

615614
table.copy(
616615
schema = truncatedSchema,

spark/src/main/scala/org/apache/spark/sql/delta/hooks/UpdateCatalog.scala

+33-28
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,14 @@ case class UpdateCatalog(table: CatalogTable) extends UpdateCatalogBase {
198198

199199

200200
override protected def schemaHasChanged(snapshot: Snapshot, spark: SparkSession): Boolean = {
201-
// We need to check whether the schema in the catalog matches the current schema. If a
202-
// field in the schema is very long, we cannot store the schema in the catalog, therefore
203-
// here we have to compare what's in the catalog with what we actually can store in the
204-
// catalog
205-
val schemaChanged = UpdateCatalog.truncateSchemaIfNecessary(snapshot.schema) != table.schema
201+
// We need to check whether the schema in the catalog matches the current schema.
202+
// Depending on the schema validation policy, the schema might need to be truncated.
203+
// Therefore, we should use what we want to store in the catalog for comparison.
204+
val truncationThreshold = spark.sessionState.conf.getConf(
205+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
206+
val schemaChanged = table.schema != UpdateCatalog.truncateSchemaIfNecessary(
207+
snapshot.schema,
208+
truncationThreshold)._1
206209
// The table may have been dropped as we're just about to update the information. There is
207210
// unfortunately no great way to avoid a race condition, but we do one last check here as
208211
// updates may have been queued for some time.
@@ -261,11 +264,11 @@ object UpdateCatalog {
261264
// This is the encoding of the database for the Hive MetaStore
262265
private val latin1 = Charset.forName("ISO-8859-1")
263266

264-
// Maximum number of characters that a catalog can store.
265-
val MAX_CATALOG_TYPE_DDL_LENGTH = 4000
266267
val ERROR_KEY = "delta.catalogUpdateError"
267268
val LONG_SCHEMA_ERROR: String = "The schema contains a very long nested field and cannot be " +
268269
"stored in the catalog."
270+
val NON_LATIN_CHARS_ERROR: String = "The schema contains non-latin encoding characters and " +
271+
"cannot be stored in the catalog."
269272
val HIVE_METASTORE_NAME = "hive_metastore"
270273

271274
private def getOrCreateExecutionContext(conf: SQLConf): ExecutionContext = synchronized {
@@ -313,12 +316,11 @@ object UpdateCatalog {
313316
catalog.qualifyIdentifier(TableIdentifier(table.identifier.table, Some(table.database)))
314317
val db = qualifiedIdentifier.database.get
315318
val tblName = qualifiedIdentifier.table
316-
val schema = truncateSchemaIfNecessary(snapshot.schema)
317-
val additionalProperties = if (schema.isEmpty) {
318-
Map(ERROR_KEY -> LONG_SCHEMA_ERROR)
319-
} else {
320-
Map.empty
321-
}
319+
val truncationThreshold = spark.sessionState.conf.getConf(
320+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD)
321+
val (schema, additionalProperties) = truncateSchemaIfNecessary(
322+
snapshot.schema,
323+
truncationThreshold)
322324

323325
// We call the lower level API so that we can actually drop columns. We also assume that
324326
// all columns are data columns so that we don't have to deal with partition columns
@@ -346,25 +348,28 @@ object UpdateCatalog {
346348
}
347349

348350
/**
349-
* If a field in the schema has a very long string representation, then the schema will be
351+
* If the schema contains non-latin encoding characters, the schema can become garbled.
352+
* We need to truncate the schema in that case.
353+
* Also, if any of the fields is longer than `truncationThreshold`, then the schema will be
350354
* truncated to an empty schema to avoid corruption.
351-
* Also, if the schema contains non-latin encoding characters, the schema will be garbled. In
352-
* this case we also truncate the schema.
355+
*
356+
* @return a tuple of the truncated schema and a map of error messages if any.
357+
* The error message is only set if the schema is truncated. Truncation
358+
* can happen if the schema is too long or if it contains non-latin characters.
353359
*/
354-
def truncateSchemaIfNecessary(schema: StructType): StructType = {
360+
def truncateSchemaIfNecessary(
361+
schema: StructType,
362+
truncationThreshold: Long): (StructType, Map[String, String]) = {
355363
// Encoders are not threadsafe
356364
val encoder = latin1.newEncoder()
357-
def isColumnValid(f: StructField): Boolean = {
358-
val typeString = f.dataType.catalogString
359-
encoder.canEncode(f.name) &&
360-
typeString.length <= MAX_CATALOG_TYPE_DDL_LENGTH &&
361-
encoder.canEncode(typeString)
362-
}
363-
364-
if (schema.exists(f => !isColumnValid(f))) {
365-
new StructType()
366-
} else {
367-
schema
365+
schema.foreach { f =>
366+
if (f.dataType.catalogString.length > truncationThreshold) {
367+
return (new StructType(), Map(UpdateCatalog.ERROR_KEY -> LONG_SCHEMA_ERROR))
368+
}
369+
if (!encoder.canEncode(f.name) || !encoder.canEncode(f.dataType.catalogString)) {
370+
return (new StructType(), Map(UpdateCatalog.ERROR_KEY -> NON_LATIN_CHARS_ERROR))
371+
}
368372
}
373+
(schema, Map.empty)
369374
}
370375
}

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

+9
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,15 @@ trait DeltaSQLConfBase {
538538
.checkValue(_ > 0, "threadPoolSize must be positive")
539539
.createWithDefault(20)
540540

541+
val DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD =
542+
buildConf("catalog.update.longFieldTruncationThreshold")
543+
.internal()
544+
.doc(
545+
"When syncing table schema to the catalog, Delta will truncate the whole schema " +
546+
"if any field is longer than this threshold.")
547+
.longConf
548+
.createWithDefault(4000)
549+
541550
val DELTA_LIST_FROM_COMMIT_STORE_THREAD_POOL_SIZE =
542551
buildStaticConf("commitStore.getCommits.threadPoolSize")
543552
.internal()

spark/src/test/scala/org/apache/spark/sql/delta/DeltaUpdateCatalogSuite.scala

+104-55
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import scala.util.control.NonFatal
2222

2323
import com.databricks.spark.util.Log4jUsageLogger
2424
import org.apache.spark.sql.delta.hooks.UpdateCatalog
25-
import org.apache.spark.sql.delta.hooks.UpdateCatalog.MAX_CATALOG_TYPE_DDL_LENGTH
2625
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2726
import org.apache.spark.sql.delta.test.DeltaHiveTest
2827
import com.fasterxml.jackson.core.JsonParseException
@@ -186,6 +185,9 @@ class DeltaUpdateCatalogSuite
186185
}
187186
}
188187

188+
val MAX_CATALOG_TYPE_DDL_LENGTH: Long =
189+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.defaultValue.get
190+
189191

190192
test("convert to delta with partitioning change") {
191193
withTable(tbl) {
@@ -326,8 +328,6 @@ class DeltaUpdateCatalogSuite
326328
}
327329

328330

329-
import UpdateCatalog.MAX_CATALOG_TYPE_DDL_LENGTH
330-
331331
test("Very long schemas can be stored in the catalog") {
332332
withTable(tbl) {
333333
val schema = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
@@ -340,47 +340,108 @@ class DeltaUpdateCatalogSuite
340340
}
341341
}
342342

343-
test("Schemas that contain very long fields cannot be stored in the catalog") {
344-
withTable(tbl) {
345-
val schema = new StructType()
346-
.add("i", StringType)
347-
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
348-
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
349-
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
350-
s"characters for this test")
351343

352-
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
353-
verifySchemaInCatalog()
344+
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH, 4020))
345+
test(s"Schemas that contain very long fields cannot be stored in the catalog " +
346+
" when longer than the truncation threshold " +
347+
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
348+
withSQLConf(
349+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
350+
truncationThreshold.toString) {
351+
withTable(tbl) {
352+
val schema = new StructType()
353+
.add("i", StringType)
354+
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
355+
require(
356+
schema.toDDL.length >= 4020,
357+
s"The length of the schema should be over 4020 " +
358+
s"characters for this test")
359+
360+
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
361+
if (truncationThreshold > 4020) {
362+
verifyTableMetadata(expectedSchema = schema)
363+
} else {
364+
verifySchemaInCatalog()
365+
}
366+
}
354367
}
355368
}
356369

357-
test("Schemas that contain very long fields cannot be stored in the catalog - array") {
358-
withTable(tbl) {
359-
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
360-
val schema = new StructType()
361-
.add("i", StringType)
362-
.add("array", ArrayType(struct))
363-
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
364-
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
365-
s"characters for this test")
366-
367-
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
368-
verifySchemaInCatalog()
370+
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
371+
test(s"Schemas that contain very long fields cannot be stored in the catalog - array" +
372+
" when longer than the truncation threshold " +
373+
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
374+
withSQLConf(
375+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
376+
truncationThreshold.toString) {
377+
withTable(tbl) {
378+
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
379+
val schema = new StructType()
380+
.add("i", StringType)
381+
.add("array", ArrayType(struct))
382+
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
383+
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
384+
s"characters for this test")
385+
386+
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
387+
if (truncationThreshold == 99999) {
388+
verifyTableMetadata(expectedSchema = schema)
389+
} else {
390+
verifySchemaInCatalog()
391+
}
392+
}
369393
}
370394
}
371395

372-
test("Schemas that contain very long fields cannot be stored in the catalog - map") {
373-
withTable(tbl) {
374-
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
375-
val schema = new StructType()
376-
.add("i", StringType)
377-
.add("map", MapType(StringType, struct))
378-
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
379-
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
380-
s"characters for this test")
396+
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
397+
test(s"Schemas that contain very long fields cannot be stored in the catalog - map" +
398+
" when longer than the truncation threshold " +
399+
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
400+
withSQLConf(
401+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
402+
truncationThreshold.toString) {
403+
withTable(tbl) {
404+
val struct = StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType)))
405+
val schema = new StructType()
406+
.add("i", StringType)
407+
.add("map", MapType(StringType, struct))
408+
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
409+
s"The length of the schema should be over $MAX_CATALOG_TYPE_DDL_LENGTH " +
410+
s"characters for this test")
411+
412+
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
413+
if (truncationThreshold == 99999) {
414+
verifyTableMetadata(expectedSchema = schema)
415+
} else {
416+
verifySchemaInCatalog()
417+
}
418+
}
419+
}
420+
}
381421

382-
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
383-
verifySchemaInCatalog()
422+
for (truncationThreshold <- Seq(99999, MAX_CATALOG_TYPE_DDL_LENGTH))
423+
test(s"Very long nested fields cannot be stored in the catalog - partitioned" +
424+
" when longer than the truncation threshold " +
425+
s" [DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD = $truncationThreshold]") {
426+
withSQLConf(
427+
DeltaSQLConf.DELTA_UPDATE_CATALOG_LONG_FIELD_TRUNCATION_THRESHOLD.key ->
428+
truncationThreshold.toString) {
429+
withTable(tbl) {
430+
val schema = new StructType()
431+
.add("i", StringType)
432+
.add("part", StringType)
433+
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
434+
require(
435+
schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
436+
"The length of the schema should be over 4000 characters for this test")
437+
438+
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta PARTITIONED BY (part)")
439+
if (truncationThreshold == 99999) {
440+
verifyTableMetadata(expectedSchema = schema)
441+
} else {
442+
verifySchemaInCatalog()
443+
}
444+
}
384445
}
385446
}
386447

@@ -396,34 +457,21 @@ class DeltaUpdateCatalogSuite
396457
}
397458
}
398459

399-
test("Very long nested fields cannot be stored in the catalog - partitioned") {
400-
withTable(tbl) {
401-
val schema = new StructType()
402-
.add("i", StringType)
403-
.add("part", StringType)
404-
.add("struct", StructType(Seq.tabulate(1000)(i => StructField(s"col$i", StringType))))
405-
require(schema.toDDL.length >= MAX_CATALOG_TYPE_DDL_LENGTH,
406-
"The length of the schema should be over 4000 characters for this test")
407-
408-
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta PARTITIONED BY (part)")
409-
verifySchemaInCatalog()
410-
}
411-
}
412460

413461
// scalastyle:off nonascii
414462
test("Schema containing non-latin characters cannot be stored - top-level") {
415463
withTable(tbl) {
416464
val schema = new StructType().add("今天", "string")
417465
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
418-
verifySchemaInCatalog()
466+
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
419467
}
420468
}
421469

422470
test("Schema containing non-latin characters cannot be stored - struct") {
423471
withTable(tbl) {
424472
val schema = new StructType().add("struct", new StructType().add("今天", "string"))
425473
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
426-
verifySchemaInCatalog()
474+
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
427475
}
428476
}
429477

@@ -434,7 +482,7 @@ class DeltaUpdateCatalogSuite
434482
.add("array", ArrayType(new StructType().add("今天", "string")))
435483

436484
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
437-
verifySchemaInCatalog()
485+
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
438486
}
439487
}
440488

@@ -445,7 +493,7 @@ class DeltaUpdateCatalogSuite
445493
.add("map", MapType(StringType, new StructType().add("今天", "string")))
446494

447495
sql(s"CREATE TABLE $tbl (${schema.toDDL}) USING delta")
448-
verifySchemaInCatalog()
496+
verifySchemaInCatalog(expectedErrorMessage = UpdateCatalog.NON_LATIN_CHARS_ERROR)
449497
}
450498
}
451499
// scalastyle:on nonascii
@@ -456,7 +504,8 @@ class DeltaUpdateCatalogSuite
456504
*/
457505
private def verifySchemaInCatalog(
458506
table: String = tbl,
459-
catalogPartitionCols: Seq[String] = Nil): Unit = {
507+
catalogPartitionCols: Seq[String] = Nil,
508+
expectedErrorMessage: String = UpdateCatalog.LONG_SCHEMA_ERROR): Unit = {
460509
val cat = spark.sessionState.catalog.externalCatalog.getTable("default", table)
461510
assert(cat.schema.isEmpty, s"Schema wasn't empty")
462511
assert(cat.partitionColumnNames === catalogPartitionCols)
@@ -465,7 +514,7 @@ class DeltaUpdateCatalogSuite
465514
s"Properties didn't match for table: $table. Expected: ${getBaseProperties(snapshot)}, " +
466515
s"Got: ${cat.properties}")
467516
}
468-
assert(cat.properties(UpdateCatalog.ERROR_KEY) === UpdateCatalog.LONG_SCHEMA_ERROR)
517+
assert(cat.properties(UpdateCatalog.ERROR_KEY) === expectedErrorMessage)
469518

470519
// Make sure table is readable
471520
checkAnswer(spark.table(table), Nil)

0 commit comments

Comments
 (0)