Skip to content

Commit dfba228

Browse files
feniljainadriangbclaude
authored
feat: allow pushdown of dynamic filters having partition cols (#18172)
## Which issue does this PR close? - Closes #18171 ## Rationale for this change Included in the issue ## Are these changes tested? While I have tested this on local with a local TPCDS-like dataset, I would appreciate if someone provides me a good way to add tests for the same 😅 --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent e432d55 commit dfba228

File tree

16 files changed

+150
-55
lines changed

16 files changed

+150
-55
lines changed

datafusion-examples/examples/csv_json_opener.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ use datafusion::{
3131
test_util::aggr_test_schema,
3232
};
3333

34-
use datafusion::datasource::physical_plan::FileScanConfigBuilder;
34+
use datafusion::datasource::{
35+
physical_plan::FileScanConfigBuilder, table_schema::TableSchema,
36+
};
3537
use futures::StreamExt;
3638
use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore};
3739

@@ -67,7 +69,7 @@ async fn csv_opener() -> Result<()> {
6769

6870
let config = CsvSource::new(true, b',', b'"')
6971
.with_comment(Some(b'#'))
70-
.with_schema(schema)
72+
.with_schema(TableSchema::from_file_schema(schema))
7173
.with_batch_size(8192)
7274
.with_projection(&scan_config);
7375

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ pub use datafusion_catalog::view;
4545
pub use datafusion_datasource::schema_adapter;
4646
pub use datafusion_datasource::sink;
4747
pub use datafusion_datasource::source;
48+
pub use datafusion_datasource::table_schema;
4849
pub use datafusion_execution::object_store;
4950
pub use datafusion_physical_expr::create_ordering;
5051

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ mod tests {
5454
use datafusion_datasource::source::DataSourceExec;
5555

5656
use datafusion_datasource::file::FileSource;
57-
use datafusion_datasource::{FileRange, PartitionedFile};
57+
use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
5858
use datafusion_datasource_parquet::source::ParquetSource;
5959
use datafusion_datasource_parquet::{
6060
DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
@@ -186,7 +186,7 @@ mod tests {
186186
source = source.with_bloom_filter_on_read(false);
187187
}
188188

189-
source.with_schema(Arc::clone(&table_schema))
189+
source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![]))
190190
}
191191

192192
fn build_parquet_exec(

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::prelude::{Expr, SessionConfig, SessionContext};
4040
use datafusion_datasource::file::FileSource;
4141
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4242
use datafusion_datasource::source::DataSourceExec;
43+
use datafusion_datasource::TableSchema;
4344
use object_store::path::Path;
4445
use object_store::ObjectMeta;
4546
use parquet::arrow::ArrowWriter;
@@ -186,7 +187,7 @@ impl TestParquetFile {
186187
ParquetSource::new(parquet_options)
187188
.with_predicate(Arc::clone(&physical_filter_expr)),
188189
)
189-
.with_schema(Arc::clone(&self.schema));
190+
.with_schema(TableSchema::from_file_schema(Arc::clone(&self.schema)));
190191
let config = scan_config_builder.with_source(source).build();
191192
let parquet_exec = DataSourceExec::from_data_source(config);
192193

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use datafusion_datasource::{
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
2525
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
2626
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
27+
TableSchema,
2728
};
2829
use datafusion_physical_expr_common::physical_expr::fmt_sql;
2930
use datafusion_physical_optimizer::PhysicalOptimizerRule;
@@ -156,9 +157,13 @@ impl FileSource for TestSource {
156157
})
157158
}
158159

159-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
160+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
161+
assert!(
162+
schema.table_partition_cols().is_empty(),
163+
"TestSource does not support partition columns"
164+
);
160165
Arc::new(TestSource {
161-
schema: Some(schema),
166+
schema: Some(schema.file_schema().clone()),
162167
..self.clone()
163168
})
164169
}

datafusion/datasource-arrow/src/source.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use std::sync::Arc;
2020

2121
use datafusion_datasource::as_file_source;
2222
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
23+
use datafusion_datasource::TableSchema;
2324

2425
use arrow::buffer::Buffer;
25-
use arrow::datatypes::SchemaRef;
2626
use arrow_ipc::reader::FileDecoder;
2727
use datafusion_common::error::Result;
2828
use datafusion_common::{exec_datafusion_err, Statistics};
@@ -73,7 +73,7 @@ impl FileSource for ArrowSource {
7373
Arc::new(Self { ..self.clone() })
7474
}
7575

76-
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
76+
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
7777
Arc::new(Self { ..self.clone() })
7878
}
7979
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {

datafusion/datasource-avro/src/source.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use datafusion_datasource::file::FileSource;
2929
use datafusion_datasource::file_scan_config::FileScanConfig;
3030
use datafusion_datasource::file_stream::FileOpener;
3131
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32+
use datafusion_datasource::TableSchema;
3233
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3334
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3435

@@ -84,11 +85,13 @@ impl FileSource for AvroSource {
8485
Arc::new(conf)
8586
}
8687

87-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
88+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
8889
let mut conf = self.clone();
89-
conf.schema = Some(schema);
90+
// TableSchema may have partition columns, but AvroSource does not use partition columns or values atm
91+
conf.schema = Some(Arc::clone(schema.file_schema()));
9092
Arc::new(conf)
9193
}
94+
9295
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
9396
let mut conf = self.clone();
9497
conf.projected_statistics = Some(statistics);

datafusion/datasource-csv/src/source.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
2929
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3030
use datafusion_datasource::{
3131
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
32-
RangeCalculation,
32+
RangeCalculation, TableSchema,
3333
};
3434

3535
use arrow::csv;
@@ -258,9 +258,9 @@ impl FileSource for CsvSource {
258258
Arc::new(conf)
259259
}
260260

261-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
261+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
262262
let mut conf = self.clone();
263-
conf.file_schema = Some(schema);
263+
conf.file_schema = Some(Arc::clone(schema.file_schema()));
264264
Arc::new(conf)
265265
}
266266

datafusion/datasource-json/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3232
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3333
use datafusion_datasource::{
3434
as_file_source, calculate_range, ListingTableUrl, PartitionedFile, RangeCalculation,
35+
TableSchema,
3536
};
3637
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3738

@@ -122,7 +123,7 @@ impl FileSource for JsonSource {
122123
Arc::new(conf)
123124
}
124125

125-
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
126+
fn with_schema(&self, _schema: TableSchema) -> Arc<dyn FileSource> {
126127
Arc::new(Self { ..self.clone() })
127128
}
128129
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {

datafusion/datasource-parquet/src/source.rs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ use datafusion_datasource::schema_adapter::{
3535
DefaultSchemaAdapterFactory, SchemaAdapterFactory,
3636
};
3737

38-
use arrow::datatypes::{SchemaRef, TimeUnit};
38+
use arrow::datatypes::TimeUnit;
3939
use datafusion_common::config::TableParquetOptions;
4040
use datafusion_common::{DataFusionError, Statistics};
4141
use datafusion_datasource::file::FileSource;
4242
use datafusion_datasource::file_scan_config::FileScanConfig;
43+
use datafusion_datasource::TableSchema;
4344
use datafusion_physical_expr::conjunction;
4445
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
4546
use datafusion_physical_expr_common::physical_expr::fmt_sql;
@@ -274,7 +275,7 @@ pub struct ParquetSource {
274275
/// The schema of the file.
275276
/// In particular, this is the schema of the table without partition columns,
276277
/// *not* the physical schema of the file.
277-
pub(crate) file_schema: Option<SchemaRef>,
278+
pub(crate) table_schema: Option<TableSchema>,
278279
/// Optional predicate for row filtering during parquet scan
279280
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
280281
/// Optional user defined parquet file reader factory
@@ -599,9 +600,9 @@ impl FileSource for ParquetSource {
599600
Arc::new(conf)
600601
}
601602

602-
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
603+
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
603604
Arc::new(Self {
604-
file_schema: Some(schema),
605+
table_schema: Some(schema),
605606
..self.clone()
606607
})
607608
}
@@ -659,9 +660,10 @@ impl FileSource for ParquetSource {
659660
// the actual predicates are built in reference to the physical schema of
660661
// each file, which we do not have at this point and hence cannot use.
661662
// Instead we use the logical schema of the file (the table schema without partition columns).
662-
if let (Some(file_schema), Some(predicate)) =
663-
(&self.file_schema, &self.predicate)
664-
{
663+
if let (Some(file_schema), Some(predicate)) = (
664+
&self.table_schema.as_ref().map(|ts| ts.file_schema()),
665+
&self.predicate,
666+
) {
665667
let predicate_creation_errors = Count::new();
666668
if let (Some(pruning_predicate), _) = build_pruning_predicates(
667669
Some(predicate),
@@ -698,7 +700,12 @@ impl FileSource for ParquetSource {
698700
filters: Vec<Arc<dyn PhysicalExpr>>,
699701
config: &ConfigOptions,
700702
) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
701-
let Some(file_schema) = self.file_schema.clone() else {
703+
let Some(table_schema) = self
704+
.table_schema
705+
.as_ref()
706+
.map(|ts| ts.table_schema())
707+
.cloned()
708+
else {
702709
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
703710
vec![PushedDown::No; filters.len()],
704711
));
@@ -718,7 +725,7 @@ impl FileSource for ParquetSource {
718725
let filters: Vec<PushedDownPredicate> = filters
719726
.into_iter()
720727
.map(|filter| {
721-
if can_expr_be_pushed_down_with_schemas(&filter, &file_schema) {
728+
if can_expr_be_pushed_down_with_schemas(&filter, &table_schema) {
722729
PushedDownPredicate::supported(filter)
723730
} else {
724731
PushedDownPredicate::unsupported(filter)

0 commit comments

Comments
 (0)