diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1cc3b61b8345..45a294192ded 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3775,6 +3775,13 @@ object SQLConf { .booleanConf .createWithDefault(false) + val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong") + .internal() + .doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.") + .version("3.2.3") + .booleanConf + .createWithDefault(false) + val PARQUET_INT96_REBASE_MODE_IN_WRITE = buildConf("spark.sql.parquet.int96RebaseModeInWrite") .internal() @@ -4944,6 +4951,8 @@ class SQLConf extends Serializable with Logging { def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID) + def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG) + def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED) def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index b14f329b4133..330296b64c5f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -150,6 +150,7 @@ protected void initialize(String path, List columns) throws IOException config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.file = new Path(path); long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen(); @@ -201,6 +202,7 @@ protected void initialize( config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false); config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false); config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false); + config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false); this.parquetColumn = new ParquetToSparkSchemaConverter(config) .convertParquetColumn(requestedSchema, Option.empty()); this.sparkSchema = (StructType) parquetColumn.sparkType(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index afa00aa6f373..5789f252c3be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -153,6 +153,10 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -357,7 +361,8 @@ object ParquetFileFormat extends Logging { val converter = new ParquetToSparkSchemaConverter( sparkSession.sessionState.conf.isParquetBinaryAsString, sparkSession.sessionState.conf.isParquetINT96AsTimestamp, - inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled, + nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong) val seen = mutable.HashSet[String]() val finalSchemas: Seq[StructType] = footers.flatMap { footer => @@ -454,13 +459,15 @@ object ParquetFileFormat extends Logging { val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled + val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => { // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = assumeBinaryIsString, assumeInt96IsTimestamp = assumeInt96IsTimestamp, - inferTimestampNTZ = inferTimestampNTZ) + inferTimestampNTZ = inferTimestampNTZ, + nanosAsLong = nanosAsLong) readParquetFootersInParallel(conf, files, ignoreCorruptFiles) .map(ParquetFileFormat.readSchemaFromFooter(_, converter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index f6b02579d31f..9c9e7ce729c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -49,24 +49,28 @@ import org.apache.spark.sql.types._ * @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read * schema with Parquet schema. * @param inferTimestampNTZ Whether TimestampNTZType type is enabled. + * @param nanosAsLong Whether timestamps with nanos are converted to long. */ class ParquetToSparkSchemaConverter( assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get, assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get, caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get, - inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) { + inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get, + nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) { def this(conf: SQLConf) = this( assumeBinaryIsString = conf.isParquetBinaryAsString, assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp, caseSensitive = conf.caseSensitiveAnalysis, - inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled) + inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled, + nanosAsLong = conf.legacyParquetNanosAsLong) def this(conf: Configuration) = this( assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean, - inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean) + inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, + nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean) /** * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter. @@ -271,6 +275,11 @@ class ParquetToSparkSchemaConverter( } else { TimestampNTZType } + // SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without + // timezone awareness to address behaviour regression introduced by SPARK-34661 + case timestamp: TimestampLogicalTypeAnnotation + if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong => + LongType case _ => illegalType() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala index de4eda1acfd4..a6d13d072f48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala @@ -461,6 +461,10 @@ object ParquetUtils extends Logging { SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) + conf.set( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sqlConf.legacyParquetNanosAsLong.toString) + // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala index 7495893a9110..feca878498dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala @@ -91,6 +91,9 @@ case class ParquetScan( hadoopConf.setBoolean( SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key, sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled) + hadoopConf.setBoolean( + SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, + sparkSession.sessionState.conf.legacyParquetNanosAsLong) val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) diff --git a/sql/core/src/test/resources/test-data/timestamp-nanos.parquet b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet new file mode 100644 index 000000000000..962aa909b823 Binary files /dev/null and b/sql/core/src/test/resources/test-data/timestamp-nanos.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 468e31d1879e..5589c61be7ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException +import org.apache.spark.sql.functions.desc import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._ import org.apache.spark.sql.test.SharedSparkSession @@ -45,7 +46,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString: Boolean, int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testSchema( testName, StructType.fromAttributes(ScalaReflection.attributesFor[T]), @@ -53,7 +55,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { binaryAsString, int96AsTimestamp, writeLegacyParquetFormat, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def testParquetToCatalyst( @@ -65,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { caseSensitive: Boolean = false, inferTimestampNTZ: Boolean = true, sparkReadSchema: Option[StructType] = None, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { val converter = new ParquetToSparkSchemaConverter( assumeBinaryIsString = binaryAsString, assumeInt96IsTimestamp = int96AsTimestamp, caseSensitive = caseSensitive, - inferTimestampNTZ = inferTimestampNTZ) + inferTimestampNTZ = inferTimestampNTZ, + nanosAsLong = nanosAsLong) test(s"sql <= parquet: $testName") { val actualParquetColumn = converter.convertParquetColumn( @@ -119,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = SQLConf.ParquetOutputTimestampType.INT96, - expectedParquetColumn: Option[ParquetColumn] = None): Unit = { + expectedParquetColumn: Option[ParquetColumn] = None, + nanosAsLong: Boolean = false): Unit = { testCatalystToParquet( testName, @@ -134,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { parquetSchema, binaryAsString, int96AsTimestamp, - expectedParquetColumn = expectedParquetColumn) + expectedParquetColumn = expectedParquetColumn, + nanosAsLong = nanosAsLong) } protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = { @@ -149,7 +156,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { val expectedDesc = expected.descriptor.get assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel) - assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + + actualDesc.getPrimitiveType.getLogicalTypeAnnotation match { + case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation + if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS => + assert(actual.sparkType == expected.sparkType) + case _ => + assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType) + } } assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " + @@ -197,6 +211,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { } class ParquetSchemaInferenceSuite extends ParquetSchemaTest { + testSchemaInference[Tuple1[Long]]( + "timestamp nanos", + """ + |message root { + | required int64 _1 (TIMESTAMP(NANOS,true)); + |} + """.stripMargin, + binaryAsString = false, + int96AsTimestamp = true, + writeLegacyParquetFormat = true, + expectedParquetColumn = Some( + ParquetColumn( + sparkType = StructType.fromAttributes( + ScalaReflection.attributesFor[Tuple1[Long]]), + descriptor = None, + repetitionLevel = 0, + definitionLevel = 0, + required = false, + path = Seq(), + children = Seq( + primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED, + 0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false))) + ))), + nanosAsLong = true + ) + testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])]( "basic types", """ @@ -1027,6 +1067,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } } + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") { + val tsAttribute = "birthday" + withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val data = spark.read.parquet(testDataPath).select(tsAttribute) + assert(data.schema.fields.head.dataType == LongType) + assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L) + } + } + + test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") { + val testDataPath = testFile("test-data/timestamp-nanos.parquet") + val e = intercept[org.apache.spark.SparkException] { + spark.read.parquet(testDataPath).collect() + } + assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)).")) + } + // ======================================================= // Tests for converting Parquet LIST to Catalyst ArrayType // =======================================================