Skip to content

Commit 4e90dad

Browse files
awdavidsonattilapiros
authored andcommitted
[SPARK-40819][SQL] Timestamp nanos behaviour regression
### What changes were proposed in this pull request? Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2 ### Why are the changes needed? Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns ``` Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)) ``` https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()` Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test covering this scenario. Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)` Closes apache#38312 from awdavidson/ts-nanos-fix. Lead-authored-by: alfreddavidson <alfie.davidson9@gmail.com> Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com> Co-authored-by: awdavidson <54780428+awdavidson@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit ceccda0) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent 770b1cc commit 4e90dad

File tree

8 files changed

+104
-12
lines changed

8 files changed

+104
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3791,6 +3791,13 @@ object SQLConf {
37913791
.booleanConf
37923792
.createWithDefault(false)
37933793

3794+
val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
3795+
.internal()
3796+
.doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
3797+
.version("3.2.3")
3798+
.booleanConf
3799+
.createWithDefault(false)
3800+
37943801
val PARQUET_INT96_REBASE_MODE_IN_WRITE =
37953802
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
37963803
.internal()
@@ -4978,6 +4985,8 @@ class SQLConf extends Serializable with Logging {
49784985

49794986
def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)
49804987

4988+
def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)
4989+
49814990
def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)
49824991

49834992
def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ protected void initialize(String path, List<String> columns) throws IOException
150150
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
151151
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
152152
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
153+
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
153154

154155
this.file = new Path(path);
155156
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
@@ -201,6 +202,7 @@ protected void initialize(
201202
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
202203
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
203204
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
205+
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
204206
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
205207
.convertParquetColumn(requestedSchema, Option.empty());
206208
this.sparkSchema = (StructType) parquetColumn.sparkType();

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ class ParquetFileFormat
153153
hadoopConf.setBoolean(
154154
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
155155
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
156+
hadoopConf.setBoolean(
157+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
158+
sparkSession.sessionState.conf.legacyParquetNanosAsLong)
159+
156160

157161
val broadcastedHadoopConf =
158162
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
@@ -357,7 +361,8 @@ object ParquetFileFormat extends Logging {
357361
val converter = new ParquetToSparkSchemaConverter(
358362
sparkSession.sessionState.conf.isParquetBinaryAsString,
359363
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
360-
inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
364+
inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled,
365+
nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)
361366

362367
val seen = mutable.HashSet[String]()
363368
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
@@ -454,13 +459,15 @@ object ParquetFileFormat extends Logging {
454459
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
455460
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
456461
val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
462+
val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong
457463

458464
val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
459465
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
460466
val converter = new ParquetToSparkSchemaConverter(
461467
assumeBinaryIsString = assumeBinaryIsString,
462468
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
463-
inferTimestampNTZ = inferTimestampNTZ)
469+
inferTimestampNTZ = inferTimestampNTZ,
470+
nanosAsLong = nanosAsLong)
464471

465472
readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
466473
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,28 @@ import org.apache.spark.sql.types._
4949
* @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
5050
* schema with Parquet schema.
5151
* @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
52+
* @param nanosAsLong Whether timestamps with nanos are converted to long.
5253
*/
5354
class ParquetToSparkSchemaConverter(
5455
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
5556
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
5657
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
57-
inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
58+
inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
59+
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {
5860

5961
def this(conf: SQLConf) = this(
6062
assumeBinaryIsString = conf.isParquetBinaryAsString,
6163
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
6264
caseSensitive = conf.caseSensitiveAnalysis,
63-
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
65+
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
66+
nanosAsLong = conf.legacyParquetNanosAsLong)
6467

6568
def this(conf: Configuration) = this(
6669
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
6770
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
6871
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
69-
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
72+
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
73+
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)
7074

7175
/**
7276
* Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
@@ -271,6 +275,11 @@ class ParquetToSparkSchemaConverter(
271275
} else {
272276
TimestampNTZType
273277
}
278+
// SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
279+
// timezone awareness to address behaviour regression introduced by SPARK-34661
280+
case timestamp: TimestampLogicalTypeAnnotation
281+
if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
282+
LongType
274283
case _ => illegalType()
275284
}
276285

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,10 @@ object ParquetUtils extends Logging {
461461
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
462462
sqlConf.parquetFieldIdWriteEnabled.toString)
463463

464+
conf.set(
465+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
466+
sqlConf.legacyParquetNanosAsLong.toString)
467+
464468
// Sets compression scheme
465469
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)
466470

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ case class ParquetScan(
9191
hadoopConf.setBoolean(
9292
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
9393
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
94+
hadoopConf.setBoolean(
95+
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
96+
sparkSession.sessionState.conf.legacyParquetNanosAsLong)
9497

9598
val broadcastedConf = sparkSession.sparkContext.broadcast(
9699
new SerializableConfiguration(hadoopConf))
784 Bytes
Binary file not shown.

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._
2929
import org.apache.spark.SparkException
3030
import org.apache.spark.sql.catalyst.ScalaReflection
3131
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
32+
import org.apache.spark.sql.functions.desc
3233
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
3435
import org.apache.spark.sql.test.SharedSparkSession
@@ -45,15 +46,17 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
4546
binaryAsString: Boolean,
4647
int96AsTimestamp: Boolean,
4748
writeLegacyParquetFormat: Boolean,
48-
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
49+
expectedParquetColumn: Option[ParquetColumn] = None,
50+
nanosAsLong: Boolean = false): Unit = {
4951
testSchema(
5052
testName,
5153
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
5254
messageType,
5355
binaryAsString,
5456
int96AsTimestamp,
5557
writeLegacyParquetFormat,
56-
expectedParquetColumn = expectedParquetColumn)
58+
expectedParquetColumn = expectedParquetColumn,
59+
nanosAsLong = nanosAsLong)
5760
}
5861

5962
protected def testParquetToCatalyst(
@@ -65,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
6568
caseSensitive: Boolean = false,
6669
inferTimestampNTZ: Boolean = true,
6770
sparkReadSchema: Option[StructType] = None,
68-
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
71+
expectedParquetColumn: Option[ParquetColumn] = None,
72+
nanosAsLong: Boolean = false): Unit = {
6973
val converter = new ParquetToSparkSchemaConverter(
7074
assumeBinaryIsString = binaryAsString,
7175
assumeInt96IsTimestamp = int96AsTimestamp,
7276
caseSensitive = caseSensitive,
73-
inferTimestampNTZ = inferTimestampNTZ)
77+
inferTimestampNTZ = inferTimestampNTZ,
78+
nanosAsLong = nanosAsLong)
7479

7580
test(s"sql <= parquet: $testName") {
7681
val actualParquetColumn = converter.convertParquetColumn(
@@ -119,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
119124
writeLegacyParquetFormat: Boolean,
120125
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
121126
SQLConf.ParquetOutputTimestampType.INT96,
122-
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
127+
expectedParquetColumn: Option[ParquetColumn] = None,
128+
nanosAsLong: Boolean = false): Unit = {
123129

124130
testCatalystToParquet(
125131
testName,
@@ -134,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
134140
parquetSchema,
135141
binaryAsString,
136142
int96AsTimestamp,
137-
expectedParquetColumn = expectedParquetColumn)
143+
expectedParquetColumn = expectedParquetColumn,
144+
nanosAsLong = nanosAsLong)
138145
}
139146

140147
protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
@@ -149,7 +156,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
149156
val expectedDesc = expected.descriptor.get
150157
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
151158
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
152-
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
159+
160+
actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
161+
case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
162+
if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
163+
assert(actual.sparkType == expected.sparkType)
164+
case _ =>
165+
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
166+
}
153167
}
154168

155169
assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
@@ -197,6 +211,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
197211
}
198212

199213
class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
214+
testSchemaInference[Tuple1[Long]](
215+
"timestamp nanos",
216+
"""
217+
|message root {
218+
| required int64 _1 (TIMESTAMP(NANOS,true));
219+
|}
220+
""".stripMargin,
221+
binaryAsString = false,
222+
int96AsTimestamp = true,
223+
writeLegacyParquetFormat = true,
224+
expectedParquetColumn = Some(
225+
ParquetColumn(
226+
sparkType = StructType.fromAttributes(
227+
ScalaReflection.attributesFor[Tuple1[Long]]),
228+
descriptor = None,
229+
repetitionLevel = 0,
230+
definitionLevel = 0,
231+
required = false,
232+
path = Seq(),
233+
children = Seq(
234+
primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
235+
0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
236+
))),
237+
nanosAsLong = true
238+
)
239+
200240
testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
201241
"basic types",
202242
"""
@@ -1027,6 +1067,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
10271067
}
10281068
}
10291069

1070+
test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
1071+
val tsAttribute = "birthday"
1072+
withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
1073+
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
1074+
val data = spark.read.parquet(testDataPath).select(tsAttribute)
1075+
assert(data.schema.fields.head.dataType == LongType)
1076+
assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
1077+
}
1078+
}
1079+
1080+
test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
1081+
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
1082+
val e = intercept[org.apache.spark.SparkException] {
1083+
spark.read.parquet(testDataPath).collect()
1084+
}
1085+
assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))."))
1086+
}
1087+
10301088
// =======================================================
10311089
// Tests for converting Parquet LIST to Catalyst ArrayType
10321090
// =======================================================

0 commit comments

Comments
 (0)