Skip to content

Commit 1fe11b8

Browse files
committed
fix(SubqueryAlias): use maybe_project_redundant_column
Fixes #17405
1 parent b084aa4 commit 1fe11b8

File tree

3 files changed

+104
-179
lines changed

3 files changed

+104
-179
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 42 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
6363
use datafusion_common::display::ToStringifiedPlan;
64-
use datafusion_common::tree_node::{
65-
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
66-
};
64+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6765
use datafusion_common::{
6866
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
6967
ScalarValue,
@@ -83,7 +81,7 @@ use datafusion_expr::{
8381
WindowFrameBound, WriteOp,
8482
};
8583
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
86-
use datafusion_physical_expr::expressions::{Column, Literal};
84+
use datafusion_physical_expr::expressions::Literal;
8785
use datafusion_physical_expr::{
8886
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
8987
};
@@ -2177,11 +2175,7 @@ impl DefaultPhysicalPlanner {
21772175
let physical_expr =
21782176
self.create_physical_expr(e, input_logical_schema, session_state);
21792177

2180-
// Check for possible column name mismatches
2181-
let final_physical_expr =
2182-
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2183-
2184-
tuple_err((final_physical_expr, physical_name))
2178+
tuple_err((physical_expr, physical_name))
21852179
})
21862180
.collect::<Result<Vec<_>>>()?;
21872181

@@ -2287,47 +2281,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
22872281
}
22882282
}
22892283

2290-
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2291-
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2292-
//
2293-
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2294-
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2295-
fn maybe_fix_physical_column_name(
2296-
expr: Result<Arc<dyn PhysicalExpr>>,
2297-
input_physical_schema: &SchemaRef,
2298-
) -> Result<Arc<dyn PhysicalExpr>> {
2299-
let Ok(expr) = expr else { return expr };
2300-
expr.transform_down(|node| {
2301-
if let Some(column) = node.as_any().downcast_ref::<Column>() {
2302-
let idx = column.index();
2303-
let physical_field = input_physical_schema.field(idx);
2304-
let expr_col_name = column.name();
2305-
let physical_name = physical_field.name();
2306-
2307-
if expr_col_name != physical_name {
2308-
// handle edge cases where the physical_name contains ':'.
2309-
let colon_count = physical_name.matches(':').count();
2310-
let mut splits = expr_col_name.match_indices(':');
2311-
let split_pos = splits.nth(colon_count);
2312-
2313-
if let Some((i, _)) = split_pos {
2314-
let base_name = &expr_col_name[..i];
2315-
if base_name == physical_name {
2316-
let updated_column = Column::new(physical_name, idx);
2317-
return Ok(Transformed::yes(Arc::new(updated_column)));
2318-
}
2319-
}
2320-
}
2321-
2322-
// If names already match or fix is not possible, just leave it as it is
2323-
Ok(Transformed::no(node))
2324-
} else {
2325-
Ok(Transformed::no(node))
2326-
}
2327-
})
2328-
.data()
2329-
}
2330-
23312284
struct OptimizationInvariantChecker<'a> {
23322285
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
23332286
}
@@ -2431,12 +2384,10 @@ mod tests {
24312384
};
24322385
use datafusion_execution::runtime_env::RuntimeEnv;
24332386
use datafusion_execution::TaskContext;
2434-
use datafusion_expr::{
2435-
col, lit, LogicalPlanBuilder, Operator, UserDefinedLogicalNodeCore,
2436-
};
2387+
use datafusion_expr::builder::subquery_alias;
2388+
use datafusion_expr::{col, lit, LogicalPlanBuilder, UserDefinedLogicalNodeCore};
24372389
use datafusion_functions_aggregate::count::count_all;
24382390
use datafusion_functions_aggregate::expr_fn::sum;
2439-
use datafusion_physical_expr::expressions::{BinaryExpr, IsNotNullExpr};
24402391
use datafusion_physical_expr::EquivalenceProperties;
24412392
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
24422393

@@ -2997,71 +2948,6 @@ mod tests {
29972948
}
29982949
}
29992950

3000-
#[tokio::test]
3001-
async fn test_maybe_fix_colon_in_physical_name() {
3002-
// The physical schema has a field name with a colon
3003-
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
3004-
let schema_ref: SchemaRef = Arc::new(schema);
3005-
3006-
// What might happen after deduplication
3007-
let logical_col_name = "metric:avg:1";
3008-
let expr_with_suffix =
3009-
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
3010-
let expr_result = Ok(expr_with_suffix);
3011-
3012-
// Call function under test
3013-
let fixed_expr =
3014-
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
3015-
3016-
// Downcast back to Column so we can check the name
3017-
let col = fixed_expr
3018-
.as_any()
3019-
.downcast_ref::<Column>()
3020-
.expect("Column");
3021-
3022-
assert_eq!(col.name(), "metric:avg");
3023-
}
3024-
3025-
#[tokio::test]
3026-
async fn test_maybe_fix_nested_column_name_with_colon() {
3027-
let schema = Schema::new(vec![Field::new("column", DataType::Int32, false)]);
3028-
let schema_ref: SchemaRef = Arc::new(schema);
3029-
3030-
// Construct the nested expr
3031-
let col_expr = Arc::new(Column::new("column:1", 0)) as Arc<dyn PhysicalExpr>;
3032-
let is_not_null_expr = Arc::new(IsNotNullExpr::new(col_expr.clone()));
3033-
3034-
// Create a binary expression and put the column inside
3035-
let binary_expr = Arc::new(BinaryExpr::new(
3036-
is_not_null_expr.clone(),
3037-
Operator::Or,
3038-
is_not_null_expr.clone(),
3039-
)) as Arc<dyn PhysicalExpr>;
3040-
3041-
let fixed_expr =
3042-
maybe_fix_physical_column_name(Ok(binary_expr), &schema_ref).unwrap();
3043-
3044-
let bin = fixed_expr
3045-
.as_any()
3046-
.downcast_ref::<BinaryExpr>()
3047-
.expect("Expected BinaryExpr");
3048-
3049-
// Check that both sides where renamed
3050-
for expr in &[bin.left(), bin.right()] {
3051-
let is_not_null = expr
3052-
.as_any()
3053-
.downcast_ref::<IsNotNullExpr>()
3054-
.expect("Expected IsNotNull");
3055-
3056-
let col = is_not_null
3057-
.arg()
3058-
.as_any()
3059-
.downcast_ref::<Column>()
3060-
.expect("Expected Column");
3061-
3062-
assert_eq!(col.name(), "column");
3063-
}
3064-
}
30652951
struct ErrorExtensionPlanner {}
30662952

30672953
#[async_trait]
@@ -3558,4 +3444,41 @@ digraph {
35583444

35593445
Ok(())
35603446
}
3447+
3448+
#[tokio::test]
3449+
async fn subquery_alias_confusing_the_optimizer() -> Result<()> {
3450+
let state = make_session_state();
3451+
3452+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3453+
let schema = Arc::new(schema);
3454+
3455+
let table = MemTable::try_new(schema.clone(), vec![vec![]])?;
3456+
let table = Arc::new(table);
3457+
3458+
let source = DefaultTableSource::new(table);
3459+
let source = Arc::new(source);
3460+
3461+
let left = LogicalPlanBuilder::scan("left", source.clone(), None)?;
3462+
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;
3463+
3464+
let join_keys = (
3465+
vec![datafusion_common::Column::new(Some("left"), "a")],
3466+
vec![datafusion_common::Column::new(Some("right"), "a")],
3467+
);
3468+
3469+
let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
3470+
3471+
let alias = subquery_alias(join, "alias")?;
3472+
3473+
let logical_plan = LogicalPlanBuilder::new(alias)
3474+
.aggregate(vec![col("a:1")], Vec::<Expr>::new())?
3475+
.build()?;
3476+
3477+
let optimized_logical_plan = state.optimize(&logical_plan)?;
3478+
3479+
let planner = DefaultPhysicalPlanner::default();
3480+
let physical_plan = planner.create_physical_plan(&logical_plan, &state).await?;
3481+
3482+
Ok(())
3483+
}
35613484
}

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 53 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module provides a builder for creating LogicalPlans
1919
2020
use std::any::Any;
21+
use std::borrow::Cow;
2122
use std::cmp::Ordering;
2223
use std::collections::{HashMap, HashSet};
2324
use std::iter::once;
@@ -1517,39 +1518,66 @@ impl ValuesFields {
15171518
}
15181519
}
15191520

1520-
// `name_map` tracks a mapping between a field name and the number of appearances of that field.
1521-
//
1522-
// Some field names might already come to this function with the count (number of times it appeared)
1523-
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1524-
// if these three fields passed to this function: "col:1", "col" and "col", the function
1525-
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1526-
// that's why we need the `seen` set, so the fields are always unique.
1527-
//
1528-
pub fn change_redundant_column(fields: &Fields) -> Vec<Field> {
1529-
let mut name_map = HashMap::new();
1530-
let mut seen: HashSet<String> = HashSet::new();
1521+
pub fn maybe_project_redundant_column(
1522+
input: Arc<LogicalPlan>,
1523+
) -> Result<Arc<LogicalPlan>> {
1524+
// tracks a mapping between a field name and the number of appearances of that field.
1525+
let mut name_map = HashMap::<&str, usize>::new();
1526+
// tracks all the fields and aliases that were previously seen.
1527+
let mut seen = HashSet::<Cow<String>>::new();
1528+
1529+
// Some field names might already come to this function with the count (number of times it appeared)
1530+
// as a suffix e.g. id:1, so there's still a chance of name collisions, for example,
1531+
// if these three fields passed to this function: "col:1", "col" and "col", the function
1532+
// would rename them to -> col:1, col, col:1 causing a posteriror error when building the DFSchema.
1533+
// That's why we need the `seen` set, so the fields are always unique.
1534+
1535+
let aliases = input
1536+
.schema()
1537+
.iter()
1538+
.map(|(_, field)| {
1539+
let original_name = field.name();
1540+
let mut name = Cow::Borrowed(original_name);
15311541

1532-
fields
1533-
.into_iter()
1534-
.map(|field| {
1535-
let base_name = field.name();
1536-
let count = name_map.entry(base_name.clone()).or_insert(0);
1537-
let mut new_name = base_name.clone();
1542+
let count = name_map.entry(original_name).or_insert(0);
15381543

15391544
// Loop until we find a name that hasn't been used
1540-
while seen.contains(&new_name) {
1545+
while seen.contains(&name) {
15411546
*count += 1;
1542-
new_name = format!("{base_name}:{count}");
1547+
name = Cow::Owned(format!("{original_name}:{count}"));
15431548
}
15441549

1545-
seen.insert(new_name.clone());
1550+
seen.insert(name.clone());
15461551

1547-
let mut modified_field =
1548-
Field::new(&new_name, field.data_type().clone(), field.is_nullable());
1549-
modified_field.set_metadata(field.metadata().clone());
1550-
modified_field
1552+
match name {
1553+
Cow::Borrowed(_) => None,
1554+
Cow::Owned(alias) => Some(alias),
1555+
}
15511556
})
1552-
.collect()
1557+
.collect::<Vec<_>>();
1558+
1559+
// Check if there is at least an alias
1560+
let is_projection_needed = aliases.iter().any(Option::is_some);
1561+
1562+
if is_projection_needed {
1563+
let projection_expressions = aliases
1564+
.iter()
1565+
.zip(input.schema().iter())
1566+
.map(|(alias, (qualifier, field))| {
1567+
let column = Expr::Column(Column::new(qualifier.cloned(), field.name()));
1568+
match alias {
1569+
None => column,
1570+
Some(alias) => {
1571+
Expr::Alias(Alias::new(column, qualifier.cloned(), alias))
1572+
}
1573+
}
1574+
})
1575+
.collect();
1576+
let projection = Projection::try_new(projection_expressions, input)?;
1577+
Ok(Arc::new(LogicalPlan::Projection(projection)))
1578+
} else {
1579+
Ok(input)
1580+
}
15531581
}
15541582

15551583
fn mark_field(schema: &DFSchema) -> (Option<TableReference>, Arc<Field>) {
@@ -2675,34 +2703,6 @@ mod tests {
26752703
Ok(())
26762704
}
26772705

2678-
#[test]
2679-
fn test_change_redundant_column() -> Result<()> {
2680-
let t1_field_1 = Field::new("a", DataType::Int32, false);
2681-
let t2_field_1 = Field::new("a", DataType::Int32, false);
2682-
let t2_field_3 = Field::new("a", DataType::Int32, false);
2683-
let t2_field_4 = Field::new("a:1", DataType::Int32, false);
2684-
let t1_field_2 = Field::new("b", DataType::Int32, false);
2685-
let t2_field_2 = Field::new("b", DataType::Int32, false);
2686-
2687-
let field_vec = vec![
2688-
t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3, t2_field_4,
2689-
];
2690-
let remove_redundant = change_redundant_column(&Fields::from(field_vec));
2691-
2692-
assert_eq!(
2693-
remove_redundant,
2694-
vec![
2695-
Field::new("a", DataType::Int32, false),
2696-
Field::new("a:1", DataType::Int32, false),
2697-
Field::new("b", DataType::Int32, false),
2698-
Field::new("b:1", DataType::Int32, false),
2699-
Field::new("a:2", DataType::Int32, false),
2700-
Field::new("a:1:1", DataType::Int32, false),
2701-
]
2702-
);
2703-
Ok(())
2704-
}
2705-
27062706
#[test]
27072707
fn plan_builder_from_logical_plan() -> Result<()> {
27082708
let plan =

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use super::invariants::{
3030
InvariantLevel,
3131
};
3232
use super::DdlStatement;
33-
use crate::builder::{change_redundant_column, unnest_with_options};
33+
use crate::builder::{maybe_project_redundant_column, unnest_with_options};
3434
use crate::expr::{
3535
intersect_metadata_for_union, Placeholder, Sort as SortExpr, WindowFunction,
3636
WindowFunctionParams,
@@ -2223,13 +2223,15 @@ impl SubqueryAlias {
22232223
alias: impl Into<TableReference>,
22242224
) -> Result<Self> {
22252225
let alias = alias.into();
2226-
let fields = change_redundant_column(plan.schema().fields());
2227-
let meta_data = plan.schema().as_ref().metadata().clone();
2228-
let schema: Schema =
2229-
DFSchema::from_unqualified_fields(fields.into(), meta_data)?.into();
2230-
// Since schema is the same, other than qualifier, we can use existing
2231-
// functional dependencies:
2226+
let plan = maybe_project_redundant_column(plan)?;
2227+
2228+
let fields = plan.schema().fields().clone();
2229+
let meta_data = plan.schema().metadata().clone();
22322230
let func_dependencies = plan.schema().functional_dependencies().clone();
2231+
2232+
let schema = DFSchema::from_unqualified_fields(fields, meta_data)?;
2233+
let schema = Schema::from(schema);
2234+
22332235
let schema = DFSchemaRef::new(
22342236
DFSchema::try_from_qualified_schema(alias.clone(), &schema)?
22352237
.with_functional_dependencies(func_dependencies)?,

0 commit comments

Comments
 (0)