From ce02a6f7653b79b1d11d90f735c9450f75aa22ba Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 5 Jul 2024 14:10:28 +0200 Subject: [PATCH 1/4] cleanup project internals --- datafusion/substrait/src/logical_plan/consumer.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 03692399e1b3..3131b82a058e 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -409,17 +409,13 @@ pub async fn from_substrait_rel( from_substrait_rex(ctx, e, input.clone().schema(), extensions) .await?; // if the expression is WindowFunction, wrap in a Window relation - // before returning and do not add to list of this Projection's expression list - // otherwise, add expression to the Projection's expression list match &*x { Expr::WindowFunction(_) => { - input = input.window(vec![x.as_ref().clone()])?; - exprs.push(x.as_ref().clone()); - } - _ => { - exprs.push(x.as_ref().clone()); + input = input.window(vec![x.as_ref().clone()])? } + _ => {} } + exprs.push(x.as_ref().clone()); } input.project(exprs)?.build() } else { From 7d27d57ce085e0d8cd0d605808bfb447ae1e2afd Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Fri, 5 Jul 2024 14:31:56 +0200 Subject: [PATCH 2/4] alias intermediate duplicate columns --- .../substrait/src/logical_plan/consumer.rs | 22 +++++++++++++++++-- .../tests/cases/roundtrip_logical_plan.rs | 16 ++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 3131b82a058e..48de49477625 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -59,7 +59,7 @@ use datafusion::{ prelude::{Column, SessionContext}, scalar::ScalarValue, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use substrait::proto::exchange_rel::ExchangeKind; @@ -403,6 +403,7 @@ pub async fn from_substrait_rel( let mut input = LogicalPlanBuilder::from( from_substrait_rel(ctx, input, extensions).await?, ); + let mut names: HashSet = HashSet::new(); let mut exprs: Vec = vec![]; for e in &p.expressions { let x = @@ -411,11 +412,28 @@ pub async fn from_substrait_rel( // if the expression is WindowFunction, wrap in a Window relation match &*x { Expr::WindowFunction(_) => { + // Adding the same expression here and in the project below + // works because the project's builder uses columnize_expr(..) + // to transform it into a column reference input = input.window(vec![x.as_ref().clone()])? } _ => {} } - exprs.push(x.as_ref().clone()); + // Ensure the expression has a unique display name, so that project's + // validate_unique_names doesn't fail + let name = x.display_name()?; + let mut new_name = name.clone(); + let mut i = 0; + while names.contains(&new_name) { + new_name = format!("{}__temp__{}", name, i); + i += 1; + } + names.insert(new_name.clone()); + if new_name != name { + exprs.push(x.as_ref().clone().alias(new_name.clone())); + } else { + exprs.push(x.as_ref().clone()); + } } input.project(exprs)?.build() } else { diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 52cfa50683a0..d79966f92d53 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -751,6 +751,22 @@ async fn roundtrip_values_duplicate_column_join() -> Result<()> { .await } +#[tokio::test] +async fn duplicate_column() -> Result<()> { + // Substrait does not keep column names (aliases) in the plan, rather it operates on column indices + // only. DataFusion however, is strict about not having duplicate column names appear in the plan. + // This test confirms that we generate aliases for columns in the plan which would otherwise have + // colliding names. + assert_expected_plan( + "SELECT a + 1 as sum_a, a + 1 as sum_a_2 FROM data", + "Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)_0 AS sum_a_2\ + \n Projection: data.a + Int64(1)\ + \n TableScan: data projection=[a]", + true, + ) + .await +} + /// Construct a plan that cast columns. Only those SQL types are supported for now. #[tokio::test] async fn new_test_grammar() -> Result<()> { From dff6a720e4ea6753c28670fd86923ea07a2bc37a Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Mon, 8 Jul 2024 10:58:29 +0200 Subject: [PATCH 3/4] fix test --- datafusion/substrait/tests/cases/roundtrip_logical_plan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index d79966f92d53..2893b1a31a26 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -759,7 +759,7 @@ async fn duplicate_column() -> Result<()> { // colliding names. assert_expected_plan( "SELECT a + 1 as sum_a, a + 1 as sum_a_2 FROM data", - "Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)_0 AS sum_a_2\ + "Projection: data.a + Int64(1) AS sum_a, data.a + Int64(1) AS data.a + Int64(1)__temp__0 AS sum_a_2\ \n Projection: data.a + Int64(1)\ \n TableScan: data projection=[a]", true, From 522b31bb3d927a9dd7ae2a70a5dd5f1b563eaaf2 Mon Sep 17 00:00:00 2001 From: Arttu Voutilainen Date: Mon, 8 Jul 2024 11:12:29 +0200 Subject: [PATCH 4/4] fix clippy --- datafusion/substrait/src/logical_plan/consumer.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 48de49477625..70a0692b6a1d 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -410,14 +410,11 @@ pub async fn from_substrait_rel( from_substrait_rex(ctx, e, input.clone().schema(), extensions) .await?; // if the expression is WindowFunction, wrap in a Window relation - match &*x { - Expr::WindowFunction(_) => { - // Adding the same expression here and in the project below - // works because the project's builder uses columnize_expr(..) - // to transform it into a column reference - input = input.window(vec![x.as_ref().clone()])? - } - _ => {} + if let Expr::WindowFunction(_) = x.as_ref() { + // Adding the same expression here and in the project below + // works because the project's builder uses columnize_expr(..) + // to transform it into a column reference + input = input.window(vec![x.as_ref().clone()])? } // Ensure the expression has a unique display name, so that project's // validate_unique_names doesn't fail