diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..b7b66c94ff 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -157,6 +157,17 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.parquet.schemaValidation.enabled") + .category(CATEGORY_PARQUET) + .doc( + "Whether to enable Spark-compatible schema validation when reading Parquet files " + + "with native_datafusion scan. When enabled, type coercions and column resolutions " + + "that Spark's vectorized reader would reject will also be rejected by Comet, " + + "throwing SparkException with compatible error messages.") + .booleanConf + .createWithDefault(true) + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] = conf("spark.comet.parquet.respectFilterPushdown") .category(CATEGORY_PARQUET) diff --git a/dev/diffs/3.5.8.diff b/dev/diffs/3.5.8.diff index 72c41e4f82..234a15c436 100644 --- a/dev/diffs/3.5.8.diff +++ b/dev/diffs/3.5.8.diff @@ -923,7 +923,7 @@ index c26757c9cff..d55775f09d7 100644 protected val baseResourcePath = { // use the same way as `SQLQueryTestSuite` to get the resource path diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala -index 3cf2bfd17ab..49728c35c42 100644 +index 3cf2bfd17ab..78e679a2870 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1521,7 +1521,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark @@ -936,7 +936,17 @@ index 3cf2bfd17ab..49728c35c42 100644 AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") { sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect() } -@@ -4459,7 +4460,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -3127,7 +3128,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark + } + } + +- test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { ++ test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly", ++ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => + withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> +@@ -4459,7 +4461,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" + @@ -946,7 +956,7 @@ index 3cf2bfd17ab..49728c35c42 100644 withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -@@ -4480,7 +4482,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -4480,7 +4483,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } test("SPARK-39175: Query context of Cast should be serialized to executors" + @@ -956,7 +966,7 @@ index 3cf2bfd17ab..49728c35c42 100644 withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", SQLConf.ANSI_ENABLED.key -> "true") { withTable("t") { -@@ -4497,14 +4500,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark +@@ -4497,14 +4501,19 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark val msg = intercept[SparkException] { sql(query).collect() }.getMessage @@ -2212,30 +2222,10 @@ index 8e88049f51e..49f2001dc6b 100644 case _ => throw new AnalysisException("Can not match ParquetTable in the query.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -index 8ed9ef1630e..f312174b182 100644 +index 8ed9ef1630e..eed2a6f5ad5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -@@ -1064,7 +1064,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: read binary as timestamp should throw schema incompatible error") { -+ test("SPARK-35640: read binary as timestamp should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - val data = (1 to 4).map(i => Tuple1(i.toString)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.TimestampType))) - -@@ -1075,7 +1076,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession - } - } - -- test("SPARK-35640: int as long should throw schema incompatible error") { -+ test("SPARK-35640: int as long should throw schema incompatible error", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - val data = (1 to 4).map(i => Tuple1(i)) - val readSchema = StructType(Seq(StructField("_1", DataTypes.LongType))) - -@@ -1345,7 +1347,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession +@@ -1345,7 +1345,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } @@ -2246,7 +2236,7 @@ index 8ed9ef1630e..f312174b182 100644 checkAnswer( // "fruit" column in this file is encoded using DELTA_LENGTH_BYTE_ARRAY. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala -index f6472ba3d9d..ce39ebb52e6 100644 +index f6472ba3d9d..6ea893cdbe6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -185,7 +185,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS @@ -2299,17 +2289,7 @@ index f6472ba3d9d..ce39ebb52e6 100644 checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) -@@ -1133,7 +1138,8 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS - } - } - -- test("row group skipping doesn't overflow when reading into larger type") { -+ test("row group skipping doesn't overflow when reading into larger type", -+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) { - withTempPath { path => - Seq(0).toDF("a").write.parquet(path.toString) - // The vectorized and non-vectorized readers will produce different exceptions, we don't need -@@ -1148,7 +1154,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS +@@ -1148,7 +1153,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .where(s"a < ${Long.MaxValue}") .collect() } diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 44ff20a44f..ca4c639785 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1053,6 +1053,8 @@ impl PhysicalPlanner { default_values, scan.session_timezone.as_str(), scan.case_sensitive, + scan.schema_validation_enabled, + scan.schema_evolution_enabled, self.session_ctx(), scan.encryption_enabled, )?; diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs index 60ebb2ff8b..b273ec12eb 100644 --- a/native/core/src/execution/spark_config.rs +++ b/native/core/src/execution/spark_config.rs @@ -21,6 +21,9 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize"; +#[allow(dead_code)] +pub(crate) const COMET_PARQUET_SCHEMA_VALIDATION_ENABLED: &str = + "spark.comet.parquet.schemaValidation.enabled"; pub(crate) trait SparkConfig { fn get_bool(&self, name: &str) -> bool; diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs index c8a480e97a..40cb9aee6f 100644 --- a/native/core/src/parquet/mod.rs +++ b/native/core/src/parquet/mod.rs @@ -773,6 +773,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat None, session_timezone.as_str(), case_sensitive != JNI_FALSE, + false, // schema_validation_enabled - validation is done on the Java side + false, // schema_evolution_enabled session_ctx, encryption_enabled, )?; diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index ec18d227f5..4249801294 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -68,12 +68,16 @@ pub(crate) fn init_datasource_exec( default_values: Option>, session_timezone: &str, case_sensitive: bool, + schema_validation_enabled: bool, + schema_evolution_enabled: bool, session_ctx: &Arc, encryption_enabled: bool, ) -> Result, ExecutionError> { let (table_parquet_options, spark_parquet_options) = get_options( session_timezone, case_sensitive, + schema_validation_enabled, + schema_evolution_enabled, &object_store_url, encryption_enabled, ); @@ -142,6 +146,8 @@ pub(crate) fn init_datasource_exec( fn get_options( session_timezone: &str, case_sensitive: bool, + schema_validation_enabled: bool, + schema_evolution_enabled: bool, object_store_url: &ObjectStoreUrl, encryption_enabled: bool, ) -> (TableParquetOptions, SparkParquetOptions) { @@ -153,6 +159,8 @@ fn get_options( SparkParquetOptions::new(EvalMode::Legacy, session_timezone, false); spark_parquet_options.allow_cast_unsigned_ints = true; spark_parquet_options.case_sensitive = case_sensitive; + spark_parquet_options.schema_validation_enabled = schema_validation_enabled; + spark_parquet_options.schema_evolution_enabled = schema_evolution_enabled; if encryption_enabled { table_parquet_options.crypto.configure_factory( diff --git a/native/core/src/parquet/parquet_support.rs b/native/core/src/parquet/parquet_support.rs index e7ff5630f1..809a86c042 100644 --- a/native/core/src/parquet/parquet_support.rs +++ b/native/core/src/parquet/parquet_support.rs @@ -76,6 +76,10 @@ pub struct SparkParquetOptions { pub use_legacy_date_timestamp_or_ntz: bool, // Whether schema field names are case sensitive pub case_sensitive: bool, + /// Whether to validate schema compatibility (type coercions) in a Spark-compatible way + pub schema_validation_enabled: bool, + /// Whether schema evolution (type widening) is enabled + pub schema_evolution_enabled: bool, } impl SparkParquetOptions { @@ -88,6 +92,8 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_validation_enabled: true, + schema_evolution_enabled: false, } } @@ -100,6 +106,8 @@ impl SparkParquetOptions { use_decimal_128: false, use_legacy_date_timestamp_or_ntz: false, case_sensitive: false, + schema_validation_enabled: true, + schema_evolution_enabled: false, } } } diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index 1e0d30c835..39560becaa 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -19,7 +19,7 @@ use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions}; use arrow::array::{RecordBatch, RecordBatchOptions}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion::common::ColumnStatistics; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; use datafusion::physical_plan::ColumnarValue; @@ -138,6 +138,78 @@ impl SchemaAdapter for SparkSchemaAdapter { } } + if self.parquet_options.schema_validation_enabled { + // Case-insensitive duplicate field detection + if !self.parquet_options.case_sensitive { + for required_field in self.required_schema.fields().iter() { + let required_name_lower = required_field.name().to_lowercase(); + let matching_names: Vec<&str> = file_schema + .fields + .iter() + .filter(|f| f.name().to_lowercase() == required_name_lower) + .map(|f| f.name().as_str()) + .collect(); + if matching_names.len() > 1 { + return Err(datafusion::error::DataFusionError::External( + format!( + "Found duplicate field(s) \"{}\": [{}] in case-insensitive mode", + required_field.name(), + matching_names.join(", ") + ) + .into(), + )); + } + } + } + + // Type coercion validation + for (table_idx, file_idx_opt) in field_mappings.iter().enumerate() { + if let Some(proj_idx) = file_idx_opt { + let file_field_idx = projection[*proj_idx]; + let file_type = file_schema.field(file_field_idx).data_type(); + let required_type = self.required_schema.field(table_idx).data_type(); + if file_type != required_type + && !is_spark_compatible_parquet_coercion( + file_type, + required_type, + self.parquet_options.schema_evolution_enabled, + ) + { + let col_name = self.required_schema.field(table_idx).name(); + let required_spark_name = arrow_type_to_spark_name(required_type); + let file_spark_name = arrow_type_to_parquet_physical_name(file_type); + + // Special error for reading TimestampLTZ as TimestampNTZ + // to match Spark's error message format + if matches!( + (file_type, required_type), + ( + DataType::Timestamp(_, Some(_)), + DataType::Timestamp(_, None) + ) + ) { + return Err(datafusion::error::DataFusionError::External( + format!( + "Unable to create Parquet converter for data type \"{}\"", + required_spark_name + ) + .into(), + )); + } + + return Err(datafusion::error::DataFusionError::External( + format!( + "Parquet column cannot be converted in file. \ + Column: [{}], Expected: {}, Found: {}", + col_name, required_spark_name, file_spark_name + ) + .into(), + )); + } + } + } + } + Ok(( Arc::new(SchemaMapping { required_schema: Arc::::clone(&self.required_schema), @@ -265,6 +337,127 @@ impl SchemaMapper for SchemaMapping { } } +/// Check if a type coercion from a Parquet file type to a required Spark type is allowed. +/// Returns true if the coercion is compatible, false if Spark's vectorized reader would reject it. +/// +/// This function rejects specific type conversions that Spark's vectorized reader would reject. +/// Conversions not explicitly rejected are allowed (Comet's parquet_convert_array or arrow-rs +/// cast handles them). +/// +/// When `schema_evolution_enabled` is true, integer and float widening conversions are allowed +/// (e.g., Int32 → Int64, Float32 → Float64). +fn is_spark_compatible_parquet_coercion( + file_type: &DataType, + required_type: &DataType, + schema_evolution_enabled: bool, +) -> bool { + use DataType::*; + match (file_type, required_type) { + // Same type is always OK + (a, b) if a == b => true, + + // Spark rejects reading TimestampLTZ as TimestampNTZ + (Timestamp(_, Some(_)), Timestamp(_, None)) => false, + + // Allow Timestamp(_, None) → Timestamp(_, Some(_)) because arrow-rs represents + // INT96 timestamps (always LTZ in Spark) as Timestamp(Microsecond, None) after + // coercion. The downstream parquet_convert_array handles this by reattaching the + // session timezone. + (Timestamp(_, None), Timestamp(_, Some(_))) => true, + + // Spark rejects integer type widening in the vectorized reader + // (INT32 → LongType, INT32 → DoubleType, etc.) + // When schema evolution is enabled, these widenings are allowed. + (Int8 | Int16 | Int32, Int64) => schema_evolution_enabled, + (Int8 | Int16 | Int32 | Int64, Float32 | Float64) => schema_evolution_enabled, + (Float32, Float64) => schema_evolution_enabled, + + // Spark rejects reading string/binary columns as timestamp or other numeric types + (Utf8 | LargeUtf8 | Binary | LargeBinary, Timestamp(_, _)) => false, + (Utf8 | LargeUtf8 | Binary | LargeBinary, Int8 | Int16 | Int32 | Int64) => false, + + // Reject cross-category conversions between non-matching structural types + // e.g., scalar types to list/struct/map types + (_, List(_) | LargeList(_) | Struct(_) | Map(_, _)) + if !matches!(file_type, List(_) | LargeList(_) | Struct(_) | Map(_, _)) => + { + false + } + + // For struct types, recursively check field conversions + (Struct(from_fields), Struct(to_fields)) => { + for to_field in to_fields.iter() { + if let Some(from_field) = from_fields + .iter() + .find(|f| f.name().to_lowercase() == to_field.name().to_lowercase()) + { + if from_field.data_type() != to_field.data_type() + && !is_spark_compatible_parquet_coercion( + from_field.data_type(), + to_field.data_type(), + schema_evolution_enabled, + ) + { + return false; + } + } + } + true + } + + // For list types, check element type conversions + (List(from_inner), List(to_inner)) => { + from_inner.data_type() == to_inner.data_type() + || is_spark_compatible_parquet_coercion( + from_inner.data_type(), + to_inner.data_type(), + schema_evolution_enabled, + ) + } + + // Everything else is allowed (handled by parquet_convert_array or arrow-rs cast) + _ => true, + } +} + +/// Convert an Arrow DataType to a Spark-style display name for error messages +fn arrow_type_to_spark_name(dt: &DataType) -> String { + use DataType::*; + match dt { + Boolean => "boolean".to_string(), + Int8 => "tinyint".to_string(), + Int16 => "smallint".to_string(), + Int32 => "int".to_string(), + Int64 => "bigint".to_string(), + Float32 => "float".to_string(), + Float64 => "double".to_string(), + Utf8 | LargeUtf8 => "string".to_string(), + Binary | LargeBinary => "binary".to_string(), + Date32 => "date".to_string(), + Timestamp(_, Some(_)) => "timestamp".to_string(), + Timestamp(_, None) => "timestamp_ntz".to_string(), + Decimal128(p, s) => format!("decimal({},{})", p, s), + _ => format!("{}", dt), + } +} + +/// Convert an Arrow DataType to a Parquet physical type name for error messages +fn arrow_type_to_parquet_physical_name(dt: &DataType) -> String { + use DataType::*; + match dt { + Boolean => "BOOLEAN".to_string(), + Int8 | Int16 | Int32 | UInt8 | UInt16 => "INT32".to_string(), + Int64 | UInt32 | UInt64 => "INT64".to_string(), + Float32 => "FLOAT".to_string(), + Float64 => "DOUBLE".to_string(), + Utf8 | LargeUtf8 | Binary | LargeBinary => "BINARY".to_string(), + Date32 => "INT32".to_string(), + Timestamp(_, _) => "INT64".to_string(), + Decimal128(_, _) => "FIXED_LEN_BYTE_ARRAY".to_string(), + _ => format!("{}", dt), + } +} + #[cfg(test)] mod test { use crate::parquet::parquet_support::SparkParquetOptions; diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 73c087cf36..a11c9b7f16 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -109,6 +109,8 @@ message NativeScan { // the map. map object_store_options = 13; bool encryption_enabled = 14; + bool schema_validation_enabled = 15; + bool schema_evolution_enabled = 16; } message CsvScan { diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 3156eb3873..758fd00c8c 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -157,12 +157,16 @@ class CometExecIterator( // threw the exception, so we log the exception with taskAttemptId here logError(s"Native execution for task $taskAttemptId failed", e) + // Strip "External error: " prefix that DataFusion adds + val msg = e.getMessage + .replaceFirst("^External error: ", "") + .replaceFirst("^External: ", "") val fileNotFoundPattern: Regex = - ("""^External: Object at location (.+?) not found: No such file or directory """ + + ("""^Object at location (.+?) not found: No such file or directory """ + """\(os error \d+\)$""").r val parquetError: Regex = """^Parquet error: (?:.*)$""".r - e.getMessage match { + msg match { case fileNotFoundPattern(filePath) => // See org.apache.spark.sql.errors.QueryExecutionErrors.readCurrentFileNotFoundError throw new SparkException( @@ -177,6 +181,11 @@ class CometExecIterator( errorClass = "_LEGACY_ERROR_TEMP_2254", messageParameters = Map("message" -> e.getMessage), cause = new SparkException("File is not a Parquet file.", e)) + case m + if m.contains("Parquet column cannot be converted") || + m.contains("Found duplicate field(s)") || + m.contains("Unable to create Parquet converter") => + throw new SparkException(msg, e) case _ => throw e } diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala index b7909b67cb..035e757240 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala @@ -176,6 +176,10 @@ object CometNativeScan extends CometOperatorSerde[CometScanExec] with Logging { nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava) nativeScanBuilder.setSessionTimezone(scan.conf.getConfString("spark.sql.session.timeZone")) nativeScanBuilder.setCaseSensitive(scan.conf.getConf[Boolean](SQLConf.CASE_SENSITIVE)) + nativeScanBuilder.setSchemaValidationEnabled( + CometConf.COMET_PARQUET_SCHEMA_VALIDATION_ENABLED.get(scan.conf)) + nativeScanBuilder.setSchemaEvolutionEnabled( + CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.get(scan.conf)) // Collect S3/cloud storage configurations val hadoopConf = scan.relation.sparkSession.sessionState diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 928e66b29b..75444215fc 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1251,9 +1251,7 @@ abstract class ParquetReadSuite extends CometTestBase { Seq(StructField("_1", LongType, false), StructField("_2", DoubleType, false))) withParquetDataFrame(data, schema = Some(readSchema)) { df => - // TODO: validate with Spark 3.x and 'usingDataFusionParquetExec=true' - if (enableSchemaEvolution || CometConf.COMET_NATIVE_SCAN_IMPL - .get(conf) == CometConf.SCAN_NATIVE_DATAFUSION) { + if (enableSchemaEvolution) { checkAnswer(df, data.map(Row.fromTuple)) } else { assertThrows[SparkException](df.collect())