@@ -160,7 +160,123 @@ fn test_pushdown_into_scan_with_config_options() {
160160}
161161
162162#[ tokio:: test]
163- async fn test_hashjoin_parent_filter_pushdown ( ) {
163+ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk ( ) {
164+ use datafusion_common:: JoinType ;
165+ use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
166+
167+ // Create build side with limited values
168+ let build_batches = vec ! [ record_batch!(
169+ ( "a" , Utf8 , [ "aa" , "ab" ] ) ,
170+ ( "b" , Utf8 , [ "ba" , "bb" ] ) ,
171+ ( "c" , Float64 , [ 1.0 , 2.0 ] )
172+ )
173+ . unwrap( ) ] ;
174+ let build_side_schema = Arc :: new ( Schema :: new ( vec ! [
175+ Field :: new( "a" , DataType :: Utf8 , false ) ,
176+ Field :: new( "b" , DataType :: Utf8 , false ) ,
177+ Field :: new( "c" , DataType :: Float64 , false ) ,
178+ ] ) ) ;
179+ let build_scan = TestScanBuilder :: new ( Arc :: clone ( & build_side_schema) )
180+ . with_support ( true )
181+ . with_batches ( build_batches)
182+ . build ( ) ;
183+
184+ // Create probe side with more values
185+ let probe_batches = vec ! [ record_batch!(
186+ ( "d" , Utf8 , [ "aa" , "ab" , "ac" , "ad" ] ) ,
187+ ( "e" , Utf8 , [ "ba" , "bb" , "bc" , "bd" ] ) ,
188+ ( "f" , Float64 , [ 1.0 , 2.0 , 3.0 , 4.0 ] )
189+ )
190+ . unwrap( ) ] ;
191+ let probe_side_schema = Arc :: new ( Schema :: new ( vec ! [
192+ Field :: new( "d" , DataType :: Utf8 , false ) ,
193+ Field :: new( "e" , DataType :: Utf8 , false ) ,
194+ Field :: new( "f" , DataType :: Float64 , false ) ,
195+ ] ) ) ;
196+ let probe_scan = TestScanBuilder :: new ( Arc :: clone ( & probe_side_schema) )
197+ . with_support ( true )
198+ . with_batches ( probe_batches)
199+ . build ( ) ;
200+
201+ // Create HashJoinExec
202+ let on = vec ! [ (
203+ col( "a" , & build_side_schema) . unwrap( ) ,
204+ col( "d" , & probe_side_schema) . unwrap( ) ,
205+ ) ] ;
206+ let join = Arc :: new (
207+ HashJoinExec :: try_new (
208+ build_scan,
209+ probe_scan,
210+ on,
211+ None ,
212+ & JoinType :: Inner ,
213+ None ,
214+ PartitionMode :: Partitioned ,
215+ datafusion_common:: NullEquality :: NullEqualsNothing ,
216+ )
217+ . unwrap ( ) ,
218+ ) ;
219+
220+ let join_schema = join. schema ( ) ;
221+
222+ // Finally let's add a SortExec on the outside to test pushdown of dynamic filters
223+ let sort_expr =
224+ PhysicalSortExpr :: new ( col ( "e" , & join_schema) . unwrap ( ) , SortOptions :: default ( ) ) ;
225+ let plan = Arc :: new (
226+ SortExec :: new ( LexOrdering :: new ( vec ! [ sort_expr] ) . unwrap ( ) , join)
227+ . with_fetch ( Some ( 2 ) ) ,
228+ ) as Arc < dyn ExecutionPlan > ;
229+
230+ let mut config = ConfigOptions :: default ( ) ;
231+ config. optimizer . enable_dynamic_filter_pushdown = true ;
232+ config. execution . parquet . pushdown_filters = true ;
233+
234+ // Appy the FilterPushdown optimizer rule
235+ let plan = FilterPushdown :: new_post_optimization ( )
236+ . optimize ( Arc :: clone ( & plan) , & config)
237+ . unwrap ( ) ;
238+
239+ // Test that filters are pushed down correctly to each side of the join
240+ insta:: assert_snapshot!(
241+ format_plan_for_test( & plan) ,
242+ @r"
243+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
244+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
245+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
246+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
247+ "
248+ ) ;
249+
250+ // Put some data through the plan to check that the filter is updated to reflect the TopK state
251+ let session_ctx = SessionContext :: new_with_config ( SessionConfig :: new ( ) ) ;
252+ session_ctx. register_object_store (
253+ ObjectStoreUrl :: parse ( "test://" ) . unwrap ( ) . as_ref ( ) ,
254+ Arc :: new ( InMemory :: new ( ) ) ,
255+ ) ;
256+ let state = session_ctx. state ( ) ;
257+ let task_ctx = state. task_ctx ( ) ;
258+ let mut stream = plan. execute ( 0 , Arc :: clone ( & task_ctx) ) . unwrap ( ) ;
259+ // Iterate one batch
260+ stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
261+
262+ // Test that filters are pushed down correctly to each side of the join
263+ insta:: assert_snapshot!(
264+ format_plan_for_test( & plan) ,
265+ @r"
266+ - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb]
267+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
268+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=true
269+ - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
270+ "
271+ ) ;
272+ }
273+
274+ // Test both static and dynamic filter pushdown in HashJoinExec.
275+ // Note that static filter pushdown is rare: it should have already happened in the logical optimizer phase.
276+ // However users may manually construct plans that could result in a FilterExec -> HashJoinExec -> Scan setup.
277+ // Dynamic filters arise in cases such as nested inner joins or TopK -> HashJoinExec -> Scan setups.
278+ #[ tokio:: test]
279+ async fn test_static_filter_pushdown_through_hash_join ( ) {
164280 use datafusion_common:: JoinType ;
165281 use datafusion_physical_plan:: joins:: { HashJoinExec , PartitionMode } ;
166282
@@ -283,7 +399,8 @@ async fn test_hashjoin_parent_filter_pushdown() {
283399
284400 let join_schema = join. schema ( ) ;
285401 let filter = col_lit_predicate ( "a" , "aa" , & join_schema) ;
286- let plan = Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) ;
402+ let plan =
403+ Arc :: new ( FilterExec :: try_new ( filter, join) . unwrap ( ) ) as Arc < dyn ExecutionPlan > ;
287404
288405 // Test that filters are NOT pushed down for left join
289406 insta:: assert_snapshot!(
0 commit comments