-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
Recently when trying to upgrade to Spark 3.4.0 [https://github.com/apache/hudi/actions/runs/4985991589/jobs/8926379795?pr=8682] and running test
{code:java}
org.apache.hudi.TestAvroSchemaResolutionSupport#testArrayOfMapsChangeValueType{code}
saw the following exception
{code:java}
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.UnsafeRow cannot be cast to org.apache.spark.sql.vectorized.ColumnarBatch
2023-05-16T01:46:35.0110639Z at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:600)
2023-05-16T01:46:35.0110882Z at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:589)
2023-05-16T01:46:35.0111237Z at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
2023-05-16T01:46:35.0111621Z at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
2023-05-16T01:46:35.0111933Z at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
2023-05-16T01:46:35.0112206Z at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
2023-05-16T01:46:35.0112432Z at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
2023-05-16T01:46:35.0112618Z at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
2023-05-16T01:46:35.0112814Z at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
2023-05-16T01:46:35.0113021Z at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
2023-05-16T01:46:35.0113220Z at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
2023-05-16T01:46:35.0113374Z at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
2023-05-16T01:46:35.0113580Z at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92){code}
Currenlty in our {{Spark32PlusHoodieParquetFileFormat we do this}}
{code:java}
val enableVectorizedReader: Boolean =
sqlConf.parquetVectorizedReaderEnabled &&
resultSchema.forall(_.dataType.isInstanceOf[AtomicType]){code}
the issue i think was that our enableVectorizedReader check was only checking for atomic type but the {{supportBatch}} was taking directly from spark parquet file format suportBatch, which is not the same check for only atomic type as spark supports now multiple types for vectorized reader [https://github.com/apache/spark/pull/33695/files#diff-1ce36caa3af5c079a8a0190c624ac4c9e95dcb91d42fc433b820023f73ed68ed] since spark 3.3.0
We also cant directly do the same logic as spark ParquetFileFormat of calling
ParquetUtils.isBatchReadSupportedForSchema(conf, schema)
since i think in hudi inside we only support a limited types for spark
{code:java}
SparkInternalSchemaConverter#convertColumnVectorType{code}
i think we would need to port this spark pr logic inside hudi for a real long term support for vectorized reader.
JIRA info
- Link: https://issues.apache.org/jira/browse/HUDI-6262
- Type: Task