Skip to content

Commit 5226650

Browse files
hareshkhnotfilippoalamb
authored
[branch-50] fix(SubqueryAlias): use maybe_project_redundant_column (apache#17478) (apache#18130)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Related to apache#17405 - Related to apache#18072 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See apache#17478 ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> See apache#17478 ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Co-authored-by: Filippo Rossi <12383260+notfilippo@users.noreply.github.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent d554f1c commit 5226650

File tree

4 files changed

+197
-198
lines changed

4 files changed

+197
-198
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 63 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,8 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
6363
use datafusion_common::display::ToStringifiedPlan;
64-
use datafusion_common::tree_node::{
65-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
66-
};
64+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
65+
use datafusion_common::TableReference;
6766
use datafusion_common::{
6867
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6968
ScalarValue,
@@ -83,7 +82,7 @@ use datafusion_expr::{
8382
WindowFrameBound, WriteOp,
8483
};
8584
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
86-
use datafusion_physical_expr::expressions::{Column, Literal};
85+
use datafusion_physical_expr::expressions::Literal;
8786
use datafusion_physical_expr::{
8887
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
8988
};
@@ -93,7 +92,6 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
9392
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9493
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
9594
use datafusion_physical_plan::unnest::ListUnnest;
96-
use datafusion_sql::TableReference;
9795
use sqlparser::ast::NullTreatment;
9896

9997
use async_trait::async_trait;
@@ -2185,11 +2183,7 @@ impl DefaultPhysicalPlanner {
21852183
let physical_expr =
21862184
self.create_physical_expr(e, input_logical_schema, session_state);
21872185

2188-
// Check for possible column name mismatches
2189-
let final_physical_expr =
2190-
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2191-
2192-
tuple_err((final_physical_expr, physical_name))
2186+
tuple_err((physical_expr, physical_name))
21932187
})
21942188
.collect::<Result<Vec<_>>>()?;
21952189

@@ -2295,47 +2289,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22952289
}
22962290
}
22972291

2298-
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2299-
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2300-
//
2301-
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2302-
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2303-
fn maybe_fix_physical_column_name(
2304-
expr: Result<Arc<dyn PhysicalExpr>>,
2305-
input_physical_schema: &SchemaRef,
2306-
) -> Result<Arc<dyn PhysicalExpr>> {
2307-
let Ok(expr) = expr else { return expr };
2308-
expr.transform_down(|node| {
2309-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2310-
let idx = column.index();
2311-
let physical_field = input_physical_schema.field(idx);
2312-
let expr_col_name = column.name();
2313-
let physical_name = physical_field.name();
2314-
2315-
if expr_col_name != physical_name {
2316-
// handle edge cases where the physical_name contains ':'.
2317-
let colon_count = physical_name.matches(':').count();
2318-
let mut splits = expr_col_name.match_indices(':');
2319-
let split_pos = splits.nth(colon_count);
2320-
2321-
if let Some((i, _)) = split_pos {
2322-
let base_name = &expr_col_name[..i];
2323-
if base_name == physical_name {
2324-
let updated_column = Column::new(physical_name, idx);
2325-
return Ok(Transformed::yes(Arc::new(updated_column)));
2326-
}
2327-
}
2328-
}
2329-
2330-
// If names already match or fix is not possible, just leave it as it is
2331-
Ok(Transformed::no(node))
2332-
} else {
2333-
Ok(Transformed::no(node))
2334-
}
2335-
})
2336-
.data()
2337-
}
2338-
23392292
struct OptimizationInvariantChecker<'a> {
23402293
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
23412294
}
@@ -2439,12 +2392,10 @@ mod tests {
24392392
};
24402393
use datafusion_execution::runtime_env::RuntimeEnv;
24412394
use datafusion_execution::TaskContext;
2442-
use datafusion_expr::{
2443-
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
2444-
};
2395+
use datafusion_expr::builder::subquery_alias;
2396+
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
24452397
use datafusion_functions_aggregate::count::count_all;
24462398
use datafusion_functions_aggregate::expr_fn::sum;
2447-
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
24482399
use datafusion_physical_expr::EquivalenceProperties;
24492400
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
24502401

@@ -3005,71 +2956,6 @@ mod tests {
30052956
}
30062957
}
30072958

3008-
#[tokio::test]
3009-
async fn test_maybe_fix_colon_in_physical_name() {
3010-
// The physical schema has a field name with a colon
3011-
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
3012-
let schema_ref: SchemaRef = Arc::new(schema);
3013-
3014-
// What might happen after deduplication
3015-
let logical_col_name = "metric:avg:1";
3016-
let expr_with_suffix =
3017-
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
3018-
let expr_result = Ok(expr_with_suffix);
3019-
3020-
// Call function under test
3021-
let fixed_expr =
3022-
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
3023-
3024-
// Downcast back to Column so we can check the name
3025-
let col = fixed_expr
3026-
.as_any()
3027-
.downcast_ref::<Column>()
3028-
.expect("Column");
3029-
3030-
assert_eq!(col.name(), "metric:avg");
3031-
}
3032-
3033-
#[tokio::test]
3034-
async fn test_maybe_fix_nested_column_name_with_colon() {
3035-
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
3036-
let schema_ref: SchemaRef = Arc::new(schema);
3037-
3038-
// Construct the nested expr
3039-
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
3040-
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));
3041-
3042-
// Create a binary expression and put the column inside
3043-
let binary_expr = Arc::new(BinaryExpr::new(
3044-
is_not_null_expr.clone(),
3045-
Operator::Or,
3046-
is_not_null_expr.clone(),
3047-
)) as Arc<dyn PhysicalExpr>;
3048-
3049-
let fixed_expr =
3050-
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();
3051-
3052-
let bin = fixed_expr
3053-
.as_any()
3054-
.downcast_ref::<BinaryExpr>()
3055-
.expect("Expected BinaryExpr");
3056-
3057-
// Check that both sides where renamed
3058-
for expr in &[bin.left(), bin.right()] {
3059-
let is_not_null = expr
3060-
.as_any()
3061-
.downcast_ref::<IsNotNullExpr>()
3062-
.expect("Expected IsNotNull");
3063-
3064-
let col = is_not_null
3065-
.arg()
3066-
.as_any()
3067-
.downcast_ref::<Column>()
3068-
.expect("Expected Column");
3069-
3070-
assert_eq!(col.name(), "column");
3071-
}
3072-
}
30732959
struct ErrorExtensionPlanner {}
30742960

30752961
#[async_trait]
@@ -3566,4 +3452,61 @@ digraph {
35663452

35673453
Ok(())
35683454
}
3455+
3456+
// Reproducer for DataFusion issue #17405:
3457+
//
3458+
// The following SQL is semantically invalid. Notably, the `SELECT left_table.a, right_table.a`
3459+
// clause is missing from the explicit logical plan:
3460+
//
3461+
// SELECT a FROM (
3462+
// -- SELECT left_table.a, right_table.a
3463+
// FROM left_table
3464+
// FULL JOIN right_table ON left_table.a = right_table.a
3465+
// ) AS alias
3466+
// GROUP BY a;
3467+
//
3468+
// As a result, the variables within `alias` subquery are not properly distinguished, which
3469+
// leads to a bug for logical and physical planning.
3470+
//
3471+
// The fix is to implicitly insert a Projection node to represent the missing SELECT clause to
3472+
// ensure each field is correctly aliased to a unique name when the SubqueryAlias node is added.
3473+
#[tokio::test]
3474+
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3475+
let state = make_session_state();
3476+
3477+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3478+
let schema = Arc::new(schema);
3479+
3480+
let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3481+
let table = Arc::new(table);
3482+
3483+
let source = DefaultTableSource::new(table);
3484+
let source = Arc::new(source);
3485+
3486+
let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3487+
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3488+
3489+
let join_keys = (
3490+
vec![datafusion_common::Column::new(Some("left"), "a")],
3491+
vec![datafusion_common::Column::new(Some("right"), "a")],
3492+
);
3493+
3494+
let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3495+
3496+
let alias = subquery_alias(join, "alias")?;
3497+
3498+
let planner = DefaultPhysicalPlanner::default();
3499+
3500+
let logical_plan = LogicalPlanBuilder::new(alias)
3501+
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3502+
.build()?;
3503+
let _physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3504+
3505+
let optimized_logical_plan = state.optimize(&logical_plan)?;
3506+
let _optimized_physical_plan = planner
3507+
.create_physical_plan(&optimized_logical_plan, &state)
3508+
.await?;
3509+
3510+
Ok(())
3511+
}
35693512
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module provides a builder for creating LogicalPlans
1919
2020
use std::any::Any;
21+
use std::borrow::Cow;
2122
use std::cmp::Ordering;
2223
use std::collections::{HashMap, HashSet};
2324
use std::iter::once;
@@ -1517,37 +1518,49 @@ impl ValuesFields {
15171518
}
15181519
}
15191520

1520-
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521-
//
1522-
// Some field names might already come to this function with the count (number of times it appeared)
1523-
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1524-
// if these three fields passed to this function: "col:1", "col" and "col", the function
1525-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526-
// that's why we need the `seen` set, so the fields are always unique.
1527-
//
1528-
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529-
let mut name_map = HashMap::new();
1530-
let mut seen: HashSet<String> = HashSet::new();
1521+
/// Returns aliases to make field names unique.
1522+
///
1523+
/// Returns a vector of optional aliases, one per input field. `None` means keep the original name,
1524+
/// `Some(alias)` means rename to the alias to ensure uniqueness.
1525+
///
1526+
/// Used when creating [`SubqueryAlias`] or similar operations that strip table qualifiers but need
1527+
/// to maintain unique column names.
1528+
///
1529+
/// # Example
1530+
/// Input fields: `[a, a, b, b, a, a:1]` ([`DFSchema`] valid when duplicate fields have different qualifiers)
1531+
/// Returns: `[None, Some("a:1"), None, Some("b:1"), Some("a:2"), Some("a:1:1")]`
1532+
pub fn unique_field_aliases(fields: &Fields) -> Vec<Option<String>> {
1533+
// Some field names might already come to this function with the count (number of times it appeared)
1534+
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1535+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1536+
// would rename them to -> col:1, col, col:1 causing a posterior error when building the DFSchema.
1537+
// That's why we need the `seen` set, so the fields are always unique.
1538+
1539+
// Tracks a mapping between a field name and the number of appearances of that field.
1540+
let mut name_map = HashMap::<&str, usize>::new();
1541+
// Tracks all the fields and aliases that were previously seen.
1542+
let mut seen = HashSet::<Cow<String>>::new();
15311543

15321544
fields
1533-
.into_iter()
1545+
.iter()
15341546
.map(|field| {
1535-
let base_name = field.name();
1536-
let count = name_map.entry(base_name.clone()).or_insert(0);
1537-
let mut new_name = base_name.clone();
1547+
let original_name = field.name();
1548+
let mut name = Cow::Borrowed(original_name);
1549+
1550+
let count = name_map.entry(original_name).or_insert(0);
15381551

1539-
// Loop until we find a name that hasn't been used
1540-
while seen.contains(&new_name) {
1552+
// Loop until we find a name that hasn't been used.
1553+
while seen.contains(&name) {
15411554
*count += 1;
1542-
new_name = format!("{base_name}:{count}");
1555+
name = Cow::Owned(format!("{original_name}:{count}"));
15431556
}
15441557

1545-
seen.insert(new_name.clone());
1558+
seen.insert(name.clone());
15461559

1547-
let mut modified_field =
1548-
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549-
modified_field.set_metadata(field.metadata().clone());
1550-
modified_field
1560+
match name {
1561+
Cow::Borrowed(_) => None,
1562+
Cow::Owned(alias) => Some(alias),
1563+
}
15511564
})
15521565
.collect()
15531566
}
@@ -2675,34 +2688,6 @@ mod tests {
26752688
Ok(())
26762689
}
26772690

2678-
#[test]
2679-
fn test_change_redundant_column() -> Result<()> {
2680-
let t1_field_1 = Field::new("a", DataType::Int32, false);
2681-
let t2_field_1 = Field::new("a", DataType::Int32, false);
2682-
let t2_field_3 = Field::new("a", DataType::Int32, false);
2683-
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684-
let t1_field_2 = Field::new("b", DataType::Int32, false);
2685-
let t2_field_2 = Field::new("b", DataType::Int32, false);
2686-
2687-
let field_vec = vec![
2688-
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689-
];
2690-
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691-
2692-
assert_eq!(
2693-
remove_redundant,
2694-
vec![
2695-
Field::new("a", DataType::Int32, false),
2696-
Field::new("a:1", DataType::Int32, false),
2697-
Field::new("b", DataType::Int32, false),
2698-
Field::new("b:1", DataType::Int32, false),
2699-
Field::new("a:2", DataType::Int32, false),
2700-
Field::new("a:1:1", DataType::Int32, false),
2701-
]
2702-
);
2703-
Ok(())
2704-
}
2705-
27062691
#[test]
27072692
fn plan_builder_from_logical_plan() -> Result<()> {
27082693
let plan =
@@ -2787,4 +2772,39 @@ mod tests {
27872772

27882773
Ok(())
27892774
}
2775+
2776+
#[test]
2777+
fn test_unique_field_aliases() {
2778+
let t1_field_1 = Field::new("a", DataType::Int32, false);
2779+
let t2_field_1 = Field::new("a", DataType::Int32, false);
2780+
let t2_field_3 = Field::new("a", DataType::Int32, false);
2781+
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2782+
let t1_field_2 = Field::new("b", DataType::Int32, false);
2783+
let t2_field_2 = Field::new("b", DataType::Int32, false);
2784+
2785+
let fields = vec![
2786+
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2787+
];
2788+
let fields = Fields::from(fields);
2789+
2790+
let remove_redundant = unique_field_aliases(&fields);
2791+
2792+
// Input [a, a, b, b, a, a:1] becomes [None, a:1, None, b:1, a:2, a:1:1]
2793+
// First occurrence of each field name keeps original name (None), duplicates get
2794+
// incremental suffixes (:1, :2, etc.).
2795+
// Crucially in this case the 2nd occurrence of `a` gets rewritten to `a:1` which later
2796+
// conflicts with the last column which is _actually_ called `a:1` so we need to rename it
2797+
// as well to `a:1:1`.
2798+
assert_eq!(
2799+
remove_redundant,
2800+
vec![
2801+
None,
2802+
Some("a:1".to_string()),
2803+
None,
2804+
Some("b:1".to_string()),
2805+
Some("a:2".to_string()),
2806+
Some("a:1:1".to_string()),
2807+
]
2808+
);
2809+
}
27902810
}

0 commit comments

Comments
 (0)