Skip to content

Commit 01cd688

Browse files
lianchengyhuai
authored andcommitted
[SPARK-10400] [SQL] Renames SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC
We introduced SQL option `spark.sql.parquet.followParquetFormatSpec` while working on implementing Parquet backwards-compatibility rules in SPARK-6777. It indicates whether we should use legacy Parquet format adopted by Spark 1.4 and prior versions or the standard format defined in parquet-format spec to write Parquet files. This option defaults to `false` and is marked as a non-public option (`isPublic = false`) because we haven't finished refactored Parquet write path. The problem is, the name of this option is somewhat confusing, because it's not super intuitive why we shouldn't follow the spec. Would be nice to rename it to `spark.sql.parquet.writeLegacyFormat`, and invert its default value (the two option names have opposite meanings). Although this option is private in 1.5, we'll make it public in 1.6 after refactoring Parquet write path. So that users can decide whether to write Parquet files in standard format or legacy format. Author: Cheng Lian <lian@databricks.com> Closes #8566 from liancheng/spark-10400/deprecate-follow-parquet-format-spec.
1 parent 02026a8 commit 01cd688

File tree

6 files changed

+231
-148
lines changed

6 files changed

+231
-148
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -290,9 +290,9 @@ private[spark] object SQLConf {
290290
defaultValue = Some(true),
291291
doc = "Enables Parquet filter push-down optimization when set to true.")
292292

293-
val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf(
294-
key = "spark.sql.parquet.followParquetFormatSpec",
295-
defaultValue = Some(false),
293+
val PARQUET_WRITE_LEGACY_FORMAT = booleanConf(
294+
key = "spark.sql.parquet.writeLegacyFormat",
295+
defaultValue = Some(true),
296296
doc = "Whether to follow Parquet's format specification when converting Parquet schema to " +
297297
"Spark SQL schema and vice versa.",
298298
isPublic = false)
@@ -304,8 +304,7 @@ private[spark] object SQLConf {
304304
"subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass " +
305305
"of org.apache.parquet.hadoop.ParquetOutputCommitter. NOTE: 1. Instead of SQLConf, this " +
306306
"option must be set in Hadoop Configuration. 2. This option overrides " +
307-
"\"spark.sql.sources.outputCommitterClass\"."
308-
)
307+
"\"spark.sql.sources.outputCommitterClass\".")
309308

310309
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
311310
defaultValue = Some(false),
@@ -491,7 +490,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
491490

492491
private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
493492

494-
private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC)
493+
private[spark] def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
495494

496495
private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
497496

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private[parquet] object CatalystReadSupport {
263263
private def clipParquetGroupFields(
264264
parquetRecord: GroupType, structType: StructType): Seq[Type] = {
265265
val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
266-
val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
266+
val toParquet = new CatalystSchemaConverter(writeLegacyParquetFormat = false)
267267
structType.map { f =>
268268
parquetFieldMap
269269
.get(f.name)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,34 +41,31 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
4141
* @constructor
4242
* @param assumeBinaryIsString Whether unannotated BINARY fields should be assumed to be Spark SQL
4343
* [[StringType]] fields when converting Parquet a [[MessageType]] to Spark SQL
44-
* [[StructType]].
44+
* [[StructType]]. This argument only affects Parquet read path.
4545
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
4646
* [[TimestampType]] fields when converting Parquet a [[MessageType]] to Spark SQL
4747
* [[StructType]]. Note that Spark SQL [[TimestampType]] is similar to Hive timestamp, which
4848
* has optional nanosecond precision, but different from `TIME_MILLS` and `TIMESTAMP_MILLIS`
49-
* described in Parquet format spec.
50-
* @param followParquetFormatSpec Whether to generate standard DECIMAL, LIST, and MAP structure when
51-
* converting Spark SQL [[StructType]] to Parquet [[MessageType]]. For Spark 1.4.x and
52-
* prior versions, Spark SQL only supports decimals with a max precision of 18 digits, and
53-
* uses non-standard LIST and MAP structure. Note that the current Parquet format spec is
54-
* backwards-compatible with these settings. If this argument is set to `false`, we fallback
55-
* to old style non-standard behaviors.
49+
* described in Parquet format spec. This argument only affects Parquet read path.
50+
* @param writeLegacyParquetFormat Whether to use legacy Parquet format compatible with Spark 1.4
51+
* and prior versions when converting a Catalyst [[StructType]] to a Parquet [[MessageType]].
52+
* When set to false, use standard format defined in parquet-format spec. This argument only
53+
* affects Parquet write path.
5654
*/
5755
private[parquet] class CatalystSchemaConverter(
5856
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
5957
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
60-
followParquetFormatSpec: Boolean = SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
61-
) {
58+
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get) {
6259

6360
def this(conf: SQLConf) = this(
6461
assumeBinaryIsString = conf.isParquetBinaryAsString,
6562
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
66-
followParquetFormatSpec = conf.followParquetFormatSpec)
63+
writeLegacyParquetFormat = conf.writeLegacyParquetFormat)
6764

6865
def this(conf: Configuration) = this(
6966
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
7067
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
71-
followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
68+
writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean)
7269

7370
/**
7471
* Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]].
@@ -371,15 +368,15 @@ private[parquet] class CatalystSchemaConverter(
371368
case BinaryType =>
372369
Types.primitive(BINARY, repetition).named(field.name)
373370

374-
// =====================================
375-
// Decimals (for Spark version <= 1.4.x)
376-
// =====================================
371+
// ======================
372+
// Decimals (legacy mode)
373+
// ======================
377374

378375
// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
379376
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
380377
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
381378
// by `DECIMAL`.
382-
case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
379+
case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat =>
383380
Types
384381
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
385382
.as(DECIMAL)
@@ -388,13 +385,13 @@ private[parquet] class CatalystSchemaConverter(
388385
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
389386
.named(field.name)
390387

391-
// =====================================
392-
// Decimals (follow Parquet format spec)
393-
// =====================================
388+
// ========================
389+
// Decimals (standard mode)
390+
// ========================
394391

395392
// Uses INT32 for 1 <= precision <= 9
396393
case DecimalType.Fixed(precision, scale)
397-
if precision <= MAX_PRECISION_FOR_INT32 && followParquetFormatSpec =>
394+
if precision <= MAX_PRECISION_FOR_INT32 && !writeLegacyParquetFormat =>
398395
Types
399396
.primitive(INT32, repetition)
400397
.as(DECIMAL)
@@ -404,7 +401,7 @@ private[parquet] class CatalystSchemaConverter(
404401

405402
// Uses INT64 for 1 <= precision <= 18
406403
case DecimalType.Fixed(precision, scale)
407-
if precision <= MAX_PRECISION_FOR_INT64 && followParquetFormatSpec =>
404+
if precision <= MAX_PRECISION_FOR_INT64 && !writeLegacyParquetFormat =>
408405
Types
409406
.primitive(INT64, repetition)
410407
.as(DECIMAL)
@@ -413,7 +410,7 @@ private[parquet] class CatalystSchemaConverter(
413410
.named(field.name)
414411

415412
// Uses FIXED_LEN_BYTE_ARRAY for all other precisions
416-
case DecimalType.Fixed(precision, scale) if followParquetFormatSpec =>
413+
case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat =>
417414
Types
418415
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
419416
.as(DECIMAL)
@@ -422,15 +419,15 @@ private[parquet] class CatalystSchemaConverter(
422419
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
423420
.named(field.name)
424421

425-
// ===================================================
426-
// ArrayType and MapType (for Spark versions <= 1.4.x)
427-
// ===================================================
422+
// ===================================
423+
// ArrayType and MapType (legacy mode)
424+
// ===================================
428425

429426
// Spark 1.4.x and prior versions convert `ArrayType` with nullable elements into a 3-level
430427
// `LIST` structure. This behavior is somewhat a hybrid of parquet-hive and parquet-avro
431428
// (1.6.0rc3): the 3-level structure is similar to parquet-hive while the 3rd level element
432429
// field name "array" is borrowed from parquet-avro.
433-
case ArrayType(elementType, nullable @ true) if !followParquetFormatSpec =>
430+
case ArrayType(elementType, nullable @ true) if writeLegacyParquetFormat =>
434431
// <list-repetition> group <name> (LIST) {
435432
// optional group bag {
436433
// repeated <element-type> array;
@@ -448,7 +445,7 @@ private[parquet] class CatalystSchemaConverter(
448445
// Spark 1.4.x and prior versions convert ArrayType with non-nullable elements into a 2-level
449446
// LIST structure. This behavior mimics parquet-avro (1.6.0rc3). Note that this case is
450447
// covered by the backwards-compatibility rules implemented in `isElementType()`.
451-
case ArrayType(elementType, nullable @ false) if !followParquetFormatSpec =>
448+
case ArrayType(elementType, nullable @ false) if writeLegacyParquetFormat =>
452449
// <list-repetition> group <name> (LIST) {
453450
// repeated <element-type> element;
454451
// }
@@ -460,7 +457,7 @@ private[parquet] class CatalystSchemaConverter(
460457

461458
// Spark 1.4.x and prior versions convert MapType into a 3-level group annotated by
462459
// MAP_KEY_VALUE. This is covered by `convertGroupField(field: GroupType): DataType`.
463-
case MapType(keyType, valueType, valueContainsNull) if !followParquetFormatSpec =>
460+
case MapType(keyType, valueType, valueContainsNull) if writeLegacyParquetFormat =>
464461
// <map-repetition> group <name> (MAP) {
465462
// repeated group map (MAP_KEY_VALUE) {
466463
// required <key-type> key;
@@ -473,11 +470,11 @@ private[parquet] class CatalystSchemaConverter(
473470
convertField(StructField("key", keyType, nullable = false)),
474471
convertField(StructField("value", valueType, valueContainsNull)))
475472

476-
// ==================================================
477-
// ArrayType and MapType (follow Parquet format spec)
478-
// ==================================================
473+
// =====================================
474+
// ArrayType and MapType (standard mode)
475+
// =====================================
479476

480-
case ArrayType(elementType, containsNull) if followParquetFormatSpec =>
477+
case ArrayType(elementType, containsNull) if !writeLegacyParquetFormat =>
481478
// <list-repetition> group <name> (LIST) {
482479
// repeated group list {
483480
// <element-repetition> <element-type> element;

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ private[sql] class ParquetRelation(
287287
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
288288
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
289289
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
290-
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
290+
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
291291

292292
// Parquet row group size. We will use this value as the value for
293293
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
@@ -305,7 +305,7 @@ private[sql] class ParquetRelation(
305305
parquetFilterPushDown,
306306
assumeBinaryIsString,
307307
assumeInt96IsTimestamp,
308-
followParquetFormatSpec) _
308+
writeLegacyParquetFormat) _
309309

310310
// Create the function to set input paths at the driver side.
311311
val setInputPaths =
@@ -531,7 +531,7 @@ private[sql] object ParquetRelation extends Logging {
531531
parquetFilterPushDown: Boolean,
532532
assumeBinaryIsString: Boolean,
533533
assumeInt96IsTimestamp: Boolean,
534-
followParquetFormatSpec: Boolean)(job: Job): Unit = {
534+
writeLegacyParquetFormat: Boolean)(job: Job): Unit = {
535535
val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
536536
conf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
537537

@@ -561,7 +561,7 @@ private[sql] object ParquetRelation extends Logging {
561561
// Sets flags for Parquet schema conversion
562562
conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, assumeBinaryIsString)
563563
conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, assumeInt96IsTimestamp)
564-
conf.setBoolean(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, followParquetFormatSpec)
564+
conf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, writeLegacyParquetFormat)
565565

566566
overrideMinSplitSize(parquetBlockSize, conf)
567567
}
@@ -586,7 +586,7 @@ private[sql] object ParquetRelation extends Logging {
586586
val converter = new CatalystSchemaConverter(
587587
sqlContext.conf.isParquetBinaryAsString,
588588
sqlContext.conf.isParquetBinaryAsString,
589-
sqlContext.conf.followParquetFormatSpec)
589+
sqlContext.conf.writeLegacyParquetFormat)
590590

591591
converter.convert(schema)
592592
}
@@ -720,7 +720,7 @@ private[sql] object ParquetRelation extends Logging {
720720
filesToTouch: Seq[FileStatus], sqlContext: SQLContext): Option[StructType] = {
721721
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
722722
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
723-
val followParquetFormatSpec = sqlContext.conf.followParquetFormatSpec
723+
val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
724724
val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
725725

726726
// !! HACK ALERT !!
@@ -760,7 +760,7 @@ private[sql] object ParquetRelation extends Logging {
760760
new CatalystSchemaConverter(
761761
assumeBinaryIsString = assumeBinaryIsString,
762762
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
763-
followParquetFormatSpec = followParquetFormatSpec)
763+
writeLegacyParquetFormat = writeLegacyParquetFormat)
764764

765765
footers.map { footer =>
766766
ParquetRelation.readSchemaFromFooter(footer, converter)

0 commit comments

Comments
 (0)