diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 4291e3fb65..e4e047453f 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -165,8 +165,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com COMET_NATIVE_SCAN_IMPL.get() match { case SCAN_AUTO => - // TODO add support for native_datafusion in the future - nativeIcebergCompatScan(session, scanExec, r, hadoopConf) + nativeDataFusionScan(session, scanExec, r, hadoopConf) + .orElse(nativeIcebergCompatScan(session, scanExec, r, hadoopConf)) .getOrElse(scanExec) case SCAN_NATIVE_DATAFUSION => nativeDataFusionScan(session, scanExec, r, hadoopConf).getOrElse(scanExec) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index b7909b67cb..7c8d4cbf95 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -39,6 +39,7 @@ import org.apache.comet.serde.{CometOperatorSerde, Compatible, OperatorOuterClas import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.Operator import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} +import org.apache.comet.shims.ShimFileFormat /** * Validation and serde logic for `native_datafusion` scans. @@ -77,6 +78,22 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { withInfo(scanExec, "Full native scan disabled because ignoreMissingFiles enabled") } + if (scanExec.fileConstantMetadataColumns.nonEmpty) { + withInfo(scanExec, "Native DataFusion scan does not support metadata columns") + } + + if (CometParquetUtils.readFieldId(SQLConf.get)) { + withInfo(scanExec, "Native DataFusion scan does not support Parquet field ID based reads") + } + + if (scanExec.bucketedScan) { + withInfo(scanExec, "Native DataFusion scan does not support bucketed scans") + } + + if (ShimFileFormat.findRowIndexColumnIndexInSchema(scanExec.requiredSchema) >= 0) { + withInfo(scanExec, "Native DataFusion scan does not support row index generation") + } + // the scan is supported if no fallback reasons were added to the node !hasExplainInfo(scanExec) } diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8a68df3820..26bb810b76 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -64,7 +64,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { private val timestampPattern = "0123456789/:T" + whitespaceChars lazy val usingParquetExecWithIncompatTypes: Boolean = - usingDataSourceExecWithIncompatTypes(conf) + hasUnsignedSmallIntSafetyCheck(conf) test("all valid cast combinations covered") { val names = testNames @@ -1087,7 +1087,7 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { |USING parquet """.stripMargin) sql("INSERT INTO TABLE tab1 SELECT named_struct('col1','1','col2','2')") - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator( "SELECT CAST(s AS struct) AS new_struct FROM tab1") } else { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fe5ea77a89..1bab18a1a1 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1509,7 +1509,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("round") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "test.parquet") @@ -1573,7 +1573,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("hex") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) Seq(true, false).foreach { dictionaryEnabled => withTempDir { dir => val path = new Path(dir.toURI.toString, "hex.parquet") @@ -2607,7 +2607,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("get_struct_field with DataFusion ParquetExec - read entire struct") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled withSQLConf(CometConf.COMET_ENABLED.key -> "false") { @@ -2644,7 +2644,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } test("read array[int] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala index 19812f38ce..191ebd908a 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzAggregateSuite.scala @@ -29,7 +29,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } @@ -45,7 +45,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT count(distinct $col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -57,7 +57,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filterNot(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } @@ -73,7 +73,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.schema.fields.filter(f => isComplexType(f.dataType)).map(_.name)) { val sql = s"SELECT c1, c2, c3, count(distinct $col) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -87,7 +87,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT c1, c2, c3, count(distinct $col, c4, c5) FROM t1 group by c1, c2, c3" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -99,7 +99,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns) { val sql = s"SELECT $col, count(*) FROM t1 GROUP BY $col ORDER BY $col" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -112,7 +112,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { for (col <- df.columns.drop(1)) { val sql = s"SELECT $groupCol, count($col) FROM t1 GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -126,7 +126,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { val sql = s"SELECT $groupCol, count(${otherCol.mkString(", ")}) FROM t1 " + s"GROUP BY $groupCol ORDER BY $groupCol" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -138,7 +138,7 @@ class CometFuzzAggregateSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT min($col), max($col) FROM t1" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } diff --git a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala index 833314a5c6..02d13c841d 100644 --- a/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometFuzzTestSuite.scala @@ -37,7 +37,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -59,7 +59,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) df.createOrReplaceTempView("t1") val sql = "SELECT * FROM t1 LIMIT 500" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -112,7 +112,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { s"alter table t2 add column col2 $defaultValueType default $defaultValueString") // Verify that our default value matches Spark's answer val sql = "select col2 from t2" - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { checkSparkAnswerAndOperator(sql) } else { checkSparkAnswer(sql) @@ -139,7 +139,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $col FROM t1 ORDER BY $col" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -152,7 +152,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val sql = s"SELECT $allCols FROM t1 ORDER BY $allCols" // cannot run fully natively due to range partitioning and sort val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(1 == collectNativeScans(cometPlan).length) } } @@ -207,7 +207,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { val df = spark.read.parquet(filename) val df2 = df.repartition(8, df.col("c0")).sort("c1") df2.collect() - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { val cometShuffles = collectCometShuffleExchanges(df2.queryExecution.executedPlan) val expectedNumCometShuffles = CometConf.COMET_NATIVE_SCAN_IMPL.get() match { case CometConf.SCAN_NATIVE_COMET => @@ -233,7 +233,7 @@ class CometFuzzTestSuite extends CometFuzzTestBase { // cannot run fully native due to HashAggregate val sql = s"SELECT count(*) FROM t1 JOIN t2 ON t1.$col = t2.$col" val (_, cometPlan) = checkSparkAnswer(sql) - if (usingDataSourceExec) { + if (!usingLegacyNativeCometScan) { assert(2 == collectNativeScans(cometPlan).length) } } diff --git a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala index 9276a20348..ee77bb80f5 100644 --- a/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometMapExpressionSuite.scala @@ -31,7 +31,7 @@ import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOpti class CometMapExpressionSuite extends CometTestBase { test("read map[int, int] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled @@ -63,7 +63,7 @@ class CometMapExpressionSuite extends CometTestBase { // repro for https://github.com/apache/datafusion-comet/issues/1754 test("read map[struct, struct] from parquet") { - assume(usingDataSourceExec(conf)) + assume(!usingLegacyNativeCometScan(conf)) withTempPath { dir => // create input file with Comet disabled diff --git a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala index 70479f0e34..ed204ef776 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometColumnarShuffleSuite.scala @@ -618,7 +618,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar // TODO: revisit this when we have resolution of https://github.com/apache/arrow-rs/issues/7040 // and https://github.com/apache/arrow-rs/issues/7097 val fieldsToTest = - if (usingDataSourceExec(conf)) { + if (!usingLegacyNativeCometScan(conf)) { Seq( $"_1", $"_4", diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index d47b4e0c1a..6111b9c0d4 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -199,7 +199,7 @@ class CometJoinSuite extends CometTestBase { test("HashJoin struct key") { // https://github.com/apache/datafusion-comet/issues/1441 - assume(!usingDataSourceExec) + assume(usingLegacyNativeCometScan) withSQLConf( "spark.sql.join.forceApplyShuffledHashJoin" -> "true", SQLConf.PREFER_SORTMERGEJOIN.key -> "false", diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index a05bb7c390..69d67d4066 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -92,7 +92,7 @@ abstract class ParquetReadSuite extends CometTestBase { // for native iceberg compat, CometScanExec supports some types that native_comet does not. // note that native_datafusion does not use CometScanExec so we need not include that in // the check - val isDataFusionScan = usingDataSourceExec(conf) + val isDataFusionScan = !usingLegacyNativeCometScan(conf) Seq( NullType -> false, BooleanType -> true, @@ -143,7 +143,7 @@ abstract class ParquetReadSuite extends CometTestBase { // Arrays support for iceberg compat native and for Parquet V1 val cometScanExecSupported = - if (usingDataSourceExec(conf) && this.isInstanceOf[ParquetReadV1Suite]) + if (!usingLegacyNativeCometScan(conf) && this.isInstanceOf[ParquetReadV1Suite]) Seq(true, true, true) else Seq(true, false, false) @@ -185,7 +185,7 @@ abstract class ParquetReadSuite extends CometTestBase { i.toDouble, DateTimeUtils.toJavaDate(i)) } - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data) } checkParquetFile(data) @@ -207,7 +207,7 @@ abstract class ParquetReadSuite extends CometTestBase { i.toDouble, DateTimeUtils.toJavaDate(i)) } - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data) } checkParquetFile(data) @@ -228,7 +228,7 @@ abstract class ParquetReadSuite extends CometTestBase { DateTimeUtils.toJavaDate(i)) } val filter = (row: Row) => row.getBoolean(0) - if (!usingDataSourceExecWithIncompatTypes(conf)) { + if (!hasUnsignedSmallIntSafetyCheck(conf)) { checkParquetScan(data, filter) } checkParquetFile(data, filter) @@ -1249,8 +1249,7 @@ abstract class ParquetReadSuite extends CometTestBase { withParquetDataFrame(data, schema = Some(readSchema)) { df => // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + if (enableSchemaEvolution || !usingLegacyNativeCometScan(conf)) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect()) @@ -1515,7 +1514,7 @@ abstract class ParquetReadSuite extends CometTestBase { test("row group skipping doesn't overflow when reading into larger type") { // Spark 4.0 no longer fails for widening types SPARK-40876 // https://github.com/apache/spark/commit/3361f25dc0ff6e5233903c26ee105711b79ba967 - assume(!isSpark40Plus && !usingDataSourceExec(conf)) + assume(!isSpark40Plus && usingLegacyNativeCometScan(conf)) withTempPath { path => Seq(0).toDF("a").write.parquet(path.toString) // Reading integer 'a' as a long isn't supported. Check that an exception is raised instead diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 8a2f8af5c2..e612d72cc4 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -591,7 +591,7 @@ abstract class CometTestBase } def getPrimitiveTypesParquetSchema: String = { - if (usingDataSourceExecWithIncompatTypes(conf)) { + if (hasUnsignedSmallIntSafetyCheck(conf)) { // Comet complex type reader has different behavior for uint_8, uint_16 types. // The issue stems from undefined behavior in the parquet spec and is tracked // here: https://github.com/apache/parquet-java/issues/3142 @@ -1268,14 +1268,13 @@ abstract class CometTestBase writer.close() } - def usingDataSourceExec: Boolean = usingDataSourceExec(SQLConf.get) + def usingLegacyNativeCometScan: Boolean = usingLegacyNativeCometScan(SQLConf.get) - def usingDataSourceExec(conf: SQLConf): Boolean = - Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION).contains( - CometConf.COMET_NATIVE_SCAN_IMPL.get(conf)) + def usingLegacyNativeCometScan(conf: SQLConf): Boolean = + CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_COMET - def usingDataSourceExecWithIncompatTypes(conf: SQLConf): Boolean = { - usingDataSourceExec(conf) && + def hasUnsignedSmallIntSafetyCheck(conf: SQLConf): Boolean = { + !usingLegacyNativeCometScan(conf) && CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get(conf) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala index bdb4a9d4b1..131423ddeb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/ParquetDatetimeRebaseSuite.scala @@ -52,7 +52,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") && - !usingDataSourceExec(conf)) { + usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df) @@ -63,7 +63,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } test("reading ancient timestamps before 1582") { - assume(!usingDataSourceExec(conf)) + assume(usingLegacyNativeCometScan(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, @@ -78,7 +78,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") - && !usingDataSourceExec(conf)) { + && usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df) @@ -90,7 +90,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { } test("reading ancient int96 timestamps before 1582") { - assume(!usingDataSourceExec(conf)) + assume(usingLegacyNativeCometScan(conf)) Seq(true, false).foreach { exceptionOnRebase => withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_COMET, @@ -105,7 +105,7 @@ abstract class ParquetDatetimeRebaseSuite extends CometTestBase { // Parquet file written by 2.4.5 should throw exception for both Spark and Comet // For Spark 4.0+, Parquet file written by 2.4.5 should not throw exception if ((exceptionOnRebase || sparkVersion == "2_4_5") && (!isSpark40Plus || sparkVersion != "2_4_5") - && !usingDataSourceExec(conf)) { + && usingLegacyNativeCometScan(conf)) { intercept[SparkException](df.collect()) } else { checkSparkNoRebaseAnswer(df)