Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
case Distinct(p: Project) =>
projectToSQL(p, isDistinct = true)

case p @ Project(_, g: Generate) =>
generateToSQL(p)

case g: Generate =>
generateToSQL(g)

case p: Project =>
projectToSQL(p, isDistinct = false)

Expand Down Expand Up @@ -308,6 +314,65 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
)
}

/**
* This function handles the SQL generation for generators.
* sample plan :
* +- Project [mycol2#192]
* +- Generate explode(myCol#191), true, false, Some(mytable2), [mycol2#192]
* +- Generate explode(array(array(1, 2, 3))), true, false, Some(mytable), [mycol#191]
* +- MetastoreRelation default, src, None
*/
private def generateToSQL(plan: Generate): String = {
val columnAliases = plan.generatorOutput.map(_.sql).mkString(",")
val generatorAlias = if (plan.qualifier.isEmpty) "" else plan.qualifier.get
val outerClause = if (plan.outer) "OUTER" else ""
build(
if (plan.child == OneRowRelation) {
s"(SELECT 1) ${SQLBuilder.newSubqueryName}"
}
else {
toSQL(plan.child)
},
"LATERAL VIEW",
outerClause,
plan.generator.sql,
quoteIdentifier(generatorAlias),
"AS",
columnAliases
)
}

private def generateToSQL(plan: Project): String = {
val generate = plan.child.asInstanceOf[Generate]
assert(generate.join == true || plan.projectList.size == 1)
// Generators that appear in projection list will be expressed as LATERAL VIEW.
// A qualifier is needed for a LATERAL VIEw.
val generatorAlias: String = generate.qualifier.getOrElse(SQLBuilder.newGeneratorName)

// Qualify the attributes in projection list.
val newProjList = plan.projectList.map {
case a if generate.generatorOutput.exists(_.semanticEquals(a)) =>
a.toAttribute.withQualifiers(Seq(generatorAlias))
case o => o
}

// If Generate is missing the qualifier (its in projection list) , add one here.
val planToProcess =
if (generate.qualifier.isEmpty) {
generate.copy(qualifier = Some(generatorAlias))
}
else {
generate
}

build(
"SELECT",
newProjList.map(a => a.sql).mkString(","),
"FROM",
toSQL(planToProcess)
)
}

object Canonicalizer extends RuleExecutor[LogicalPlan] {
override protected def batches: Seq[Batch] = Seq(
Batch("Collapse Project", FixedPoint(100),
Expand Down Expand Up @@ -360,6 +425,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi

case plan @ Project(_,
_: SubqueryAlias
| _: Generate
| _: Filter
| _: Join
| _: MetastoreRelation
Expand Down Expand Up @@ -411,4 +477,8 @@ object SQLBuilder {
private val nextSubqueryId = new AtomicLong(0)

private def newSubqueryName: String = s"gen_subquery_${nextSubqueryId.getAndIncrement()}"

private val nextGeneratorId = new AtomicLong(0)

private def newGeneratorName: String = s"gen_generator_${nextGeneratorId.getAndIncrement()}"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t1")

val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
("3", """{"f1": "value13", "f4": "value44", "f3": "value33", "f2": 2, "f5": 5.01}""") ::
("4", null) ::
("5", """{"f1": "", "f5": null}""") ::
("6", "[invalid JSON string]") ::
Nil

sqlContext.range(10).write.saveAsTable("parquet_t0")
sql("CREATE TABLE t0 AS SELECT * FROM parquet_t0")
Expand All @@ -45,13 +56,18 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
.write
.saveAsTable("parquet_t2")

tuples.toDF("key", "jstring").write.saveAsTable("parquet_t3")
sql("CREATE TABLE t1 as select key, array(value) as value from parquet_t1 limit 20")
}

override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS parquet_t0")
sql("DROP TABLE IF EXISTS parquet_t1")
sql("DROP TABLE IF EXISTS parquet_t2")
sql("DROP TABLE IF EXISTS parquet_t3")
sql("DROP TABLE IF EXISTS t0")
sql("DROP TABLE IF EXISTS t1")
}

private def checkHiveQl(hiveQl: String): Unit = {
Expand Down Expand Up @@ -550,4 +566,107 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
|WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
""".stripMargin)
}

test("SQL generator for explode in projection list") {
// Basic Explode
checkHiveQl("SELECT explode(array(1,2,3)) FROM src")

// Explode with Alias
checkHiveQl("SELECT explode(array(1,2,3)) as value FROM src")

// Explode without FROM
checkHiveQl("select explode(array(1,2,3)) AS gencol")

// non-generated columns in projection list
checkHiveQl("SELECT key as c1, explode(array(1,2,3)) as c2, value as c3 FROM t1")
}

test("SQL generation for json_tuple as generator") {
checkHiveQl("SELECT key, json_tuple(jstring, 'f1', 'f2', 'f3', 'f4', 'f5') FROM parquet_t3")
}

test("Lateral view with join") {
checkHiveQl(
"""
|SELECT gencol, explode(array(1,2,3)), x1.key
|FROM (t1 LATERAL VIEW OUTER explode(value) gentab as gencol), t1 as x1
""".stripMargin
)
}

test("SQL generation for lateral views") {
// Filter and OUTER clause
checkHiveQl(
"""
|SELECT key, value
|FROM t1
|LATERAL VIEW OUTER explode(value) gentab as gencol
|WHERE key = 1
""".stripMargin
)

// single lateral view
checkHiveQl(
"""
|SELECT *
|FROM t1
|LATERAL VIEW explode(array(1,2,3)) gentab AS gencol
|SORT BY key ASC, gencol ASC LIMIT 1
""".stripMargin
)

// multiple lateral views
checkHiveQl(
"""
|SELECT gentab2.*
|FROM t1
|LATERAL VIEW explode(array(array(1,2,3))) gentab1 AS gencol1
|LATERAL VIEW explode(gentab1.gencol1) gentab2 AS gencol2 LIMIT 3
""".stripMargin
)

// No generated column aliases
checkHiveQl(
"""SELECT gentab.*
|FROM t1
|LATERAL VIEW explode(map('key1', 100, 'key2', 200)) gentab limit 2
""".stripMargin
)
}

test("SQL generation for lateral views in subquery") {
// Subquries in FROM clause using Generate
checkHiveQl(
"""
|SELECT subq.gencol
|FROM
|(SELECT * from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin)

checkHiveQl(
"""
|SELECT subq.key
|FROM
|(SELECT key, value from t1 LATERAL VIEW explode(value) gentab AS gencol) subq
""".stripMargin
)
}

test("SQL generation for UDTF") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more UDTF cases, as what @cloud-fan suggested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}")

// The function source code can be found at:
// https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF
sql(
"""
|CREATE TEMPORARY FUNCTION udtf_count2
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
""".stripMargin)

checkHiveQl("SELECT key, gencol FROM t1 LATERAL VIEW udtf_count2(value) gentab AS gencol")

checkHiveQl("SELECT udtf_count2(c1) FROM (SELECT 1 AS c1 FROM t1 LIMIT 3) g1")

sql("DROP TEMPORARY FUNCTION udtf_count2")
}
}