@@ -89,14 +89,15 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
8989use datafusion_physical_plan:: execution_plan:: InvariantLevel ;
9090use datafusion_physical_plan:: placeholder_row:: PlaceholderRowExec ;
9191use datafusion_physical_plan:: unnest:: ListUnnest ;
92+ use datafusion_sql:: TableReference ;
93+ use sqlparser:: ast:: NullTreatment ;
9294
9395use crate :: schema_equivalence:: schema_satisfied_by;
9496use async_trait:: async_trait;
9597use datafusion_datasource:: file_groups:: FileGroup ;
9698use futures:: { StreamExt , TryStreamExt } ;
9799use itertools:: { multiunzip, Itertools } ;
98100use log:: { debug, trace} ;
99- use sqlparser:: ast:: NullTreatment ;
100101use tokio:: sync:: Mutex ;
101102
102103/// Physical query planner that converts a `LogicalPlan` to an
@@ -890,8 +891,8 @@ impl DefaultPhysicalPlanner {
890891
891892 // 2 Children
892893 LogicalPlan :: Join ( Join {
893- left,
894- right,
894+ left : original_left ,
895+ right : original_right ,
895896 on : keys,
896897 filter,
897898 join_type,
@@ -916,23 +917,25 @@ impl DefaultPhysicalPlanner {
916917 let ( left, left_col_keys, left_projected) =
917918 wrap_projection_for_join_if_necessary (
918919 & left_keys,
919- left . as_ref ( ) . clone ( ) ,
920+ original_left . as_ref ( ) . clone ( ) ,
920921 ) ?;
921922 let ( right, right_col_keys, right_projected) =
922923 wrap_projection_for_join_if_necessary (
923924 & right_keys,
924- right . as_ref ( ) . clone ( ) ,
925+ original_right . as_ref ( ) . clone ( ) ,
925926 ) ?;
926927 let column_on = ( left_col_keys, right_col_keys) ;
927928
928929 let left = Arc :: new ( left) ;
929930 let right = Arc :: new ( right) ;
930- let new_join = LogicalPlan :: Join ( Join :: try_new_with_project_input (
931+ let ( new_join, requalified ) = Join :: try_new_with_project_input (
931932 node,
932933 Arc :: clone ( & left) ,
933934 Arc :: clone ( & right) ,
934935 column_on,
935- ) ?) ;
936+ ) ?;
937+
938+ let new_join = LogicalPlan :: Join ( new_join) ;
936939
937940 // If inputs were projected then create ExecutionPlan for these new
938941 // LogicalPlan nodes.
@@ -965,8 +968,24 @@ impl DefaultPhysicalPlanner {
965968
966969 // Remove temporary projected columns
967970 if left_projected || right_projected {
968- let final_join_result =
969- join_schema. iter ( ) . map ( Expr :: from) . collect :: < Vec < _ > > ( ) ;
971+ // This qualification is only valid if sides have been previosuly requalified when building the new join in
972+ // try_new_with_project_input, the purpose is to be able to determine later when building the Projection, the
973+ // nullability and datatypes by looking into both sides of the schema, avoiding ambiguity errors.
974+ let qualified_join_schema =
975+ if * join_type == JoinType :: Inner && requalified {
976+ Arc :: new ( qualify_join_schem_sides (
977+ join_schema,
978+ original_left,
979+ original_right,
980+ ) ?)
981+ } else {
982+ Arc :: clone ( join_schema)
983+ } ;
984+
985+ let final_join_result = qualified_join_schema
986+ . iter ( )
987+ . map ( Expr :: from)
988+ . collect :: < Vec < _ > > ( ) ;
970989 let projection = LogicalPlan :: Projection ( Projection :: try_new (
971990 final_join_result,
972991 Arc :: new ( new_join) ,
@@ -1463,6 +1482,60 @@ fn get_null_physical_expr_pair(
14631482 Ok ( ( Arc :: new ( null_value) , physical_name) )
14641483}
14651484
1485+ /// Qualify the schema sides without mutating the original schema, this function should only be used in case sides
1486+ /// have been requalified before in try_new_with_project_input. The purpose is to avoid ambiguity errors when checking
1487+ /// for nullability and datatypes when converting expression to fields.
1488+ fn qualify_join_schem_sides (
1489+ join_schema : & DFSchema ,
1490+ left : & LogicalPlan ,
1491+ right : & LogicalPlan ,
1492+ ) -> Result < DFSchema > {
1493+ let left_fields = left. schema ( ) . fields ( ) ;
1494+ let right_fields = right. schema ( ) . fields ( ) ;
1495+ let join_fields = join_schema. fields ( ) ;
1496+
1497+ // Validate lengths
1498+ if join_fields. len ( ) != left_fields. len ( ) + right_fields. len ( ) {
1499+ return internal_err ! (
1500+ "Join schema field count must match left and right field count."
1501+ ) ;
1502+ }
1503+
1504+ // Validate field names match
1505+ for ( i, field) in join_fields. iter ( ) . enumerate ( ) {
1506+ let expected_field = if i < left_fields. len ( ) {
1507+ & left_fields[ i]
1508+ } else {
1509+ & right_fields[ i - left_fields. len ( ) ]
1510+ } ;
1511+
1512+ if field. name ( ) != expected_field. name ( ) {
1513+ return internal_err ! (
1514+ "Left and right names must match the join schema names"
1515+ ) ;
1516+ }
1517+ }
1518+
1519+ // qualify sides
1520+ let qualifiers = join_fields
1521+ . iter ( )
1522+ . enumerate ( )
1523+ . map ( |( i, _) | {
1524+ if i < left_fields. len ( ) {
1525+ Some ( TableReference :: Bare {
1526+ table : Arc :: from ( "left" ) ,
1527+ } )
1528+ } else {
1529+ Some ( TableReference :: Bare {
1530+ table : Arc :: from ( "right" ) ,
1531+ } )
1532+ }
1533+ } )
1534+ . collect ( ) ;
1535+
1536+ join_schema. with_field_specific_qualified_schema ( qualifiers)
1537+ }
1538+
14661539fn get_physical_expr_pair (
14671540 expr : & Expr ,
14681541 input_dfschema : & DFSchema ,
0 commit comments