Skip to content

Commit c910db4

Browse files
authored
fix(SubqueryAlias): use maybe_project_redundant_column (#17478)
* fix(SubqueryAlias): use maybe_project_redundant_column Fixes #17405 * chore: format * ci: retry * chore(SubqueryAlias): restructore duplicate detection and add tests * docs: add examples and context to the reproducer
1 parent 13208e6 commit c910db4

File tree

4 files changed

+196
-197
lines changed

4 files changed

+196
-197
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 62 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,7 @@ use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
6363
use datafusion_catalog::ScanArgs;
6464
use datafusion_common::display::ToStringifiedPlan;
65-
use datafusion_common::tree_node::{
66-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
67-
};
65+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6866
use datafusion_common::TableReference;
6967
use datafusion_common::{
7068
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
@@ -85,7 +83,7 @@ use datafusion_expr::{
8583
WindowFrameBound, WriteOp,
8684
};
8785
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
88-
use datafusion_physical_expr::expressions::{Column, Literal};
86+
use datafusion_physical_expr::expressions::Literal;
8987
use datafusion_physical_expr::{
9088
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
9189
};
@@ -2181,11 +2179,7 @@ impl DefaultPhysicalPlanner {
21812179
let physical_expr =
21822180
self.create_physical_expr(e, input_logical_schema, session_state);
21832181

2184-
// Check for possible column name mismatches
2185-
let final_physical_expr =
2186-
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2187-
2188-
tuple_err((final_physical_expr, physical_name))
2182+
tuple_err((physical_expr, physical_name))
21892183
})
21902184
.collect::<Result<Vec<_>>>()?;
21912185

@@ -2291,47 +2285,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22912285
}
22922286
}
22932287

2294-
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2295-
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2296-
//
2297-
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2298-
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2299-
fn maybe_fix_physical_column_name(
2300-
expr: Result<Arc<dyn PhysicalExpr>>,
2301-
input_physical_schema: &SchemaRef,
2302-
) -> Result<Arc<dyn PhysicalExpr>> {
2303-
let Ok(expr) = expr else { return expr };
2304-
expr.transform_down(|node| {
2305-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2306-
let idx = column.index();
2307-
let physical_field = input_physical_schema.field(idx);
2308-
let expr_col_name = column.name();
2309-
let physical_name = physical_field.name();
2310-
2311-
if expr_col_name != physical_name {
2312-
// handle edge cases where the physical_name contains ':'.
2313-
let colon_count = physical_name.matches(':').count();
2314-
let mut splits = expr_col_name.match_indices(':');
2315-
let split_pos = splits.nth(colon_count);
2316-
2317-
if let Some((i, _)) = split_pos {
2318-
let base_name = &expr_col_name[..i];
2319-
if base_name == physical_name {
2320-
let updated_column = Column::new(physical_name, idx);
2321-
return Ok(Transformed::yes(Arc::new(updated_column)));
2322-
}
2323-
}
2324-
}
2325-
2326-
// If names already match or fix is not possible, just leave it as it is
2327-
Ok(Transformed::no(node))
2328-
} else {
2329-
Ok(Transformed::no(node))
2330-
}
2331-
})
2332-
.data()
2333-
}
2334-
23352288
struct OptimizationInvariantChecker<'a> {
23362289
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
23372290
}
@@ -2435,12 +2388,10 @@ mod tests {
24352388
};
24362389
use datafusion_execution::runtime_env::RuntimeEnv;
24372390
use datafusion_execution::TaskContext;
2438-
use datafusion_expr::{
2439-
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
2440-
};
2391+
use datafusion_expr::builder::subquery_alias;
2392+
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
24412393
use datafusion_functions_aggregate::count::count_all;
24422394
use datafusion_functions_aggregate::expr_fn::sum;
2443-
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
24442395
use datafusion_physical_expr::EquivalenceProperties;
24452396
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
24462397

@@ -3001,71 +2952,6 @@ mod tests {
30012952
}
30022953
}
30032954

3004-
#[tokio::test]
3005-
async fn test_maybe_fix_colon_in_physical_name() {
3006-
// The physical schema has a field name with a colon
3007-
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
3008-
let schema_ref: SchemaRef = Arc::new(schema);
3009-
3010-
// What might happen after deduplication
3011-
let logical_col_name = "metric:avg:1";
3012-
let expr_with_suffix =
3013-
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
3014-
let expr_result = Ok(expr_with_suffix);
3015-
3016-
// Call function under test
3017-
let fixed_expr =
3018-
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
3019-
3020-
// Downcast back to Column so we can check the name
3021-
let col = fixed_expr
3022-
.as_any()
3023-
.downcast_ref::<Column>()
3024-
.expect("Column");
3025-
3026-
assert_eq!(col.name(), "metric:avg");
3027-
}
3028-
3029-
#[tokio::test]
3030-
async fn test_maybe_fix_nested_column_name_with_colon() {
3031-
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
3032-
let schema_ref: SchemaRef = Arc::new(schema);
3033-
3034-
// Construct the nested expr
3035-
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
3036-
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));
3037-
3038-
// Create a binary expression and put the column inside
3039-
let binary_expr = Arc::new(BinaryExpr::new(
3040-
is_not_null_expr.clone(),
3041-
Operator::Or,
3042-
is_not_null_expr.clone(),
3043-
)) as Arc<dyn PhysicalExpr>;
3044-
3045-
let fixed_expr =
3046-
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();
3047-
3048-
let bin = fixed_expr
3049-
.as_any()
3050-
.downcast_ref::<BinaryExpr>()
3051-
.expect("Expected BinaryExpr");
3052-
3053-
// Check that both sides where renamed
3054-
for expr in &[bin.left(), bin.right()] {
3055-
let is_not_null = expr
3056-
.as_any()
3057-
.downcast_ref::<IsNotNullExpr>()
3058-
.expect("Expected IsNotNull");
3059-
3060-
let col = is_not_null
3061-
.arg()
3062-
.as_any()
3063-
.downcast_ref::<Column>()
3064-
.expect("Expected Column");
3065-
3066-
assert_eq!(col.name(), "column");
3067-
}
3068-
}
30692955
struct ErrorExtensionPlanner {}
30702956

30712957
#[async_trait]
@@ -3562,4 +3448,61 @@ digraph {
35623448

35633449
Ok(())
35643450
}
3451+
3452+
// Reproducer for DataFusion issue #17405:
3453+
//
3454+
// The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3455+
// clause is missing from the explicit logical plan:
3456+
//
3457+
// SELECT a FROM (
3458+
// -- SELECT left_table.a, right_table.a
3459+
// FROM left_table
3460+
// FULL JOIN right_table ON left_table.a = right_table.a
3461+
// ) AS alias
3462+
// GROUP BY a;
3463+
//
3464+
// As a result, the variables within `alias` subquery are not properly distinguished, which
3465+
// leads to a bug for logical and physical planning.
3466+
//
3467+
// The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3468+
// ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3469+
#[tokio::test]
3470+
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3471+
let state = make_session_state();
3472+
3473+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3474+
let schema = Arc::new(schema);
3475+
3476+
let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3477+
let table = Arc::new(table);
3478+
3479+
let source = DefaultTableSource::new(table);
3480+
let source = Arc::new(source);
3481+
3482+
let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3483+
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3484+
3485+
let join_keys = (
3486+
vec![datafusion_common::Column::new(Some("left"), "a")],
3487+
vec![datafusion_common::Column::new(Some("right"), "a")],
3488+
);
3489+
3490+
let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3491+
3492+
let alias = subquery_alias(join, "alias")?;
3493+
3494+
let planner = DefaultPhysicalPlanner::default();
3495+
3496+
let logical_plan = LogicalPlanBuilder::new(alias)
3497+
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3498+
.build()?;
3499+
let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3500+
3501+
let optimized_logical_plan = state.optimize(&logical_plan)?;
3502+
let _optimized_physical_plan = planner
3503+
.create_physical_plan(&optimized_logical_plan, &state)
3504+
.await?;
3505+
3506+
Ok(())
3507+
}
35653508
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module provides a builder for creating LogicalPlans
1919
2020
use std::any::Any;
21+
use std::borrow::Cow;
2122
use std::cmp::Ordering;
2223
use std::collections::{HashMap, HashSet};
2324
use std::iter::once;
@@ -1517,37 +1518,49 @@ impl ValuesFields {
15171518
}
15181519
}
15191520

1520-
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521-
//
1522-
// Some field names might already come to this function with the count (number of times it appeared)
1523-
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1524-
// if these three fields passed to this function: "col:1", "col" and "col", the function
1525-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526-
// that's why we need the `seen` set, so the fields are always unique.
1527-
//
1528-
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529-
let mut name_map = HashMap::new();
1530-
let mut seen: HashSet<String> = HashSet::new();
1521+
/// Returns aliases to make field names unique.
1522+
///
1523+
/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1524+
/// `Some(alias)` means rename to the alias to ensure uniqueness.
1525+
///
1526+
/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1527+
/// to maintain unique column names.
1528+
///
1529+
/// # Example
1530+
/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1531+
/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1532+
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1533+
// Some field names might already come to this function with the count (number of times it appeared)
1534+
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1535+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1536+
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1537+
// That's why we need the `seen` set, so the fields are always unique.
1538+
1539+
// Tracks a mapping between a field name and the number of appearances of that field.
1540+
let mut name_map = HashMap::<&str, usize>::new();
1541+
// Tracks all the fields and aliases that were previously seen.
1542+
let mut seen = HashSet::<Cow<String>>::new();
15311543

15321544
fields
1533-
.into_iter()
1545+
.iter()
15341546
.map(|field| {
1535-
let base_name = field.name();
1536-
let count = name_map.entry(base_name.clone()).or_insert(0);
1537-
let mut new_name = base_name.clone();
1547+
let original_name = field.name();
1548+
let mut name = Cow::Borrowed(original_name);
1549+
1550+
let count = name_map.entry(original_name).or_insert(0);
15381551

1539-
// Loop until we find a name that hasn't been used
1540-
while seen.contains(&new_name) {
1552+
// Loop until we find a name that hasn't been used.
1553+
while seen.contains(&name) {
15411554
*count += 1;
1542-
new_name = format!("{base_name}:{count}");
1555+
name = Cow::Owned(format!("{original_name}:{count}"));
15431556
}
15441557

1545-
seen.insert(new_name.clone());
1558+
seen.insert(name.clone());
15461559

1547-
let mut modified_field =
1548-
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549-
modified_field.set_metadata(field.metadata().clone());
1550-
modified_field
1560+
match name {
1561+
Cow::Borrowed(_) => None,
1562+
Cow::Owned(alias) => Some(alias),
1563+
}
15511564
})
15521565
.collect()
15531566
}
@@ -2675,34 +2688,6 @@ mod tests {
26752688
Ok(())
26762689
}
26772690

2678-
#[test]
2679-
fn test_change_redundant_column() -> Result<()> {
2680-
let t1_field_1 = Field::new("a", DataType::Int32, false);
2681-
let t2_field_1 = Field::new("a", DataType::Int32, false);
2682-
let t2_field_3 = Field::new("a", DataType::Int32, false);
2683-
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684-
let t1_field_2 = Field::new("b", DataType::Int32, false);
2685-
let t2_field_2 = Field::new("b", DataType::Int32, false);
2686-
2687-
let field_vec = vec![
2688-
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689-
];
2690-
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691-
2692-
assert_eq!(
2693-
remove_redundant,
2694-
vec![
2695-
Field::new("a", DataType::Int32, false),
2696-
Field::new("a:1", DataType::Int32, false),
2697-
Field::new("b", DataType::Int32, false),
2698-
Field::new("b:1", DataType::Int32, false),
2699-
Field::new("a:2", DataType::Int32, false),
2700-
Field::new("a:1:1", DataType::Int32, false),
2701-
]
2702-
);
2703-
Ok(())
2704-
}
2705-
27062691
#[test]
27072692
fn plan_builder_from_logical_plan() -> Result<()> {
27082693
let plan =
@@ -2787,4 +2772,39 @@ mod tests {
27872772

27882773
Ok(())
27892774
}
2775+
2776+
#[test]
2777+
fn test_unique_field_aliases() {
2778+
let t1_field_1 = Field::new("a", DataType::Int32, false);
2779+
let t2_field_1 = Field::new("a", DataType::Int32, false);
2780+
let t2_field_3 = Field::new("a", DataType::Int32, false);
2781+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2782+
let t1_field_2 = Field::new("b", DataType::Int32, false);
2783+
let t2_field_2 = Field::new("b", DataType::Int32, false);
2784+
2785+
let fields = vec![
2786+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2787+
];
2788+
let fields = Fields::from(fields);
2789+
2790+
let remove_redundant = unique_field_aliases(&fields);
2791+
2792+
// Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2793+
// First occurrence of each field name keeps original name (None), duplicates get
2794+
// incremental suffixes (:1, :2, etc.).
2795+
// Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2796+
// conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2797+
// as well to `a:1:1`.
2798+
assert_eq!(
2799+
remove_redundant,
2800+
vec![
2801+
None,
2802+
Some("a:1".to_string()),
2803+
None,
2804+
Some("b:1".to_string()),
2805+
Some("a:2".to_string()),
2806+
Some("a:1:1".to_string()),
2807+
]
2808+
);
2809+
}
27902810
}

0 commit comments

Comments
 (0)