Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case w: Window =>
windowToSQL(w)

case g: Generate =>
generateToSQL(g)

case Limit(limitExpr, child) =>
s"${toSQL(child)} LIMIT ${limitExpr.sql}"

Expand Down Expand Up @@ -221,6 +224,30 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

private def generateToSQL(g: Generate): String = {
val columnAliases = g.generatorOutput.map(_.sql).mkString(",")

val childSQL = if (g.child == OneRowRelation) {
// This only happens when we put UDTF in project list and there is no FROM clause. Because we
// always generate LATERAL VIEW for `Generate`, here we use a trick to put a dummy sub-query
// after FROM clause, so that we can generate a valid LATERAL VIEW SQL string.
s"(SELECT 1) ${SQLBuilder.newSubqueryName}"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a concrete example at here?

}
else {
toSQL(g.child)
}

build(
childSQL,
"LATERAL VIEW",
if (g.outer) "OUTER" else "",
g.generator.sql,
SQLBuilder.newSubqueryName,
"AS",
columnAliases
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This build call has many arguments. It will be good to put an example at here.

}

private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
output1.size == output2.size &&
output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
Expand Down Expand Up @@ -333,6 +360,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
// operators on top of a table relation, merge the sample information into `SQLTable` of
// that table relation, as we can only convert table sample to standard SQL string.
ResolveSQLTable,
// Re-order operators to let them generate legal SQL string, e.g. we should push down
// `Generate` through `Filter`. We should enrich this rule in the future.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what you mean on "We should enrich this rule in the future."?

ReOrderOperators,
// Insert sub queries on top of operators that need to appear after FROM clause.
AddSubquery
)
Expand Down Expand Up @@ -368,6 +398,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
}
}

object ReOrderOperators extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case g @ Generate(_, _, _, _, _, f: Filter) =>
val newGenerate = g.copy(child = f.child)
f.copy(child = newGenerate)
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add an example.


object AddSubquery extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
// This branch handles aggregate functions within HAVING clauses. For example:
Expand Down Expand Up @@ -408,6 +446,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case _: LocalLimit => plan
case _: GlobalLimit => plan
case _: SQLTable => plan
case _: Generate => plan
case OneRowRelation => plan
case _ => addSubquery(plan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive

import scala.util.control.NonFatal

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestUtils

Expand All @@ -45,12 +46,25 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write
.saveAsTable("parquet_t2")

def createArray(id: Column): Column = {
when(id % 3 === 0, lit(null)).otherwise(array('id, 'id + 1))
}

sqlContext
.range(10)
.select(
createArray('id).as("arr"),
array(array('id), createArray('id)).as("arr2"), 'id)
.write
.saveAsTable("parquet_t3")
}

override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
}

Expand Down Expand Up @@ -568,4 +582,84 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|HAVING MAX(a.KEY) > 0
""".stripMargin)
}

test("generator in project list without FROM clause") {
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3))")
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val")
}

test("generator in project list with non-referenced table") {
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) FROM t0")
checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val FROM t0")
}

test("generator in project list with referenced table") {
checkHiveQl("SELECT EXPLODE(arr) FROM parquet_t3")
checkHiveQl("SELECT EXPLODE(arr) AS val FROM parquet_t3")
}

test("generator in project list with non-UDTF expressions") {
checkHiveQl("SELECT EXPLODE(arr), id FROM parquet_t3")
checkHiveQl("SELECT EXPLODE(arr) AS val, id as a FROM parquet_t3")
}

test("generator in lateral view") {
checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW EXPLODE(arr) exp AS val")
checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW OUTER EXPLODE(arr) exp AS val")
}

test("generator in lateral view with ambiguous names") {
checkHiveQl(
"""
|SELECT exp.id, parquet_t3.id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr) exp AS id
""".stripMargin)
checkHiveQl(
"""
|SELECT exp.id, parquet_t3.id
|FROM parquet_t3
|LATERAL VIEW OUTER EXPLODE(arr) exp AS id
""".stripMargin)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test using json_tuple?


test("nested generator in lateral view") {
checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW EXPLODE(nested_array) exp1 AS val
""".stripMargin)

checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW OUTER EXPLODE(nested_array) exp1 AS val
""".stripMargin)
}

test("generate with other operators") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test for the rule of ReOrderOperators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this query does not generate deterministic results (generate with other operators).

checkHiveQl(
"""
|SELECT EXPLODE(arr) AS val, id
|FROM parquet_t3
|WHERE id > 2
|ORDER BY val
|LIMIT 5
""".stripMargin)

checkHiveQl(
"""
|SELECT val, id
|FROM parquet_t3
|LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
|LATERAL VIEW EXPLODE(nested_array) exp1 AS val
|WHERE val > 2
|ORDER BY val
|LIMIT 5
""".stripMargin)
}
}