Skip to content

Commit

Permalink
Fix error running data fusion queries - Physical input schema should …
Browse files Browse the repository at this point in the history
…be the same as the one converted from logical input schema (#664)

* xx

* fix comment

* fix comment

* fix comment
  • Loading branch information
FANNG1 authored Nov 28, 2024
1 parent ce04058 commit 6a714ed
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
6 changes: 5 additions & 1 deletion crates/integrations/datafusion/src/physical_plan/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl IcebergTableScan {
projection: Option<&Vec<usize>>,
filters: &[Expr],
) -> Self {
let plan_properties = Self::compute_properties(schema.clone());
let output_schema = match projection {
None => schema.clone(),
Some(projection) => Arc::new(schema.project(projection).unwrap()),
};
let plan_properties = Self::compute_properties(output_schema.clone());
let projection = get_column_names(schema.clone(), projection);
let predicates = convert_filters_to_predicate(filters);

Expand Down
14 changes: 14 additions & 0 deletions crates/integrations/datafusion/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,18 @@ mod tests {
let has_column = df_schema.has_column(&Column::from_name("z"));
assert!(has_column);
}

#[tokio::test]
async fn test_physical_input_schema_consistent_with_logical_input_schema() {
let table = get_test_table_from_metadata_file().await;
let table_provider = IcebergTableProvider::try_new_from_table(table.clone())
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("mytable", Arc::new(table_provider))
.unwrap();
let df = ctx.sql("SELECT count(*) FROM mytable").await.unwrap();
let physical_plan = df.create_physical_plan().await;
assert!(physical_plan.is_ok())
}
}

0 comments on commit 6a714ed

Please sign in to comment.