Skip to content

Commit cc81ed0

Browse files
vladimirg-dbMaxGekk
authored andcommitted
[SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer
### What changes were proposed in this pull request? Factor out alias resolution code to the `AliasResolution` object. ### Why are the changes needed? Some Analyzer code will be used in both fixed-point and single-pass Analyzers. Also, Analyzer.scala is 4K+ lines long, so it makes sense to gradually split it. Context: https://issues.apache.org/jira/browse/SPARK-49834 ### Does this PR introduce _any_ user-facing change? No. It's a pure refactoring. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #48857 from vladimirg-db/vladimirg-db/refactor-alias-resolution. Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent cf90271 commit cc81ed0

File tree

2 files changed

+109
-54
lines changed

2 files changed

+109
-54
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.sql.catalyst.analysis.MultiAlias
21+
import org.apache.spark.sql.catalyst.expressions.{
22+
Alias,
23+
Attribute,
24+
Cast,
25+
Expression,
26+
ExtractValue,
27+
Generator,
28+
GeneratorOuter,
29+
Literal,
30+
NamedExpression
31+
}
32+
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ALIAS
33+
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS}
34+
import org.apache.spark.sql.types.MetadataBuilder
35+
36+
object AliasResolution {
37+
def hasUnresolvedAlias(exprs: Seq[NamedExpression]): Boolean = {
38+
exprs.exists(_.exists(_.isInstanceOf[UnresolvedAlias]))
39+
}
40+
41+
def assignAliases(exprs: Seq[NamedExpression]): Seq[NamedExpression] = {
42+
exprs
43+
.map(_.transformUpWithPruning(_.containsPattern(UNRESOLVED_ALIAS)) {
44+
case u: UnresolvedAlias => resolve(u)
45+
})
46+
.asInstanceOf[Seq[NamedExpression]]
47+
}
48+
49+
def resolve(u: UnresolvedAlias): Expression = {
50+
val UnresolvedAlias(child, optGenAliasFunc) = u
51+
child match {
52+
case ne: NamedExpression => ne
53+
case go @ GeneratorOuter(g: Generator) if g.resolved => MultiAlias(go, Nil)
54+
case e if !e.resolved => u
55+
case g: Generator => MultiAlias(g, Nil)
56+
case c @ Cast(ne: NamedExpression, _, _, _) => Alias(c, ne.name)()
57+
case e: ExtractValue if extractOnly(e) => Alias(e, toPrettySQL(e))()
58+
case e if optGenAliasFunc.isDefined =>
59+
Alias(child, optGenAliasFunc.get.apply(e))()
60+
case l: Literal => Alias(l, toPrettySQL(l))()
61+
case e =>
62+
val metaForAutoGeneratedAlias = new MetadataBuilder()
63+
.putString(AUTO_GENERATED_ALIAS, "true")
64+
.build()
65+
Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias))
66+
}
67+
}
68+
69+
private def extractOnly(e: Expression): Boolean = e match {
70+
case _: ExtractValue => e.children.forall(extractOnly)
71+
case _: Literal => true
72+
case _: Attribute => true
73+
case _ => false
74+
}
75+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 34 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.trees.AlwaysProcess
4343
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
4444
import org.apache.spark.sql.catalyst.trees.TreePattern._
4545
import org.apache.spark.sql.catalyst.types.DataTypeUtils
46-
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils}
46+
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
4747
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
4848
import org.apache.spark.sql.connector.catalog.{View => _, _}
4949
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -444,62 +444,42 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
444444
* Replaces [[UnresolvedAlias]]s with concrete aliases.
445445
*/
446446
object ResolveAliases extends Rule[LogicalPlan] {
447-
private def assignAliases(exprs: Seq[NamedExpression]) = {
448-
exprs.map(_.transformUpWithPruning(_.containsPattern(UNRESOLVED_ALIAS)) {
449-
case u: UnresolvedAlias => resolve(u)
450-
}
451-
).asInstanceOf[Seq[NamedExpression]]
452-
}
453-
454-
private[analysis] def resolve(u: UnresolvedAlias): Expression = {
455-
val UnresolvedAlias(child, optGenAliasFunc) = u
456-
child match {
457-
case ne: NamedExpression => ne
458-
case go @ GeneratorOuter(g: Generator) if g.resolved => MultiAlias(go, Nil)
459-
case e if !e.resolved => u
460-
case g: Generator => MultiAlias(g, Nil)
461-
case c @ Cast(ne: NamedExpression, _, _, _) => Alias(c, ne.name)()
462-
case e: ExtractValue if extractOnly(e) => Alias(e, toPrettySQL(e))()
463-
case e if optGenAliasFunc.isDefined =>
464-
Alias(child, optGenAliasFunc.get.apply(e))()
465-
case l: Literal => Alias(l, toPrettySQL(l))()
466-
case e =>
467-
val metaForAutoGeneratedAlias = new MetadataBuilder()
468-
.putString(AUTO_GENERATED_ALIAS, "true")
469-
.build()
470-
Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias))
471-
}
472-
}
473-
474-
private def extractOnly(e: Expression): Boolean = e match {
475-
case _: ExtractValue => e.children.forall(extractOnly)
476-
case _: Literal => true
477-
case _: Attribute => true
478-
case _ => false
479-
}
480-
481-
private def hasUnresolvedAlias(exprs: Seq[NamedExpression]) =
482-
exprs.exists(_.exists(_.isInstanceOf[UnresolvedAlias]))
483-
484-
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
485-
_.containsPattern(UNRESOLVED_ALIAS), ruleId) {
486-
case Aggregate(groups, aggs, child, _) if child.resolved && hasUnresolvedAlias(aggs) =>
487-
Aggregate(groups, assignAliases(aggs), child)
488-
489-
case Pivot(groupByOpt, pivotColumn, pivotValues, aggregates, child)
490-
if child.resolved && groupByOpt.isDefined && hasUnresolvedAlias(groupByOpt.get) =>
491-
Pivot(Some(assignAliases(groupByOpt.get)), pivotColumn, pivotValues, aggregates, child)
447+
def apply(plan: LogicalPlan): LogicalPlan =
448+
plan.resolveOperatorsUpWithPruning(_.containsPattern(UNRESOLVED_ALIAS), ruleId) {
449+
case Aggregate(groups, aggs, child, _)
450+
if child.resolved && AliasResolution.hasUnresolvedAlias(aggs) =>
451+
Aggregate(groups, AliasResolution.assignAliases(aggs), child)
452+
453+
case Pivot(groupByOpt, pivotColumn, pivotValues, aggregates, child)
454+
if child.resolved &&
455+
groupByOpt.isDefined &&
456+
AliasResolution.hasUnresolvedAlias(groupByOpt.get) =>
457+
Pivot(
458+
Some(AliasResolution.assignAliases(groupByOpt.get)),
459+
pivotColumn,
460+
pivotValues,
461+
aggregates,
462+
child
463+
)
492464

493-
case up: Unpivot if up.child.resolved &&
494-
(up.ids.exists(hasUnresolvedAlias) || up.values.exists(_.exists(hasUnresolvedAlias))) =>
495-
up.copy(ids = up.ids.map(assignAliases), values = up.values.map(_.map(assignAliases)))
465+
case up: Unpivot
466+
if up.child.resolved &&
467+
(up.ids.exists(AliasResolution.hasUnresolvedAlias) || up.values.exists(
468+
_.exists(AliasResolution.hasUnresolvedAlias)
469+
)) =>
470+
up.copy(
471+
ids = up.ids.map(AliasResolution.assignAliases),
472+
values = up.values.map(_.map(AliasResolution.assignAliases))
473+
)
496474

497-
case Project(projectList, child) if child.resolved && hasUnresolvedAlias(projectList) =>
498-
Project(assignAliases(projectList), child)
475+
case Project(projectList, child)
476+
if child.resolved && AliasResolution.hasUnresolvedAlias(projectList) =>
477+
Project(AliasResolution.assignAliases(projectList), child)
499478

500-
case c: CollectMetrics if c.child.resolved && hasUnresolvedAlias(c.metrics) =>
501-
c.copy(metrics = assignAliases(c.metrics))
502-
}
479+
case c: CollectMetrics
480+
if c.child.resolved && AliasResolution.hasUnresolvedAlias(c.metrics) =>
481+
c.copy(metrics = AliasResolution.assignAliases(c.metrics))
482+
}
503483
}
504484

505485
object ResolveGroupingAnalytics extends Rule[LogicalPlan] {

0 commit comments

Comments
 (0)