@@ -49,6 +49,7 @@ use datafusion_physical_plan::{
4949 aggregates:: { AggregateExec , AggregateMode , PhysicalGroupBy } ,
5050 coalesce_batches:: CoalesceBatchesExec ,
5151 coalesce_partitions:: CoalescePartitionsExec ,
52+ collect,
5253 filter:: FilterExec ,
5354 repartition:: RepartitionExec ,
5455 sorts:: sort:: SortExec ,
@@ -1095,34 +1096,45 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
10951096 let cb =
10961097 Arc :: new ( CoalesceBatchesExec :: new ( hash_join, 8192 ) ) as Arc < dyn ExecutionPlan > ;
10971098 // Top-level CoalesceParititionsExec
1098- let plan = Arc :: new ( CoalescePartitionsExec :: new ( cb) ) as Arc < dyn ExecutionPlan > ;
1099+ let cp = Arc :: new ( CoalescePartitionsExec :: new ( cb) ) as Arc < dyn ExecutionPlan > ;
1100+ // Add a sort for determistic output
1101+ let plan = Arc :: new ( SortExec :: new (
1102+ LexOrdering :: new ( vec ! [ PhysicalSortExpr :: new(
1103+ col( "a" , & probe_side_schema) . unwrap( ) ,
1104+ SortOptions :: new( true , false ) , // descending, nulls_first
1105+ ) ] )
1106+ . unwrap ( ) ,
1107+ cp,
1108+ ) ) as Arc < dyn ExecutionPlan > ;
10991109
11001110 // expect the predicate to be pushed down into the probe side DataSource
11011111 insta:: assert_snapshot!(
11021112 OptimizationTest :: new( Arc :: clone( & plan) , FilterPushdown :: new_post_optimization( ) , true ) ,
11031113 @r"
11041114 OptimizationTest:
11051115 input:
1106- - CoalescePartitionsExec
1107- - CoalesceBatchesExec: target_batch_size=8192
1108- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1109- - CoalesceBatchesExec: target_batch_size=8192
1110- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1111- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1112- - CoalesceBatchesExec: target_batch_size=8192
1113- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1114- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
1116+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1117+ - CoalescePartitionsExec
1118+ - CoalesceBatchesExec: target_batch_size=8192
1119+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1120+ - CoalesceBatchesExec: target_batch_size=8192
1121+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1122+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1123+ - CoalesceBatchesExec: target_batch_size=8192
1124+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1125+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true
11151126 output:
11161127 Ok:
1117- - CoalescePartitionsExec
1118- - CoalesceBatchesExec: target_batch_size=8192
1119- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1120- - CoalesceBatchesExec: target_batch_size=8192
1121- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1122- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1123- - CoalesceBatchesExec: target_batch_size=8192
1124- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1125- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
1128+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1129+ - CoalescePartitionsExec
1130+ - CoalesceBatchesExec: target_batch_size=8192
1131+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1132+ - CoalesceBatchesExec: target_batch_size=8192
1133+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1134+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1135+ - CoalesceBatchesExec: target_batch_size=8192
1136+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1137+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
11261138 "
11271139 ) ;
11281140
@@ -1141,27 +1153,58 @@ async fn test_hashjoin_dynamic_filter_pushdown_partitioned() {
11411153 ) ;
11421154 let state = session_ctx. state ( ) ;
11431155 let task_ctx = state. task_ctx ( ) ;
1144- let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
1145- // Iterate one batch
1146- if let Some ( batch_result) = stream. next ( ) . await {
1147- batch_result. unwrap ( ) ;
1148- }
1156+ let batches = collect ( Arc :: clone ( & plan) , Arc :: clone ( & task_ctx) )
1157+ . await
1158+ . unwrap ( ) ;
11491159
11501160 // Now check what our filter looks like
1161+ #[ cfg( not( feature = "force_hash_collisions" ) ) ]
11511162 insta:: assert_snapshot!(
11521163 format!( "{}" , format_plan_for_test( & plan) ) ,
11531164 @r"
1154- - CoalescePartitionsExec
1155- - CoalesceBatchesExec: target_batch_size=8192
1156- - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1157- - CoalesceBatchesExec: target_batch_size=8192
1158- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1159- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1160- - CoalesceBatchesExec: target_batch_size=8192
1161- - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1162- - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
1165+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1166+ - CoalescePartitionsExec
1167+ - CoalesceBatchesExec: target_batch_size=8192
1168+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1169+ - CoalesceBatchesExec: target_batch_size=8192
1170+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1171+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1172+ - CoalesceBatchesExec: target_batch_size=8192
1173+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1174+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ]
11631175 "
11641176 ) ;
1177+
1178+ #[ cfg( feature = "force_hash_collisions" ) ]
1179+ insta:: assert_snapshot!(
1180+ format!( "{}" , format_plan_for_test( & plan) ) ,
1181+ @r"
1182+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
1183+ - CoalescePartitionsExec
1184+ - CoalesceBatchesExec: target_batch_size=8192
1185+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)]
1186+ - CoalesceBatchesExec: target_batch_size=8192
1187+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1188+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
1189+ - CoalesceBatchesExec: target_batch_size=8192
1190+ - RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1
1191+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ]
1192+ "
1193+ ) ;
1194+
1195+ let result = format ! ( "{}" , pretty_format_batches( & batches) . unwrap( ) ) ;
1196+
1197+ insta:: assert_snapshot!(
1198+ result,
1199+ @r"
1200+ +----+----+-----+----+----+-----+
1201+ | a | b | c | a | b | e |
1202+ +----+----+-----+----+----+-----+
1203+ | ab | bb | 2.0 | ab | bb | 2.0 |
1204+ | aa | ba | 1.0 | aa | ba | 1.0 |
1205+ +----+----+-----+----+----+-----+
1206+ " ,
1207+ ) ;
11651208}
11661209
11671210#[ tokio:: test]
0 commit comments