Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: When consuming Substrait, temporarily rename clashing duplicate columns #11329

Merged
merged 4 commits into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion::{
prelude::{Column, SessionContext},
scalar::ScalarValue,
};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use substrait::proto::exchange_rel::ExchangeKind;
Expand Down Expand Up @@ -403,22 +403,33 @@ pub async fn from_substrait_rel(
let mut input = LogicalPlanBuilder::from(
from_substrait_rel(ctx, input, extensions).await?,
);
let mut names: HashSet<String> = HashSet::new();
let mut exprs: Vec<Expr> = vec![];
for e in &p.expressions {
let x =
from_substrait_rex(ctx, e, input.clone().schema(), extensions)
.await?;
// if the expression is WindowFunction, wrap in a Window relation
// before returning and do not add to list of this Projection's expression list
// otherwise, add expression to the Projection's expression list
match &*x {
Expr::WindowFunction(_) => {
input = input.window(vec![x.as_ref().clone()])?;
exprs.push(x.as_ref().clone());
}
_ => {
exprs.push(x.as_ref().clone());
}
if let Expr::WindowFunction(_) = x.as_ref() {
// Adding the same expression here and in the project below
// works because the project's builder uses columnize_expr(..)
// to transform it into a column reference
input = input.window(vec![x.as_ref().clone()])?
}
// Ensure the expression has a unique display name, so that project's
// validate_unique_names doesn't fail
let name = x.display_name()?;
let mut new_name = name.clone();
let mut i = 0;
while names.contains(&new_name) {
new_name = format!("{}__temp__{}", name, i);
i += 1;
}
names.insert(new_name.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably save a clone by putting the names.insert call below the exprs.push calls

Copy link
Contributor

@alamb alamb Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change in #11337 as I had the code open anyways

if new_name != name {
exprs.push(x.as_ref().clone().alias(new_name.clone()));
} else {
exprs.push(x.as_ref().clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused at first why this was as_ref().clone() was needed

When I investigated, it seems that it is due to the fact that from_substrait_rex returns an Arc<Expr> rather than an Expr which is somewhat confusing to me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out I can remove them: #11337

}
}
input.project(exprs)?.build()
Expand Down
16 changes: 16 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,22 @@ async fn roundtrip_values_duplicate_column_join() -> Result<()> {
.await
}

#[tokio::test]
async fn duplicate_column() -> Result<()> {
// Substrait does not keep column names (aliases) in the plan, rather it operates on column indices
// only. DataFusion however, is strict about not having duplicate column names appear in the plan.
// This test confirms that we generate aliases for columns in the plan which would otherwise have
// colliding names.
assert_expected_plan(
"SELECT a + 1 as sum_a, a + 1 as sum_a_2 FROM data",
"Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)__temp__0 AS sum_a_2\
\n Projection: data.a + Int64(1)\
\n TableScan: data projection=[a]",
true,
)
.await
}

/// Construct a plan that cast columns. Only those SQL types are supported for now.
#[tokio::test]
async fn new_test_grammar() -> Result<()> {
Expand Down