Skip to content

Commit ceea461

Browse files
fix(SubqueryAlias): use maybe_project_redundant_column (apache#17478) (#51)
* fix(SubqueryAlias): use maybe_project_redundant_column Fixes apache#17405 * chore: format * ci: retry * chore(SubqueryAlias): restructore duplicate detection and add tests * docs: add examples and context to the reproducer (cherry picked from commit c910db4) Co-authored-by: Filippo Rossi <12383260+notfilippo@users.noreply.github.com>
1 parent 5506e69 commit ceea461

File tree

4 files changed

+196
-197
lines changed

4 files changed

+196
-197
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 62 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
6363
use datafusion_common::display::ToStringifiedPlan;
64-
use datafusion_common::tree_node::{
65-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
66-
};
64+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6765
use datafusion_common::{
6866
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6967
ScalarValue,
@@ -83,7 +81,7 @@ use datafusion_expr::{
8381
WindowFrameBound, WriteOp,
8482
};
8583
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
86-
use datafusion_physical_expr::expressions::{Column, Literal};
84+
use datafusion_physical_expr::expressions::Literal;
8785
use datafusion_physical_expr::{
8886
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
8987
};
@@ -2177,11 +2175,7 @@ impl DefaultPhysicalPlanner {
21772175
let physical_expr =
21782176
self.create_physical_expr(e, input_logical_schema, session_state);
21792177

2180-
// Check for possible column name mismatches
2181-
let final_physical_expr =
2182-
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2183-
2184-
tuple_err((final_physical_expr, physical_name))
2178+
tuple_err((physical_expr, physical_name))
21852179
})
21862180
.collect::<Result<Vec<_>>>()?;
21872181

@@ -2287,47 +2281,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22872281
}
22882282
}
22892283

2290-
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2291-
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2292-
//
2293-
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2294-
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2295-
fn maybe_fix_physical_column_name(
2296-
expr: Result<Arc<dyn PhysicalExpr>>,
2297-
input_physical_schema: &SchemaRef,
2298-
) -> Result<Arc<dyn PhysicalExpr>> {
2299-
let Ok(expr) = expr else { return expr };
2300-
expr.transform_down(|node| {
2301-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2302-
let idx = column.index();
2303-
let physical_field = input_physical_schema.field(idx);
2304-
let expr_col_name = column.name();
2305-
let physical_name = physical_field.name();
2306-
2307-
if expr_col_name != physical_name {
2308-
// handle edge cases where the physical_name contains ':'.
2309-
let colon_count = physical_name.matches(':').count();
2310-
let mut splits = expr_col_name.match_indices(':');
2311-
let split_pos = splits.nth(colon_count);
2312-
2313-
if let Some((i, _)) = split_pos {
2314-
let base_name = &expr_col_name[..i];
2315-
if base_name == physical_name {
2316-
let updated_column = Column::new(physical_name, idx);
2317-
return Ok(Transformed::yes(Arc::new(updated_column)));
2318-
}
2319-
}
2320-
}
2321-
2322-
// If names already match or fix is not possible, just leave it as it is
2323-
Ok(Transformed::no(node))
2324-
} else {
2325-
Ok(Transformed::no(node))
2326-
}
2327-
})
2328-
.data()
2329-
}
2330-
23312284
struct OptimizationInvariantChecker<'a> {
23322285
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
23332286
}
@@ -2431,12 +2384,10 @@ mod tests {
24312384
};
24322385
use datafusion_execution::runtime_env::RuntimeEnv;
24332386
use datafusion_execution::TaskContext;
2434-
use datafusion_expr::{
2435-
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
2436-
};
2387+
use datafusion_expr::builder::subquery_alias;
2388+
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
24372389
use datafusion_functions_aggregate::count::count_all;
24382390
use datafusion_functions_aggregate::expr_fn::sum;
2439-
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
24402391
use datafusion_physical_expr::EquivalenceProperties;
24412392
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
24422393

@@ -2997,71 +2948,6 @@ mod tests {
29972948
}
29982949
}
29992950

3000-
#[tokio::test]
3001-
async fn test_maybe_fix_colon_in_physical_name() {
3002-
// The physical schema has a field name with a colon
3003-
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
3004-
let schema_ref: SchemaRef = Arc::new(schema);
3005-
3006-
// What might happen after deduplication
3007-
let logical_col_name = "metric:avg:1";
3008-
let expr_with_suffix =
3009-
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
3010-
let expr_result = Ok(expr_with_suffix);
3011-
3012-
// Call function under test
3013-
let fixed_expr =
3014-
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
3015-
3016-
// Downcast back to Column so we can check the name
3017-
let col = fixed_expr
3018-
.as_any()
3019-
.downcast_ref::<Column>()
3020-
.expect("Column");
3021-
3022-
assert_eq!(col.name(), "metric:avg");
3023-
}
3024-
3025-
#[tokio::test]
3026-
async fn test_maybe_fix_nested_column_name_with_colon() {
3027-
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
3028-
let schema_ref: SchemaRef = Arc::new(schema);
3029-
3030-
// Construct the nested expr
3031-
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
3032-
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));
3033-
3034-
// Create a binary expression and put the column inside
3035-
let binary_expr = Arc::new(BinaryExpr::new(
3036-
is_not_null_expr.clone(),
3037-
Operator::Or,
3038-
is_not_null_expr.clone(),
3039-
)) as Arc<dyn PhysicalExpr>;
3040-
3041-
let fixed_expr =
3042-
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();
3043-
3044-
let bin = fixed_expr
3045-
.as_any()
3046-
.downcast_ref::<BinaryExpr>()
3047-
.expect("Expected BinaryExpr");
3048-
3049-
// Check that both sides where renamed
3050-
for expr in &[bin.left(), bin.right()] {
3051-
let is_not_null = expr
3052-
.as_any()
3053-
.downcast_ref::<IsNotNullExpr>()
3054-
.expect("Expected IsNotNull");
3055-
3056-
let col = is_not_null
3057-
.arg()
3058-
.as_any()
3059-
.downcast_ref::<Column>()
3060-
.expect("Expected Column");
3061-
3062-
assert_eq!(col.name(), "column");
3063-
}
3064-
}
30652951
struct ErrorExtensionPlanner {}
30662952

30672953
#[async_trait]
@@ -3558,4 +3444,61 @@ digraph {
35583444

35593445
Ok(())
35603446
}
3447+
3448+
// Reproducer for DataFusion issue #17405:
3449+
//
3450+
// The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3451+
// clause is missing from the explicit logical plan:
3452+
//
3453+
// SELECT a FROM (
3454+
// -- SELECT left_table.a, right_table.a
3455+
// FROM left_table
3456+
// FULL JOIN right_table ON left_table.a = right_table.a
3457+
// ) AS alias
3458+
// GROUP BY a;
3459+
//
3460+
// As a result, the variables within `alias` subquery are not properly distinguished, which
3461+
// leads to a bug for logical and physical planning.
3462+
//
3463+
// The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3464+
// ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3465+
#[tokio::test]
3466+
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3467+
let state = make_session_state();
3468+
3469+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3470+
let schema = Arc::new(schema);
3471+
3472+
let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3473+
let table = Arc::new(table);
3474+
3475+
let source = DefaultTableSource::new(table);
3476+
let source = Arc::new(source);
3477+
3478+
let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3479+
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3480+
3481+
let join_keys = (
3482+
vec![datafusion_common::Column::new(Some("left"), "a")],
3483+
vec![datafusion_common::Column::new(Some("right"), "a")],
3484+
);
3485+
3486+
let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3487+
3488+
let alias = subquery_alias(join, "alias")?;
3489+
3490+
let planner = DefaultPhysicalPlanner::default();
3491+
3492+
let logical_plan = LogicalPlanBuilder::new(alias)
3493+
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3494+
.build()?;
3495+
let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3496+
3497+
let optimized_logical_plan = state.optimize(&logical_plan)?;
3498+
let _optimized_physical_plan = planner
3499+
.create_physical_plan(&optimized_logical_plan, &state)
3500+
.await?;
3501+
3502+
Ok(())
3503+
}
35613504
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module provides a builder for creating LogicalPlans
1919
2020
use std::any::Any;
21+
use std::borrow::Cow;
2122
use std::cmp::Ordering;
2223
use std::collections::{HashMap, HashSet};
2324
use std::iter::once;
@@ -1517,37 +1518,49 @@ impl ValuesFields {
15171518
}
15181519
}
15191520

1520-
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521-
//
1522-
// Some field names might already come to this function with the count (number of times it appeared)
1523-
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1524-
// if these three fields passed to this function: "col:1", "col" and "col", the function
1525-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526-
// that's why we need the `seen` set, so the fields are always unique.
1527-
//
1528-
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529-
let mut name_map = HashMap::new();
1530-
let mut seen: HashSet<String> = HashSet::new();
1521+
/// Returns aliases to make field names unique.
1522+
///
1523+
/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1524+
/// `Some(alias)` means rename to the alias to ensure uniqueness.
1525+
///
1526+
/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1527+
/// to maintain unique column names.
1528+
///
1529+
/// # Example
1530+
/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1531+
/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1532+
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1533+
// Some field names might already come to this function with the count (number of times it appeared)
1534+
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1535+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1536+
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1537+
// That's why we need the `seen` set, so the fields are always unique.
1538+
1539+
// Tracks a mapping between a field name and the number of appearances of that field.
1540+
let mut name_map = HashMap::<&str, usize>::new();
1541+
// Tracks all the fields and aliases that were previously seen.
1542+
let mut seen = HashSet::<Cow<String>>::new();
15311543

15321544
fields
1533-
.into_iter()
1545+
.iter()
15341546
.map(|field| {
1535-
let base_name = field.name();
1536-
let count = name_map.entry(base_name.clone()).or_insert(0);
1537-
let mut new_name = base_name.clone();
1547+
let original_name = field.name();
1548+
let mut name = Cow::Borrowed(original_name);
1549+
1550+
let count = name_map.entry(original_name).or_insert(0);
15381551

1539-
// Loop until we find a name that hasn't been used
1540-
while seen.contains(&new_name) {
1552+
// Loop until we find a name that hasn't been used.
1553+
while seen.contains(&name) {
15411554
*count += 1;
1542-
new_name = format!("{base_name}:{count}");
1555+
name = Cow::Owned(format!("{original_name}:{count}"));
15431556
}
15441557

1545-
seen.insert(new_name.clone());
1558+
seen.insert(name.clone());
15461559

1547-
let mut modified_field =
1548-
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549-
modified_field.set_metadata(field.metadata().clone());
1550-
modified_field
1560+
match name {
1561+
Cow::Borrowed(_) => None,
1562+
Cow::Owned(alias) => Some(alias),
1563+
}
15511564
})
15521565
.collect()
15531566
}
@@ -2675,34 +2688,6 @@ mod tests {
26752688
Ok(())
26762689
}
26772690

2678-
#[test]
2679-
fn test_change_redundant_column() -> Result<()> {
2680-
let t1_field_1 = Field::new("a", DataType::Int32, false);
2681-
let t2_field_1 = Field::new("a", DataType::Int32, false);
2682-
let t2_field_3 = Field::new("a", DataType::Int32, false);
2683-
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684-
let t1_field_2 = Field::new("b", DataType::Int32, false);
2685-
let t2_field_2 = Field::new("b", DataType::Int32, false);
2686-
2687-
let field_vec = vec![
2688-
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689-
];
2690-
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691-
2692-
assert_eq!(
2693-
remove_redundant,
2694-
vec![
2695-
Field::new("a", DataType::Int32, false),
2696-
Field::new("a:1", DataType::Int32, false),
2697-
Field::new("b", DataType::Int32, false),
2698-
Field::new("b:1", DataType::Int32, false),
2699-
Field::new("a:2", DataType::Int32, false),
2700-
Field::new("a:1:1", DataType::Int32, false),
2701-
]
2702-
);
2703-
Ok(())
2704-
}
2705-
27062691
#[test]
27072692
fn plan_builder_from_logical_plan() -> Result<()> {
27082693
let plan =
@@ -2787,4 +2772,39 @@ mod tests {
27872772

27882773
Ok(())
27892774
}
2775+
2776+
#[test]
2777+
fn test_unique_field_aliases() {
2778+
let t1_field_1 = Field::new("a", DataType::Int32, false);
2779+
let t2_field_1 = Field::new("a", DataType::Int32, false);
2780+
let t2_field_3 = Field::new("a", DataType::Int32, false);
2781+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2782+
let t1_field_2 = Field::new("b", DataType::Int32, false);
2783+
let t2_field_2 = Field::new("b", DataType::Int32, false);
2784+
2785+
let fields = vec![
2786+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2787+
];
2788+
let fields = Fields::from(fields);
2789+
2790+
let remove_redundant = unique_field_aliases(&fields);
2791+
2792+
// Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2793+
// First occurrence of each field name keeps original name (None), duplicates get
2794+
// incremental suffixes (:1, :2, etc.).
2795+
// Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2796+
// conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2797+
// as well to `a:1:1`.
2798+
assert_eq!(
2799+
remove_redundant,
2800+
vec![
2801+
None,
2802+
Some("a:1".to_string()),
2803+
None,
2804+
Some("b:1".to_string()),
2805+
Some("a:2".to_string()),
2806+
Some("a:1:1".to_string()),
2807+
]
2808+
);
2809+
}
27902810
}

0 commit comments

Comments
 (0)