Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,7 @@ case class Alias(child: Expression, name: String)(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
s"${child.sql} AS $qualifiersString${quoteIdentifier(aliasName)}"
s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is isGenerated still needed? (if not, we do not need to remove it in this PR). Also, what is the reason of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isGenerated is still needed to avoid resolving column names on these generated internal attributes. However, it should not affect the sql anymore, as this PR need to control the format of attribute names and alias names.

}
}

Expand Down Expand Up @@ -302,8 +301,7 @@ case class AttributeReference(
override def sql: String = {
val qualifiersString =
if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
s"$qualifiersString${quoteIdentifier(attrRefName)}"
s"$qualifiersString${quoteIdentifier(name)}"
}
}

Expand Down
226 changes: 127 additions & 99 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -54,8 +55,26 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

def toSQL: String = {
val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
val outputNames = logicalPlan.output.map(_.name)
val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct

// Keep the qualifier information by using it as sub-query name, if there is only one qualifier
// present.
val finalName = if (qualifiers.length == 1) {
qualifiers.head
} else {
SQLBuilder.newSubqueryName
}

// Canonicalizer will remove all naming information, we should add it back by adding an extra
// Project and alias the outputs.
val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
case (attr, name) => Alias(attr.withQualifiers(Nil), name)()
}
val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The top Project in the finalPlan still has the name ambiguity issues. I think it could be still a problem when we use it in the native view supports. #10733

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, this should not be an issue. Name ambiguity should not exist when creating a view.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, actually we should add this check in our CreateView operator. I checked PostgreSQL, it doesn't allow ambiguous column names when create view, do you wanna submit a PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can do it tomorrow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized Hive will issue an error when having duplicate names in CREATE VIEW. Thus, I guess we do not need to redo it in Spark?

BTW, the top Project could have duplicate names when creating a view if users specify the schema. For example,

CREATE VIEW testView(id3, id4) AS SELECT * FROM jt1 JOIN jt2 ON jt1.id1 == jt2.id1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true, and we can add one more renaming project before passing the plan to SQLBuilder when creating view.


try {
val replaced = canonicalizedPlan.transformAllExpressions {
val replaced = finalPlan.transformAllExpressions {
case e: SubqueryExpression =>
SubqueryHolder(new SQLBuilder(e.query, sqlContext).toSQL)
case e: NonSQLExpression =>
Expand Down Expand Up @@ -109,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"

case p: Sample if p.isTableSample =>
val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100))
p.child match {
case m: MetastoreRelation =>
val aliasName = m.alias.getOrElse("")
build(
s"`${m.databaseName}`.`${m.tableName}`",
"TABLESAMPLE(" + fraction + " PERCENT)",
aliasName)
case s: SubqueryAlias =>
val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias else ""
val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s
build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName)
case _ =>
build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
}

case Filter(condition, child) =>
val whereOrHaving = child match {
case _: Aggregate => "HAVING"
Expand All @@ -147,18 +149,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case p: Except =>
build("(" + toSQL(p.left), ") EXCEPT (", toSQL(p.right) + ")")

case p: SubqueryAlias =>
p.child match {
// Persisted data source relation
case LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
// Parentheses is not used for persisted data source relations
// e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) =>
build(toSQL(p.child), "AS", p.alias)
case _ =>
build("(" + toSQL(p.child) + ")", "AS", p.alias)
}
case p: SubqueryAlias => build("(" + toSQL(p.child) + ")", "AS", p.alias)

case p: Join =>
build(
Expand All @@ -168,11 +159,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
toSQL(p.right),
p.condition.map(" ON " + _.sql).getOrElse(""))

case p: MetastoreRelation =>
build(
s"${quoteIdentifier(p.databaseName)}.${quoteIdentifier(p.tableName)}",
p.alias.map(a => s" AS ${quoteIdentifier(a)}").getOrElse("")
)
case SQLTable(database, table, _, sample) =>
val qualifiedName = s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
sample.map { case (lowerBound, upperBound) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, you can remove case Sample

val fraction = math.min(100, math.max(0, (upperBound - lowerBound) * 100))
qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)"
}.getOrElse(qualifiedName)

case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
if orders.map(_.child) == partitionExprs =>
Expand Down Expand Up @@ -274,8 +266,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
val groupingSetSQL = "GROUPING SETS(" +
groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"

val aggExprs = agg.aggregateExpressions.map { case expr =>
expr.transformDown {
val aggExprs = agg.aggregateExpressions.map { case aggExpr =>
val originalAggExpr = aggExpr.transformDown {
// grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back.
case ar: AttributeReference if ar == gid => GroupingID(Nil)
case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
Expand All @@ -286,6 +278,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
groupByExprs.lift(idx).map(Grouping).getOrElse(a)
}

originalAggExpr match {
// Ancestor operators may reference the output of this grouping set, and we use exprId to
// generate a unique name for each attribute, so we should make sure the transformed
// aggregate expression won't change the output, i.e. exprId and alias name should remain
// the same.
case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
case e => Alias(e, normalizedName(aggExpr))(exprId = aggExpr.exprId)
}
}

build(
Expand All @@ -308,6 +309,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Collapse Project", FixedPoint(100),
Expand All @@ -316,31 +319,55 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// `Aggregate`s.
CollapseProject),
Batch("Recover Scoping Info", Once,
// Used to handle other auxiliary `Project`s added by analyzer (e.g.
// `ResolveAggregateFunctions` rule)
AddSubquery,
// Previous rule will add extra sub-queries, this rule is used to re-propagate and update
// the qualifiers bottom up, e.g.:
//
// Sort
// ordering = t1.a
// Project
// projectList = [t1.a, t1.b]
// Subquery gen_subquery
// child ...
//
// will be transformed to:
//
// Sort
// ordering = gen_subquery.a
// Project
// projectList = [gen_subquery.a, gen_subquery.b]
// Subquery gen_subquery
// child ...
UpdateQualifiers
// Remove all sub queries, as we will insert new ones when it's necessary.
EliminateSubqueryAliases,
// A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the
// `Join` operator). However, this kind of plan can't be put under a sub query as we will
// erase and assign a new qualifier to all outputs and make it impossible to distinguish
// same-name outputs. This rule renames all attributes, to guarantee different
// attributes(with different exprId) always have different names. It also removes all
// qualifiers, as attributes have unique names now and we don't need qualifiers to resolve
// ambiguity.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we reassign the needed column names?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the beginning of toSQL method.

NormalizedAttribute,
// Finds the table relations and wrap them with `SQLTable`s. If there are any `Sample`
// operators on top of a table relation, merge the sample information into `SQLTable` of
// that table relation, as we can only convert table sample to standard SQL string.
ResolveSQLTable,
// Insert sub queries on top of operators that need to appear after FROM clause.
AddSubquery
)
)

object NormalizedAttribute extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
case a: AttributeReference =>
AttributeReference(normalizedName(a), a.dataType)(exprId = a.exprId, qualifiers = Nil)
case a: Alias =>
Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil)
}
}

object ResolveSQLTable extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
case Sample(lowerBound, upperBound, _, _, ExtractSQLTable(table)) =>
aliasColumns(table.withSample(lowerBound, upperBound))
case ExtractSQLTable(table) =>
aliasColumns(table)
}

/**
* Aliases the table columns to the generated attribute names, as we use exprId to generate
* unique name for each attribute when normalize attributes, and we can't reference table
* columns with their real names.
*/
private def aliasColumns(table: SQLTable): LogicalPlan = {
val aliasedOutput = table.output.map { attr =>
Alias(attr, normalizedName(attr))(exprId = attr.exprId)
}
addSubquery(Project(aliasedOutput, table))
}
}

object AddSubquery extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
// This branch handles aggregate functions within HAVING clauses. For example:
Expand All @@ -354,55 +381,56 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// +- Filter ...
// +- Aggregate ...
// +- MetastoreRelation default, src, None
case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan)
case p @ Project(_, f @ Filter(_, _: Aggregate)) => p.copy(child = addSubquery(f))

case w @ Window(_, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w)
case w @ Window(_, _, _, f @ Filter(_, _: Aggregate)) => w.copy(child = addSubquery(f))

case plan @ Project(_,
_: SubqueryAlias
| _: Filter
| _: Join
| _: MetastoreRelation
| OneRowRelation
| _: LocalLimit
| _: GlobalLimit
| _: Sample
) => plan

case plan: Project => wrapChildWithSubquery(plan)
case p: Project => p.copy(child = addSubqueryIfNeeded(p.child))

// We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
// be able to put in the FROM clause, or we wrap it with a subquery.
case w @ Window(_, _, _,
_: SubqueryAlias
| _: Filter
| _: Join
| _: MetastoreRelation
| OneRowRelation
| _: LocalLimit
| _: GlobalLimit
| _: Sample
) => w

case w: Window => wrapChildWithSubquery(w)
}
case w: Window => w.copy(child = addSubqueryIfNeeded(w.child))

private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
plan.withNewChildren(Seq(newChild))
case j: Join => j.copy(
left = addSubqueryIfNeeded(j.left),
right = addSubqueryIfNeeded(j.right))
}
}

object UpdateQualifiers extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
case plan =>
val inputAttributes = plan.children.flatMap(_.output)
plan transformExpressions {
case a: AttributeReference if !plan.producedAttributes.contains(a) =>
val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
a.withQualifiers(qualifier.getOrElse(Nil))
}
}
private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
SubqueryAlias(SQLBuilder.newSubqueryName, plan)
}

private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
case _: SubqueryAlias => plan
case _: Filter => plan
case _: Join => plan
case _: LocalLimit => plan
case _: GlobalLimit => plan
case _: SQLTable => plan
case OneRowRelation => plan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do not need to add a subquery for these kinds of nodes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the comments says, we don't need to add sub-query if this operator can be put after FROM. So obviously, SubqueryAlias, Join, SQLTable, OneRowRelation don't need extra sub-query. Currently we only support convert logical plan that is parsed from SQL string to SQL string, this implies, Filter, Limit will always appear after table relation and they will generate SQL string like tbl WHERE ... LIMIT ... which can be put after FROM.

Anyway this logical is just copied from original code.

case _ => addSubquery(plan)
}
}

case class SQLTable(
database: String,
table: String,
output: Seq[Attribute],
sample: Option[(Double, Double)] = None) extends LeafNode {
def withSample(lowerBound: Double, upperBound: Double): SQLTable =
this.copy(sample = Some(lowerBound -> upperBound))
}

object ExtractSQLTable {
def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
Some(SQLTable(database, table, l.output.map(_.withQualifiers(Nil))))

case m: MetastoreRelation =>
Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifiers(Nil))))

case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,22 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
""".stripMargin)
}

test("window with join") {
checkHiveQl(
"""
|SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key)
|FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key
""".stripMargin)
}

test("join 2 tables and aggregate function in having clause") {
checkHiveQl(
"""
|SELECT COUNT(a.value), b.KEY, a.KEY
|FROM parquet_t1 a, parquet_t1 b
|GROUP BY a.KEY, b.KEY
|HAVING MAX(a.KEY) > 0
""".stripMargin)
}
}