-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix(SubqueryAlias): use maybe_project_redundant_column #17478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
97205ee
b0d4552
c0e51ec
cc90153
251a206
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -62,9 +62,7 @@ use arrow::compute::SortOptions; | |||||||||||||||||||||||||||||||||||
| use arrow::datatypes::{Schema, SchemaRef}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_catalog::ScanArgs; | ||||||||||||||||||||||||||||||||||||
| use datafusion_common::display::ToStringifiedPlan; | ||||||||||||||||||||||||||||||||||||
| use datafusion_common::tree_node::{ | ||||||||||||||||||||||||||||||||||||
| Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
| use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_common::TableReference; | ||||||||||||||||||||||||||||||||||||
| use datafusion_common::{ | ||||||||||||||||||||||||||||||||||||
| exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, | ||||||||||||||||||||||||||||||||||||
|
|
@@ -85,7 +83,7 @@ use datafusion_expr::{ | |||||||||||||||||||||||||||||||||||
| WindowFrameBound, WriteOp, | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::expressions::{Column, Literal}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::expressions::Literal; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::{ | ||||||||||||||||||||||||||||||||||||
| create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
|
|
@@ -2181,11 +2179,7 @@ impl DefaultPhysicalPlanner { | |||||||||||||||||||||||||||||||||||
| let physical_expr = | ||||||||||||||||||||||||||||||||||||
| self.create_physical_expr(e, input_logical_schema, session_state); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Check for possible column name mismatches | ||||||||||||||||||||||||||||||||||||
| let final_physical_expr = | ||||||||||||||||||||||||||||||||||||
| maybe_fix_physical_column_name(physical_expr, &input_physical_schema); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| tuple_err((final_physical_expr, physical_name)) | ||||||||||||||||||||||||||||||||||||
| tuple_err((physical_expr, physical_name)) | ||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||
| .collect::<Result<Vec<_>>>()?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
@@ -2291,47 +2285,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Handle the case where the name of a physical column expression does not match the corresponding physical input fields names. | ||||||||||||||||||||||||||||||||||||
| // Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names. | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'), | ||||||||||||||||||||||||||||||||||||
| // to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`. | ||||||||||||||||||||||||||||||||||||
| fn maybe_fix_physical_column_name( | ||||||||||||||||||||||||||||||||||||
| expr: Result<Arc<dyn PhysicalExpr>>, | ||||||||||||||||||||||||||||||||||||
| input_physical_schema: &SchemaRef, | ||||||||||||||||||||||||||||||||||||
| ) -> Result<Arc<dyn PhysicalExpr>> { | ||||||||||||||||||||||||||||||||||||
| let Ok(expr) = expr else { return expr }; | ||||||||||||||||||||||||||||||||||||
| expr.transform_down(|node| { | ||||||||||||||||||||||||||||||||||||
| if let Some(column) = node.as_any().downcast_ref::<Column>() { | ||||||||||||||||||||||||||||||||||||
| let idx = column.index(); | ||||||||||||||||||||||||||||||||||||
| let physical_field = input_physical_schema.field(idx); | ||||||||||||||||||||||||||||||||||||
| let expr_col_name = column.name(); | ||||||||||||||||||||||||||||||||||||
| let physical_name = physical_field.name(); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if expr_col_name != physical_name { | ||||||||||||||||||||||||||||||||||||
| // handle edge cases where the physical_name contains ':'. | ||||||||||||||||||||||||||||||||||||
| let colon_count = physical_name.matches(':').count(); | ||||||||||||||||||||||||||||||||||||
| let mut splits = expr_col_name.match_indices(':'); | ||||||||||||||||||||||||||||||||||||
| let split_pos = splits.nth(colon_count); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| if let Some((i, _)) = split_pos { | ||||||||||||||||||||||||||||||||||||
| let base_name = &expr_col_name[..i]; | ||||||||||||||||||||||||||||||||||||
| if base_name == physical_name { | ||||||||||||||||||||||||||||||||||||
| let updated_column = Column::new(physical_name, idx); | ||||||||||||||||||||||||||||||||||||
| return Ok(Transformed::yes(Arc::new(updated_column))); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // If names already match or fix is not possible, just leave it as it is | ||||||||||||||||||||||||||||||||||||
| Ok(Transformed::no(node)) | ||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||
| Ok(Transformed::no(node)) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||||||||
| .data() | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| struct OptimizationInvariantChecker<'a> { | ||||||||||||||||||||||||||||||||||||
| rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>, | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
@@ -2435,12 +2388,10 @@ mod tests { | |||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
| use datafusion_execution::runtime_env::RuntimeEnv; | ||||||||||||||||||||||||||||||||||||
| use datafusion_execution::TaskContext; | ||||||||||||||||||||||||||||||||||||
| use datafusion_expr::{ | ||||||||||||||||||||||||||||||||||||
| col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore, | ||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||
| use datafusion_expr::builder::subquery_alias; | ||||||||||||||||||||||||||||||||||||
| use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_functions_aggregate::count::count_all; | ||||||||||||||||||||||||||||||||||||
| use datafusion_functions_aggregate::expr_fn::sum; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr}; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_expr::EquivalenceProperties; | ||||||||||||||||||||||||||||||||||||
| use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
|
|
@@ -3001,71 +2952,6 @@ mod tests { | |||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[tokio::test] | ||||||||||||||||||||||||||||||||||||
| async fn test_maybe_fix_colon_in_physical_name() { | ||||||||||||||||||||||||||||||||||||
| // The physical schema has a field name with a colon | ||||||||||||||||||||||||||||||||||||
| let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]); | ||||||||||||||||||||||||||||||||||||
| let schema_ref: SchemaRef = Arc::new(schema); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // What might happen after deduplication | ||||||||||||||||||||||||||||||||||||
| let logical_col_name = "metric:avg:1"; | ||||||||||||||||||||||||||||||||||||
| let expr_with_suffix = | ||||||||||||||||||||||||||||||||||||
| Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>; | ||||||||||||||||||||||||||||||||||||
| let expr_result = Ok(expr_with_suffix); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Call function under test | ||||||||||||||||||||||||||||||||||||
| let fixed_expr = | ||||||||||||||||||||||||||||||||||||
| maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap(); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Downcast back to Column so we can check the name | ||||||||||||||||||||||||||||||||||||
| let col = fixed_expr | ||||||||||||||||||||||||||||||||||||
| .as_any() | ||||||||||||||||||||||||||||||||||||
| .downcast_ref::<Column>() | ||||||||||||||||||||||||||||||||||||
| .expect("Column"); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| assert_eq!(col.name(), "metric:avg"); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[tokio::test] | ||||||||||||||||||||||||||||||||||||
| async fn test_maybe_fix_nested_column_name_with_colon() { | ||||||||||||||||||||||||||||||||||||
| let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]); | ||||||||||||||||||||||||||||||||||||
| let schema_ref: SchemaRef = Arc::new(schema); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Construct the nested expr | ||||||||||||||||||||||||||||||||||||
| let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>; | ||||||||||||||||||||||||||||||||||||
| let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone())); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Create a binary expression and put the column inside | ||||||||||||||||||||||||||||||||||||
| let binary_expr = Arc::new(BinaryExpr::new( | ||||||||||||||||||||||||||||||||||||
| is_not_null_expr.clone(), | ||||||||||||||||||||||||||||||||||||
| Operator::Or, | ||||||||||||||||||||||||||||||||||||
| is_not_null_expr.clone(), | ||||||||||||||||||||||||||||||||||||
| )) as Arc<dyn PhysicalExpr>; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let fixed_expr = | ||||||||||||||||||||||||||||||||||||
| maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap(); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let bin = fixed_expr | ||||||||||||||||||||||||||||||||||||
| .as_any() | ||||||||||||||||||||||||||||||||||||
| .downcast_ref::<BinaryExpr>() | ||||||||||||||||||||||||||||||||||||
| .expect("Expected BinaryExpr"); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Check that both sides where renamed | ||||||||||||||||||||||||||||||||||||
| for expr in &[bin.left(), bin.right()] { | ||||||||||||||||||||||||||||||||||||
| let is_not_null = expr | ||||||||||||||||||||||||||||||||||||
| .as_any() | ||||||||||||||||||||||||||||||||||||
| .downcast_ref::<IsNotNullExpr>() | ||||||||||||||||||||||||||||||||||||
| .expect("Expected IsNotNull"); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let col = is_not_null | ||||||||||||||||||||||||||||||||||||
| .arg() | ||||||||||||||||||||||||||||||||||||
| .as_any() | ||||||||||||||||||||||||||||||||||||
| .downcast_ref::<Column>() | ||||||||||||||||||||||||||||||||||||
| .expect("Expected Column"); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| assert_eq!(col.name(), "column"); | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| struct ErrorExtensionPlanner {} | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| #[async_trait] | ||||||||||||||||||||||||||||||||||||
|
|
@@ -3562,4 +3448,61 @@ digraph { | |||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| // Reproducer for DataFusion issue #17405: | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a` | ||||||||||||||||||||||||||||||||||||
| // clause is missing from the explicit logical plan: | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // SELECT a FROM ( | ||||||||||||||||||||||||||||||||||||
| // -- SELECT left_table.a, right_table.a | ||||||||||||||||||||||||||||||||||||
| // FROM left_table | ||||||||||||||||||||||||||||||||||||
| // FULL JOIN right_table ON left_table.a = right_table.a | ||||||||||||||||||||||||||||||||||||
| // ) AS alias | ||||||||||||||||||||||||||||||||||||
| // GROUP BY a; | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // As a result, the variables within `alias` subquery are not properly distinguished, which | ||||||||||||||||||||||||||||||||||||
| // leads to a bug for logical and physical planning. | ||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||
| // The fix is to implicitly insert a Projection node to represent the missing SELECT clause to | ||||||||||||||||||||||||||||||||||||
| // ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added. | ||||||||||||||||||||||||||||||||||||
| #[tokio::test] | ||||||||||||||||||||||||||||||||||||
| async fn subquery_alias_confusing_the_optimizer() -> Result<()> { | ||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||||
| let state = make_session_state(); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); | ||||||||||||||||||||||||||||||||||||
| let schema = Arc::new(schema); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let table = MemTable::try_new(schema.clone(), vec![vec![]])?; | ||||||||||||||||||||||||||||||||||||
| let table = Arc::new(table); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let source = DefaultTableSource::new(table); | ||||||||||||||||||||||||||||||||||||
| let source = Arc::new(source); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let left = LogicalPlanBuilder::scan("left", source.clone(), None)?; | ||||||||||||||||||||||||||||||||||||
| let right = LogicalPlanBuilder::scan("right", source, None)?.build()?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let join_keys = ( | ||||||||||||||||||||||||||||||||||||
| vec![datafusion_common::Column::new(Some("left"), "a")], | ||||||||||||||||||||||||||||||||||||
| vec![datafusion_common::Column::new(Some("right"), "a")], | ||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let join = left.join(right, JoinType::Full, join_keys, None)?.build()?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let alias = subquery_alias(join, "alias")?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let planner = DefaultPhysicalPlanner::default(); | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let logical_plan = LogicalPlanBuilder::new(alias) | ||||||||||||||||||||||||||||||||||||
| .aggregate(vec![col("a:1")], Vec::<Expr>::new())? | ||||||||||||||||||||||||||||||||||||
| .build()?; | ||||||||||||||||||||||||||||||||||||
| let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| let optimized_logical_plan = state.optimize(&logical_plan)?; | ||||||||||||||||||||||||||||||||||||
| let _optimized_physical_plan = planner | ||||||||||||||||||||||||||||||||||||
| .create_physical_plan(&optimized_logical_plan, &state) | ||||||||||||||||||||||||||||||||||||
| .await?; | ||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| //! This module provides a builder for creating LogicalPlans | ||
|
|
||
| use std::any::Any; | ||
| use std::borrow::Cow; | ||
| use std::cmp::Ordering; | ||
| use std::collections::{HashMap, HashSet}; | ||
| use std::iter::once; | ||
|
|
@@ -1517,37 +1518,49 @@ impl ValuesFields { | |
| } | ||
| } | ||
|
|
||
| // `name_map` tracks a mapping between a field name and the number of appearances of that field. | ||
| // | ||
| // Some field names might already come to this function with the count (number of times it appeared) | ||
| // as a suffix e.g. id:1, so there's still a chance of name collisions, for example, | ||
| // if these three fields passed to this function: "col:1", "col" and "col", the function | ||
| // would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema. | ||
| // that's why we need the `seen` set, so the fields are always unique. | ||
| // | ||
| pub fn change_redundant_column(fields: &Fields) -> Vec<Field> { | ||
| let mut name_map = HashMap::new(); | ||
| let mut seen: HashSet<String> = HashSet::new(); | ||
| /// Returns aliases to make field names unique. | ||
| /// | ||
| /// Returns a vector of optional aliases, one per input field. `None` means keep the original name, | ||
| /// `Some(alias)` means rename to the alias to ensure uniqueness. | ||
| /// | ||
| /// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need | ||
| /// to maintain unique column names. | ||
| /// | ||
| /// # Example | ||
| /// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers) | ||
| /// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]` | ||
| pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> { | ||
| // Some field names might already come to this function with the count (number of times it appeared) | ||
| // as a suffix e.g. id:1, so there's still a chance of name collisions, for example, | ||
| // if these three fields passed to this function: "col:1", "col" and "col", the function | ||
| // would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema. | ||
| // That's why we need the `seen` set, so the fields are always unique. | ||
|
|
||
| // Tracks a mapping between a field name and the number of appearances of that field. | ||
| let mut name_map = HashMap::<&str, usize>::new(); | ||
| // Tracks all the fields and aliases that were previously seen. | ||
| let mut seen = HashSet::<Cow<String>>::new(); | ||
|
|
||
| fields | ||
| .into_iter() | ||
| .iter() | ||
| .map(|field| { | ||
| let base_name = field.name(); | ||
| let count = name_map.entry(base_name.clone()).or_insert(0); | ||
| let mut new_name = base_name.clone(); | ||
| let original_name = field.name(); | ||
| let mut name = Cow::Borrowed(original_name); | ||
|
|
||
| let count = name_map.entry(original_name).or_insert(0); | ||
|
|
||
| // Loop until we find a name that hasn't been used | ||
| while seen.contains(&new_name) { | ||
| // Loop until we find a name that hasn't been used. | ||
| while seen.contains(&name) { | ||
| *count += 1; | ||
| new_name = format!("{base_name}:{count}"); | ||
| name = Cow::Owned(format!("{original_name}:{count}")); | ||
| } | ||
|
|
||
| seen.insert(new_name.clone()); | ||
| seen.insert(name.clone()); | ||
|
|
||
| let mut modified_field = | ||
| Field::new(&new_name, field.data_type().clone(), field.is_nullable()); | ||
| modified_field.set_metadata(field.metadata().clone()); | ||
| modified_field | ||
| match name { | ||
| Cow::Borrowed(_) => None, | ||
| Cow::Owned(alias) => Some(alias), | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
@@ -2675,34 +2688,6 @@ mod tests { | |
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_change_redundant_column() -> Result<()> { | ||
notfilippo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| let t1_field_1 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_1 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_3 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_4 = Field::new("a:1", DataType::Int32, false); | ||
| let t1_field_2 = Field::new("b", DataType::Int32, false); | ||
| let t2_field_2 = Field::new("b", DataType::Int32, false); | ||
|
|
||
| let field_vec = vec![ | ||
| t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4, | ||
| ]; | ||
| let remove_redundant = change_redundant_column(&Fields::from(field_vec)); | ||
|
|
||
| assert_eq!( | ||
| remove_redundant, | ||
| vec![ | ||
| Field::new("a", DataType::Int32, false), | ||
| Field::new("a:1", DataType::Int32, false), | ||
| Field::new("b", DataType::Int32, false), | ||
| Field::new("b:1", DataType::Int32, false), | ||
| Field::new("a:2", DataType::Int32, false), | ||
| Field::new("a:1:1", DataType::Int32, false), | ||
| ] | ||
| ); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn plan_builder_from_logical_plan() -> Result<()> { | ||
| let plan = | ||
|
|
@@ -2787,4 +2772,39 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_unique_field_aliases() { | ||
| let t1_field_1 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_1 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_3 = Field::new("a", DataType::Int32, false); | ||
| let t2_field_4 = Field::new("a:1", DataType::Int32, false); | ||
| let t1_field_2 = Field::new("b", DataType::Int32, false); | ||
| let t2_field_2 = Field::new("b", DataType::Int32, false); | ||
|
|
||
| let fields = vec![ | ||
| t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4, | ||
| ]; | ||
| let fields = Fields::from(fields); | ||
|
|
||
| let remove_redundant = unique_field_aliases(&fields); | ||
|
|
||
| // Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1] | ||
| // First occurrence of each field name keeps original name (None), duplicates get | ||
| // incremental suffixes (:1, :2, etc.). | ||
| // Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later | ||
| // conflicts with the last column which is _actually_ called `a:1` so we need to rename it | ||
| // as well to `a:1:1`. | ||
| assert_eq!( | ||
| remove_redundant, | ||
| vec![ | ||
| None, | ||
| Some("a:1".to_string()), | ||
| None, | ||
| Some("b:1".to_string()), | ||
| Some("a:2".to_string()), | ||
| Some("a:1:1".to_string()), | ||
| ] | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For other reviewers, this is a reproducer of the bug. I have run this test before the fix and it fails. Great job to reproduce this with logical and physical plan @notfilippo.
I suggest you add the below comment so we know what the bug is