Skip to content

Commit bd49372

Browse files
committed
fix: Prevent duplicate expressions in DynamicPhysicalExpr
1 parent bfc5067 commit bd49372

File tree

4 files changed

+215
-10
lines changed

4 files changed

+215
-10
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,6 +1219,184 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
12191219
);
12201220
}
12211221

1222+
#[tokio::test]
1223+
async fn test_hashjoin_dynamic_filter_pushdown_collect_left() {
1224+
use datafusion_common::JoinType;
1225+
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
1226+
1227+
let build_batches = vec![record_batch!(
1228+
("a", Utf8, ["aa", "ab"]),
1229+
("b", Utf8, ["ba", "bb"]),
1230+
("c", Float64, [1.0, 2.0]) // Extra column not used in join
1231+
)
1232+
.unwrap()];
1233+
let build_side_schema = Arc::new(Schema::new(vec![
1234+
Field::new("a", DataType::Utf8, false),
1235+
Field::new("b", DataType::Utf8, false),
1236+
Field::new("c", DataType::Float64, false),
1237+
]));
1238+
let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
1239+
.with_support(true)
1240+
.with_batches(build_batches)
1241+
.build();
1242+
1243+
// Create probe side with more values
1244+
let probe_batches = vec![record_batch!(
1245+
("a", Utf8, ["aa", "ab", "ac", "ad"]),
1246+
("b", Utf8, ["ba", "bb", "bc", "bd"]),
1247+
("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
1248+
)
1249+
.unwrap()];
1250+
let probe_side_schema = Arc::new(Schema::new(vec![
1251+
Field::new("a", DataType::Utf8, false),
1252+
Field::new("b", DataType::Utf8, false),
1253+
Field::new("e", DataType::Float64, false),
1254+
]));
1255+
let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
1256+
.with_support(true)
1257+
.with_batches(probe_batches)
1258+
.build();
1259+
1260+
// Create RepartitionExec nodes for both sides with hash partitioning on join keys
1261+
let partition_count = 12;
1262+
1263+
// Probe side: DataSource -> RepartitionExec(Hash) -> CoalesceBatchesExec
1264+
let probe_hash_exprs = vec![
1265+
col("a", &probe_side_schema).unwrap(),
1266+
col("b", &probe_side_schema).unwrap(),
1267+
];
1268+
let probe_repartition = Arc::new(
1269+
RepartitionExec::try_new(
1270+
Arc::clone(&probe_scan),
1271+
Partitioning::Hash(probe_hash_exprs, partition_count), // create multi partitions on probSide
1272+
)
1273+
.unwrap(),
1274+
);
1275+
let probe_coalesce = Arc::new(CoalesceBatchesExec::new(probe_repartition, 8192));
1276+
1277+
// Create HashJoinExec with partitioned inputs
1278+
let on = vec![
1279+
(
1280+
col("a", &build_side_schema).unwrap(),
1281+
col("a", &probe_side_schema).unwrap(),
1282+
),
1283+
(
1284+
col("b", &build_side_schema).unwrap(),
1285+
col("b", &probe_side_schema).unwrap(),
1286+
),
1287+
];
1288+
let hash_join = Arc::new(
1289+
HashJoinExec::try_new(
1290+
build_scan,
1291+
probe_coalesce,
1292+
on,
1293+
None,
1294+
&JoinType::Inner,
1295+
None,
1296+
PartitionMode::CollectLeft,
1297+
datafusion_common::NullEquality::NullEqualsNothing,
1298+
)
1299+
.unwrap(),
1300+
);
1301+
1302+
// Top-level CoalesceBatchesExec
1303+
let cb =
1304+
Arc::new(CoalesceBatchesExec::new(hash_join, 8192)) as Arc<dyn ExecutionPlan>;
1305+
// Top-level CoalescePartitionsExec
1306+
let cp = Arc::new(CoalescePartitionsExec::new(cb)) as Arc<dyn ExecutionPlan>;
1307+
// Add a sort for determistic output
1308+
let plan = Arc::new(SortExec::new(
1309+
LexOrdering::new(vec![PhysicalSortExpr::new(
1310+
col("a", &probe_side_schema).unwrap(),
1311+
SortOptions::new(true, false), // descending, nulls_first
1312+
)])
1313+
.unwrap(),
1314+
cp,
1315+
)) as Arc<dyn ExecutionPlan>;
1316+
1317+
// expect the predicate to be pushed down into the probe side DataSource
1318+
insta::assert_snapshot!(
1319+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
1320+
@r"
1321+
OptimizationTest:
1322+
input:
1323+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1324+
- CoalescePartitionsExec
1325+
- CoalesceBatchesExec: target_batch_size=8192
1326+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1327+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1328+
- CoalesceBatchesExec: target_batch_size=8192
1329+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1330+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
1331+
output:
1332+
Ok:
1333+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1334+
- CoalescePartitionsExec
1335+
- CoalesceBatchesExec: target_batch_size=8192
1336+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1337+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1338+
- CoalesceBatchesExec: target_batch_size=8192
1339+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1340+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1341+
"
1342+
);
1343+
1344+
// Actually apply the optimization to the plan and execute to see the filter in action
1345+
let mut config = ConfigOptions::default();
1346+
config.execution.parquet.pushdown_filters = true;
1347+
config.optimizer.enable_dynamic_filter_pushdown = true;
1348+
let plan = FilterPushdown::new_post_optimization()
1349+
.optimize(plan, &config)
1350+
.unwrap();
1351+
let config = SessionConfig::new().with_batch_size(10);
1352+
let session_ctx = SessionContext::new_with_config(config);
1353+
session_ctx.register_object_store(
1354+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
1355+
Arc::new(InMemory::new()),
1356+
);
1357+
let state = session_ctx.state();
1358+
let task_ctx = state.task_ctx();
1359+
let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
1360+
.await
1361+
.unwrap();
1362+
1363+
// Now check what our filter looks like
1364+
insta::assert_snapshot!(
1365+
format!("{}", format_plan_for_test(&plan)),
1366+
@r"
1367+
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1368+
- CoalescePartitionsExec
1369+
- CoalesceBatchesExec: target_batch_size=8192
1370+
- HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1371+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1372+
- CoalesceBatchesExec: target_batch_size=8192
1373+
- RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1374+
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1375+
"
1376+
);
1377+
1378+
let result = format!("{}", pretty_format_batches(&batches).unwrap());
1379+
1380+
let probe_scan_metrics = probe_scan.metrics().unwrap();
1381+
1382+
// The probe side had 4 rows, but after applying the dynamic filter only 2 rows should remain.
1383+
// The number of output rows from the probe side scan should stay consistent across executions.
1384+
// Issue: https://github.com/apache/datafusion/issues/17451
1385+
assert_eq!(probe_scan_metrics.output_rows().unwrap(), 2);
1386+
1387+
insta::assert_snapshot!(
1388+
result,
1389+
@r"
1390+
+----+----+-----+----+----+-----+
1391+
| a | b | c | a | b | e |
1392+
+----+----+-----+----+----+-----+
1393+
| ab | bb | 2.0 | ab | bb | 2.0 |
1394+
| aa | ba | 1.0 | aa | ba | 1.0 |
1395+
+----+----+-----+----+----+-----+
1396+
",
1397+
);
1398+
}
1399+
12221400
#[tokio::test]
12231401
async fn test_nested_hashjoin_dynamic_filter_pushdown() {
12241402
use datafusion_common::JoinType;

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ impl ExecutionPlan for HashJoinExec {
10201020
vec![],
10211021
self.right.output_ordering().is_some(),
10221022
bounds_accumulator,
1023+
self.mode,
10231024
)))
10241025
}
10251026

datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,10 @@ impl SharedBoundsAccumulator {
128128
/// ## Partition Mode Execution Patterns
129129
///
130130
/// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
131-
/// across all output partitions. Each output partition calls `collect_build_side` to access
132-
/// the shared build data. Expected calls = number of output partitions.
131+
/// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
132+
/// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely.
133+
/// Expected calls = number of output partitions.
134+
///
133135
///
134136
/// - **Partitioned**: Each partition independently builds its own hash table by calling
135137
/// `collect_build_side` once. Expected calls = number of build partitions.
@@ -260,22 +262,34 @@ impl SharedBoundsAccumulator {
260262
/// consider making the resulting future shared so the ready result can be reused.
261263
///
262264
/// # Arguments
263-
/// * `partition` - The partition identifier reporting its bounds
265+
/// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
264266
/// * `partition_bounds` - The bounds computed by this partition (if any)
265267
///
266268
/// # Returns
267269
/// * `Result<()>` - Ok if successful, Err if filter update failed
268270
pub(crate) async fn report_partition_bounds(
269271
&self,
270-
partition: usize,
272+
left_side_partition_id: usize,
271273
partition_bounds: Option<Vec<ColumnBounds>>,
272274
) -> Result<()> {
273275
// Store bounds in the accumulator - this runs once per partition
274276
if let Some(bounds) = partition_bounds {
275-
self.inner
276-
.lock()
277-
.bounds
278-
.push(PartitionBounds::new(partition, bounds));
277+
let mut guard = self.inner.lock();
278+
279+
let should_push = if let Some(last_bound) = guard.bounds.last() {
280+
// In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
281+
// Since this function can be called multiple times for that same partition, we must deduplicate
282+
// by checking against the last recorded bound.
283+
last_bound.partition != left_side_partition_id
284+
} else {
285+
true
286+
};
287+
288+
if should_push {
289+
guard
290+
.bounds
291+
.push(PartitionBounds::new(left_side_partition_id, bounds));
292+
}
279293
}
280294

281295
if self.barrier.wait().await.is_leader() {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use crate::joins::hash_join::shared_bounds::SharedBoundsAccumulator;
2828
use crate::joins::utils::{
2929
equal_rows_arr, get_final_indices_from_shared_bitmap, OnceFut,
3030
};
31+
use crate::joins::PartitionMode;
3132
use crate::{
3233
handle_state,
3334
hash_utils::create_hashes,
@@ -210,6 +211,9 @@ pub(super) struct HashJoinStream {
210211
/// Optional future to signal when bounds have been reported by all partitions
211212
/// and the dynamic filter has been updated
212213
bounds_waiter: Option<OnceFut<()>>,
214+
215+
/// Partitioning mode to use
216+
mode: PartitionMode,
213217
}
214218

215219
impl RecordBatchStream for HashJoinStream {
@@ -312,6 +316,7 @@ impl HashJoinStream {
312316
hashes_buffer: Vec<u64>,
313317
right_side_ordered: bool,
314318
bounds_accumulator: Option<Arc<SharedBoundsAccumulator>>,
319+
mode: PartitionMode,
315320
) -> Self {
316321
Self {
317322
partition,
@@ -331,6 +336,7 @@ impl HashJoinStream {
331336
right_side_ordered,
332337
bounds_accumulator,
333338
bounds_waiter: None,
339+
mode,
334340
}
335341
}
336342

@@ -406,11 +412,17 @@ impl HashJoinStream {
406412
// Report bounds to the accumulator which will handle synchronization and filter updates
407413
if let Some(ref bounds_accumulator) = self.bounds_accumulator {
408414
let bounds_accumulator = Arc::clone(bounds_accumulator);
409-
let partition = self.partition;
415+
416+
let left_side_partition_id = match self.mode {
417+
PartitionMode::Partitioned => self.partition,
418+
PartitionMode::CollectLeft => 0,
419+
PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
420+
};
421+
410422
let left_data_bounds = left_data.bounds.clone();
411423
self.bounds_waiter = Some(OnceFut::new(async move {
412424
bounds_accumulator
413-
.report_partition_bounds(partition, left_data_bounds)
425+
.report_partition_bounds(left_side_partition_id, left_data_bounds)
414426
.await
415427
}));
416428
self.state = HashJoinStreamState::WaitPartitionBoundsReport;

0 commit comments

Comments
 (0)