diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 1a04753966a2..90c03cf93563 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -48,6 +48,7 @@ use datafusion_physical_optimizer::{ use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, coalesce_batches::CoalesceBatchesExec, + coalesce_partitions::CoalescePartitionsExec, filter::FilterExec, repartition::RepartitionExec, sorts::sort::SortExec, @@ -267,7 +268,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] " @@ -890,7 +891,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() { None, &JoinType::Inner, None, - PartitionMode::Partitioned, + PartitionMode::CollectLeft, datafusion_common::NullEquality::NullEqualsNothing, ) .unwrap(), @@ -902,12 +903,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() { @r" OptimizationTest: input: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true output: Ok: - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] ", @@ -936,13 +937,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb] + - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] " ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown_partitioned() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Rouugh plan we're trying to recreate: + // COPY (select i as k from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t1.parquet' + // STORED AS PARQUET; + // COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) + // TO 'test_files/scratch/push_down_filter/t2.parquet' + // STORED AS PARQUET; + // create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + // create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + // explain + // select * + // from t1 + // join t2 on t1.k = t2.k; + // +---------------+------------------------------------------------------------+ + // | plan_type | plan | + // +---------------+------------------------------------------------------------+ + // | physical_plan | ┌───────────────────────────┐ | + // | | │ CoalesceBatchesExec │ | + // | | │ -------------------- │ | + // | | │ target_batch_size: │ | + // | | │ 8192 │ | + // | | └─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐ | + // | | │ HashJoinExec │ | + // | | │ -------------------- ├──────────────┐ | + // | | │ on: (k = k) │ │ | + // | | └─────────────┬─────────────┘ │ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ CoalesceBatchesExec ││ CoalesceBatchesExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ target_batch_size: ││ target_batch_size: │ | + // | | │ 8192 ││ 8192 │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ RepartitionExec ││ RepartitionExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ partition_count(in->out): ││ partition_count(in->out): │ | + // | | │ 12 -> 12 ││ 12 -> 12 │ | + // | | │ ││ │ | + // | | │ partitioning_scheme: ││ partitioning_scheme: │ | + // | | │ Hash([k@0], 12) ││ Hash([k@0], 12) │ | + // | | └─────────────┬─────────────┘└─────────────┬─────────────┘ | + // | | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | + // | | │ DataSourceExec ││ DataSourceExec │ | + // | | │ -------------------- ││ -------------------- │ | + // | | │ files: 12 ││ files: 12 │ | + // | | │ format: parquet ││ format: parquet │ | + // | | │ ││ predicate: true │ | + // | | └───────────────────────────┘└───────────────────────────┘ | + // | | | + // +---------------+------------------------------------------------------------+ + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create RepartitionExec nodes for both sides with hash partitioning on join keys + let partition_count = 12; + + // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let build_hash_exprs = vec![ + col("a", &build_side_schema).unwrap(), + col("b", &build_side_schema).unwrap(), + ]; + let build_repartition = Arc::new( + RepartitionExec::try_new( + build_scan, + Partitioning::Hash(build_hash_exprs, partition_count), + ) + .unwrap(), + ); + let build_coalesce = Arc::new(CoalesceBatchesExec::new(build_repartition, 8192)); + + // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec + let probe_hash_exprs = vec![ + col("a", &probe_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ]; + let probe_repartition = Arc::new( + RepartitionExec::try_new( + probe_scan, + Partitioning::Hash(probe_hash_exprs, partition_count), + ) + .unwrap(), + ); + let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192)); + + // Create HashJoinExec with partitioned inputs + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let hash_join = Arc::new( + HashJoinExec::try_new( + build_coalesce, + probe_coalesce, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Top-level CoalesceBatchesExec + let cb = + Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc; + // Top-level CoalesceParititionsExec + let plan = Arc::new(CoalescePartitionsExec::new(cb)) as Arc; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + " + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + if let Some(batch_result) = stream.next().await { + batch_result.unwrap(); + } + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - CoalescePartitionsExec + - CoalesceBatchesExec: target_batch_size=8192 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - CoalesceBatchesExec: target_batch_size=8192 + - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] + " + ); +} + #[tokio::test] async fn test_nested_hashjoin_dynamic_filter_pushdown() { use datafusion_common::JoinType; @@ -1082,9 +1303,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() { insta::assert_snapshot!( format!("{}", format_plan_for_test(&plan)), @r" - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] " diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 65b9a54f9ae6..e7494113775e 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -20,7 +20,7 @@ use std::fmt; use std::mem::size_of; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::task::Poll; use std::{any::Any, vec}; @@ -98,6 +98,262 @@ use parking_lot::Mutex; const HASH_JOIN_SEED: RandomState = RandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +/// Represents the minimum and maximum values for a specific column. +/// Used in dynamic filter pushdown to establish value boundaries. +#[derive(Debug, Clone, PartialEq)] +struct ColumnBounds { + /// The minimum value observed for this column + min: ScalarValue, + /// The maximum value observed for this column + max: ScalarValue, +} + +impl ColumnBounds { + fn new(min: ScalarValue, max: ScalarValue) -> Self { + Self { min, max } + } +} + +/// Represents the bounds for all join key columns from a single partition. +/// This contains the min/max values computed from one partition's build-side data. +#[derive(Debug, Clone)] +struct PartitionBounds { + /// Min/max bounds for each join key column in this partition. + /// Index corresponds to the join key expression index. + column_bounds: Vec, +} + +impl PartitionBounds { + fn new(column_bounds: Vec) -> Self { + Self { column_bounds } + } + + fn len(&self) -> usize { + self.column_bounds.len() + } + + fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> { + self.column_bounds.get(index) + } +} + +/// Coordinates dynamic filter bounds collection across multiple partitions +/// +/// This structure ensures that dynamic filters are built with complete information from all +/// relevant partitions before being applied to probe-side scans. Incomplete filters would +/// incorrectly eliminate valid join results. +/// +/// ## Synchronization Strategy +/// +/// 1. Each partition computes bounds from its build-side data +/// 2. Bounds are stored in the shared HashMap (indexed by partition_id) +/// 3. A counter tracks how many partitions have reported their bounds +/// 4. When the last partition reports (completed == total), bounds are merged and filter is updated +/// +/// ## Partition Counting +/// +/// The `total_partitions` count represents how many times `collect_build_side` will be called: +/// - **CollectLeft**: Number of output partitions (each accesses shared build data) +/// - **Partitioned**: Number of input partitions (each builds independently) +/// +/// ## Thread Safety +/// +/// All fields use a single mutex to ensure correct coordination between concurrent +/// partition executions. +struct SharedBoundsAccumulator { + /// Shared state protected by a single mutex to avoid ordering concerns + inner: Mutex, + /// Total number of partitions. + /// Need to know this so that we can update the dynamic filter once we are done + /// building *all* of the hash tables. + total_partitions: usize, + /// Dynamic filter for pushdown to probe side + dynamic_filter: Arc, + /// Right side join expressions needed for creating filter bounds + on_right: Vec, +} + +/// State protected by SharedBoundsAccumulator's mutex +struct SharedBoundsState { + /// Bounds from completed partitions. + /// Each element represents the column bounds computed by one partition. + bounds: Vec, + /// Number of partitions that have reported completion. + completed_partitions: usize, +} + +impl SharedBoundsAccumulator { + /// Creates a new SharedBoundsAccumulator configured for the given partition mode + /// + /// This method calculates how many times `collect_build_side` will be called based on the + /// partition mode's execution pattern. This count is critical for determining when we have + /// complete information from all partitions to build the dynamic filter. + /// + /// ## Partition Mode Execution Patterns + /// + /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut` + /// across all output partitions. Each output partition calls `collect_build_side` to access + /// the shared build data. Expected calls = number of output partitions. + /// + /// - **Partitioned**: Each partition independently builds its own hash table by calling + /// `collect_build_side` once. Expected calls = number of build partitions. + /// + /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since + /// the actual mode will be determined and a new bounds_accumulator created before execution. + /// + /// ## Why This Matters + /// + /// We cannot build a partial filter from some partitions - it would incorrectly eliminate + /// valid join results. We must wait until we have complete bounds information from ALL + /// relevant partitions before updating the dynamic filter. + fn new_from_partition_mode( + partition_mode: PartitionMode, + left_child: &dyn ExecutionPlan, + right_child: &dyn ExecutionPlan, + dynamic_filter: Arc, + on_right: Vec, + ) -> Self { + // Troubleshooting: If partition counts are incorrect, verify this logic matches + // the actual execution pattern in collect_build_side() + let expected_calls = match partition_mode { + // Each output partition accesses shared build data + PartitionMode::CollectLeft => { + right_child.output_partitioning().partition_count() + } + // Each partition builds its own data + PartitionMode::Partitioned => { + left_child.output_partitioning().partition_count() + } + // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two) + PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"), + }; + Self { + inner: Mutex::new(SharedBoundsState { + bounds: Vec::with_capacity(expected_calls), + completed_partitions: 0, + }), + total_partitions: expected_calls, + dynamic_filter, + on_right, + } + } + + /// Create a filter expression from individual partition bounds using OR logic. + /// + /// This creates a filter where each partition's bounds form a conjunction (AND) + /// of column range predicates, and all partitions are combined with OR. + /// + /// For example, with 2 partitions and 2 columns: + /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1) + /// OR + /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1)) + fn create_filter_from_partition_bounds( + &self, + bounds: &[PartitionBounds], + ) -> Result> { + if bounds.is_empty() { + return Ok(lit(true)); + } + + // Create a predicate for each partition + let mut partition_predicates = Vec::with_capacity(bounds.len()); + + for partition_bounds in bounds.iter() { + // Create range predicates for each join key in this partition + let mut column_predicates = Vec::with_capacity(partition_bounds.len()); + + for (col_idx, right_expr) in self.on_right.iter().enumerate() { + if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(column_bounds.min.clone()), + )) as Arc; + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(column_bounds.max.clone()), + )) as Arc; + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc; + column_predicates.push(range_expr); + } + } + + // Combine all column predicates for this partition with AND + if !column_predicates.is_empty() { + let partition_predicate = column_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc + }) + .unwrap(); + partition_predicates.push(partition_predicate); + } + } + + // Combine all partition predicates with OR + let combined_predicate = partition_predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::Or, pred)) + as Arc + }) + .unwrap_or_else(|| lit(true)); + + Ok(combined_predicate) + } + + /// Report bounds from a completed partition and update dynamic filter if all partitions are done + /// + /// This method coordinates the dynamic filter updates across all partitions. It stores the + /// bounds from the current partition, increments the completion counter, and when all + /// partitions have reported, creates an OR'd filter from individual partition bounds. + /// + /// # Arguments + /// * `partition_bounds` - The bounds computed by this partition (if any) + /// + /// # Returns + /// * `Result<()>` - Ok if successful, Err if filter update failed + fn report_partition_bounds( + &self, + partition_bounds: Option>, + ) -> Result<()> { + let mut inner = self.inner.lock(); + + // Store bounds in the accumulator - this runs once per partition + if let Some(bounds) = partition_bounds { + // Only push actual bounds if they exist + inner.bounds.push(PartitionBounds::new(bounds)); + } + + // Increment the completion counter + // Even empty partitions must report to ensure proper termination + inner.completed_partitions += 1; + let completed = inner.completed_partitions; + let total_partitions = self.total_partitions; + + // Critical synchronization point: Only update the filter when ALL partitions are complete + // Troubleshooting: If you see "completed > total_partitions", check partition + // count calculation in new_from_partition_mode() - it may not match actual execution calls + if completed == total_partitions && !inner.bounds.is_empty() { + let filter_expr = self.create_filter_from_partition_bounds(&inner.bounds)?; + self.dynamic_filter.update(filter_expr)?; + } + + Ok(()) + } +} + +impl fmt::Debug for SharedBoundsAccumulator { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SharedBoundsAccumulator") + } +} + /// HashTable and input data for the left (build side) of a join struct JoinLeftData { /// The hash table with indices into `batch` @@ -116,6 +372,8 @@ struct JoinLeftData { /// This could hide potential out-of-memory issues, especially when upstream operators increase their memory consumption. /// The MemoryReservation ensures proper tracking of memory resources throughout the join operation's lifecycle. _reservation: MemoryReservation, + /// Bounds computed from the build side for dynamic filter pushdown + bounds: Option>, } impl JoinLeftData { @@ -127,6 +385,7 @@ impl JoinLeftData { visited_indices_bitmap: SharedBitmapBuilder, probe_threads_counter: AtomicUsize, reservation: MemoryReservation, + bounds: Option>, ) -> Self { Self { hash_map, @@ -135,6 +394,7 @@ impl JoinLeftData { visited_indices_bitmap, probe_threads_counter, _reservation: reservation, + bounds, } } @@ -365,7 +625,12 @@ pub struct HashJoinExec { /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, /// Dynamic filter for pushing down to the probe side + /// Set when dynamic filter pushdown is detected in handle_child_pushdown_result dynamic_filter: Option>, + /// Shared bounds accumulator for coordinating dynamic filter updates across partitions + /// Only created when dynamic filter pushdown is enabled. + /// Lazily initialized at execution time to use actual runtime partition counts + bounds_accumulator: Option>>, } impl fmt::Debug for HashJoinExec { @@ -434,6 +699,9 @@ impl HashJoinExec { projection.as_ref(), )?; + // Initialize both dynamic filter and bounds accumulator to None + // They will be set later if dynamic filtering is enabled + Ok(HashJoinExec { left, right, @@ -450,6 +718,7 @@ impl HashJoinExec { null_equality, cache, dynamic_filter: None, + bounds_accumulator: None, }) } @@ -706,24 +975,10 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::>() .join(", "); - let dynamic_filter_display = match self.dynamic_filter.as_ref() { - Some(dynamic_filter) => match dynamic_filter.current() { - Ok(current) if current != lit(true) => { - format!(", filter=[{current}]") - } - _ => "".to_string(), - }, - None => "".to_string(), - }; write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", - self.mode, - self.join_type, - on, - display_filter, - display_projections, - dynamic_filter_display + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", + self.mode, self.join_type, on, display_filter, display_projections, ) } DisplayFormatType::TreeRender => { @@ -813,25 +1068,45 @@ impl ExecutionPlan for HashJoinExec { vec![&self.left, &self.right] } + /// Creates a new HashJoinExec with different children while preserving configuration. + /// + /// This method is called during query optimization when the optimizer creates new + /// plan nodes. Importantly, it creates a fresh bounds_accumulator via `try_new` + /// rather than cloning the existing one because partitioning may have changed. fn with_new_children( self: Arc, children: Vec>, ) -> Result> { - let new_join = HashJoinExec::try_new( - Arc::clone(&children[0]), - Arc::clone(&children[1]), - self.on.clone(), - self.filter.clone(), - &self.join_type, - self.projection.clone(), - self.mode, - self.null_equality, - )?; - Ok(Arc::new(new_join)) + Ok(Arc::new(HashJoinExec { + left: Arc::clone(&children[0]), + right: Arc::clone(&children[1]), + on: self.on.clone(), + filter: self.filter.clone(), + join_type: self.join_type, + join_schema: Arc::clone(&self.join_schema), + left_fut: Arc::clone(&self.left_fut), + random_state: self.random_state.clone(), + mode: self.mode, + metrics: ExecutionPlanMetricsSet::new(), + projection: self.projection.clone(), + column_indices: self.column_indices.clone(), + null_equality: self.null_equality, + cache: Self::compute_properties( + &children[0], + &children[1], + Arc::clone(&self.join_schema), + self.join_type, + &self.on, + self.mode, + self.projection.as_ref(), + )?, + // Keep the dynamic filter, bounds accumulator will be reset + dynamic_filter: self.dynamic_filter.clone(), + bounds_accumulator: None, + })) } fn reset_state(self: Arc) -> Result> { - // Reset the left_fut to allow re-execution Ok(Arc::new(HashJoinExec { left: Arc::clone(&self.left), right: Arc::clone(&self.right), @@ -839,6 +1114,7 @@ impl ExecutionPlan for HashJoinExec { filter: self.filter.clone(), join_type: self.join_type, join_schema: Arc::clone(&self.join_schema), + // Reset the left_fut to allow re-execution left_fut: Arc::new(OnceAsync::default()), random_state: self.random_state.clone(), mode: self.mode, @@ -847,7 +1123,9 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), + // Reset dynamic filter and bounds accumulator to initial state dynamic_filter: None, + bounds_accumulator: None, })) } @@ -861,11 +1139,6 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.0)) .collect::>(); - let on_right = self - .on - .iter() - .map(|on| Arc::clone(&on.1)) - .collect::>(); let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -884,11 +1157,7 @@ impl ExecutionPlan for HashJoinExec { ); } - let enable_dynamic_filter_pushdown = context - .session_config() - .options() - .optimizer - .enable_dynamic_filter_pushdown; + let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some(); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { @@ -906,10 +1175,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) })?, PartitionMode::Partitioned => { @@ -927,10 +1193,7 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, - enable_dynamic_filter_pushdown - .then_some(self.dynamic_filter.clone()) - .flatten(), - on_right.clone(), + enable_dynamic_filter_pushdown, )) } PartitionMode::Auto => { @@ -943,6 +1206,34 @@ impl ExecutionPlan for HashJoinExec { let batch_size = context.session_config().batch_size(); + // Initialize bounds_accumulator lazily with runtime partition counts (only if enabled) + let bounds_accumulator = if enable_dynamic_filter_pushdown + && self.dynamic_filter.is_some() + { + if let Some(ref bounds_accumulator_oncelock) = self.bounds_accumulator { + let dynamic_filter = Arc::clone(self.dynamic_filter.as_ref().unwrap()); + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + + Some(Arc::clone(bounds_accumulator_oncelock.get_or_init(|| { + Arc::new(SharedBoundsAccumulator::new_from_partition_mode( + self.mode, + self.left.as_ref(), + self.right.as_ref(), + dynamic_filter, + on_right, + )) + }))) + } else { + None + } + } else { + None + }; + // we have the batches and the hash map with their keys. We can how create a stream // over the right that uses this information to issue new batches. let right_stream = self.right.execute(partition, context)?; @@ -956,6 +1247,12 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let on_right = self + .on + .iter() + .map(|(_, right_expr)| Arc::clone(right_expr)) + .collect::>(); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_right, @@ -971,6 +1268,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + bounds_accumulator, })) } @@ -1129,6 +1427,7 @@ impl ExecutionPlan for HashJoinExec { null_equality: self.null_equality, cache: self.cache.clone(), dynamic_filter: Some(dynamic_filter), + bounds_accumulator: Some(OnceLock::new()), }); result = result.with_updated_node(new_node as Arc); } @@ -1138,13 +1437,13 @@ impl ExecutionPlan for HashJoinExec { } /// Compute min/max bounds for each column in the given arrays -fn compute_bounds(arrays: &[ArrayRef]) -> Result> { +fn compute_bounds(arrays: &[ArrayRef]) -> Result> { arrays .iter() .map(|array| { if array.is_empty() { // Return NULL values for empty arrays - return Ok(( + return Ok(ColumnBounds::new( ScalarValue::try_from(array.data_type())?, ScalarValue::try_from(array.data_type())?, )); @@ -1154,14 +1453,40 @@ fn compute_bounds(arrays: &[ArrayRef]) -> Result let min_val = min_batch(array)?; let max_val = max_batch(array)?; - Ok((min_val, max_val)) + Ok(ColumnBounds::new(min_val, max_val)) }) .collect() } -/// Reads the left (build) side of the input, buffering it in memory, to build a -/// hash table (`LeftJoinData`) #[expect(clippy::too_many_arguments)] +/// Collects all batches from the left (build) side stream and creates a hash map for joining. +/// +/// This function is responsible for: +/// 1. Consuming the entire left stream and collecting all batches into memory +/// 2. Building a hash map from the join key columns for efficient probe operations +/// 3. Computing bounds for dynamic filter pushdown (if enabled) +/// 4. Preparing visited indices bitmap for certain join types +/// +/// # Parameters +/// * `random_state` - Random state for consistent hashing across partitions +/// * `left_stream` - Stream of record batches from the build side +/// * `on_left` - Physical expressions for the left side join keys +/// * `metrics` - Metrics collector for tracking memory usage and row counts +/// * `reservation` - Memory reservation tracker for the hash table and data +/// * `with_visited_indices_bitmap` - Whether to track visited indices (for outer joins) +/// * `probe_threads_count` - Number of threads that will probe this hash table +/// * `should_compute_bounds` - Whether to compute min/max bounds for dynamic filtering +/// +/// # Dynamic Filter Coordination +/// When `should_compute_bounds` is true, this function computes the min/max bounds +/// for each join key column but does NOT update the dynamic filter. Instead, the +/// bounds are stored in the returned `JoinLeftData` and later coordinated by +/// `SharedBoundsAccumulator` to ensure all partitions contribute their bounds +/// before updating the filter exactly once. +/// +/// # Returns +/// `JoinLeftData` containing the hash map, consolidated batch, join key values, +/// visited indices bitmap, and computed bounds (if requested). async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1170,8 +1495,7 @@ async fn collect_left_input( reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, - dynamic_filter: Option>, - on_right: Vec, + should_compute_bounds: bool, ) -> Result { let schema = left_stream.schema(); @@ -1261,6 +1585,13 @@ async fn collect_left_input( }) .collect::>>()?; + // Compute bounds for dynamic filter if enabled + let bounds = if should_compute_bounds && num_rows > 0 { + Some(compute_bounds(&left_values)?) + } else { + None + }; + let data = JoinLeftData::new( hashmap, single_batch, @@ -1268,49 +1599,9 @@ async fn collect_left_input( Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), reservation, + bounds, ); - // Update dynamic filter with min/max bounds if provided - if let Some(dynamic_filter) = dynamic_filter { - if num_rows > 0 { - let bounds = compute_bounds(&left_values)?; - - // Create range predicates for each join key - let mut predicates = Vec::with_capacity(bounds.len()); - for ((min_val, max_val), right_expr) in bounds.iter().zip(on_right.iter()) { - // Create predicate: col >= min AND col <= max - let min_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::GtEq, - lit(min_val.clone()), - )) as Arc; - - let max_expr = Arc::new(BinaryExpr::new( - Arc::clone(right_expr), - Operator::LtEq, - lit(max_val.clone()), - )) as Arc; - - let range_expr = - Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) - as Arc; - - predicates.push(range_expr); - } - - // Combine all predicates with AND - let combined_predicate = predicates - .into_iter() - .reduce(|acc, pred| { - Arc::new(BinaryExpr::new(acc, Operator::And, pred)) - as Arc - }) - .unwrap_or_else(|| lit(true)); - - dynamic_filter.update(combined_predicate)?; - } - } - Ok(data) } @@ -1506,6 +1797,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// Shared bounds accumulator for coordinating dynamic filter updates (optional) + bounds_accumulator: Option>, } impl RecordBatchStream for HashJoinStream { @@ -1695,6 +1988,14 @@ impl HashJoinStream { .get_shared(cx))?; build_timer.done(); + // Handle dynamic filter bounds accumulation + // + // Dynamic filter coordination between partitions: + // Report bounds to the accumulator which will handle synchronization and filter updates + if let Some(ref bounds_accumulator) = self.bounds_accumulator { + bounds_accumulator.report_partition_bounds(left_data.bounds.clone())?; + } + self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); diff --git a/datafusion/sqllogictest/test_files/push_down_filter.slt b/datafusion/sqllogictest/test_files/push_down_filter.slt index c999aa71fe5b..a7e3338df367 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter.slt @@ -320,3 +320,78 @@ drop table large_table; statement ok drop table t; + +# Regression test for https://github.com/apache/datafusion/issues/17188 +query I +COPY (select i as k from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t1.parquet' +STORED AS PARQUET; +---- +10000000 + +query I +COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) +TO 'test_files/scratch/push_down_filter/t2.parquet' +STORED AS PARQUET; +---- +10000000 + +statement ok +create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; + +statement ok +create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; + +# The failure before https://github.com/apache/datafusion/pull/17197 was non-deterministic and random +# So we'll run the same query a couple of times just to have more certainty it's fixed +# Sorry about the spam in this slt test... + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000 + +query III rowsort +select * +from t1 +join t2 on t1.k = t2.k +where v = 1 or v = 10000000 +order by t1.k, t2.v; +---- +1 1 1 +10000000 10000000 10000000