Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3775,6 +3775,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val LEGACY_PARQUET_NANOS_AS_LONG = buildConf("spark.sql.legacy.parquet.nanosAsLong")
.internal()
.doc("When true, the Parquet's nanos precision timestamps are converted to SQL long values.")
.version("3.2.3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@awdavidson I realised that we already released Spark 3.2.3.

Can you make a PR to fix this to 3.2.4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

.booleanConf
.createWithDefault(false)

val PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.parquet.int96RebaseModeInWrite")
.internal()
Expand Down Expand Up @@ -4944,6 +4951,8 @@ class SQLConf extends Serializable with Logging {

def ignoreMissingParquetFieldId: Boolean = getConf(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID)

def legacyParquetNanosAsLong: Boolean = getConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG)

def parquetInferTimestampNTZEnabled: Boolean = getConf(PARQUET_INFER_TIMESTAMP_NTZ_ENABLED)

def useV1Command: Boolean = getConf(SQLConf.LEGACY_USE_V1_COMMAND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ protected void initialize(String path, List<String> columns) throws IOException
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);

this.file = new Path(path);
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
Expand Down Expand Up @@ -201,6 +202,7 @@ protected void initialize(
config.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), false);
config.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
config.setBoolean(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED().key(), false);
config.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG().key(), false);
this.parquetColumn = new ParquetToSparkSchemaConverter(config)
.convertParquetColumn(requestedSchema, Option.empty());
this.sparkSchema = (StructType) parquetColumn.sparkType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ class ParquetFileFormat
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)


val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
Expand Down Expand Up @@ -357,7 +361,8 @@ object ParquetFileFormat extends Logging {
val converter = new ParquetToSparkSchemaConverter(
sparkSession.sessionState.conf.isParquetBinaryAsString,
sparkSession.sessionState.conf.isParquetINT96AsTimestamp,
inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled,
nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong)

val seen = mutable.HashSet[String]()
val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
Expand Down Expand Up @@ -454,13 +459,15 @@ object ParquetFileFormat extends Logging {
val assumeBinaryIsString = sparkSession.sessionState.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sparkSession.sessionState.conf.isParquetINT96AsTimestamp
val inferTimestampNTZ = sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled
val nanosAsLong = sparkSession.sessionState.conf.legacyParquetNanosAsLong

val reader = (files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean) => {
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = assumeBinaryIsString,
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
inferTimestampNTZ = inferTimestampNTZ)
inferTimestampNTZ = inferTimestampNTZ,
nanosAsLong = nanosAsLong)

readParquetFootersInParallel(conf, files, ignoreCorruptFiles)
.map(ParquetFileFormat.readSchemaFromFooter(_, converter))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,28 @@ import org.apache.spark.sql.types._
* @param caseSensitive Whether use case sensitive analysis when comparing Spark catalyst read
* schema with Parquet schema.
* @param inferTimestampNTZ Whether TimestampNTZType type is enabled.
* @param nanosAsLong Whether timestamps with nanos are converted to long.
*/
class ParquetToSparkSchemaConverter(
assumeBinaryIsString: Boolean = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
assumeInt96IsTimestamp: Boolean = SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
caseSensitive: Boolean = SQLConf.CASE_SENSITIVE.defaultValue.get,
inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get) {
inferTimestampNTZ: Boolean = SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.defaultValue.get,
nanosAsLong: Boolean = SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.defaultValue.get) {

def this(conf: SQLConf) = this(
assumeBinaryIsString = conf.isParquetBinaryAsString,
assumeInt96IsTimestamp = conf.isParquetINT96AsTimestamp,
caseSensitive = conf.caseSensitiveAnalysis,
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled)
inferTimestampNTZ = conf.parquetInferTimestampNTZEnabled,
nanosAsLong = conf.legacyParquetNanosAsLong)

def this(conf: Configuration) = this(
assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
caseSensitive = conf.get(SQLConf.CASE_SENSITIVE.key).toBoolean,
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean)
inferTimestampNTZ = conf.get(SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
nanosAsLong = conf.get(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key).toBoolean)

/**
* Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter.
Expand Down Expand Up @@ -271,6 +275,11 @@ class ParquetToSparkSchemaConverter(
} else {
TimestampNTZType
}
// SPARK-40819: NANOS are not supported as a Timestamp, convert to LongType without
// timezone awareness to address behaviour regression introduced by SPARK-34661
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do a truncation and still read it as timestamp type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this is a good idea as the precision will be lost which is extremely important for high frequency time series.

I haven’t verified but end users/developers would still be able to .cast(Timestamp) which I believe would truncate the timestamp; allowing developers to make that decision makes more sense then forcing the loss of precision.

Copy link
Contributor

@EnricoMi EnricoMi Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the mere purpose of this exercise is to get access to the nano precision.

case timestamp: TimestampLogicalTypeAnnotation
if timestamp.getUnit == TimeUnit.NANOS && nanosAsLong =>
LongType
case _ => illegalType()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,10 @@ object ParquetUtils extends Logging {
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sqlConf.parquetFieldIdWriteEnabled.toString)

conf.set(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sqlConf.legacyParquetNanosAsLong.toString)

// Sets compression scheme
conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ case class ParquetScan(
hadoopConf.setBoolean(
SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
sparkSession.sessionState.conf.legacyParquetNanosAsLong)

val broadcastedConf = sparkSession.sparkContext.broadcast(
new SerializableConfiguration(hadoopConf))
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.parquet.schema.Type._
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
import org.apache.spark.sql.functions.desc
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType._
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -45,15 +46,17 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
binaryAsString: Boolean,
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {
testSchema(
testName,
StructType.fromAttributes(ScalaReflection.attributesFor[T]),
messageType,
binaryAsString,
int96AsTimestamp,
writeLegacyParquetFormat,
expectedParquetColumn = expectedParquetColumn)
expectedParquetColumn = expectedParquetColumn,
nanosAsLong = nanosAsLong)
}

protected def testParquetToCatalyst(
Expand All @@ -65,12 +68,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
caseSensitive: Boolean = false,
inferTimestampNTZ: Boolean = true,
sparkReadSchema: Option[StructType] = None,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {
val converter = new ParquetToSparkSchemaConverter(
assumeBinaryIsString = binaryAsString,
assumeInt96IsTimestamp = int96AsTimestamp,
caseSensitive = caseSensitive,
inferTimestampNTZ = inferTimestampNTZ)
inferTimestampNTZ = inferTimestampNTZ,
nanosAsLong = nanosAsLong)

test(s"sql <= parquet: $testName") {
val actualParquetColumn = converter.convertParquetColumn(
Expand Down Expand Up @@ -119,7 +124,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
expectedParquetColumn: Option[ParquetColumn] = None): Unit = {
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {

testCatalystToParquet(
testName,
Expand All @@ -134,7 +140,8 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
parquetSchema,
binaryAsString,
int96AsTimestamp,
expectedParquetColumn = expectedParquetColumn)
expectedParquetColumn = expectedParquetColumn,
nanosAsLong = nanosAsLong)
}

protected def compareParquetColumn(actual: ParquetColumn, expected: ParquetColumn): Unit = {
Expand All @@ -149,7 +156,14 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
val expectedDesc = expected.descriptor.get
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getMaxRepetitionLevel == expectedDesc.getMaxRepetitionLevel)
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)

actualDesc.getPrimitiveType.getLogicalTypeAnnotation match {
case timestamp: LogicalTypeAnnotation.TimestampLogicalTypeAnnotation
if timestamp.getUnit == LogicalTypeAnnotation.TimeUnit.NANOS =>
assert(actual.sparkType == expected.sparkType)
case _ =>
assert(actualDesc.getPrimitiveType === expectedDesc.getPrimitiveType)
}
}

assert(actual.repetitionLevel == expected.repetitionLevel, "repetition level mismatch: " +
Expand Down Expand Up @@ -197,6 +211,32 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
}

class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
testSchemaInference[Tuple1[Long]](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this case run passed before Spark 3.2, in my impression, Parquet 1.10.1 used by Spark 3.1 does not support nanos type, does it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then is this really a regression?

Copy link
Contributor Author

@awdavidson awdavidson Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I've been looking further into it, it's because the message is different between 1.10.1 and 1.12.3 - meaning the test would need to be different.

In 1.10.1 the message is

message schema {
  required int64 attribute;
}

where as 1.12.3 the message is the same as the unit test

message schema {
  required int64 attribute (TIMESTAMP(NANOS,true));
}

So in Spark 3.1.0 you end up hitting this block with returns a LongType https://github.com/apache/spark/blob/branch-3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L146

where as since 3.2 you hit https://github.com/apache/spark/blob/branch-3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala#L174 because a case for TimeUnit.NANOS is not covered

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan moving Parquet from 1.10.1 to 1.12.3 introduced this regression where Spark 3.1 returned LongType and Spark 3.2 fails on illegal type.

"timestamp nanos",
"""
|message root {
| required int64 _1 (TIMESTAMP(NANOS,true));
|}
""".stripMargin,
binaryAsString = false,
int96AsTimestamp = true,
writeLegacyParquetFormat = true,
expectedParquetColumn = Some(
ParquetColumn(
sparkType = StructType.fromAttributes(
ScalaReflection.attributesFor[Tuple1[Long]]),
descriptor = None,
repetitionLevel = 0,
definitionLevel = 0,
required = false,
path = Seq(),
children = Seq(
primitiveParquetColumn(LongType, PrimitiveTypeName.INT64, Repetition.REQUIRED,
0, 0, Seq("_1"), logicalTypeAnnotation = Some(LogicalTypeAnnotation.intType(64, false)))
))),
nanosAsLong = true
)

testSchemaInference[(Boolean, Int, Long, Float, Double, Array[Byte])](
"basic types",
"""
Expand Down Expand Up @@ -1027,6 +1067,24 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}
}

test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with nanosAsLong=true)") {
val tsAttribute = "birthday"
withSQLConf(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key -> "true") {
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
val data = spark.read.parquet(testDataPath).select(tsAttribute)
assert(data.schema.fields.head.dataType == LongType)
assert(data.orderBy(desc(tsAttribute)).take(1).head.getAs[Long](0) == 1668537129123534758L)
}
}

test("SPARK-40819: parquet file with TIMESTAMP(NANOS, true) (with default nanosAsLong=false)") {
val testDataPath = testFile("test-data/timestamp-nanos.parquet")
val e = intercept[org.apache.spark.SparkException] {
spark.read.parquet(testDataPath).collect()
}
assert(e.getMessage.contains("Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true))."))
}

// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
Expand Down