Skip to content

Commit

Permalink
Fix None Projections in Projection Pushdown (#9005)
Browse files Browse the repository at this point in the history
* Fix none projections

* Update select.slt
  • Loading branch information
berkaysynnada authored Jan 26, 2024
1 parent 7005e2e commit ec6abec
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
26 changes: 18 additions & 8 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ fn try_swapping_with_csv(
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
all_alias_free_columns(projection.expr()).then(|| {
let mut file_scan = csv.base_config().clone();
let new_projections =
new_projections_for_columns(projection, &file_scan.projection);
let new_projections = new_projections_for_columns(
projection,
&file_scan
.projection
.unwrap_or((0..csv.schema().fields().len()).collect()),
);
file_scan.projection = Some(new_projections);

Arc::new(CsvExec::new(
Expand All @@ -188,8 +192,11 @@ fn try_swapping_with_memory(
// This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
all_alias_free_columns(projection.expr())
.then(|| {
let new_projections =
new_projections_for_columns(projection, memory.projection());
let all_projections = (0..memory.schema().fields().len()).collect();
let new_projections = new_projections_for_columns(
projection,
memory.projection().as_ref().unwrap_or(&all_projections),
);

MemoryExec::try_new(
memory.partitions(),
Expand All @@ -216,8 +223,11 @@ fn try_swapping_with_streaming_table(
.projection()
.as_ref()
.map(|i| i.as_ref().to_vec());
let new_projections =
new_projections_for_columns(projection, &streaming_table_projections);
let new_projections = new_projections_for_columns(
projection,
&streaming_table_projections
.unwrap_or((0..streaming_table.schema().fields().len()).collect()),
);

let mut lex_orderings = vec![];
for lex_ordering in streaming_table.projected_output_ordering().into_iter() {
Expand Down Expand Up @@ -833,15 +843,15 @@ fn all_alias_free_columns(exprs: &[(Arc<dyn PhysicalExpr>, String)]) -> bool {
/// ensure that all expressions are `Column` expressions without aliases.
fn new_projections_for_columns(
projection: &ProjectionExec,
source: &Option<Vec<usize>>,
source: &[usize],
) -> Vec<usize> {
projection
.expr()
.iter()
.filter_map(|(expr, _)| {
expr.as_any()
.downcast_ref::<Column>()
.and_then(|expr| source.as_ref().map(|proj| proj[expr.index()]))
.map(|expr| source[expr.index()])
})
.collect()
}
Expand Down
14 changes: 14 additions & 0 deletions datafusion/sqllogictest/test_files/select.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,17 @@ query I
select count(1) from v;
----
1

# run below query without logical optimizations
statement ok
set datafusion.optimizer.max_passes=0;

statement ok
CREATE TABLE t(a int, b int);

query I
select a from t;
----

statement ok
set datafusion.optimizer.max_passes=3;

0 comments on commit ec6abec

Please sign in to comment.