Skip to content

Conversation

@LiaCastaneda
Copy link
Contributor

@LiaCastaneda LiaCastaneda commented Jun 19, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

There have been some occasions where substrait queries fail during logical planning because a node's resulting schema contains duplicate field names. This is not allowed by either Arrow or DataFusion (always verified on check_names upon schema creation). This occurred most commonly when constructing the schema of joins, in inner joins where both the left and right inputs have a field with the same name, resulting in duplicate fields in the output schema.

This was previously addressed in the Substrait consumer path in` Fix duplicate unqualified Field name (schema error) on join queries and there is an existing function that handles this kind of situations on joins rels: requalify_sides_if_needed

We are now encountering the same error again, but this time during physical planning while mutating the Logical plan, rather than in logical planning phase itself.

Specifically it arises on this line when the join key is an non Column expression, since it has to create a Projection with a Column expr on top (by calling wrap_projection_for_join_if_necessary) to ensure correct execution iiuc. The Join logical node is then updated to use this new column. The issue occurs when the new logical node is built using Join::try_new_with_project_input as the function that constructs the join schema checks that names are unique as well.

What changes are included in this PR?

My proposed workaround is to qualify the sides of the join on try_new_with_project_input by calling the same function the substrait consumer uses: requalify_sides_if_needed, but I'm open to suggestions 🙇‍♀️

Are these changes tested?

Yes, I added a substrait reproducer & a substrait consumer test to verify planning works.

Are there any user-facing changes?

No.

@github-actions github-actions bot added logical-expr Logical plan and expressions core Core DataFusion crate substrait Changes to the substrait crate common Related to common crate labels Jun 19, 2025
@LiaCastaneda LiaCastaneda changed the title Fix duplicates on Join creation during physcial planning Fix duplicates on logical Join creation during physical planning Jun 19, 2025
@LiaCastaneda LiaCastaneda changed the title Fix duplicates on logical Join creation during physical planning Fix duplicate field name error in Join::try_new_with_project_input during physical planning Jun 20, 2025
@LiaCastaneda LiaCastaneda force-pushed the lia/fix-duplicate-unqualified-from-physcial-planning-error branch 2 times, most recently from 97caf0f to fb9d758 Compare June 20, 2025 11:00
/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
/// places (see e.g. DFSchema::check_names).
pub fn requalify_sides_if_needed(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this helper function to the logical plan builder module since now its not used only by the substrait consumer but also by plan.rs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extending the doc comment with something like:

The function returns:
- The requalified or original left logical plan 
- The requalified or original right logical plan
- If a requalification was needed or not

@LiaCastaneda LiaCastaneda force-pushed the lia/fix-duplicate-unqualified-from-physcial-planning-error branch from fb9d758 to 6244060 Compare June 20, 2025 12:09
Comment on lines +975 to +987
// Re-qualify the join schema only if the inputs were previously requalified in
// `try_new_with_project_input`. This ensures that when building the Projection
// it can correctly resolve field nullability and data types
// by disambiguating fields from the left and right sides of the join.
let qualified_join_schema = if requalified {
Arc::new(qualify_join_schema_sides(
join_schema,
original_left,
original_right,
)?)
} else {
Arc::clone(join_schema)
};
Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for qualifying the schema is that when building the logical Projection after, it will build the fields out of the expression names in exprlist_to_fields so it will look in new_join.schema() and try to match each expr to a field in the schema, if the expr::Column does not have a qualifier and there are multiple candidates Fields that could correspond to this expr::Column , we will get an ambiguity error, qualifying the schema allows us to prevent this.

@LiaCastaneda LiaCastaneda marked this pull request as ready for review June 26, 2025 08:06
@LiaCastaneda LiaCastaneda marked this pull request as draft June 26, 2025 08:23
@LiaCastaneda LiaCastaneda marked this pull request as ready for review July 2, 2025 10:50
Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the huge substrait plan, here is the SQL version:

WITH index_sizes
     AS (SELECT index_name      AS index,
                Upper(host)     AS host_expr,
                Max(size_bytes) AS idx_size
         FROM   index_metrics
         WHERE  index_name IN ( 'aaa' )
         GROUP  BY index_name,
                   Upper(host)),
     db_sizes
     AS (SELECT Upper(host)      AS host_expr,-- scalar func
                Max(total_bytes) AS db_size
         FROM   db_metrics
         WHERE  Upper(host) IN (SELECT host_expr
                                FROM   index_sizes) -- this comes as inner join in the substrait plan
         GROUP  BY Upper(host))
SELECT ix.index,
       ds.host_expr                         AS host,
       ix.idx_size,
       ds.db_size,
       ( ix.idx_size / ds.db_size ) * 100.0 AS pct_of_db
FROM   index_sizes ix
       JOIN db_sizes ds
         ON ix.host_expr = ds.host_expr; 

@alamb
Copy link
Contributor

alamb commented Jul 2, 2025

@gabotechs do you have time to review this PR?

Copy link
Contributor

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left some non-blocking comments. These kind of bugs are very tricky and the best solutions I can think of (like yours) still feel a bit patchy.

It would be nice if someone more familiar with the planning code takes a look to see if there's any better way of solving this, but from were I stand I cannot think of something better.

/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
/// places (see e.g. DFSchema::check_names).
pub fn requalify_sides_if_needed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extending the doc comment with something like:

The function returns:
- The requalified or original left logical plan 
- The requalified or original right logical plan
- If a requalification was needed or not

Comment on lines +1568 to +1574
Some(TableReference::Bare {
table: Arc::from("left"),
})
} else {
Some(TableReference::Bare {
table: Arc::from("right"),
})
Copy link
Contributor

@gabotechs gabotechs Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 having the hardcoded "left" and "right" strings here introduces a "stringly-typed dependency" with requalify_sides_if_needed, where the "left" and "right" qualifiers are also hardcoded. Not a very big deal though, unless you have a better idea this might be fine

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @LiaCastaneda and @gabotechs

I am sorry I didn't quite follow if this fix is specific to substrait or if it also fixes some issue that could be hit with a SQL as well? Specifically, is there any SQL query that would fail prior to this fix but pass after it?

@LiaCastaneda
Copy link
Contributor Author

LiaCastaneda commented Jul 7, 2025

I am sorry I didn't quite follow if this fix is specific to substrait or if it also fixes some issue that could be hit with a SQL as well? Specifically, is there any SQL query that would fail prior to this fix but pass after it?

I would say It’s specific to Substrait given that I’ve only observed this error in Substrait queries, and only under a very narrow set of conditions: the join key is an expression (different to Column), the join type is INNER, and there are identical column names on both sides of the join. For example the following join is specified as an inner join in the substrait plan & then a projection with the left columns is applied on top:

SELECT Upper(host)      AS host_expr,
                Max(total_bytes) AS db_size
         FROM   db_metrics
         WHERE  Upper(host) IN (SELECT host_expr
                                FROM   index_sizes)

I haven’t been able to reproduce the same error through any other execution path. In the regular path, this query is likely rewritten as a semi-join, so the issue doesn’t arise.

@alamb alamb merged commit ec15558 into apache:main Jul 7, 2025
27 checks passed
@alamb
Copy link
Contributor

alamb commented Jul 7, 2025

THanks again @LiaCastaneda and @gabotechs

LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Jul 7, 2025
…ring physical planning (apache#16454)

* Fix duplicates on Join creation during physcial planning

* Add Substrait reproducer

* Better error message & more doc

* Handle case for right/left/full joins as well
LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Jul 7, 2025
…ring physical planning (apache#16454)

* Fix duplicates on Join creation during physcial planning

* Add Substrait reproducer

* Better error message & more doc

* Handle case for right/left/full joins as well
LiaCastaneda added a commit to DataDog/datafusion that referenced this pull request Jul 7, 2025
…ring physical planning (apache#16454) (#33)

* Fix duplicates on Join creation during physcial planning

* Add Substrait reproducer

* Better error message & more doc

* Handle case for right/left/full joins as well
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

common Related to common crate core Core DataFusion crate logical-expr Logical plan and expressions substrait Changes to the substrait crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants