Skip to content

Commit 49d49fd

Browse files
authored
fix: Remove duplicate expressions in dynamicFilter when PartitionMode is CollectLeft` (#17551)
1 parent c61ac33 commit 49d49fd

File tree

4 files changed

+214
-10
lines changed

4 files changed

+214
-10
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,183 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
12191219
);
12201220
}
12211221

1222+
#[tokio::test]
1223+
async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
1224+
use datafusion_common::JoinType;
1225+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
1226+
1227+
let build_batches = vec![record_batch!(
1228+
("a", Utf8, ["aa", "ab"]),
1229+
("b", Utf8, ["ba", "bb"]),
1230+
("c", Float64, [1.0, 2.0]) // Extra column not used in join
1231+
)
1232+
.unwrap()];
1233+
let build_side_schema = Arc::new(Schema::new(vec![
1234+
Field::new("a", DataType::Utf8, false),
1235+
Field::new("b", DataType::Utf8, false),
1236+
Field::new("c", DataType::Float64, false),
1237+
]));
1238+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
1239+
.with_support(true)
1240+
.with_batches(build_batches)
1241+
.build();
1242+
1243+
// Create probe side with more values
1244+
let probe_batches = vec![record_batch!(
1245+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
1246+
("b", Utf8, ["ba", "bb", "bc", "bd"]),
1247+
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
1248+
)
1249+
.unwrap()];
1250+
let probe_side_schema = Arc::new(Schema::new(vec![
1251+
Field::new("a", DataType::Utf8, false),
1252+
Field::new("b", DataType::Utf8, false),
1253+
Field::new("e", DataType::Float64, false),
1254+
]));
1255+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
1256+
.with_support(true)
1257+
.with_batches(probe_batches)
1258+
.build();
1259+
1260+
// Create RepartitionExec nodes for both sides with hash partitioning on join keys
1261+
let partition_count = 12;
1262+
1263+
// Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec
1264+
let probe_hash_exprs = vec![
1265+
col("a", &probe_side_schema).unwrap(),
1266+
col("b", &probe_side_schema).unwrap(),
1267+
];
1268+
let probe_repartition = Arc::new(
1269+
RepartitionExec::try_new(
1270+
Arc::clone(&probe_scan),
1271+
Partitioning::Hash(probe_hash_exprs, partition_count), // create multi partitions on probSide
1272+
)
1273+
.unwrap(),
1274+
);
1275+
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));
1276+
1277+
let on = vec![
1278+
(
1279+
col("a", &build_side_schema).unwrap(),
1280+
col("a", &probe_side_schema).unwrap(),
1281+
),
1282+
(
1283+
col("b", &build_side_schema).unwrap(),
1284+
col("b", &probe_side_schema).unwrap(),
1285+
),
1286+
];
1287+
let hash_join = Arc::new(
1288+
HashJoinExec::try_new(
1289+
build_scan,
1290+
probe_coalesce,
1291+
on,
1292+
None,
1293+
&JoinType::Inner,
1294+
None,
1295+
PartitionMode::CollectLeft,
1296+
datafusion_common::NullEquality::NullEqualsNothing,
1297+
)
1298+
.unwrap(),
1299+
);
1300+
1301+
// Top-level CoalesceBatchesExec
1302+
let cb =
1303+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
1304+
// Top-level CoalescePartitionsExec
1305+
let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
1306+
// Add a sort for determistic output
1307+
let plan = Arc::new(SortExec::new(
1308+
LexOrdering::new(vec![PhysicalSortExpr::new(
1309+
col("a", &probe_side_schema).unwrap(),
1310+
SortOptions::new(true, false), // descending, nulls_first
1311+
)])
1312+
.unwrap(),
1313+
cp,
1314+
)) as Arc<dyn ExecutionPlan>;
1315+
1316+
// expect the predicate to be pushed down into the probe side DataSource
1317+
insta::assert_snapshot!(
1318+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
1319+
@r"
1320+
OptimizationTest:
1321+
input:
1322+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1323+
- CoalescePartitionsExec
1324+
- CoalesceBatchesExec: target_batch_size=8192
1325+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1326+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1327+
- CoalesceBatchesExec: target_batch_size=8192
1328+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1329+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
1330+
output:
1331+
Ok:
1332+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1333+
- CoalescePartitionsExec
1334+
- CoalesceBatchesExec: target_batch_size=8192
1335+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1336+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1337+
- CoalesceBatchesExec: target_batch_size=8192
1338+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1339+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1340+
"
1341+
);
1342+
1343+
// Actually apply the optimization to the plan and execute to see the filter in action
1344+
let mut config = ConfigOptions::default();
1345+
config.execution.parquet.pushdown_filters = true;
1346+
config.optimizer.enable_dynamic_filter_pushdown = true;
1347+
let plan = FilterPushdown::new_post_optimization()
1348+
.optimize(plan, &config)
1349+
.unwrap();
1350+
let config = SessionConfig::new().with_batch_size(10);
1351+
let session_ctx = SessionContext::new_with_config(config);
1352+
session_ctx.register_object_store(
1353+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
1354+
Arc::new(InMemory::new()),
1355+
);
1356+
let state = session_ctx.state();
1357+
let task_ctx = state.task_ctx();
1358+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
1359+
.await
1360+
.unwrap();
1361+
1362+
// Now check what our filter looks like
1363+
insta::assert_snapshot!(
1364+
format!("{}", format_plan_for_test(&plan)),
1365+
@r"
1366+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1367+
- CoalescePartitionsExec
1368+
- CoalesceBatchesExec: target_batch_size=8192
1369+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1370+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1371+
- CoalesceBatchesExec: target_batch_size=8192
1372+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1373+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1374+
"
1375+
);
1376+
1377+
let result = format!("{}", pretty_format_batches(&batches).unwrap());
1378+
1379+
let probe_scan_metrics = probe_scan.metrics().unwrap();
1380+
1381+
// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
1382+
// The number of output rows from the probe side scan should stay consistent across executions.
1383+
// Issue: https://github.com/apache/datafusion/issues/17451
1384+
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
1385+
1386+
insta::assert_snapshot!(
1387+
result,
1388+
@r"
1389+
+----+----+-----+----+----+-----+
1390+
| a | b | c | a | b | e |
1391+
+----+----+-----+----+----+-----+
1392+
| ab | bb | 2.0 | ab | bb | 2.0 |
1393+
| aa | ba | 1.0 | aa | ba | 1.0 |
1394+
+----+----+-----+----+----+-----+
1395+
",
1396+
);
1397+
}
1398+
12221399
#[tokio::test]
12231400
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
12241401
use datafusion_common::JoinType;

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ impl ExecutionPlan for HashJoinExec {
10201020
vec![],
10211021
self.right.output_ordering().is_some(),
10221022
bounds_accumulator,
1023+
self.mode,
10231024
)))
10241025
}
10251026

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,10 @@ impl SharedBoundsAccumulator {
128128
/// ## Partition Mode Execution Patterns
129129
///
130130
/// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
131-
/// across all output partitions. Each output partition calls `collect_build_side` to access
132-
/// the shared build data. Expected calls = number of output partitions.
131+
/// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
132+
/// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely.
133+
/// Expected calls = number of output partitions.
134+
///
133135
///
134136
/// - **Partitioned**: Each partition independently builds its own hash table by calling
135137
/// `collect_build_side` once. Expected calls = number of build partitions.
@@ -260,22 +262,34 @@ impl SharedBoundsAccumulator {
260262
/// consider making the resulting future shared so the ready result can be reused.
261263
///
262264
/// # Arguments
263-
/// * `partition` - The partition identifier reporting its bounds
265+
/// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
264266
/// * `partition_bounds` - The bounds computed by this partition (if any)
265267
///
266268
/// # Returns
267269
/// * `Result<()>` - Ok if successful, Err if filter update failed
268270
pub(crate) async fn report_partition_bounds(
269271
&self,
270-
partition: usize,
272+
left_side_partition_id: usize,
271273
partition_bounds: Option<Vec<ColumnBounds>>,
272274
) -> Result<()> {
273275
// Store bounds in the accumulator - this runs once per partition
274276
if let Some(bounds) = partition_bounds {
275-
self.inner
276-
.lock()
277-
.bounds
278-
.push(PartitionBounds::new(partition, bounds));
277+
let mut guard = self.inner.lock();
278+
279+
let should_push = if let Some(last_bound) = guard.bounds.last() {
280+
// In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
281+
// Since this function can be called multiple times for that same partition, we must deduplicate
282+
// by checking against the last recorded bound.
283+
last_bound.partition != left_side_partition_id
284+
} else {
285+
true
286+
};
287+
288+
if should_push {
289+
guard
290+
.bounds
291+
.push(PartitionBounds::new(left_side_partition_id, bounds));
292+
}
279293
}
280294

281295
if self.barrier.wait().await.is_leader() {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
2828
use crate::joins::utils::{
2929
equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
3030
};
31+
use crate::joins::PartitionMode;
3132
use crate::{
3233
handle_state,
3334
hash_utils::create_hashes,
@@ -210,6 +211,9 @@ pub(super) struct HashJoinStream {
210211
/// Optional future to signal when bounds have been reported by all partitions
211212
/// and the dynamic filter has been updated
212213
bounds_waiter: Option<OnceFut<()>>,
214+
215+
/// Partitioning mode to use
216+
mode: PartitionMode,
213217
}
214218

215219
impl RecordBatchStream for HashJoinStream {
@@ -312,6 +316,7 @@ impl HashJoinStream {
312316
hashes_buffer: Vec<u64>,
313317
right_side_ordered: bool,
314318
bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
319+
mode: PartitionMode,
315320
) -> Self {
316321
Self {
317322
partition,
@@ -331,6 +336,7 @@ impl HashJoinStream {
331336
right_side_ordered,
332337
bounds_accumulator,
333338
bounds_waiter: None,
339+
mode,
334340
}
335341
}
336342

@@ -406,11 +412,17 @@ impl HashJoinStream {
406412
// Report bounds to the accumulator which will handle synchronization and filter updates
407413
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
408414
let bounds_accumulator = Arc::clone(bounds_accumulator);
409-
let partition = self.partition;
415+
416+
let left_side_partition_id = match self.mode {
417+
PartitionMode::Partitioned => self.partition,
418+
PartitionMode::CollectLeft => 0,
419+
PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
420+
};
421+
410422
let left_data_bounds = left_data.bounds.clone();
411423
self.bounds_waiter = Some(OnceFut::new(async move {
412424
bounds_accumulator
413-
.report_partition_bounds(partition, left_data_bounds)
425+
.report_partition_bounds(left_side_partition_id, left_data_bounds)
414426
.await
415427
}));
416428
self.state = HashJoinStreamState::WaitPartitionBoundsReport;

0 commit comments

Comments
 (0)