Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,40 @@ pub(crate) fn init_datasource_exec(
encryption_enabled,
);

// dbg!(&required_schema, &data_schema);

// Determine the schema to use for ParquetSource
// // Use data_schema only if both data_schema and data_filters are set
let base_schema = match (&data_schema, &projection_vector) {
(Some(schema), Some(_)) => Arc::clone(schema),
_ => Arc::clone(&required_schema),
// Determine the schema and projection to use for ParquetSource.
// When data_schema is provided, use it as the base schema so DataFusion knows the full
// file schema. Compute a projection vector to select only the required columns.
let (base_schema, projection) = match (&data_schema, &projection_vector) {
(Some(schema), Some(proj)) => (Arc::clone(schema), Some(proj.clone())),
(Some(schema), None) => {
// Compute projection: map required_schema field names to data_schema indices.
// This is needed for schema pruning when the data_schema has more columns than
// the required_schema.
let projection: Vec<usize> = required_schema
.fields()
.iter()
.filter_map(|req_field| {
schema.fields().iter().position(|data_field| {
if case_sensitive {
data_field.name() == req_field.name()
} else {
data_field.name().to_lowercase() == req_field.name().to_lowercase()
}
})
})
.collect();
// Only use data_schema + projection when all required fields were found by name.
// When some fields can't be matched (e.g., Parquet field ID mapping where names
// differ between required and data schemas), fall back to using required_schema
// directly with no projection.
if projection.len() == required_schema.fields().len() {
(Arc::clone(schema), Some(projection))
} else {
(Arc::clone(&required_schema), None)
}
}
_ => (Arc::clone(&required_schema), None),
};
//let base_schema = required_schema;
// dbg!(&base_schema);
// dbg!(&data_schema);
// dbg!(&data_filters);
let partition_fields: Vec<_> = partition_schema
.iter()
.flat_map(|s| s.fields().iter())
Expand All @@ -100,13 +122,9 @@ pub(crate) fn init_datasource_exec(
let table_schema =
TableSchema::from_file_schema(base_schema).with_table_partition_cols(partition_fields);

// dbg!(&table_schema);

let mut parquet_source =
ParquetSource::new(table_schema).with_table_parquet_options(table_parquet_options);

// dbg!(&parquet_source);

// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
if let Some(data_filters) = data_filters {
Expand Down Expand Up @@ -146,9 +164,9 @@ pub(crate) fn init_datasource_exec(
.with_file_groups(file_groups)
.with_expr_adapter(Some(expr_adapter_factory));

if let Some(projection_vector) = projection_vector {
if let Some(projection) = projection {
file_scan_config_builder =
file_scan_config_builder.with_projection_indices(Some(projection_vector))?;
file_scan_config_builder.with_projection_indices(Some(projection))?;
}

let file_scan_config = file_scan_config_builder.build();
Expand Down
15 changes: 14 additions & 1 deletion native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,22 @@ impl SparkPhysicalExprAdapter {
expr.transform(|e| {
if let Some(column) = e.as_any().downcast_ref::<Column>() {
let col_idx = column.index();
let col_name = column.name();

let logical_field = self.logical_file_schema.fields().get(col_idx);
let physical_field = self.physical_file_schema.fields().get(col_idx);
// Look up physical field by name instead of index for correctness
// when logical and physical schemas have different column orderings
let physical_field = if self.parquet_options.case_sensitive {
self.physical_file_schema
.fields()
.iter()
.find(|f| f.name() == col_name)
} else {
self.physical_file_schema
.fields()
.iter()
.find(|f| f.name().to_lowercase() == col_name.to_lowercase())
};

if let (Some(logical_field), Some(physical_field)) = (logical_field, physical_field)
{
Expand Down
Loading