@@ -48,6 +48,7 @@ use datafusion_physical_optimizer::{
4848use  datafusion_physical_plan:: { 
4949    aggregates:: { AggregateExec ,  AggregateMode ,  PhysicalGroupBy } , 
5050    coalesce_batches:: CoalesceBatchesExec , 
51+     coalesce_partitions:: CoalescePartitionsExec , 
5152    filter:: FilterExec , 
5253    repartition:: RepartitionExec , 
5354    sorts:: sort:: SortExec , 
@@ -267,7 +268,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() {
267268        format_plan_for_test( & plan) , 
268269        @r" 
269270    - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] 
270-     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab]  
271+     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] 
271272    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
272273    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] 
273274    " 
@@ -890,7 +891,7 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
890891            None , 
891892            & JoinType :: Inner , 
892893            None , 
893-             PartitionMode :: Partitioned , 
894+             PartitionMode :: CollectLeft , 
894895            datafusion_common:: NullEquality :: NullEqualsNothing , 
895896        ) 
896897        . unwrap ( ) , 
@@ -902,12 +903,12 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
902903        @r" 
903904    OptimizationTest: 
904905      input: 
905-         - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
906+         - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
906907        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
907908        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true 
908909      output: 
909910        Ok: 
910-           - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
911+           - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
911912          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
912913          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] 
913914    " , 
@@ -936,13 +937,233 @@ async fn test_hashjoin_dynamic_filter_pushdown() {
936937    insta:: assert_snapshot!( 
937938        format!( "{}" ,  format_plan_for_test( & plan) ) , 
938939        @r" 
939-     - HashJoinExec: mode=Partitioned , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] 
940+     - HashJoinExec: mode=CollectLeft , join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
940941    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
941942    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] 
942943    " 
943944    ) ; 
944945} 
945946
947+ #[ tokio:: test]  
948+ async  fn  test_hashjoin_dynamic_filter_pushdown_partitioned ( )  { 
949+     use  datafusion_common:: JoinType ; 
950+     use  datafusion_physical_plan:: joins:: { HashJoinExec ,  PartitionMode } ; 
951+ 
952+     // Rouugh plan we're trying to recreate: 
953+     // COPY (select i as k from generate_series(1, 10000000) as t(i)) 
954+     // TO 'test_files/scratch/push_down_filter/t1.parquet' 
955+     // STORED AS PARQUET; 
956+     // COPY (select i as k, i as v from generate_series(1, 10000000) as t(i)) 
957+     // TO 'test_files/scratch/push_down_filter/t2.parquet' 
958+     // STORED AS PARQUET; 
959+     // create external table t1 stored as parquet location 'test_files/scratch/push_down_filter/t1.parquet'; 
960+     // create external table t2 stored as parquet location 'test_files/scratch/push_down_filter/t2.parquet'; 
961+     // explain 
962+     // select * 
963+     // from t1 
964+     // join t2 on t1.k = t2.k; 
965+     // +---------------+------------------------------------------------------------+ 
966+     // | plan_type     | plan                                                       | 
967+     // +---------------+------------------------------------------------------------+ 
968+     // | physical_plan | ┌───────────────────────────┐                              | 
969+     // |               | │    CoalesceBatchesExec    │                              | 
970+     // |               | │    --------------------   │                              | 
971+     // |               | │     target_batch_size:    │                              | 
972+     // |               | │            8192           │                              | 
973+     // |               | └─────────────┬─────────────┘                              | 
974+     // |               | ┌─────────────┴─────────────┐                              | 
975+     // |               | │        HashJoinExec       │                              | 
976+     // |               | │    --------------------   ├──────────────┐               | 
977+     // |               | │        on: (k = k)        │              │               | 
978+     // |               | └─────────────┬─────────────┘              │               | 
979+     // |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | 
980+     // |               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec    │ | 
981+     // |               | │    --------------------   ││    --------------------   │ | 
982+     // |               | │     target_batch_size:    ││     target_batch_size:    │ | 
983+     // |               | │            8192           ││            8192           │ | 
984+     // |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ | 
985+     // |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | 
986+     // |               | │      RepartitionExec      ││      RepartitionExec      │ | 
987+     // |               | │    --------------------   ││    --------------------   │ | 
988+     // |               | │ partition_count(in->out): ││ partition_count(in->out): │ | 
989+     // |               | │          12 -> 12         ││          12 -> 12         │ | 
990+     // |               | │                           ││                           │ | 
991+     // |               | │    partitioning_scheme:   ││    partitioning_scheme:   │ | 
992+     // |               | │      Hash([k@0], 12)      ││      Hash([k@0], 12)      │ | 
993+     // |               | └─────────────┬─────────────┘└─────────────┬─────────────┘ | 
994+     // |               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | 
995+     // |               | │       DataSourceExec      ││       DataSourceExec      │ | 
996+     // |               | │    --------------------   ││    --------------------   │ | 
997+     // |               | │         files: 12         ││         files: 12         │ | 
998+     // |               | │      format: parquet      ││      format: parquet      │ | 
999+     // |               | │                           ││      predicate: true      │ | 
1000+     // |               | └───────────────────────────┘└───────────────────────────┘ | 
1001+     // |               |                                                            | 
1002+     // +---------------+------------------------------------------------------------+ 
1003+ 
1004+     // Create build side with limited values 
1005+     let  build_batches = vec ! [ record_batch!( 
1006+         ( "a" ,  Utf8 ,  [ "aa" ,  "ab" ] ) , 
1007+         ( "b" ,  Utf8 ,  [ "ba" ,  "bb" ] ) , 
1008+         ( "c" ,  Float64 ,  [ 1.0 ,  2.0 ] )  // Extra column not used in join 
1009+     ) 
1010+     . unwrap( ) ] ; 
1011+     let  build_side_schema = Arc :: new ( Schema :: new ( vec ! [ 
1012+         Field :: new( "a" ,  DataType :: Utf8 ,  false ) , 
1013+         Field :: new( "b" ,  DataType :: Utf8 ,  false ) , 
1014+         Field :: new( "c" ,  DataType :: Float64 ,  false ) , 
1015+     ] ) ) ; 
1016+     let  build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) ) 
1017+         . with_support ( true ) 
1018+         . with_batches ( build_batches) 
1019+         . build ( ) ; 
1020+ 
1021+     // Create probe side with more values 
1022+     let  probe_batches = vec ! [ record_batch!( 
1023+         ( "a" ,  Utf8 ,  [ "aa" ,  "ab" ,  "ac" ,  "ad" ] ) , 
1024+         ( "b" ,  Utf8 ,  [ "ba" ,  "bb" ,  "bc" ,  "bd" ] ) , 
1025+         ( "e" ,  Float64 ,  [ 1.0 ,  2.0 ,  3.0 ,  4.0 ] )  // Extra column not used in join 
1026+     ) 
1027+     . unwrap( ) ] ; 
1028+     let  probe_side_schema = Arc :: new ( Schema :: new ( vec ! [ 
1029+         Field :: new( "a" ,  DataType :: Utf8 ,  false ) , 
1030+         Field :: new( "b" ,  DataType :: Utf8 ,  false ) , 
1031+         Field :: new( "e" ,  DataType :: Float64 ,  false ) , 
1032+     ] ) ) ; 
1033+     let  probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) ) 
1034+         . with_support ( true ) 
1035+         . with_batches ( probe_batches) 
1036+         . build ( ) ; 
1037+ 
1038+     // Create RepartitionExec nodes for both sides with hash partitioning on join keys 
1039+     let  partition_count = 12 ; 
1040+ 
1041+     // Build side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec 
1042+     let  build_hash_exprs = vec ! [ 
1043+         col( "a" ,  & build_side_schema) . unwrap( ) , 
1044+         col( "b" ,  & build_side_schema) . unwrap( ) , 
1045+     ] ; 
1046+     let  build_repartition = Arc :: new ( 
1047+         RepartitionExec :: try_new ( 
1048+             build_scan, 
1049+             Partitioning :: Hash ( build_hash_exprs,  partition_count) , 
1050+         ) 
1051+         . unwrap ( ) , 
1052+     ) ; 
1053+     let  build_coalesce = Arc :: new ( CoalesceBatchesExec :: new ( build_repartition,  8192 ) ) ; 
1054+ 
1055+     // Probe side: DataSource -> RepartitionExec (Hash) -> CoalesceBatchesExec 
1056+     let  probe_hash_exprs = vec ! [ 
1057+         col( "a" ,  & probe_side_schema) . unwrap( ) , 
1058+         col( "b" ,  & probe_side_schema) . unwrap( ) , 
1059+     ] ; 
1060+     let  probe_repartition = Arc :: new ( 
1061+         RepartitionExec :: try_new ( 
1062+             probe_scan, 
1063+             Partitioning :: Hash ( probe_hash_exprs,  partition_count) , 
1064+         ) 
1065+         . unwrap ( ) , 
1066+     ) ; 
1067+     let  probe_coalesce = Arc :: new ( CoalesceBatchesExec :: new ( probe_repartition,  8192 ) ) ; 
1068+ 
1069+     // Create HashJoinExec with partitioned inputs 
1070+     let  on = vec ! [ 
1071+         ( 
1072+             col( "a" ,  & build_side_schema) . unwrap( ) , 
1073+             col( "a" ,  & probe_side_schema) . unwrap( ) , 
1074+         ) , 
1075+         ( 
1076+             col( "b" ,  & build_side_schema) . unwrap( ) , 
1077+             col( "b" ,  & probe_side_schema) . unwrap( ) , 
1078+         ) , 
1079+     ] ; 
1080+     let  hash_join = Arc :: new ( 
1081+         HashJoinExec :: try_new ( 
1082+             build_coalesce, 
1083+             probe_coalesce, 
1084+             on, 
1085+             None , 
1086+             & JoinType :: Inner , 
1087+             None , 
1088+             PartitionMode :: Partitioned , 
1089+             datafusion_common:: NullEquality :: NullEqualsNothing , 
1090+         ) 
1091+         . unwrap ( ) , 
1092+     ) ; 
1093+ 
1094+     // Top-level CoalesceBatchesExec 
1095+     let  cb =
1096+         Arc :: new ( CoalesceBatchesExec :: new ( hash_join,  8192 ) )  as  Arc < dyn  ExecutionPlan > ; 
1097+     // Top-level CoalesceParititionsExec 
1098+     let  plan = Arc :: new ( CoalescePartitionsExec :: new ( cb) )  as  Arc < dyn  ExecutionPlan > ; 
1099+ 
1100+     // expect the predicate to be pushed down into the probe side DataSource 
1101+     insta:: assert_snapshot!( 
1102+         OptimizationTest :: new( Arc :: clone( & plan) ,  FilterPushdown :: new_post_optimization( ) ,  true ) , 
1103+         @r" 
1104+     OptimizationTest: 
1105+       input: 
1106+         - CoalescePartitionsExec 
1107+         -   CoalesceBatchesExec: target_batch_size=8192 
1108+         -     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
1109+         -       CoalesceBatchesExec: target_batch_size=8192 
1110+         -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1111+         -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
1112+         -       CoalesceBatchesExec: target_batch_size=8192 
1113+         -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1114+         -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true 
1115+       output: 
1116+         Ok: 
1117+           - CoalescePartitionsExec 
1118+           -   CoalesceBatchesExec: target_batch_size=8192 
1119+           -     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
1120+           -       CoalesceBatchesExec: target_batch_size=8192 
1121+           -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1122+           -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
1123+           -       CoalesceBatchesExec: target_batch_size=8192 
1124+           -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1125+           -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] 
1126+     " 
1127+     ) ; 
1128+ 
1129+     // Actually apply the optimization to the plan and execute to see the filter in action 
1130+     let  mut  config = ConfigOptions :: default ( ) ; 
1131+     config. execution . parquet . pushdown_filters  = true ; 
1132+     config. optimizer . enable_dynamic_filter_pushdown  = true ; 
1133+     let  plan = FilterPushdown :: new_post_optimization ( ) 
1134+         . optimize ( plan,  & config) 
1135+         . unwrap ( ) ; 
1136+     let  config = SessionConfig :: new ( ) . with_batch_size ( 10 ) ; 
1137+     let  session_ctx = SessionContext :: new_with_config ( config) ; 
1138+     session_ctx. register_object_store ( 
1139+         ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) , 
1140+         Arc :: new ( InMemory :: new ( ) ) , 
1141+     ) ; 
1142+     let  state = session_ctx. state ( ) ; 
1143+     let  task_ctx = state. task_ctx ( ) ; 
1144+     let  mut  stream = plan. execute ( 0 ,  Arc :: clone ( & task_ctx) ) . unwrap ( ) ; 
1145+     // Iterate one batch 
1146+     if  let  Some ( batch_result)  = stream. next ( ) . await  { 
1147+         batch_result. unwrap ( ) ; 
1148+     } 
1149+ 
1150+     // Now check what our filter looks like 
1151+     insta:: assert_snapshot!( 
1152+         format!( "{}" ,  format_plan_for_test( & plan) ) , 
1153+         @r" 
1154+     - CoalescePartitionsExec 
1155+     -   CoalesceBatchesExec: target_batch_size=8192 
1156+     -     HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] 
1157+     -       CoalesceBatchesExec: target_batch_size=8192 
1158+     -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1159+     -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true 
1160+     -       CoalesceBatchesExec: target_batch_size=8192 
1161+     -         RepartitionExec: partitioning=Hash([a@0, b@1], 12), input_partitions=1 
1162+     -           DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= ab AND a@0 <= ab AND b@1 >= bb AND b@1 <= bb OR a@0 >= aa AND a@0 <= aa AND b@1 >= ba AND b@1 <= ba ] 
1163+     " 
1164+     ) ; 
1165+ } 
1166+ 
9461167#[ tokio:: test]  
9471168async  fn  test_nested_hashjoin_dynamic_filter_pushdown ( )  { 
9481169    use  datafusion_common:: JoinType ; 
@@ -1082,9 +1303,9 @@ async fn test_nested_hashjoin_dynamic_filter_pushdown() {
10821303    insta:: assert_snapshot!( 
10831304        format!( "{}" ,  format_plan_for_test( & plan) ) , 
10841305        @r" 
1085-     - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab]  
1306+     - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] 
10861307    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true 
1087-     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce]  
1308+     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] 
10881309    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] 
10891310    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] 
10901311    " 
0 commit comments