Skip to content

Conversation

@aws-awinstan
Copy link

What changes were proposed in this pull request?

This PR adds support for reading Parquet FIXED_LENGTH_BYTE_ARRAYs as a Binary column if no OriginalType is specified. Parquet-avro writes the Avro fixed type as a Parquet FIXED_LENGTH_BYTE_ARRAY type. Currently when trying to load Parquet files with a column of this type with Spark SQL it throws an exception similar to the following:

Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: FIXED_LEN_BYTE_ARRAY;
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:108)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:177)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:90)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:72)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter$$anonfun$2.apply(ParquetSchemaConverter.scala:66)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.org$apache$spark$sql$execution$datasources$parquet$ParquetToSparkSchemaConverter$$convert(ParquetSchemaConverter.scala:66)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:63)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readSchemaFromFooter$2.apply(ParquetFileFormat.scala:642)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readSchemaFromFooter(ParquetFileFormat.scala:642)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:599)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$9.apply(ParquetFileFormat.scala:581)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

After this change Spark SQL is able to correctly load the Parquet files. There was a PR to fix this 3 years ago (#1737) however it was ultimately rejected as the committer went down the path of adding a new SQL Type specifically for FIXED_LENGTH_BYTE_ARRAYs and the maintainers believed this was too intrusive of a change. This PR simply defaults to Binary if no OriginalType is specified. A few updates were required to the VectorizedColumnReader to support Binary FIXED_LENGTH_BYTE_ARRAYs.

Note: All the changes to the gen-java/* files were generated by avro-tools-1.8.1 and the mostly documentation updates look to come from changes in the template avro-tools uses.

How was this patch tested?

I added a fixed attribute to the AvroPrimitives and AvroOptionalPrimitives record types which are used by the ParquetAvroCompatibilitySuite. These values were populated by taking the same value as other type ("val_$i"), padding it to 8 bytes (the chosen fixed length), and storing it as the fixed type. I verified that before my fix the "required primitives" and "optional primitives" failed with the same exception we're seeing in our clusters. After my change the tests succeed with the expected results.

* Default constructor. Note that this does not initialize fields
* to their default values from the schema. If that is desired then
* one should use <code>newBuilder()</code>.
* one should use <code>newBuilder()</code>.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aws-awinstan Can we remove unrelated changes? It looks hard to follow the changes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon Done! I had debated leaving these changes in when submitting the PR but decided to leave the newly generated files since they had some minor improvements (e.g. to the JavaDocs). Ideally these Avro files would be generated during the build process rather than checked in but that's a separate issue.

@aws-awinstan
Copy link
Author

Just pinging on this for a review. Let me know if there are any questions or concerns.

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon
Copy link
Member

cc @liancheng and @marmbrus too.

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88501 has finished for PR 20826 at commit 27ac6af.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Mar 22, 2018

Test build #88513 has finished for PR 20826 at commit 27ac6af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@aws-awinstan
Copy link
Author

@HyukjinKwon @liancheng @marmbrus - Any comments on this PR? Can we get this merged?

@praetp
Copy link

praetp commented Apr 24, 2018

Hope we can see this in the next release of Spark...

@aws-awinstan
Copy link
Author

Pinging on this once again. Is there anything more you'd like to see before this is merged?

@aws-awinstan
Copy link
Author

Pinging on this once again. It's been almost a year since this PR was opened.

@praetp
Copy link

praetp commented Feb 21, 2019

Can this be merged in ?

@HyukjinKwon
Copy link
Member

cc @rdblue for review since it;s a Parquet one.

@ozars
Copy link

ozars commented Aug 12, 2019

Pinging this. Could you please have a look at this PR?

@rdblue
Copy link
Contributor

rdblue commented Aug 12, 2019

+1

This looks good to me.

@viirya
Copy link
Member

viirya commented Aug 12, 2019

I am not sure about the comment #1737 (comment) said before. It seems a concern about using BinaryType for fixed_len_byte_array. What do you think about the argument?

}

logParquetSchema(path)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we revert this newline?

column.putNull(rowId + i);
}
}
} else if (column.dataType() == DataTypes.BinaryType) {
Copy link
Member

@viirya viirya Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change doesn't have associated test. Should we add one?


logParquetSchema(path)

checkAnswer(spark.read.parquet(path), (0 until 10).map { i =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this only tests against spark.sql.parquet.enableVectorizedReader is true (default), should we test non-vectorized reader too?

@ozars
Copy link

ozars commented Aug 12, 2019

@rdblue @viirya Thanks for checking it out.

I am not sure about the comment #1737 (comment) said before. It seems a concern about using BinaryType for fixed_len_byte_array. What do you think about the argument?

I bisected for convertFromAttributes and figured out the code referred by that message (or at least some portion of it) was refactored by 02149ff in 2015. I'm not very familiar with the internal machinery of spark. Could someone familiar with the codebase confirm if this is still a concern?

cc @liancheng

@rdblue
Copy link
Contributor

rdblue commented Aug 12, 2019

I think it is fine to use BinaryType to read fixed_len_byte_array. That's the best mapping, even though the type is technically wider because it includes shorter and longer binary sequences. This only allows reading that data, not writing values with a fixed length. All writes would use BinaryType regardless.

@viirya
Copy link
Member

viirya commented Aug 13, 2019

Thanks for clarifying. Although that comment concerned about reading problem, looks like convertFromAttributes was for writing. This PR works on reading fixed_len_byte_array as Spark BinaryType. I think it should be fine. cc @cloud-fan too

@cloud-fan
Copy link
Contributor

The idea is fine. Spark reads Hive varchar as string type, although it's not an accurate mapping.

@HyukjinKwon
Copy link
Member

ok to test

@HyukjinKwon HyukjinKwon changed the title [Spark-2489][SQL] Unsupported parquet datatype optional fixed_len_byte_array [SPARK-2489][SQL] Support Parquet's optional fixed_len_byte_array Aug 14, 2019

private def generateFixedLengthByteArray(i : Int): Array[Byte] = {
val fixedLengthByteArray = Array[Byte](0, 0, 0, 0, 0, 0, 0, 0)
val fixedLengthByteArrayComponent = s"val_$i".getBytes(StandardCharsets.UTF_8)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit s seems not needed

@HyukjinKwon
Copy link
Member

Yea, let's have a parquet dedicated test (I guess) as already pointed out.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am okay with supporting this - don't particularly mind.

But I would like to note:

case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()

We don't support unsigned types although other types like long can contain (see #9646 - wow it's 4 years ago). cc @liancheng

@SparkQA
Copy link

SparkQA commented Aug 14, 2019

Test build #109071 has finished for PR 20826 at commit 27ac6af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 17, 2019

Test build #110691 has finished for PR 20826 at commit 27ac6af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

column.putNull(rowId + i);
}
}
} else if (column.dataType() == DataTypes.BinaryType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can merge it into the previous if:

else if (DecimalType.isByteArrayDecimalType(column.dataType()) || column.dataType() == DataTypes.BinaryType)

@cloud-fan
Copy link
Contributor

The added tests only test parquet-avro compatibility. We should have a test for parquet.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jan 13, 2020
@cloud-fan cloud-fan removed the Stale label Jan 13, 2020
@cloud-fan
Copy link
Contributor

Is anyone interested in this PR and want to take over?

@jonbelanger-ns
Copy link

jonbelanger-ns commented Jan 17, 2020

If it helps, I have a fairly complex parquet file with a few nested fields as FIXED_LEN_BYTE_ARRAY, so this bug is a show stopper for spark on this dataset.

I tried to fix by cloning this repo with the PR (https://github.com/aws-awinstan/spark.git) to local machine and compiling.

I did the same for the master repo for spark which worked fine on a with a few of the columns (to test without parsing the FIXED_LEN_BYTE_ARRAY columns).

However, the aws-awinstan repo fails with on the same test columns:

[Stage 0:> (0 + 1) / 1]20/01/17 12:37:13 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.42.107, executor 0): java.io.StreamCorruptedException: invalid stream header: 0000000F
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:866)
at java.io.ObjectInputStream.(ObjectInputStream.java:358)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:126)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:113)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

I'm using in the following in my client environment, with the HDFS and Spark remote in VM and standalone with a single worker.

$ pip freeze | grep spark
pyspark==2.4.4
spark==0.2.1

I'm surprised this bug was allowed to languish for as long as it has, it's not possible for us to serialize the upstream data and need this feature or have to move on...

Edit: further troubleshooting showed that it was the toPandas() call that is failing

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@nicolaslrveiga
Copy link

Can we reopen this issue or proceed with the work in #35902?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.