Skip to content

Commit 55bfff4

Browse files
awdavidsonparthchandra
authored andcommitted
[SPARK-40819][SQL][3.2] Timestamp nanos behaviour regression
As per HyukjinKwon request on apache#38312 to backport fix into 3.2 ### What changes were proposed in this pull request? Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2 ### Why are the changes needed? Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns ``` Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)) ``` https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()` Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test covering this scenario. Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)` Closes apache#39905 from awdavidson/ts-nanos-fix-3.2. Authored-by: alfreddavidson <alfie.davidson9@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent ed12efa commit 55bfff4

File tree

7 files changed

+82
-11
lines changed

7 files changed

+82
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3228,6 +3228,13 @@ object SQLConf {
32283228
.booleanConf
32293229
.createWithDefault(true)
32303230

3231+
val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
3232+
.internal()
3233+
.doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
3234+
.version("3.2.3")
3235+
.booleanConf
3236+
.createWithDefault(false)
3237+
32313238
val PARQUET_INT96_REBASE_MODE_IN_WRITE =
32323239
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
32333240
.internal()
@@ -4241,6 +4248,8 @@ class SQLConf extends Serializable with Logging {
42414248

42424249
def maxConcurrentOutputFileWriters: Int = getConf(SQLConf.MAX_CONCURRENT_OUTPUT_FILE_WRITERS)
42434250

4251+
def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
4252+
42444253
/** ********************** SQLConf functionality methods ************ */
42454254

42464255
/** Set Spark SQL configuration properties. */

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ protected void initialize(String path, List<String> columns) throws IOException
161161
Configuration config = new Configuration();
162162
config.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key() , false);
163163
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
164+
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
164165

165166
Path file = new Path(path);
166167
long length = file.getFileSystem(config).getFileStatus(file).getLen();

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,10 @@ class ParquetFileFormat
120120
SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
121121
sparkSession.sessionState.conf.parquetOutputTimestampType.toString)
122122

123+
conf.set(
124+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
125+
sparkSession.sessionState.conf.legacyParquetNanosAsLong.toString)
126+
123127
// Sets compression scheme
124128
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
125129

@@ -244,6 +248,9 @@ class ParquetFileFormat
244248
hadoopConf.setBoolean(
245249
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
246250
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
251+
hadoopConf.setBoolean(
252+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
253+
sparkSession.sessionState.conf.legacyParquetNanosAsLong)
247254

248255
val broadcastedHadoopConf =
249256
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -437,7 +444,8 @@ object ParquetFileFormat extends Logging {
437444

438445
val converter = new ParquetToSparkSchemaConverter(
439446
sparkSession.sessionState.conf.isParquetBinaryAsString,
440-
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
447+
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
448+
nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
441449

442450
val seen = mutable.HashSet[String]()
443451
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -533,12 +541,14 @@ object ParquetFileFormat extends Logging {
533541
sparkSession: SparkSession): Option[StructType] = {
534542
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
535543
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
544+
val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
536545

537546
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
538547
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
539548
val converter = new ParquetToSparkSchemaConverter(
540549
assumeBinaryIsString = assumeBinaryIsString,
541-
assumeInt96IsTimestamp = assumeInt96IsTimestamp)
550+
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
551+
nanosAsLong = nanosAsLong)
542552

543553
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
544554
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,22 @@ import org.apache.spark.sql.types._
4343
* [[StringType]] fields.
4444
* @param assumeInt96IsTimestamp Whether unannotated INT96 fields should be assumed to be Spark SQL
4545
* [[TimestampType]] fields.
46+
* @param nanosAsLong Whether timestamps with nanos are converted to long.
4647
*/
4748
class ParquetToSparkSchemaConverter(
4849
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
49-
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get) {
50+
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
51+
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
5052

5153
def this(conf: SQLConf) = this(
5254
assumeBinaryIsString = conf.isParquetBinaryAsString,
53-
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp)
55+
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
56+
nanosAsLong = conf.legacyParquetNanosAsLong)
5457

5558
def this(conf: Configuration) = this(
5659
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
57-
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean)
60+
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
61+
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
5862

5963

6064
/**
@@ -243,6 +247,11 @@ class ParquetToSparkSchemaConverter(
243247
TimestampType
244248
case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.MILLIS =>
245249
TimestampType
250+
// SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
251+
// timezone awareness to address behaviour regression introduced by SPARK-34661
252+
case timestamp: TimestampLogicalTypeAnnotation
253+
if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
254+
LongType
246255
case _ => illegalType()
247256
}
248257

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ case class ParquetScan(
7676
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
7777
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
7878

79+
hadoopConf.setBoolean(
80+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
81+
sparkSession.sessionState.conf.legacyParquetNanosAsLong)
82+
7983
val broadcastedConf = sparkSession.sparkContext.broadcast(
8084
new SerializableConfiguration(hadoopConf))
8185
val sqlConf = sparkSession.sessionState.conf
784 Bytes
Binary file not shown.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.sql.DisableBoson
2828
import org.apache.spark.sql.catalyst.ScalaReflection
2929
import org.apache.spark.sql.execution.QueryExecutionException
3030
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
31+
import org.apache.spark.sql.functions.desc
3132
import org.apache.spark.sql.internal.SQLConf
3233
import org.apache.spark.sql.test.SharedSparkSession
3334
import org.apache.spark.sql.types._
@@ -42,25 +43,29 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
4243
messageType: String,
4344
binaryAsString: Boolean,
4445
int96AsTimestamp: Boolean,
45-
writeLegacyParquetFormat: Boolean): Unit = {
46+
writeLegacyParquetFormat: Boolean,
47+
nanosAsLong: Boolean = false): Unit = {
4648
testSchema(
4749
testName,
4850
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
4951
messageType,
5052
binaryAsString,
5153
int96AsTimestamp,
52-
writeLegacyParquetFormat)
54+
writeLegacyParquetFormat,
55+
nanosAsLong = nanosAsLong)
5356
}
5457

5558
protected def testParquetToCatalyst(
5659
testName: String,
5760
sqlSchema: StructType,
5861
parquetSchema: String,
5962
binaryAsString: Boolean,
60-
int96AsTimestamp: Boolean): Unit = {
63+
int96AsTimestamp: Boolean,
64+
nanosAsLong: Boolean = false): Unit = {
6165
val converter = new ParquetToSparkSchemaConverter(
6266
assumeBinaryIsString = binaryAsString,
63-
assumeInt96IsTimestamp = int96AsTimestamp)
67+
assumeInt96IsTimestamp = int96AsTimestamp,
68+
nanosAsLong = nanosAsLong)
6469

6570
test(s"sql <= parquet: $testName") {
6671
val actual = converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
@@ -101,7 +106,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
101106
int96AsTimestamp: Boolean,
102107
writeLegacyParquetFormat: Boolean,
103108
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
104-
SQLConf.ParquetOutputTimestampType.INT96): Unit = {
109+
SQLConf.ParquetOutputTimestampType.INT96,
110+
nanosAsLong: Boolean = false): Unit = {
105111

106112
testCatalystToParquet(
107113
testName,
@@ -115,11 +121,25 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
115121
sqlSchema,
116122
parquetSchema,
117123
binaryAsString,
118-
int96AsTimestamp)
124+
int96AsTimestamp,
125+
nanosAsLong = nanosAsLong)
119126
}
120127
}
121128

122129
class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
130+
testSchemaInference[Tuple1[Long]](
131+
"timestamp nanos",
132+
"""
133+
|message root {
134+
| required int64 _1 (TIMESTAMP(NANOS,true));
135+
|}
136+
""".stripMargin,
137+
binaryAsString = false,
138+
int96AsTimestamp = true,
139+
writeLegacyParquetFormat = true,
140+
nanosAsLong = true
141+
)
142+
123143
testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
124144
"basic types",
125145
"""
@@ -458,6 +478,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
458478
}
459479
}
460480

481+
test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
482+
val tsAttribute = "birthday"
483+
withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
484+
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
485+
val data = spark.read.parquet(testDataPath).select(tsAttribute)
486+
assert(data.schema.fields.head.dataType == LongType)
487+
assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
488+
}
489+
}
490+
491+
test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
492+
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
493+
val e = intercept[SparkException] {
494+
spark.read.parquet(testDataPath).collect()
495+
}
496+
assert(e.getCause.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))"))
497+
}
498+
461499
// =======================================================
462500
// Tests for converting Parquet LIST to Catalyst ArrayType
463501
// =======================================================

0 commit comments

Comments
 (0)