Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

277 changes: 269 additions & 8 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::{
};
use arrow_schema::SortOptions;
use datafusion::{
assert_batches_eq,
assert_batches_eq, assert_batches_sorted_eq,
logical_expr::Operator,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
Expand Down Expand Up @@ -60,6 +60,8 @@ use futures::StreamExt;
use object_store::{memory::InMemory, ObjectStore};
use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder};

use crate::physical_optimizer::filter_pushdown::util::SlowPartitionNode;

mod util;

#[test]
Expand Down Expand Up @@ -1199,13 +1201,6 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {

let result = format!("{}", pretty_format_batches(&batches).unwrap());

let probe_scan_metrics = probe_scan.metrics().unwrap();

// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
// The number of output rows from the probe side scan should stay consistent across executions.
// Issue: https://github.com/apache/datafusion/issues/17451
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);

insta::assert_snapshot!(
result,
@r"
Expand All @@ -1219,6 +1214,272 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}

/// Test demonstrating progressive dynamic filter evolution in partitioned hash joins
///
/// This test validates that instead of waiting for all
/// build-side partitions to complete before applying any filters, we apply partial filters
/// immediately as each partition completes.
/// To be able to evaluate partial filters we need to know which partition each probe row belongs to,
/// so we push down a hash function to the probe side that computes the same hash as repartitioning will later on.
///
/// ## Test Scenario Setup
///
/// - **Build side**: Values [1, 2, 3, 4] distributed across 3 hash partitions
/// - **Probe side**: Values [2, 3] that need to be filtered
/// - **Partition 1 is artificially slowed**: Simulates real-world partition skew
///
/// ## Progressive Filter Evolution Demonstration
///
/// The test shows how the dynamic filter evolves through three distinct phases:
///
/// ### Phase 1: Initial State (All Partitions Building)
/// ```sql
/// -- No filter applied yet
/// predicate=DynamicFilterPhysicalExpr [ true ]
/// ```
/// → All probe-side data passes through unfiltered
///
/// ### Phase 2: Progressive Filtering (Some Partitions Complete)
/// ```sql
/// -- Hash-based progressive filter after partition 0 completes
/// predicate=DynamicFilterPhysicalExpr [
/// CASE repartition_hash(id@0) % 3
/// WHEN 0 THEN id@0 >= 3 AND id@0 <= 3 -- Only partition 0 bounds known
/// ELSE true -- Pass through partitions 1,2 data
/// END
/// ]
/// ```
/// → Filters probe data for partition 0, passes through everything else safely
///
/// ### Phase 3: Final Optimization (All Partitions Complete)
/// ```sql
/// -- Optimized bounds-only filter
/// predicate=DynamicFilterPhysicalExpr [
/// id@0 >= 3 AND id@0 <= 3 OR -- Partition 0 bounds
/// id@0 >= 2 AND id@0 <= 2 OR -- Partition 1 bounds
/// id@0 >= 1 AND id@0 <= 4 -- Partition 2 bounds
/// ]
/// ```
/// → Bounds filter with no hash computation overhead
///
/// ## Correctness Validation
///
/// The test verifies:
/// 1. **No False Negatives**: All valid join results [2,3] are preserved throughout
/// 2. **Progressive Improvement**: Filter selectivity increases as partitions complete
/// 3. **Final Optimization**: Hash-based expressions are removed when all partitions finish
/// 4. **Partition Isolation**: Each partition's filter only affects its own hash bucket
///
/// ## Real-World Impact
///
/// This optimization addresses common production scenarios where:
/// - Some partitions finish much faster than others (data skew)
/// - Waiting for large build sides before starting the probe sides increases latency
#[tokio::test]
#[cfg(not(feature = "force_hash_collisions"))] // this test relies on hash partitioning to separate rows
async fn test_hashjoin_progressive_filter_reporting() {
use datafusion_common::JoinType;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

use crate::physical_optimizer::filter_pushdown::util::TestRepartitionHash;

// Create build side with limited values
let build_batches_1 = vec![record_batch!(("id", UInt64, [1, 2, 3, 4])).unwrap()];
let build_side_schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::UInt64, false)]));
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
.with_support(true)
// Add two batches -> creates 2 partitions
.with_batches(build_batches_1)
.build();

// Create probe side with more values
let probe_batches_1 = vec![record_batch!(("id", UInt64, [2, 3])).unwrap()];
let probe_side_schema =
Arc::new(Schema::new(vec![Field::new("id", DataType::UInt64, false)]));
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
.with_support(true)
.with_batches(probe_batches_1)
.build();

// Create RepartitionExec nodes for both sides with hash partitioning on join keys
let partition_count = 3;

// Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
let build_hash_exprs = vec![
col("id", &build_side_schema).unwrap(),
col("id", &build_side_schema).unwrap(),
];
let build_side = Arc::new(
RepartitionExec::try_new(
build_scan,
Partitioning::Hash(build_hash_exprs, partition_count),
)
.unwrap()
.with_hash_function(ScalarUDF::new_from_impl(TestRepartitionHash::new())),
);
let build_side = Arc::new(SlowPartitionNode::new(build_side, vec![1]));

// Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec
let probe_hash_exprs = vec![
col("id", &probe_side_schema).unwrap(),
col("id", &probe_side_schema).unwrap(),
];
let probe_side = Arc::new(
RepartitionExec::try_new(
Arc::clone(&probe_scan),
Partitioning::Hash(probe_hash_exprs, partition_count),
)
.unwrap()
.with_hash_function(ScalarUDF::new_from_impl(TestRepartitionHash::new())),
);

// Create HashJoinExec with partitioned inputs
let on = vec![(
col("id", &build_side_schema).unwrap(),
col("id", &probe_side_schema).unwrap(),
)];
let plan = Arc::new(
HashJoinExec::try_new(
Arc::clone(&build_side) as Arc<dyn ExecutionPlan>,
probe_side,
on,
None,
&JoinType::Inner,
None,
PartitionMode::Partitioned,
datafusion_common::NullEquality::NullEqualsNothing,
)
.unwrap(),
) as Arc<dyn ExecutionPlan>;

// Verify the initial optimization - should show DynamicFilterPhysicalExpr is set up
// but not yet populated with any bounds (shows as "true" initially)
insta::assert_snapshot!(
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
@r"
OptimizationTest:
input:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- SlowPartitionNode
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
output:
Ok:
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- SlowPartitionNode
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
"
);

// Actually apply the optimization to the plan and execute to see the filter in action
let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;
let plan = FilterPushdown::new_post_optimization()
.optimize(plan, &config)
.unwrap();
let config = SessionConfig::new().with_batch_size(10);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut batches = Vec::new();

// Execute partition 0 directly to test progressive behavior
// This partition should complete while partition 1 is blocked
let mut stream_0 = plan.execute(0, Arc::clone(&task_ctx)).unwrap();

// Pull batches from partition 0 (should work even while partition 1 is blocked)
while let Some(batch_result) = stream_0.next().await {
let batch = batch_result.unwrap();
if batch.num_rows() > 0 {
batches.push(batch);
}
}

// CRITICAL VALIDATION: This snapshot shows the progressive filter in action!
// After partition 0 completes (but partition 1 is still blocked), we see:
// - CASE repartition_hash(id@0) % 3 WHEN 0 THEN id@0 >= 3 AND id@0 <= 3 ELSE true END
// This means:
// - For rows that hash to partition 0: Apply bounds check (id >= 3 AND id <= 3)
// - For rows that hash to partitions 1,2: Pass everything through (ELSE true)
// This is the core of progressive filtering - partial filtering without false negatives!
#[cfg(not(feature = "force_hash_collisions"))]
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- SlowPartitionNode
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ CASE repartition_hash(id@0) % 3 WHEN 0 THEN id@0 >= 3 AND id@0 <= 3 ELSE true END ]
"
);

#[rustfmt::skip]
let expected = [
"+----+----+",
"| id | id |",
"+----+----+",
"| 3 | 3 |",
"+----+----+",
];

assert_batches_sorted_eq!(expected, &batches);

// Wake the slow build side partition 0 to allow it to complete
build_side.unblock();

// Pull remaining batches
let mut stream_1 = plan.execute(1, Arc::clone(&task_ctx)).unwrap();
while let Some(batch) = stream_1.next().await {
batches.push(batch.unwrap());
}
let mut stream_2 = plan.execute(2, Arc::clone(&task_ctx)).unwrap();
while let Some(batch) = stream_2.next().await {
batches.push(batch.unwrap());
}

// FINAL OPTIMIZATION VALIDATION: All partitions complete - filter is now optimized!
// The hash-based CASE expression has been replaced with a simple OR of bounds:
// - id@0 >= 3 AND id@0 <= 3 OR id@0 >= 2 AND id@0 <= 2 OR id@0 >= 1 AND id@0 <= 4
// This is much more efficient - no hash computation needed, just bounds checks.
// Each OR clause represents one partition's bounds: [3,3], [2,2], [1,4]
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(id@0, id@0)]
- SlowPartitionNode
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true
- RepartitionExec: partitioning=Hash([id@0, id@0], 3), input_partitions=1
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[id], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ id@0 >= 3 AND id@0 <= 3 OR id@0 >= 2 AND id@0 <= 2 OR id@0 >= 1 AND id@0 <= 4 ]
"
);

// Look at the final results
#[rustfmt::skip]
let expected = [
"+----+----+",
"| id | id |",
"+----+----+",
"| 2 | 2 |",
"| 3 | 3 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &batches);
}

#[tokio::test]
async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
use datafusion_common::JoinType;
Expand Down
Loading
Loading