From 96b8f6a760ec243740ead7915a4452deb48a38e8 Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Wed, 19 Oct 2022 10:59:01 +0100 Subject: [PATCH 01/12] SPARK-40819: address timestamp nanos --- .../parquet/ParquetSchemaConverter.scala | 2 ++ .../parquet/ParquetSchemaSuite.scala | 34 ++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index f6b02579d31f..9407b9dd40e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -271,6 +271,8 @@ class ParquetToSparkSchemaConverter( } else { TimestampNTZType } + case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS => + LongType case _ => illegalType() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 468e31d1879e..d159a8c0cda5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -149,7 +149,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { val expectedDesc = expected.descriptor.get assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) - assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + + actualDesc.getPrimitiveType.getLogicalTypeAnnotation match { + case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation + if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS => + assert(actual.sparkType == expected.sparkType) + case _ => + assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + } } assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " + @@ -197,6 +204,31 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { } class ParquetSchemaInferenceSuite extends ParquetSchemaTest { + testSchemaInference[Tuple1[Long]]( + "timestamp nanos", + """ + |message root { + | required int64 _1 (TIMESTAMP(NANOS,true)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[Tuple1[Long]]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, + 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false))) + ))) + ) + testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])]( "basic types", """ From ccdb1cc180d5cb01035c604dee20a61ad36a0076 Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Thu, 20 Oct 2022 08:41:56 +0100 Subject: [PATCH 02/12] add comment --- .../execution/datasources/parquet/ParquetSchemaConverter.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 9407b9dd40e8..4f16202aca84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -271,6 +271,8 @@ class ParquetToSparkSchemaConverter( } else { TimestampNTZType } + // SPARK-40819: NANOS are not support as a Timestamp, convert to LongType without + // timezone awareness to address behaviour regression introduced by SPARK-34661 case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS => LongType case _ => illegalType() From d81bbc78b71848ae2c78493afc704dc8387fbe4c Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Thu, 20 Oct 2022 09:49:10 +0100 Subject: [PATCH 03/12] add comment --- .../execution/datasources/parquet/ParquetSchemaConverter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 4f16202aca84..2cb6bafebbfd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -271,7 +271,7 @@ class ParquetToSparkSchemaConverter( } else { TimestampNTZType } - // SPARK-40819: NANOS are not support as a Timestamp, convert to LongType without + // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without // timezone awareness to address behaviour regression introduced by SPARK-34661 case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS => LongType From efdf7915facb2299311a0f14fd92b37c4779d5b0 Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Thu, 17 Nov 2022 15:30:14 +0000 Subject: [PATCH 04/12] Another timestamp nanos test case --- .../resources/test-data/timestamp-nanos.parquet | Bin 0 -> 799 bytes .../datasources/parquet/ParquetSchemaSuite.scala | 8 ++++++++ 2 files changed, 8 insertions(+) create mode 100644 sql/core/src/test/resources/test-data/timestamp-nanos.parquet diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet new file mode 100644 index 0000000000000000000000000000000000000000..12912f203964f26a95eee86914ede8f64c3f8738 GIT binary patch literal 799 zcmYjQJ&)5s5S_C(js=k*;&$;$E>aK^5G3bIVj^TAE;I-Q=|o8pW3hpIAp{Q0mU@r+}?7fP5*$0Jb zmX2;y&fn1PdZFkZ8JCA?I!&%O8V{p%IP>elWZW1(^7dx_ Date: Thu, 17 Nov 2022 16:21:15 +0000 Subject: [PATCH 05/12] Use testFile --- .../execution/datasources/parquet/ParquetSchemaSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 07beefc60de6..d72c32cf6f57 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1060,8 +1060,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } test("SPARK-40819 - ability to read parquet file with TIMESTAMP(NANOS, true)") { - val testDataPath = getClass.getResource("/test-data/timestamp-nanos.parquet") - val data = spark.read.parquet(testDataPath.toString).select("birthday") + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val data = spark.read.parquet(testDataPath).select("birthday") assert(data.schema.fields.head.dataType == LongType) assert(data.take(1).head.getAs[Long](0) == 1668537129000000000L) From 896694eb9e905ac24558649a18010c58744236b9 Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Thu, 17 Nov 2022 16:24:19 +0000 Subject: [PATCH 06/12] fix test name --- .../sql/execution/datasources/parquet/ParquetSchemaSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index d72c32cf6f57..6adf0e99dd9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1059,7 +1059,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } - test("SPARK-40819 - ability to read parquet file with TIMESTAMP(NANOS, true)") { + test("SPARK-40819: ability to read parquet file with TIMESTAMP(NANOS, true)") { val testDataPath = testFile("test-data/timestamp-nanos.parquet") val data = spark.read.parquet(testDataPath).select("birthday") From d610c6cda9c252f0317ebff45c4cae685ec1234d Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Thu, 17 Nov 2022 16:48:23 +0000 Subject: [PATCH 07/12] update test data to end with non-zero --- .../resources/test-data/timestamp-nanos.parquet | Bin 799 -> 784 bytes .../parquet/ParquetSchemaSuite.scala | 6 ++++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet index 12912f203964f26a95eee86914ede8f64c3f8738..962aa909b8231aa244620f581198fdde14e29017 100644 GIT binary patch delta 255 zcmbQwHi2zIjtPUP4G>Cs7?|0aqGoJadsSVW8A>k;|GFE* zpSaY4A0{U$!2uGU_(hTB8lz~|WJ$&q!k?H$MZ^v;iP|x!rOAl$NSZRRO}@ftSl`A1 z6#vI0YQ~^;jZsFF1th@%G$1Lns3apLu~L*lj7=;{EDI=W12Gn4n*;~UE-?lTuw|&C z43aWHXETTb-7GeXMN~{|3p3DWu^mieY-$`pN`wKZLW0dVadID1C&vdy28IB~AVUBx Cvqp0O delta 260 zcmbQhHlJ-mjwOqz01&?1^7kl@jVO~S3j+f~)Ql}_ud0iSGJvJN?z#7XOGcDUlm&=6 z7#Ns0uJJn=Hb(Lii7}sEF7pAdf*UO-7VQ(v*#1 z@_9zX`UxyR@eXEDGX}LDCK*u{NmCX!4xj-^nMEZTDT$S$3}S3zC1NE&S+#47G7ytN z?m@7`7&O54p^7p{>HyuyBFe-7bUBD(5L?0`Dkip%8R!PFLrh|9Y8+yRCYLfz=J>_P Kz!2aVWC#E_c|SM+ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 6adf0e99dd9f..380aee521e23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.desc import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._ import org.apache.spark.sql.test.SharedSparkSession @@ -1061,10 +1062,11 @@ class ParquetSchemaSuite extends ParquetSchemaTest { test("SPARK-40819: ability to read parquet file with TIMESTAMP(NANOS, true)") { val testDataPath = testFile("test-data/timestamp-nanos.parquet") - val data = spark.read.parquet(testDataPath).select("birthday") + val tsAttribute = "birthday" + val data = spark.read.parquet(testDataPath).select(tsAttribute) assert(data.schema.fields.head.dataType == LongType) - assert(data.take(1).head.getAs[Long](0) == 1668537129000000000L) + assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L) } // ======================================================= From dd476dc4b3631121b3af7dc27cc3cbb8cb838716 Mon Sep 17 00:00:00 2001 From: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com> Date: Fri, 25 Nov 2022 04:16:42 -0800 Subject: [PATCH 08/12] [SPARK-40819][SQL] add spark.sql.legacy.parquet.nanosAsLong configuration (#2) * add spark.sql.legacy.parquet.nanosAsLong * small fix * add extra test: nanosAsLong=false --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++ .../SpecificParquetRecordReaderBase.java | 2 + .../parquet/ParquetFileFormat.scala | 7 ++++ .../parquet/ParquetSchemaConverter.scala | 7 +++- .../datasources/parquet/ParquetUtils.scala | 4 ++ .../datasources/v2/parquet/ParquetScan.scala | 3 ++ .../parquet/ParquetSchemaSuite.scala | 38 +++++++++++++------ 7 files changed, 58 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1cc3b61b8345..0052911d2039 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3775,6 +3775,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong") + .internal() + .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.") + .version("3.2.3") + .booleanConf + .createWithDefault(false) + val PARQUET_INT96_REBASE_MODE_IN_WRITE = buildConf("spark.sql.parquet.int96RebaseModeInWrite") .internal() @@ -4946,6 +4953,8 @@ class SQLConf extends Serializable with Logging { def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED) + def parquetTimestampNTZEnabled: Boolean = getConf(PARQUET_TIMESTAMP_NTZ_ENABLED) + def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND) def histogramNumericPropagateInputType: Boolean = diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index b14f329b4133..330296b64c5f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -150,6 +150,7 @@ protected void initialize(String path, List columns) throws IOException config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.file = new Path(path); long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); @@ -201,6 +202,7 @@ protected void initialize( config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.parquetColumn = new ParquetToSparkSchemaConverter(config) .convertParquetColumn(requestedSchema, Option.empty()); this.sparkSchema = (StructType) parquetColumn.sparkType(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index afa00aa6f373..49fc1fc4705f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -153,6 +153,10 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -358,6 +362,7 @@ object ParquetFileFormat extends Logging { sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetINT96AsTimestamp, inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -454,6 +459,7 @@ object ParquetFileFormat extends Logging { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled + val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` @@ -461,6 +467,7 @@ object ParquetFileFormat extends Logging { assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, inferTimestampNTZ = inferTimestampNTZ) + nanosAsLong = nanosAsLong) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 2cb6bafebbfd..ed0de1dcfae0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -49,24 +49,28 @@ import org.apache.spark.sql.types._ * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read * schema with Parquet schema. * @param inferTimestampNTZ Whether TimestampNTZType type is enabled. + * @param nanosAsLong Whether timestamps with nanos are converted to long. */ class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) { + nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, caseSensitive = conf.caseSensitiveAnalysis, inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled) + nanosAsLong = conf.legacyParquetNanosAsLong) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean, inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean) + nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) /** * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter. @@ -273,7 +277,8 @@ class ParquetToSparkSchemaConverter( } // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without // timezone awareness to address behaviour regression introduced by SPARK-34661 - case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.NANOS => + case timestamp: TimestampLogicalTypeAnnotation + if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong => LongType case _ => illegalType() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index de4eda1acfd4..a6d13d072f48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -461,6 +461,10 @@ object ParquetUtils extends Logging { SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) + conf.set( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sqlConf.legacyParquetNanosAsLong.toString) + // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 7495893a9110..feca878498dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -91,6 +91,9 @@ case class ParquetScan( hadoopConf.setBoolean( SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 380aee521e23..a3092a4c0b20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -46,7 +46,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), @@ -54,7 +55,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString, int96AsTimestamp, writeLegacyParquetFormat, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def testParquetToCatalyst( @@ -66,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { caseSensitive: Boolean = false, inferTimestampNTZ: Boolean = true, sparkReadSchema: Option[StructType] = None, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, caseSensitive = caseSensitive, inferTimestampNTZ = inferTimestampNTZ) + nanosAsLong = nanosAsLong) test(s"sql <= parquet: $testName") { val actualParquetColumn = converter.convertParquetColumn( @@ -120,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = SQLConf.ParquetOutputTimestampType.INT96, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testCatalystToParquet( testName, @@ -135,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { parquetSchema, binaryAsString, int96AsTimestamp, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = { @@ -227,7 +233,8 @@ class ParquetSchemaInferenceSuite extends ParquetSchemaTest { children = Seq( primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false))) - ))) + ))), + nanosAsLong = true ) testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])]( @@ -1060,13 +1067,22 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } - test("SPARK-40819: ability to read parquet file with TIMESTAMP(NANOS, true)") { - val testDataPath = testFile("test-data/timestamp-nanos.parquet") + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") { val tsAttribute = "birthday" - val data = spark.read.parquet(testDataPath).select(tsAttribute) + withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val data = spark.read.parquet(testDataPath).select(tsAttribute) + assert(data.schema.fields.head.dataType == LongType) + assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L) + } + } - assert(data.schema.fields.head.dataType == LongType) - assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L) + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val e = intercept[org.apache.spark.SparkException] { + spark.read.parquet(testDataPath).collect() + } + assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)).")) } // ======================================================= From 14d13e6b2f475bc5406008c05be559c32942df1f Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Fri, 3 Feb 2023 10:44:14 +0000 Subject: [PATCH 09/12] Remove conf element --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 0052911d2039..eb0d0659f5dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4953,8 +4953,6 @@ class SQLConf extends Serializable with Logging { def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED) - def parquetTimestampNTZEnabled: Boolean = getConf(PARQUET_TIMESTAMP_NTZ_ENABLED) - def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND) def histogramNumericPropagateInputType: Boolean = From 2713d75c78008fce2ec1b30e2d0c17f87547697a Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Fri, 3 Feb 2023 12:21:01 +0000 Subject: [PATCH 10/12] Formatting --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetSchemaConverter.scala | 2 +- .../execution/datasources/parquet/ParquetSchemaSuite.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 49fc1fc4705f..a044c165b507 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -466,7 +466,7 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, - inferTimestampNTZ = inferTimestampNTZ) + inferTimestampNTZ = inferTimestampNTZ, nanosAsLong = nanosAsLong) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index ed0de1dcfae0..67be4870f101 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -55,7 +55,7 @@ class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, - inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) { + inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get, nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) { def this(conf: SQLConf) = this( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index a3092a4c0b20..de4d7e751d54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -74,7 +74,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, caseSensitive = caseSensitive, - inferTimestampNTZ = inferTimestampNTZ) + inferTimestampNTZ = inferTimestampNTZ, nanosAsLong = nanosAsLong) test(s"sql <= parquet: $testName") { @@ -1036,7 +1036,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { val errMsg = e.getCause.getMessage assert(errMsg.startsWith("Parquet column cannot be converted in file")) val file = errMsg.substring("Parquet column cannot be converted in file ".length, - errMsg.indexOf(". ")) + errMsg.indexOf(". "))inferTimestampNTZ val col = spark.read.parquet(file).schema.fields.filter(_.name == "a") assert(col.length == 1) if (col(0).dataType == StringType) { From 9f0cc5d6e63048fa0746d032cf2b18c31be4a5d9 Mon Sep 17 00:00:00 2001 From: alfreddavidson Date: Fri, 3 Feb 2023 12:36:11 +0000 Subject: [PATCH 11/12] formatting --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/parquet/ParquetSchemaConverter.scala | 4 ++-- .../execution/datasources/parquet/ParquetSchemaSuite.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index a044c165b507..5789f252c3be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -361,7 +361,7 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetINT96AsTimestamp, - inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled, nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong) val seen = mutable.HashSet[String]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 67be4870f101..9c9e7ce729c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -62,14 +62,14 @@ class ParquetToSparkSchemaConverter( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, caseSensitive = conf.caseSensitiveAnalysis, - inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled) + inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, nanosAsLong = conf.legacyParquetNanosAsLong) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean, - inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean) + inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index de4d7e751d54..5589c61be7ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -1036,7 +1036,7 @@ class ParquetSchemaSuite extends ParquetSchemaTest { val errMsg = e.getCause.getMessage assert(errMsg.startsWith("Parquet column cannot be converted in file")) val file = errMsg.substring("Parquet column cannot be converted in file ".length, - errMsg.indexOf(". "))inferTimestampNTZ + errMsg.indexOf(". ")) val col = spark.read.parquet(file).schema.fields.filter(_.name == "a") assert(col.length == 1) if (col(0).dataType == StringType) { From 83391ab81af8490628d8af74571e4ab8853cc51d Mon Sep 17 00:00:00 2001 From: awdavidson <54780428+awdavidson@users.noreply.github.com> Date: Fri, 3 Feb 2023 12:54:02 +0000 Subject: [PATCH 12/12] Add legacyParquetNanosAsLong --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index eb0d0659f5dc..45a294192ded 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4951,6 +4951,8 @@ class SQLConf extends Serializable with Logging { def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) + def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) + def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED) def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)