Skip to content

Commit cc90153

Browse files
committed
chore(SubqueryAlias): restructore duplicate detection and add tests
1 parent c0e51ec commit cc90153

File tree

2 files changed

+77
-40
lines changed

2 files changed

+77
-40
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,30 +1518,31 @@ impl ValuesFields {
15181518
}
15191519
}
15201520

1521-
pub fn maybe_project_redundant_column(
1522-
input: Arc<LogicalPlan>,
1523-
) -> Result<Arc<LogicalPlan>> {
1524-
// tracks a mapping between a field name and the number of appearances of that field.
1525-
let mut name_map = HashMap::<&str, usize>::new();
1526-
// tracks all the fields and aliases that were previously seen.
1527-
let mut seen = HashSet::<Cow<String>>::new();
1528-
1521+
// Return a list of aliases so that if applied to `fields` it would result in a unique name for each
1522+
// column, regardless of qualification. The returned vector length is equal to the number of fields
1523+
// as input and will optionally contain the alias that needs to be assigned to the column in the
1524+
// same position in order to maintain the uniqueness property.
1525+
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
15291526
// Some field names might already come to this function with the count (number of times it appeared)
15301527
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
15311528
// if these three fields passed to this function: "col:1", "col" and "col", the function
1532-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1529+
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
15331530
// That's why we need the `seen` set, so the fields are always unique.
15341531

1535-
let aliases = input
1536-
.schema()
1532+
// Tracks a mapping between a field name and the number of appearances of that field.
1533+
let mut name_map = HashMap::<&str, usize>::new();
1534+
// Tracks all the fields and aliases that were previously seen.
1535+
let mut seen = HashSet::<Cow<String>>::new();
1536+
1537+
fields
15371538
.iter()
1538-
.map(|(_, field)| {
1539+
.map(|field| {
15391540
let original_name = field.name();
15401541
let mut name = Cow::Borrowed(original_name);
15411542

15421543
let count = name_map.entry(original_name).or_insert(0);
15431544

1544-
// Loop until we find a name that hasn't been used
1545+
// Loop until we find a name that hasn't been used.
15451546
while seen.contains(&name) {
15461547
*count += 1;
15471548
name = Cow::Owned(format!("{original_name}:{count}"));
@@ -1554,30 +1555,7 @@ pub fn maybe_project_redundant_column(
15541555
Cow::Owned(alias) => Some(alias),
15551556
}
15561557
})
1557-
.collect::<Vec<_>>();
1558-
1559-
// Check if there is at least an alias
1560-
let is_projection_needed = aliases.iter().any(Option::is_some);
1561-
1562-
if is_projection_needed {
1563-
let projection_expressions = aliases
1564-
.iter()
1565-
.zip(input.schema().iter())
1566-
.map(|(alias, (qualifier, field))| {
1567-
let column = Expr::Column(Column::new(qualifier.cloned(), field.name()));
1568-
match alias {
1569-
None => column,
1570-
Some(alias) => {
1571-
Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
1572-
}
1573-
}
1574-
})
1575-
.collect();
1576-
let projection = Projection::try_new(projection_expressions, input)?;
1577-
Ok(Arc::new(LogicalPlan::Projection(projection)))
1578-
} else {
1579-
Ok(input)
1580-
}
1558+
.collect()
15811559
}
15821560

15831561
fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
@@ -2787,4 +2765,33 @@ mod tests {
27872765

27882766
Ok(())
27892767
}
2768+
2769+
#[test]
2770+
fn test_unique_field_aliases() {
2771+
let t1_field_1 = Field::new("a", DataType::Int32, false);
2772+
let t2_field_1 = Field::new("a", DataType::Int32, false);
2773+
let t2_field_3 = Field::new("a", DataType::Int32, false);
2774+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2775+
let t1_field_2 = Field::new("b", DataType::Int32, false);
2776+
let t2_field_2 = Field::new("b", DataType::Int32, false);
2777+
2778+
let fields = vec![
2779+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2780+
];
2781+
let fields = Fields::from(fields);
2782+
2783+
let remove_redundant = unique_field_aliases(&fields);
2784+
2785+
assert_eq!(
2786+
remove_redundant,
2787+
vec![
2788+
None,
2789+
Some("a:1".to_string()),
2790+
None,
2791+
Some("b:1".to_string()),
2792+
Some("a:2".to_string()),
2793+
Some("a:1:1".to_string()),
2794+
]
2795+
);
2796+
}
27902797
}

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ use super::invariants::{
2929
InvariantLevel,
3030
};
3131
use super::DdlStatement;
32-
use crate::builder::{maybe_project_redundant_column, unnest_with_options};
32+
use crate::builder::{unique_field_aliases, unnest_with_options};
3333
use crate::expr::{
34-
intersect_metadata_for_union, Placeholder, Sort as SortExpr, WindowFunction,
34+
intersect_metadata_for_union, Alias, Placeholder, Sort as SortExpr, WindowFunction,
3535
WindowFunctionParams,
3636
};
3737
use crate::expr_rewriter::{
@@ -2239,8 +2239,38 @@ impl SubqueryAlias {
22392239
alias: impl Into<TableReference>,
22402240
) -> Result<Self> {
22412241
let alias = alias.into();
2242-
let plan = maybe_project_redundant_column(plan)?;
22432242

2243+
// Since SubqueryAlias will replace all field qualification for the output schema of `plan`,
2244+
// no field must share the same column name as this would lead to ambiguity when referencing
2245+
// columns in parent logical nodes.
2246+
2247+
// Compute unique aliases, if any, for each column of the input's schema.
2248+
let aliases = unique_field_aliases(plan.schema().fields());
2249+
let is_projection_needed = aliases.iter().any(Option::is_some);
2250+
2251+
// Insert a projection node, if needed, to make sure aliases are applied.
2252+
let plan = if is_projection_needed {
2253+
let projection_expressions = aliases
2254+
.iter()
2255+
.zip(plan.schema().iter())
2256+
.map(|(alias, (qualifier, field))| {
2257+
let column =
2258+
Expr::Column(Column::new(qualifier.cloned(), field.name()));
2259+
match alias {
2260+
None => column,
2261+
Some(alias) => {
2262+
Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
2263+
}
2264+
}
2265+
})
2266+
.collect();
2267+
let projection = Projection::try_new(projection_expressions, plan)?;
2268+
Arc::new(LogicalPlan::Projection(projection))
2269+
} else {
2270+
plan
2271+
};
2272+
2273+
// Requalify fields with the new `alias`.
22442274
let fields = plan.schema().fields().clone();
22452275
let meta_data = plan.schema().metadata().clone();
22462276
let func_dependencies = plan.schema().functional_dependencies().clone();

0 commit comments

Comments
 (0)