Skip to content

Commit 871aa7c

Browse files
Evaluate projections at the file source level
1 parent bea4b68 commit 871aa7c

File tree

4 files changed

+130
-62
lines changed

4 files changed

+130
-62
lines changed

datafusion/datasource/src/file.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@ use crate::schema_adapter::SchemaAdapterFactory;
2929
use arrow::datatypes::SchemaRef;
3030
use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{not_impl_err, Result, Statistics};
32+
use datafusion_physical_expr::expressions::Column;
33+
use datafusion_physical_expr::projection::ProjectionExprs;
3234
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
3335
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3436
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
37+
use datafusion_physical_plan::projection::{
38+
all_alias_free_columns, new_projections_for_columns,
39+
};
3540
use datafusion_physical_plan::DisplayFormatType;
3641

3742
use object_store::ObjectStore;
@@ -129,6 +134,47 @@ pub trait FileSource: Send + Sync {
129134
))
130135
}
131136

137+
fn try_pushdown_projections(
138+
&self,
139+
projection_exprs: &ProjectionExprs,
140+
file_schema: &SchemaRef,
141+
current_projection: Option<&[usize]>,
142+
) -> Result<ProjectionPushdownResult> {
143+
let projection_slice: Vec<_> = projection_exprs.iter().cloned().collect();
144+
145+
// check if there are any partition columns in projection (columns beyond file schema)
146+
let partitioned_columns_in_proj = projection_slice.iter().any(|proj_expr| {
147+
proj_expr
148+
.expr
149+
.as_any()
150+
.downcast_ref::<Column>()
151+
.map(|expr| expr.index() >= file_schema.fields().len())
152+
.unwrap_or(false)
153+
});
154+
155+
// if there are any non-column or alias-carrier expressions, projection should not be removed
156+
let no_aliases = all_alias_free_columns(&projection_slice);
157+
158+
if !no_aliases || partitioned_columns_in_proj {
159+
return Ok(ProjectionPushdownResult::None);
160+
}
161+
162+
let all_projections: Vec<usize> = (0..file_schema.fields().len()).collect();
163+
let source_projection = current_projection.unwrap_or(&all_projections);
164+
165+
let new_projection_indices =
166+
new_projections_for_columns(&projection_slice, source_projection);
167+
168+
// return a partial projection with the new projection indices
169+
// if `new_file_source` is None, it means the file source doesn't change,
170+
// rather the new projection is updated in `FileScanConfig`
171+
Ok(ProjectionPushdownResult::Partial {
172+
new_file_source: None,
173+
remaining_projections: None,
174+
new_projection_indices: Some(new_projection_indices),
175+
})
176+
}
177+
132178
/// Set optional schema adapter factory.
133179
///
134180
/// [`SchemaAdapterFactory`] allows user to specify how fields from the
@@ -155,3 +201,12 @@ pub trait FileSource: Send + Sync {
155201
None
156202
}
157203
}
204+
205+
pub enum ProjectionPushdownResult {
206+
None,
207+
Partial {
208+
new_file_source: Option<Arc<dyn FileSource>>,
209+
remaining_projections: Option<ProjectionExprs>,
210+
new_projection_indices: Option<Vec<usize>>,
211+
},
212+
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! [`FileScanConfig`] to configure scanning of possibly partitioned
1919
//! file sources.
2020
21+
use crate::file::ProjectionPushdownResult;
2122
use crate::file_groups::FileGroup;
2223
#[allow(unused_imports)]
2324
use crate::schema_adapter::SchemaAdapterFactory;
@@ -44,16 +45,14 @@ use datafusion_execution::{
4445
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
4546
};
4647
use datafusion_expr::Operator;
47-
use datafusion_physical_expr::expressions::{BinaryExpr, Column};
48+
use datafusion_physical_expr::expressions::BinaryExpr;
4849
use datafusion_physical_expr::projection::ProjectionExprs;
4950
use datafusion_physical_expr::utils::reassign_expr_columns;
5051
use datafusion_physical_expr::{split_conjunction, EquivalenceProperties, Partitioning};
5152
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
5253
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5354
use datafusion_physical_expr_common::sort_expr::LexOrdering;
54-
use datafusion_physical_plan::projection::{
55-
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
56-
};
55+
use datafusion_physical_plan::projection::ProjectionExpr;
5756
use datafusion_physical_plan::{
5857
display::{display_orderings, ProjectSchemaDisplay},
5958
filter_pushdown::FilterPushdownPropagation,
@@ -679,42 +678,42 @@ impl DataSource for FileScanConfig {
679678
fn try_swapping_with_projection(
680679
&self,
681680
projection: &[ProjectionExpr],
682-
) -> Result<Option<Arc<dyn DataSource>>> {
683-
// This process can be moved into CsvExec, but it would be an overlap of their responsibility.
684-
685-
// Must be all column references, with no table partition columns (which can not be projected)
686-
let partitioned_columns_in_proj = projection.iter().any(|proj_expr| {
687-
proj_expr
688-
.expr
689-
.as_any()
690-
.downcast_ref::<Column>()
691-
.map(|expr| expr.index() >= self.file_schema().fields().len())
692-
.unwrap_or(false)
693-
});
681+
) -> Result<crate::source::ProjectionPushdownResult> {
682+
let new_projection_exprs = ProjectionExprs::from(projection);
683+
684+
// get current projection indices if they exist
685+
let current_projection = self
686+
.projection_exprs
687+
.as_ref()
688+
.map(|p| p.ordered_column_indices());
689+
690+
// pass the new projections to the file source, along wiht the current projection
691+
// the file source will merge them if possible
692+
let res = self.file_source().try_pushdown_projections(
693+
&new_projection_exprs,
694+
self.file_schema(),
695+
current_projection.as_deref(),
696+
)?;
694697

695-
// If there is any non-column or alias-carrier expression, Projection should not be removed.
696-
let no_aliases = all_alias_free_columns(projection);
697-
698-
Ok((no_aliases && !partitioned_columns_in_proj).then(|| {
699-
let file_scan = self.clone();
700-
let source = Arc::clone(&file_scan.file_source);
701-
let new_projections = new_projections_for_columns(
702-
projection,
703-
&file_scan
704-
.projection_exprs
705-
.as_ref()
706-
.map(|p| p.ordered_column_indices())
707-
.unwrap_or_else(|| (0..self.file_schema().fields().len()).collect()),
708-
);
698+
match res {
699+
ProjectionPushdownResult::None => Ok(None),
700+
ProjectionPushdownResult::Partial {
701+
new_file_source,
702+
remaining_projections,
703+
new_projection_indices,
704+
} => {
705+
let mut builder = FileScanConfigBuilder::from(self.clone());
706+
707+
if let Some(new_source) = new_file_source {
708+
builder = builder.with_source(new_source);
709+
}
710+
711+
builder = builder.with_projection_indices(new_projection_indices);
709712

710-
Arc::new(
711-
FileScanConfigBuilder::from(file_scan)
712-
// Assign projected statistics to source
713-
.with_projection_indices(Some(new_projections))
714-
.with_source(source)
715-
.build(),
716-
) as _
717-
}))
713+
let new_config = Arc::new(builder.build()) as Arc<dyn DataSource>;
714+
Ok(Some((new_config, remaining_projections)))
715+
}
716+
}
718717
}
719718

720719
fn try_pushdown_filters(
@@ -2300,7 +2299,7 @@ mod tests {
23002299

23012300
// Simulate projection being updated. Since the filter has already been pushed down,
23022301
// the new projection won't include the filtered column.
2303-
let data_source = config
2302+
let (data_source, _remaining_projections) = config
23042303
.try_swapping_with_projection(&[ProjectionExpr::new(
23052304
col("c3", &file_schema).unwrap(),
23062305
"c3".to_string(),

datafusion/datasource/src/memory.rs

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ use datafusion_physical_expr::equivalence::project_orderings;
3535
use datafusion_physical_expr::utils::collect_columns;
3636
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
3737
use datafusion_physical_plan::memory::MemoryStream;
38-
use datafusion_physical_plan::projection::{
39-
all_alias_free_columns, new_projections_for_columns, ProjectionExpr,
40-
};
38+
use datafusion_physical_plan::projection::ProjectionExpr;
4139
use datafusion_physical_plan::{
4240
common, ColumnarValue, DisplayAs, DisplayFormatType, Partitioning, PhysicalExpr,
4341
SendableRecordBatchStream, Statistics,
@@ -228,25 +226,29 @@ impl DataSource for MemorySourceConfig {
228226
fn try_swapping_with_projection(
229227
&self,
230228
projection: &[ProjectionExpr],
231-
) -> Result<Option<Arc<dyn DataSource>>> {
229+
) -> Result<crate::source::ProjectionPushdownResult> {
230+
use datafusion_physical_plan::projection::{
231+
all_alias_free_columns, new_projections_for_columns,
232+
};
233+
232234
// If there is any non-column or alias-carrier expression, Projection should not be removed.
233-
// This process can be moved into MemoryExec, but it would be an overlap of their responsibility.
234-
all_alias_free_columns(projection)
235-
.then(|| {
236-
let all_projections = (0..self.schema.fields().len()).collect();
237-
let new_projections = new_projections_for_columns(
238-
projection,
239-
self.projection().as_ref().unwrap_or(&all_projections),
240-
);
235+
if !all_alias_free_columns(projection) {
236+
return Ok(None);
237+
}
241238

242-
MemorySourceConfig::try_new(
243-
self.partitions(),
244-
self.original_schema(),
245-
Some(new_projections),
246-
)
247-
.map(|s| Arc::new(s) as Arc<dyn DataSource>)
248-
})
249-
.transpose()
239+
let all_projections: Vec<usize> = (0..self.schema.fields().len()).collect();
240+
let new_projections = new_projections_for_columns(
241+
projection,
242+
self.projection().as_ref().unwrap_or(&all_projections),
243+
);
244+
245+
let new_source = MemorySourceConfig::try_new(
246+
self.partitions(),
247+
self.original_schema(),
248+
Some(new_projections),
249+
)?;
250+
251+
Ok(Some((Arc::new(new_source) as Arc<dyn DataSource>, None)))
250252
}
251253
}
252254

datafusion/datasource/src/source.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::fmt;
2222
use std::fmt::{Debug, Formatter};
2323
use std::sync::Arc;
2424

25+
use datafusion_physical_expr::projection::ProjectionExprs;
2526
use datafusion_physical_plan::execution_plan::{
2627
Boundedness, EmissionType, SchedulingType,
2728
};
@@ -175,7 +176,7 @@ pub trait DataSource: Send + Sync + Debug {
175176
fn try_swapping_with_projection(
176177
&self,
177178
_projection: &[ProjectionExpr],
178-
) -> Result<Option<Arc<dyn DataSource>>>;
179+
) -> Result<ProjectionPushdownResult>;
179180
/// Try to push down filters into this DataSource.
180181
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
181182
///
@@ -191,6 +192,9 @@ pub trait DataSource: Send + Sync + Debug {
191192
}
192193
}
193194

195+
pub type ProjectionPushdownResult =
196+
Option<(Arc<dyn DataSource>, Option<ProjectionExprs>)>;
197+
194198
/// [`ExecutionPlan`] that reads one or more files
195199
///
196200
/// `DataSourceExec` implements common functionality such as applying
@@ -321,8 +325,16 @@ impl ExecutionPlan for DataSourceExec {
321325
.data_source
322326
.try_swapping_with_projection(projection.expr())?
323327
{
324-
Some(new_data_source) => {
325-
Ok(Some(Arc::new(DataSourceExec::new(new_data_source))))
328+
Some((new_data_source, remaining_projections)) => {
329+
let new_exec = Arc::new(DataSourceExec::new(new_data_source));
330+
if let Some(remaining_projections) = remaining_projections {
331+
let new_projection_exec =
332+
ProjectionExec::try_new(remaining_projections, new_exec)?;
333+
334+
return Ok(Some(Arc::new(new_projection_exec)));
335+
}
336+
337+
Ok(Some(new_exec))
326338
}
327339
None => Ok(None),
328340
}

0 commit comments

Comments
 (0)