Skip to content

Commit c77d96f

Browse files
notfilippohareshkh
authored andcommitted
fix(SubqueryAlias): use maybe_project_redundant_column (apache#17478)
* fix(SubqueryAlias): use maybe_project_redundant_column Fixes apache#17405 * chore: format * ci: retry * chore(SubqueryAlias): restructore duplicate detection and add tests * docs: add examples and context to the reproducer
1 parent 226cb02 commit c77d96f

File tree

4 files changed

+197
-198
lines changed

4 files changed

+197
-198
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 63 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
6363
use datafusion_common::display::ToStringifiedPlan;
64-
use datafusion_common::tree_node::{
65-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
66-
};
64+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
65+
use datafusion_common::TableReference;
6766
use datafusion_common::{
6867
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6968
ScalarValue,
@@ -83,7 +82,7 @@ use datafusion_expr::{
8382
WindowFrameBound, WriteOp,
8483
};
8584
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
86-
use datafusion_physical_expr::expressions::{Column, Literal};
85+
use datafusion_physical_expr::expressions::Literal;
8786
use datafusion_physical_expr::{
8887
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
8988
};
@@ -93,7 +92,6 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
9392
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9493
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
9594
use datafusion_physical_plan::unnest::ListUnnest;
96-
use datafusion_sql::TableReference;
9795
use sqlparser::ast::NullTreatment;
9896

9997
use async_trait::async_trait;
@@ -2185,11 +2183,7 @@ impl DefaultPhysicalPlanner {
21852183
let physical_expr =
21862184
self.create_physical_expr(e, input_logical_schema, session_state);
21872185

2188-
// Check for possible column name mismatches
2189-
let final_physical_expr =
2190-
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2191-
2192-
tuple_err((final_physical_expr, physical_name))
2186+
tuple_err((physical_expr, physical_name))
21932187
})
21942188
.collect::<Result<Vec<_>>>()?;
21952189

@@ -2295,47 +2289,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22952289
}
22962290
}
22972291

2298-
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2299-
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2300-
//
2301-
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2302-
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2303-
fn maybe_fix_physical_column_name(
2304-
expr: Result<Arc<dyn PhysicalExpr>>,
2305-
input_physical_schema: &SchemaRef,
2306-
) -> Result<Arc<dyn PhysicalExpr>> {
2307-
let Ok(expr) = expr else { return expr };
2308-
expr.transform_down(|node| {
2309-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2310-
let idx = column.index();
2311-
let physical_field = input_physical_schema.field(idx);
2312-
let expr_col_name = column.name();
2313-
let physical_name = physical_field.name();
2314-
2315-
if expr_col_name != physical_name {
2316-
// handle edge cases where the physical_name contains ':'.
2317-
let colon_count = physical_name.matches(':').count();
2318-
let mut splits = expr_col_name.match_indices(':');
2319-
let split_pos = splits.nth(colon_count);
2320-
2321-
if let Some((i, _)) = split_pos {
2322-
let base_name = &expr_col_name[..i];
2323-
if base_name == physical_name {
2324-
let updated_column = Column::new(physical_name, idx);
2325-
return Ok(Transformed::yes(Arc::new(updated_column)));
2326-
}
2327-
}
2328-
}
2329-
2330-
// If names already match or fix is not possible, just leave it as it is
2331-
Ok(Transformed::no(node))
2332-
} else {
2333-
Ok(Transformed::no(node))
2334-
}
2335-
})
2336-
.data()
2337-
}
2338-
23392292
struct OptimizationInvariantChecker<'a> {
23402293
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
23412294
}
@@ -2439,12 +2392,10 @@ mod tests {
24392392
};
24402393
use datafusion_execution::runtime_env::RuntimeEnv;
24412394
use datafusion_execution::TaskContext;
2442-
use datafusion_expr::{
2443-
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
2444-
};
2395+
use datafusion_expr::builder::subquery_alias;
2396+
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
24452397
use datafusion_functions_aggregate::count::count_all;
24462398
use datafusion_functions_aggregate::expr_fn::sum;
2447-
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
24482399
use datafusion_physical_expr::EquivalenceProperties;
24492400
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
24502401

@@ -3005,71 +2956,6 @@ mod tests {
30052956
}
30062957
}
30072958

3008-
#[tokio::test]
3009-
async fn test_maybe_fix_colon_in_physical_name() {
3010-
// The physical schema has a field name with a colon
3011-
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
3012-
let schema_ref: SchemaRef = Arc::new(schema);
3013-
3014-
// What might happen after deduplication
3015-
let logical_col_name = "metric:avg:1";
3016-
let expr_with_suffix =
3017-
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
3018-
let expr_result = Ok(expr_with_suffix);
3019-
3020-
// Call function under test
3021-
let fixed_expr =
3022-
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
3023-
3024-
// Downcast back to Column so we can check the name
3025-
let col = fixed_expr
3026-
.as_any()
3027-
.downcast_ref::<Column>()
3028-
.expect("Column");
3029-
3030-
assert_eq!(col.name(), "metric:avg");
3031-
}
3032-
3033-
#[tokio::test]
3034-
async fn test_maybe_fix_nested_column_name_with_colon() {
3035-
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
3036-
let schema_ref: SchemaRef = Arc::new(schema);
3037-
3038-
// Construct the nested expr
3039-
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
3040-
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));
3041-
3042-
// Create a binary expression and put the column inside
3043-
let binary_expr = Arc::new(BinaryExpr::new(
3044-
is_not_null_expr.clone(),
3045-
Operator::Or,
3046-
is_not_null_expr.clone(),
3047-
)) as Arc<dyn PhysicalExpr>;
3048-
3049-
let fixed_expr =
3050-
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();
3051-
3052-
let bin = fixed_expr
3053-
.as_any()
3054-
.downcast_ref::<BinaryExpr>()
3055-
.expect("Expected BinaryExpr");
3056-
3057-
// Check that both sides where renamed
3058-
for expr in &[bin.left(), bin.right()] {
3059-
let is_not_null = expr
3060-
.as_any()
3061-
.downcast_ref::<IsNotNullExpr>()
3062-
.expect("Expected IsNotNull");
3063-
3064-
let col = is_not_null
3065-
.arg()
3066-
.as_any()
3067-
.downcast_ref::<Column>()
3068-
.expect("Expected Column");
3069-
3070-
assert_eq!(col.name(), "column");
3071-
}
3072-
}
30732959
struct ErrorExtensionPlanner {}
30742960

30752961
#[async_trait]
@@ -3566,4 +3452,61 @@ digraph {
35663452

35673453
Ok(())
35683454
}
3455+
3456+
// Reproducer for DataFusion issue #17405:
3457+
//
3458+
// The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3459+
// clause is missing from the explicit logical plan:
3460+
//
3461+
// SELECT a FROM (
3462+
// -- SELECT left_table.a, right_table.a
3463+
// FROM left_table
3464+
// FULL JOIN right_table ON left_table.a = right_table.a
3465+
// ) AS alias
3466+
// GROUP BY a;
3467+
//
3468+
// As a result, the variables within `alias` subquery are not properly distinguished, which
3469+
// leads to a bug for logical and physical planning.
3470+
//
3471+
// The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3472+
// ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3473+
#[tokio::test]
3474+
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3475+
let state = make_session_state();
3476+
3477+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3478+
let schema = Arc::new(schema);
3479+
3480+
let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3481+
let table = Arc::new(table);
3482+
3483+
let source = DefaultTableSource::new(table);
3484+
let source = Arc::new(source);
3485+
3486+
let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3487+
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3488+
3489+
let join_keys = (
3490+
vec![datafusion_common::Column::new(Some("left"), "a")],
3491+
vec![datafusion_common::Column::new(Some("right"), "a")],
3492+
);
3493+
3494+
let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3495+
3496+
let alias = subquery_alias(join, "alias")?;
3497+
3498+
let planner = DefaultPhysicalPlanner::default();
3499+
3500+
let logical_plan = LogicalPlanBuilder::new(alias)
3501+
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3502+
.build()?;
3503+
let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3504+
3505+
let optimized_logical_plan = state.optimize(&logical_plan)?;
3506+
let _optimized_physical_plan = planner
3507+
.create_physical_plan(&optimized_logical_plan, &state)
3508+
.await?;
3509+
3510+
Ok(())
3511+
}
35693512
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module provides a builder for creating LogicalPlans
1919
2020
use std::any::Any;
21+
use std::borrow::Cow;
2122
use std::cmp::Ordering;
2223
use std::collections::{HashMap, HashSet};
2324
use std::iter::once;
@@ -1517,37 +1518,49 @@ impl ValuesFields {
15171518
}
15181519
}
15191520

1520-
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521-
//
1522-
// Some field names might already come to this function with the count (number of times it appeared)
1523-
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1524-
// if these three fields passed to this function: "col:1", "col" and "col", the function
1525-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526-
// that's why we need the `seen` set, so the fields are always unique.
1527-
//
1528-
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529-
let mut name_map = HashMap::new();
1530-
let mut seen: HashSet<String> = HashSet::new();
1521+
/// Returns aliases to make field names unique.
1522+
///
1523+
/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1524+
/// `Some(alias)` means rename to the alias to ensure uniqueness.
1525+
///
1526+
/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1527+
/// to maintain unique column names.
1528+
///
1529+
/// # Example
1530+
/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1531+
/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1532+
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1533+
// Some field names might already come to this function with the count (number of times it appeared)
1534+
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1535+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1536+
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1537+
// That's why we need the `seen` set, so the fields are always unique.
1538+
1539+
// Tracks a mapping between a field name and the number of appearances of that field.
1540+
let mut name_map = HashMap::<&str, usize>::new();
1541+
// Tracks all the fields and aliases that were previously seen.
1542+
let mut seen = HashSet::<Cow<String>>::new();
15311543

15321544
fields
1533-
.into_iter()
1545+
.iter()
15341546
.map(|field| {
1535-
let base_name = field.name();
1536-
let count = name_map.entry(base_name.clone()).or_insert(0);
1537-
let mut new_name = base_name.clone();
1547+
let original_name = field.name();
1548+
let mut name = Cow::Borrowed(original_name);
1549+
1550+
let count = name_map.entry(original_name).or_insert(0);
15381551

1539-
// Loop until we find a name that hasn't been used
1540-
while seen.contains(&new_name) {
1552+
// Loop until we find a name that hasn't been used.
1553+
while seen.contains(&name) {
15411554
*count += 1;
1542-
new_name = format!("{base_name}:{count}");
1555+
name = Cow::Owned(format!("{original_name}:{count}"));
15431556
}
15441557

1545-
seen.insert(new_name.clone());
1558+
seen.insert(name.clone());
15461559

1547-
let mut modified_field =
1548-
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549-
modified_field.set_metadata(field.metadata().clone());
1550-
modified_field
1560+
match name {
1561+
Cow::Borrowed(_) => None,
1562+
Cow::Owned(alias) => Some(alias),
1563+
}
15511564
})
15521565
.collect()
15531566
}
@@ -2675,34 +2688,6 @@ mod tests {
26752688
Ok(())
26762689
}
26772690

2678-
#[test]
2679-
fn test_change_redundant_column() -> Result<()> {
2680-
let t1_field_1 = Field::new("a", DataType::Int32, false);
2681-
let t2_field_1 = Field::new("a", DataType::Int32, false);
2682-
let t2_field_3 = Field::new("a", DataType::Int32, false);
2683-
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684-
let t1_field_2 = Field::new("b", DataType::Int32, false);
2685-
let t2_field_2 = Field::new("b", DataType::Int32, false);
2686-
2687-
let field_vec = vec![
2688-
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689-
];
2690-
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691-
2692-
assert_eq!(
2693-
remove_redundant,
2694-
vec![
2695-
Field::new("a", DataType::Int32, false),
2696-
Field::new("a:1", DataType::Int32, false),
2697-
Field::new("b", DataType::Int32, false),
2698-
Field::new("b:1", DataType::Int32, false),
2699-
Field::new("a:2", DataType::Int32, false),
2700-
Field::new("a:1:1", DataType::Int32, false),
2701-
]
2702-
);
2703-
Ok(())
2704-
}
2705-
27062691
#[test]
27072692
fn plan_builder_from_logical_plan() -> Result<()> {
27082693
let plan =
@@ -2787,4 +2772,39 @@ mod tests {
27872772

27882773
Ok(())
27892774
}
2775+
2776+
#[test]
2777+
fn test_unique_field_aliases() {
2778+
let t1_field_1 = Field::new("a", DataType::Int32, false);
2779+
let t2_field_1 = Field::new("a", DataType::Int32, false);
2780+
let t2_field_3 = Field::new("a", DataType::Int32, false);
2781+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2782+
let t1_field_2 = Field::new("b", DataType::Int32, false);
2783+
let t2_field_2 = Field::new("b", DataType::Int32, false);
2784+
2785+
let fields = vec![
2786+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2787+
];
2788+
let fields = Fields::from(fields);
2789+
2790+
let remove_redundant = unique_field_aliases(&fields);
2791+
2792+
// Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2793+
// First occurrence of each field name keeps original name (None), duplicates get
2794+
// incremental suffixes (:1, :2, etc.).
2795+
// Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2796+
// conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2797+
// as well to `a:1:1`.
2798+
assert_eq!(
2799+
remove_redundant,
2800+
vec![
2801+
None,
2802+
Some("a:1".to_string()),
2803+
None,
2804+
Some("b:1".to_string()),
2805+
Some("a:2".to_string()),
2806+
Some("a:1:1".to_string()),
2807+
]
2808+
);
2809+
}
27902810
}

0 commit comments

Comments
 (0)