diff --git a/native/core/src/parquet/parquet_exec.rs b/native/core/src/parquet/parquet_exec.rs index 79c7e06c63..f4cc7bf9fe 100644 --- a/native/core/src/parquet/parquet_exec.rs +++ b/native/core/src/parquet/parquet_exec.rs @@ -80,18 +80,40 @@ pub(crate) fn init_datasource_exec( encryption_enabled, ); - // dbg!(&required_schema, &data_schema); - - // Determine the schema to use for ParquetSource - // // Use data_schema only if both data_schema and data_filters are set - let base_schema = match (&data_schema, &projection_vector) { - (Some(schema), Some(_)) => Arc::clone(schema), - _ => Arc::clone(&required_schema), + // Determine the schema and projection to use for ParquetSource. + // When data_schema is provided, use it as the base schema so DataFusion knows the full + // file schema. Compute a projection vector to select only the required columns. + let (base_schema, projection) = match (&data_schema, &projection_vector) { + (Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())), + (Some(schema), None) => { + // Compute projection: map required_schema field names to data_schema indices. + // This is needed for schema pruning when the data_schema has more columns than + // the required_schema. + let projection: Vec = required_schema + .fields() + .iter() + .filter_map(|req_field| { + schema.fields().iter().position(|data_field| { + if case_sensitive { + data_field.name() == req_field.name() + } else { + data_field.name().to_lowercase() == req_field.name().to_lowercase() + } + }) + }) + .collect(); + // Only use data_schema + projection when all required fields were found by name. + // When some fields can't be matched (e.g., Parquet field ID mapping where names + // differ between required and data schemas), fall back to using required_schema + // directly with no projection. + if projection.len() == required_schema.fields().len() { + (Arc::clone(schema), Some(projection)) + } else { + (Arc::clone(&required_schema), None) + } + } + _ => (Arc::clone(&required_schema), None), }; - //let base_schema = required_schema; - // dbg!(&base_schema); - // dbg!(&data_schema); - // dbg!(&data_filters); let partition_fields: Vec<_> = partition_schema .iter() .flat_map(|s| s.fields().iter()) @@ -100,13 +122,9 @@ pub(crate) fn init_datasource_exec( let table_schema = TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields); - // dbg!(&table_schema); - let mut parquet_source = ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options); - // dbg!(&parquet_source); - // Create a conjunctive form of the vector because ParquetExecBuilder takes // a single expression if let Some(data_filters) = data_filters { @@ -146,9 +164,9 @@ pub(crate) fn init_datasource_exec( .with_file_groups(file_groups) .with_expr_adapter(Some(expr_adapter_factory)); - if let Some(projection_vector) = projection_vector { + if let Some(projection) = projection { file_scan_config_builder = - file_scan_config_builder.with_projection_indices(Some(projection_vector))?; + file_scan_config_builder.with_projection_indices(Some(projection))?; } let file_scan_config = file_scan_config_builder.build(); diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index f19ec39fca..5e8b395b5b 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter { expr.transform(|e| { if let Some(column) = e.as_any().downcast_ref::() { let col_idx = column.index(); + let col_name = column.name(); let logical_field = self.logical_file_schema.fields().get(col_idx); - let physical_field = self.physical_file_schema.fields().get(col_idx); + // Look up physical field by name instead of index for correctness + // when logical and physical schemas have different column orderings + let physical_field = if self.parquet_options.case_sensitive { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name() == col_name) + } else { + self.physical_file_schema + .fields() + .iter() + .find(|f| f.name().to_lowercase() == col_name.to_lowercase()) + }; if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field) {