Skip to content

Commit 3789ac8

Browse files
committed
Deprecates SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC
1 parent 4586f21 commit 3789ac8

File tree

5 files changed

+232
-148
lines changed

5 files changed

+232
-148
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,9 @@ private[spark] object SQLConf {
290290
defaultValue = Some(true),
291291
doc = "Enables Parquet filter push-down optimization when set to true.")
292292

293-
val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
294-
key = "spark.sql.parquet.followParquetFormatSpec",
295-
defaultValue = Some(false),
293+
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
294+
key = "spark.sql.parquet.writeLegacyFormat",
295+
defaultValue = Some(true),
296296
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
297297
"Spark SQL schema and vice versa.",
298298
isPublic = false)
@@ -304,8 +304,7 @@ private[spark] object SQLConf {
304304
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
305305
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
306306
"option must be set in Hadoop Configuration. 2. This option overrides " +
307-
"\"spark.sql.sources.outputCommitterClass\"."
308-
)
307+
"\"spark.sql.sources.outputCommitterClass\".")
309308

310309
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
311310
defaultValue = Some(false),
@@ -497,7 +496,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
497496

498497
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
499498

500-
private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
499+
private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
501500

502501
private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
503502

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -41,34 +41,31 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
4141
* @constructor
4242
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
4343
* [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
44-
* [[StructType]].
44+
* [[StructType]]. This argument only affects Parquet read path.
4545
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
4646
* [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
4747
* [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
4848
* has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
49-
* described in Parquet format spec.
50-
* @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
51-
* converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and
52-
* prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
53-
* uses non-standard LIST and MAP structure. Note that the current Parquet format spec is
54-
* backwards-compatible with these settings. If this argument is set to `false`, we fallback
55-
* to old style non-standard behaviors.
49+
* described in Parquet format spec. This argument only affects Parquet read path.
50+
* @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
51+
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
52+
* When set to false, use standard format defined in parquet-format spec. This argument only
53+
* affects Parquet write path.
5654
*/
5755
private[parquet] class CatalystSchemaConverter(
58-
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
59-
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
60-
followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
61-
) {
56+
private val assumeBinaryIsString: Boolean,
57+
private val assumeInt96IsTimestamp: Boolean,
58+
private val writeLegacyParquetFormat: Boolean) {
6259

6360
def this(conf: SQLConf) = this(
6461
assumeBinaryIsString = conf.isParquetBinaryAsString,
6562
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
66-
followParquetFormatSpec = conf.followParquetFormatSpec)
63+
writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
6764

6865
def this(conf: Configuration) = this(
6966
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
7067
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
71-
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
68+
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean)
7269

7370
/**
7471
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -371,15 +368,15 @@ private[parquet] class CatalystSchemaConverter(
371368
case BinaryType =>
372369
Types.primitive(BINARY, repetition).named(field.name)
373370

374-
// =====================================
375-
// Decimals (for Spark version <= 1.4.x)
376-
// =====================================
371+
// ======================
372+
// Decimals (legacy mode)
373+
// ======================
377374

378375
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
379376
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
380377
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
381378
// by `DECIMAL`.
382-
case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
379+
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
383380
Types
384381
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
385382
.as(DECIMAL)
@@ -388,13 +385,13 @@ private[parquet] class CatalystSchemaConverter(
388385
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
389386
.named(field.name)
390387

391-
// =====================================
392-
// Decimals (follow Parquet format spec)
393-
// =====================================
388+
// ========================
389+
// Decimals (standard mode)
390+
// ========================
394391

395392
// Uses INT32 for 1 <= precision <= 9
396393
case DecimalType.Fixed(precision, scale)
397-
if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
394+
if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
398395
Types
399396
.primitive(INT32, repetition)
400397
.as(DECIMAL)
@@ -404,7 +401,7 @@ private[parquet] class CatalystSchemaConverter(
404401

405402
// Uses INT64 for 1 <= precision <= 18
406403
case DecimalType.Fixed(precision, scale)
407-
if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
404+
if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
408405
Types
409406
.primitive(INT64, repetition)
410407
.as(DECIMAL)
@@ -413,7 +410,7 @@ private[parquet] class CatalystSchemaConverter(
413410
.named(field.name)
414411

415412
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
416-
case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
413+
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
417414
Types
418415
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
419416
.as(DECIMAL)
@@ -422,15 +419,15 @@ private[parquet] class CatalystSchemaConverter(
422419
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
423420
.named(field.name)
424421

425-
// ===================================================
426-
// ArrayType and MapType (for Spark versions <= 1.4.x)
427-
// ===================================================
422+
// ===================================
423+
// ArrayType and MapType (legacy mode)
424+
// ===================================
428425

429426
// Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level
430427
// `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
431428
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element
432429
// field name "array" is borrowed from parquet-avro.
433-
case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
430+
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
434431
// <list-repetition> group <name> (LIST) {
435432
// optional group bag {
436433
// repeated <element-type> array;
@@ -448,7 +445,7 @@ private[parquet] class CatalystSchemaConverter(
448445
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
449446
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
450447
// covered by the backwards-compatibility rules implemented in `isElementType()`.
451-
case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
448+
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
452449
// <list-repetition> group <name> (LIST) {
453450
// repeated <element-type> element;
454451
// }
@@ -460,7 +457,7 @@ private[parquet] class CatalystSchemaConverter(
460457

461458
// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
462459
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
463-
case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
460+
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
464461
// <map-repetition> group <name> (MAP) {
465462
// repeated group map (MAP_KEY_VALUE) {
466463
// required <key-type> key;
@@ -473,11 +470,11 @@ private[parquet] class CatalystSchemaConverter(
473470
convertField(StructField("key", keyType, nullable = false)),
474471
convertField(StructField("value", valueType, valueContainsNull)))
475472

476-
// ==================================================
477-
// ArrayType and MapType (follow Parquet format spec)
478-
// ==================================================
473+
// =====================================
474+
// ArrayType and MapType (standard mode)
475+
// =====================================
479476

480-
case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
477+
case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
481478
// <list-repetition> group <name> (LIST) {
482479
// repeated group list {
483480
// <element-repetition> <element-type> element;

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ private[sql] class ParquetRelation(
283283
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
284284
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
285285
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
286-
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
286+
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
287287

288288
// Parquet row group size. We will use this value as the value for
289289
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -301,7 +301,7 @@ private[sql] class ParquetRelation(
301301
parquetFilterPushDown,
302302
assumeBinaryIsString,
303303
assumeInt96IsTimestamp,
304-
followParquetFormatSpec) _
304+
writeLegacyParquetFormat) _
305305

306306
// Create the function to set input paths at the driver side.
307307
val setInputPaths =
@@ -526,7 +526,7 @@ private[sql] object ParquetRelation extends Logging {
526526
parquetFilterPushDown: Boolean,
527527
assumeBinaryIsString: Boolean,
528528
assumeInt96IsTimestamp: Boolean,
529-
followParquetFormatSpec: Boolean)(job: Job): Unit = {
529+
writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
530530
val conf = job.getConfiguration
531531
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
532532

@@ -556,7 +556,7 @@ private[sql] object ParquetRelation extends Logging {
556556
// Sets flags for Parquet schema conversion
557557
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
558558
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
559-
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
559+
conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
560560

561561
overrideMinSplitSize(parquetBlockSize, conf)
562562
}
@@ -581,7 +581,7 @@ private[sql] object ParquetRelation extends Logging {
581581
val converter = new CatalystSchemaConverter(
582582
sqlContext.conf.isParquetBinaryAsString,
583583
sqlContext.conf.isParquetBinaryAsString,
584-
sqlContext.conf.followParquetFormatSpec)
584+
sqlContext.conf.writeLegacyParquetFormat)
585585

586586
converter.convert(schema)
587587
}
@@ -715,7 +715,7 @@ private[sql] object ParquetRelation extends Logging {
715715
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
716716
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
717717
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
718-
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
718+
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
719719
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
720720

721721
// !! HACK ALERT !!
@@ -755,7 +755,7 @@ private[sql] object ParquetRelation extends Logging {
755755
new CatalystSchemaConverter(
756756
assumeBinaryIsString = assumeBinaryIsString,
757757
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
758-
followParquetFormatSpec = followParquetFormatSpec)
758+
writeLegacyParquetFormat = writeLegacyParquetFormat)
759759

760760
footers.map { footer =>
761761
ParquetRelation.readSchemaFromFooter(footer, converter)

0 commit comments

Comments
 (0)