Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
ResolveSortReferences ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
UnresolvedHavingClauseAttributes ::
Expand Down Expand Up @@ -151,7 +150,34 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
*/
object ResolveReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case q: LogicalPlan if q.childrenResolved =>
case p: LogicalPlan if !p.childrenResolved => p

// If the projection list contains Stars, expand it.
case p@Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output, resolver)
case o => o :: Nil
},
child)
case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
input = t.input.flatMap {
case s: Star => s.expand(t.child.output, resolver)
case o => o :: Nil
}
)

// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
a.copy(
aggregateExpressions = a.aggregateExpressions.flatMap {
case s: Star => s.expand(a.child.output, resolver)
case o => o :: Nil
}
)

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressions {
case u @ UnresolvedAttribute(name) =>
Expand All @@ -161,6 +187,12 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
result
}
}

/**
* Returns true if `exprs` contains a [[Star]].
*/
protected def containsStar(exprs: Seq[Expression]): Boolean =
exprs.collect { case _: Star => true}.nonEmpty
}

/**
Expand Down Expand Up @@ -275,45 +307,6 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
Generate(g, join = false, outer = false, None, child)
}
}

/**
* Expands any references to [[Star]] (*) in project operators.
*/
object StarExpansion extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Wait until children are resolved
case p: LogicalPlan if !p.childrenResolved => p
// If the projection list contains Stars, expand it.
case p @ Project(projectList, child) if containsStar(projectList) =>
Project(
projectList.flatMap {
case s: Star => s.expand(child.output, resolver)
case o => o :: Nil
},
child)
case t: ScriptTransformation if containsStar(t.input) =>
t.copy(
input = t.input.flatMap {
case s: Star => s.expand(t.child.output, resolver)
case o => o :: Nil
}
)
// If the aggregate function argument contains Stars, expand it.
case a: Aggregate if containsStar(a.aggregateExpressions) =>
a.copy(
aggregateExpressions = a.aggregateExpressions.flatMap {
case s: Star => s.expand(a.child.output, resolver)
case o => o :: Nil
}
)
}

/**
* Returns true if `exprs` contains a [[Star]].
*/
protected def containsStar(exprs: Seq[Expression]): Boolean =
exprs.collect { case _: Star => true }.nonEmpty
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package org.apache.spark.sql.catalyst.analysis

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
Expand All @@ -46,6 +48,14 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter {
caseInsensitiveCatalog.registerTable(None, "TaBlE", testRelation)
}

test("union project *") {
val plan = (1 to 100)
.map(_ => testRelation)
.fold[LogicalPlan](testRelation)((a,b) => a.select(Star(None)).select('a).unionAll(b.select(Star(None))))

assert(caseInsensitiveAnalyze(plan).resolved)
}

test("analyze project") {
assert(
caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("a")), testRelation)) ===
Expand Down