Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4198db9
feat: wrong version which did weird stuff
discord9 Dec 19, 2025
3bf503a
feat: gather filter support alias
discord9 Dec 25, 2025
f4f9b19
feat: add support for detecting unknown columns in filter pushdown & …
discord9 Dec 25, 2025
df19fc2
feat: update projection alias handling and enhance PhysicalColumnRewr…
discord9 Dec 25, 2025
85a3c12
feat: update deeply nested expression helper function and enhance tes…
discord9 Dec 25, 2025
c60bf05
chore: clippy
discord9 Dec 25, 2025
5c51005
typo
discord9 Dec 25, 2025
31df516
feat: update test assertions for filter pushdown to reflect expected …
discord9 Dec 25, 2025
ae6a63c
c
discord9 Dec 25, 2025
085e2ba
c
discord9 Dec 25, 2025
e7e3a7a
clippy
discord9 Dec 26, 2025
8b3990f
test: update sqllogic test result
discord9 Dec 26, 2025
1dbd0a4
test: more complex dyn filter
discord9 Dec 26, 2025
392ecb9
c
discord9 Dec 26, 2025
49945bf
refactor: rename function have_unknown_columns to has_unknown_columns…
discord9 Jan 4, 2026
f42c2a3
test: topk with projection
discord9 Jan 5, 2026
f6489f1
test: slt test for projection dyn filter
discord9 Jan 5, 2026
4680cfa
chore
discord9 Jan 5, 2026
747f2b9
test: ignore time
discord9 Jan 5, 2026
98afab7
chore: fmt
discord9 Jan 5, 2026
fa2ac40
test: more slt test
discord9 Jan 5, 2026
5175fd0
test: fix
discord9 Jan 7, 2026
17adf3f
test: more ignore
discord9 Jan 7, 2026
64adaae
test: more ignore&proper sql
discord9 Jan 7, 2026
94b762b
feat: unmap column not pushdown
discord9 Jan 9, 2026
3fb15e4
clippy
discord9 Jan 9, 2026
2ed6fe0
chore
discord9 Jan 9, 2026
5a43014
test: add pushdown assert
discord9 Jan 9, 2026
1839cd1
refactor: ref column map
discord9 Jan 9, 2026
d3a9259
refactor: per review
discord9 Jan 20, 2026
cbb0ab5
clippy
discord9 Jan 20, 2026
a2cebc3
what
discord9 Jan 20, 2026
6e8588b
test: slt update
discord9 Jan 20, 2026
1cb3fbb
refactor: rename per review
discord9 Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ env_logger = { workspace = true }
glob = { workspace = true }
insta = { workspace = true }
paste = { workspace = true }
pretty_assertions = "1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to already be used elsewhere (this is not a net new depednecy), so I think it is ok to add

rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5"
recursive = { workspace = true }
Expand Down
229 changes: 229 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::{FilterExec, FilterExecBuilder},
projection::ProjectionExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
};
Expand Down Expand Up @@ -1826,6 +1827,234 @@ fn schema() -> SchemaRef {
Arc::clone(&TEST_SCHEMA)
}

struct ProjectionDynFilterTestCase {
schema: SchemaRef,
batches: Vec<RecordBatch>,
projection: Vec<(Arc<dyn PhysicalExpr>, String)>,
sort_expr: PhysicalSortExpr,
expected_plans: Vec<String>,
}

async fn run_projection_dyn_filter_case(case: ProjectionDynFilterTestCase) {
let ProjectionDynFilterTestCase {
schema,
batches,
projection,
sort_expr,
expected_plans,
} = case;

let scan = TestScanBuilder::new(Arc::clone(&schema))
.with_support(true)
.with_batches(batches)
.build();

let projection_exec = Arc::new(ProjectionExec::try_new(projection, scan).unwrap());

let sort = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), projection_exec)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;

let optimized_plan = FilterPushdown::new_post_optimization()
.optimize(Arc::clone(&sort), &config)
.unwrap();

pretty_assertions::assert_eq!(
format_plan_for_test(&optimized_plan).trim(),
expected_plans[0].trim()
);

let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = optimized_plan.execute(0, Arc::clone(&task_ctx)).unwrap();
for (idx, expected_plan) in expected_plans.iter().enumerate().skip(1) {
stream.next().await.unwrap().unwrap();
let formatted_plan = format_plan_for_test(&optimized_plan);
pretty_assertions::assert_eq!(
formatted_plan.trim(),
expected_plan.trim(),
"Mismatch at iteration {}",
idx
);
}
}

#[tokio::test]
async fn test_topk_with_projection_transformation_on_dyn_filter() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let simple_abc = vec![
record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["x", "y", "z"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];

// Case 1: Reordering [b, a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("b", &schema).unwrap(), "b".to_string()),
(col("a", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 1)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false], filter=[a@1 IS NULL OR a@1 < 2]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string()]
})
.await;

// Case 2: Pruning [a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![(col("a", &schema).unwrap(), "a".to_string())],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 3: Identity [a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "a".to_string()),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 4: Expressions [a + 1, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a_plus_1".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a_plus_1", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false], filter=[a_plus_1@0 IS NULL OR a_plus_1@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;

// Case 5: [a as b, b as a] (swapped columns)
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "b".to_string()),
(col("b", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("b", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false], filter=[b@0 IS NULL OR b@0 < 2]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 6: Confusing expr [a + 1 as a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;
}

/// Returns a predicate that is a binary expression col = lit
fn col_lit_predicate(
column_name: &str,
Expand Down
Loading