@@ -62,9 +62,7 @@ use arrow::compute::SortOptions;
6262use arrow:: datatypes:: { Schema , SchemaRef } ;
6363use datafusion_catalog:: ScanArgs ;
6464use datafusion_common:: display:: ToStringifiedPlan ;
65- use datafusion_common:: tree_node:: {
66- Transformed , TransformedResult , TreeNode , TreeNodeRecursion , TreeNodeVisitor ,
67- } ;
65+ use datafusion_common:: tree_node:: { TreeNode , TreeNodeRecursion , TreeNodeVisitor } ;
6866use datafusion_common:: TableReference ;
6967use datafusion_common:: {
7068 exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema ,
@@ -85,7 +83,7 @@ use datafusion_expr::{
8583 WindowFrameBound , WriteOp ,
8684} ;
8785use datafusion_physical_expr:: aggregate:: { AggregateExprBuilder , AggregateFunctionExpr } ;
88- use datafusion_physical_expr:: expressions:: { Column , Literal } ;
86+ use datafusion_physical_expr:: expressions:: Literal ;
8987use datafusion_physical_expr:: {
9088 create_physical_sort_exprs, LexOrdering , PhysicalSortExpr ,
9189} ;
@@ -2181,11 +2179,7 @@ impl DefaultPhysicalPlanner {
21812179 let physical_expr =
21822180 self . create_physical_expr ( e, input_logical_schema, session_state) ;
21832181
2184- // Check for possible column name mismatches
2185- let final_physical_expr =
2186- maybe_fix_physical_column_name ( physical_expr, & input_physical_schema) ;
2187-
2188- tuple_err ( ( final_physical_expr, physical_name) )
2182+ tuple_err ( ( physical_expr, physical_name) )
21892183 } )
21902184 . collect :: < Result < Vec < _ > > > ( ) ?;
21912185
@@ -2291,47 +2285,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22912285 }
22922286}
22932287
2294- // Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2295- // Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2296- //
2297- // This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2298- // to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2299- fn maybe_fix_physical_column_name (
2300- expr : Result < Arc < dyn PhysicalExpr > > ,
2301- input_physical_schema : & SchemaRef ,
2302- ) -> Result < Arc < dyn PhysicalExpr > > {
2303- let Ok ( expr) = expr else { return expr } ;
2304- expr. transform_down ( |node| {
2305- if let Some ( column) = node. as_any ( ) . downcast_ref :: < Column > ( ) {
2306- let idx = column. index ( ) ;
2307- let physical_field = input_physical_schema. field ( idx) ;
2308- let expr_col_name = column. name ( ) ;
2309- let physical_name = physical_field. name ( ) ;
2310-
2311- if expr_col_name != physical_name {
2312- // handle edge cases where the physical_name contains ':'.
2313- let colon_count = physical_name. matches ( ':' ) . count ( ) ;
2314- let mut splits = expr_col_name. match_indices ( ':' ) ;
2315- let split_pos = splits. nth ( colon_count) ;
2316-
2317- if let Some ( ( i, _) ) = split_pos {
2318- let base_name = & expr_col_name[ ..i] ;
2319- if base_name == physical_name {
2320- let updated_column = Column :: new ( physical_name, idx) ;
2321- return Ok ( Transformed :: yes ( Arc :: new ( updated_column) ) ) ;
2322- }
2323- }
2324- }
2325-
2326- // If names already match or fix is not possible, just leave it as it is
2327- Ok ( Transformed :: no ( node) )
2328- } else {
2329- Ok ( Transformed :: no ( node) )
2330- }
2331- } )
2332- . data ( )
2333- }
2334-
23352288struct OptimizationInvariantChecker < ' a > {
23362289 rule : & ' a Arc < dyn PhysicalOptimizerRule + Send + Sync > ,
23372290}
@@ -2435,12 +2388,10 @@ mod tests {
24352388 } ;
24362389 use datafusion_execution:: runtime_env:: RuntimeEnv ;
24372390 use datafusion_execution:: TaskContext ;
2438- use datafusion_expr:: {
2439- col, lit, LogicalPlanBuilder , Operator , UserDefinedLogicalNodeCore ,
2440- } ;
2391+ use datafusion_expr:: builder:: subquery_alias;
2392+ use datafusion_expr:: { col, lit, LogicalPlanBuilder , UserDefinedLogicalNodeCore } ;
24412393 use datafusion_functions_aggregate:: count:: count_all;
24422394 use datafusion_functions_aggregate:: expr_fn:: sum;
2443- use datafusion_physical_expr:: expressions:: { BinaryExpr , IsNotNullExpr } ;
24442395 use datafusion_physical_expr:: EquivalenceProperties ;
24452396 use datafusion_physical_plan:: execution_plan:: { Boundedness , EmissionType } ;
24462397
@@ -3001,71 +2952,6 @@ mod tests {
30012952 }
30022953 }
30032954
3004- #[ tokio:: test]
3005- async fn test_maybe_fix_colon_in_physical_name ( ) {
3006- // The physical schema has a field name with a colon
3007- let schema = Schema :: new ( vec ! [ Field :: new( "metric:avg" , DataType :: Int32 , false ) ] ) ;
3008- let schema_ref: SchemaRef = Arc :: new ( schema) ;
3009-
3010- // What might happen after deduplication
3011- let logical_col_name = "metric:avg:1" ;
3012- let expr_with_suffix =
3013- Arc :: new ( Column :: new ( logical_col_name, 0 ) ) as Arc < dyn PhysicalExpr > ;
3014- let expr_result = Ok ( expr_with_suffix) ;
3015-
3016- // Call function under test
3017- let fixed_expr =
3018- maybe_fix_physical_column_name ( expr_result, & schema_ref) . unwrap ( ) ;
3019-
3020- // Downcast back to Column so we can check the name
3021- let col = fixed_expr
3022- . as_any ( )
3023- . downcast_ref :: < Column > ( )
3024- . expect ( "Column" ) ;
3025-
3026- assert_eq ! ( col. name( ) , "metric:avg" ) ;
3027- }
3028-
3029- #[ tokio:: test]
3030- async fn test_maybe_fix_nested_column_name_with_colon ( ) {
3031- let schema = Schema :: new ( vec ! [ Field :: new( "column" , DataType :: Int32 , false ) ] ) ;
3032- let schema_ref: SchemaRef = Arc :: new ( schema) ;
3033-
3034- // Construct the nested expr
3035- let col_expr = Arc :: new ( Column :: new ( "column:1" , 0 ) ) as Arc < dyn PhysicalExpr > ;
3036- let is_not_null_expr = Arc :: new ( IsNotNullExpr :: new ( col_expr. clone ( ) ) ) ;
3037-
3038- // Create a binary expression and put the column inside
3039- let binary_expr = Arc :: new ( BinaryExpr :: new (
3040- is_not_null_expr. clone ( ) ,
3041- Operator :: Or ,
3042- is_not_null_expr. clone ( ) ,
3043- ) ) as Arc < dyn PhysicalExpr > ;
3044-
3045- let fixed_expr =
3046- maybe_fix_physical_column_name ( Ok ( binary_expr) , & schema_ref) . unwrap ( ) ;
3047-
3048- let bin = fixed_expr
3049- . as_any ( )
3050- . downcast_ref :: < BinaryExpr > ( )
3051- . expect ( "Expected BinaryExpr" ) ;
3052-
3053- // Check that both sides where renamed
3054- for expr in & [ bin. left ( ) , bin. right ( ) ] {
3055- let is_not_null = expr
3056- . as_any ( )
3057- . downcast_ref :: < IsNotNullExpr > ( )
3058- . expect ( "Expected IsNotNull" ) ;
3059-
3060- let col = is_not_null
3061- . arg ( )
3062- . as_any ( )
3063- . downcast_ref :: < Column > ( )
3064- . expect ( "Expected Column" ) ;
3065-
3066- assert_eq ! ( col. name( ) , "column" ) ;
3067- }
3068- }
30692955 struct ErrorExtensionPlanner { }
30702956
30712957 #[ async_trait]
@@ -3562,4 +3448,61 @@ digraph {
35623448
35633449 Ok ( ( ) )
35643450 }
3451+
3452+ // Reproducer for DataFusion issue #17405:
3453+ //
3454+ // The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3455+ // clause is missing from the explicit logical plan:
3456+ //
3457+ // SELECT a FROM (
3458+ // -- SELECT left_table.a, right_table.a
3459+ // FROM left_table
3460+ // FULL JOIN right_table ON left_table.a = right_table.a
3461+ // ) AS alias
3462+ // GROUP BY a;
3463+ //
3464+ // As a result, the variables within `alias` subquery are not properly distinguished, which
3465+ // leads to a bug for logical and physical planning.
3466+ //
3467+ // The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3468+ // ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3469+ #[ tokio:: test]
3470+ async fn subquery_alias_confusing_the_optimizer ( ) -> Result < ( ) > {
3471+ let state = make_session_state ( ) ;
3472+
3473+ let schema = Schema :: new ( vec ! [ Field :: new( "a" , DataType :: Int32 , false ) ] ) ;
3474+ let schema = Arc :: new ( schema) ;
3475+
3476+ let table = MemTable :: try_new ( schema. clone ( ) , vec ! [ vec![ ] ] ) ?;
3477+ let table = Arc :: new ( table) ;
3478+
3479+ let source = DefaultTableSource :: new ( table) ;
3480+ let source = Arc :: new ( source) ;
3481+
3482+ let left = LogicalPlanBuilder :: scan ( "left" , source. clone ( ) , None ) ?;
3483+ let right = LogicalPlanBuilder :: scan ( "right" , source, None ) ?. build ( ) ?;
3484+
3485+ let join_keys = (
3486+ vec ! [ datafusion_common:: Column :: new( Some ( "left" ) , "a" ) ] ,
3487+ vec ! [ datafusion_common:: Column :: new( Some ( "right" ) , "a" ) ] ,
3488+ ) ;
3489+
3490+ let join = left. join ( right, JoinType :: Full , join_keys, None ) ?. build ( ) ?;
3491+
3492+ let alias = subquery_alias ( join, "alias" ) ?;
3493+
3494+ let planner = DefaultPhysicalPlanner :: default ( ) ;
3495+
3496+ let logical_plan = LogicalPlanBuilder :: new ( alias)
3497+ . aggregate ( vec ! [ col( "a:1" ) ] , Vec :: < Expr > :: new ( ) ) ?
3498+ . build ( ) ?;
3499+ let _physical_plan = planner. create_physical_plan ( & logical_plan, & state) . await ?;
3500+
3501+ let optimized_logical_plan = state. optimize ( & logical_plan) ?;
3502+ let _optimized_physical_plan = planner
3503+ . create_physical_plan ( & optimized_logical_plan, & state)
3504+ . await ?;
3505+
3506+ Ok ( ( ) )
3507+ }
35653508}
0 commit comments