Skip to content

Commit 66d9d0e

Browse files
Andrew Oryhuai
authored andcommitted
[SPARK-13139][SQL] Parse Hive DDL commands ourselves
## What changes were proposed in this pull request? This patch is ported over from viirya's changes in #11048. Currently for most DDLs we just pass the query text directly to Hive. Instead, we should parse these commands ourselves and in the future (not part of this patch) use the `HiveCatalog` to process these DDLs. This is a pretext to merging `SQLContext` and `HiveContext`. Note: As of this patch we still pass the query text to Hive. The difference is that we now parse the commands ourselves so in the future we can just use our own catalog. ## How was this patch tested? Jenkins, new `DDLCommandSuite`, which comprises of about 40% of the changes here. Author: Andrew Or <andrew@databricks.com> Closes #11573 from andrewor14/parser-plus-plus.
1 parent 42afd72 commit 66d9d0e

File tree

8 files changed

+1318
-36
lines changed

8 files changed

+1318
-36
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.parser
1919

20+
import org.apache.spark.sql.AnalysisException
2021
import org.apache.spark.sql.catalyst.TableIdentifier
2122
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
2223
import org.apache.spark.sql.types._
@@ -29,6 +30,7 @@ import org.apache.spark.sql.types._
2930
object ParserUtils {
3031

3132
object Token {
33+
// Match on (text, children)
3234
def unapply(node: ASTNode): Some[(String, List[ASTNode])] = {
3335
CurrentOrigin.setPosition(node.line, node.positionInLine)
3436
node.pattern
@@ -160,7 +162,14 @@ object ParserUtils {
160162
}
161163

162164
/**
163-
* Throw an exception because we cannot parse the given node.
165+
* Throw an exception because we cannot parse the given node for some unexpected reason.
166+
*/
167+
def parseFailed(msg: String, node: ASTNode): Nothing = {
168+
throw new AnalysisException(s"$msg: '${node.source}")
169+
}
170+
171+
/**
172+
* Throw an exception because there are no rules to parse the node.
164173
*/
165174
def noParseRule(msg: String, node: ASTNode): Nothing = {
166175
throw new NotImplementedError(

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,15 @@ class SQLContext private[sql](
784784
Dataset.newDataFrame(this, parseSql(sqlText))
785785
}
786786

787+
/**
788+
* Executes a SQL query without parsing it, but instead passing it directly to an underlying
789+
* system to process. This is currently only used for Hive DDLs and will be removed as soon
790+
* as Spark can parse all supported Hive DDLs itself.
791+
*/
792+
private[sql] def runNativeSql(sqlText: String): Seq[Row] = {
793+
throw new UnsupportedOperationException
794+
}
795+
787796
/**
788797
* Returns the specified table as a [[DataFrame]].
789798
*

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,26 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
2929
import ParserUtils._
3030

3131
/** Check if a command should not be explained. */
32-
protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command
32+
protected def isNoExplainCommand(command: String): Boolean = {
33+
"TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command
34+
}
35+
36+
/**
37+
* For each node, extract properties in the form of a list ['key1', 'key2', 'key3', 'value']
38+
* into a pair (key1.key2.key3, value).
39+
*/
40+
private def extractProps(
41+
props: Seq[ASTNode],
42+
expectedNodeText: String): Seq[(String, String)] = {
43+
props.map {
44+
case Token(x, keysAndValue) if x == expectedNodeText =>
45+
val key = keysAndValue.init.map { x => unquoteString(x.text) }.mkString(".")
46+
val value = unquoteString(keysAndValue.last.text)
47+
(key, value)
48+
case p =>
49+
parseFailed(s"Expected property '$expectedNodeText' in command", p)
50+
}
51+
}
3352

3453
protected override def nodeToPlan(node: ASTNode): LogicalPlan = {
3554
node match {
@@ -64,10 +83,86 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
6483
val tableIdent = extractTableIdent(nameParts)
6584
RefreshTable(tableIdent)
6685

86+
// CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
87+
// [LOCATION path] [WITH DBPROPERTIES (key1=val1, key2=val2, ...)];
88+
case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: args) =>
89+
val Seq(ifNotExists, dbLocation, databaseComment, dbprops) = getClauses(Seq(
90+
"TOK_IFNOTEXISTS",
91+
"TOK_DATABASELOCATION",
92+
"TOK_DATABASECOMMENT",
93+
"TOK_DATABASEPROPERTIES"), args)
94+
val location = dbLocation.map {
95+
case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc)
96+
case _ => parseFailed("Invalid CREATE DATABASE command", node)
97+
}
98+
val comment = databaseComment.map {
99+
case Token("TOK_DATABASECOMMENT", Token(com, Nil) :: Nil) => unquoteString(com)
100+
case _ => parseFailed("Invalid CREATE DATABASE command", node)
101+
}
102+
val props = dbprops.toSeq.flatMap {
103+
case Token("TOK_DATABASEPROPERTIES", Token("TOK_DBPROPLIST", propList) :: Nil) =>
104+
extractProps(propList, "TOK_TABLEPROPERTY")
105+
case _ => parseFailed("Invalid CREATE DATABASE command", node)
106+
}.toMap
107+
CreateDatabase(databaseName, ifNotExists.isDefined, location, comment, props)(node.source)
108+
109+
// CREATE [TEMPORARY] FUNCTION [db_name.]function_name AS class_name
110+
// [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
111+
case Token("TOK_CREATEFUNCTION", args) =>
112+
// Example format:
113+
//
114+
// TOK_CREATEFUNCTION
115+
// :- db_name
116+
// :- func_name
117+
// :- alias
118+
// +- TOK_RESOURCE_LIST
119+
// :- TOK_RESOURCE_URI
120+
// : :- TOK_JAR
121+
// : +- '/path/to/jar'
122+
// +- TOK_RESOURCE_URI
123+
// :- TOK_FILE
124+
// +- 'path/to/file'
125+
val (funcNameArgs, otherArgs) = args.partition {
126+
case Token("TOK_RESOURCE_LIST", _) => false
127+
case Token("TOK_TEMPORARY", _) => false
128+
case Token(_, Nil) => true
129+
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
130+
}
131+
// If database name is specified, there are 3 tokens, otherwise 2.
132+
val (funcName, alias) = funcNameArgs match {
133+
case Token(dbName, Nil) :: Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
134+
(unquoteString(dbName) + "." + unquoteString(fname), unquoteString(aname))
135+
case Token(fname, Nil) :: Token(aname, Nil) :: Nil =>
136+
(unquoteString(fname), unquoteString(aname))
137+
case _ =>
138+
parseFailed("Invalid CREATE FUNCTION command", node)
139+
}
140+
// Extract other keywords, if they exist
141+
val Seq(rList, temp) = getClauses(Seq("TOK_RESOURCE_LIST", "TOK_TEMPORARY"), otherArgs)
142+
val resourcesMap = rList.toSeq.flatMap {
143+
case Token("TOK_RESOURCE_LIST", resources) =>
144+
resources.map {
145+
case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) =>
146+
val resourceType = rType match {
147+
case Token("TOK_JAR", Nil) => "jar"
148+
case Token("TOK_FILE", Nil) => "file"
149+
case Token("TOK_ARCHIVE", Nil) => "archive"
150+
case Token(f, _) => parseFailed(s"Unexpected resource format '$f'", node)
151+
}
152+
(resourceType, unquoteString(rPath))
153+
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
154+
}
155+
case _ => parseFailed("Invalid CREATE FUNCTION command", node)
156+
}.toMap
157+
CreateFunction(funcName, alias, resourcesMap, temp.isDefined)(node.source)
158+
159+
case Token("TOK_ALTERTABLE", alterTableArgs) =>
160+
AlterTableCommandParser.parse(node)
161+
67162
case Token("TOK_CREATETABLEUSING", createTableArgs) =>
68163
val Seq(
69164
temp,
70-
allowExisting,
165+
ifNotExists,
71166
Some(tabName),
72167
tableCols,
73168
Some(Token("TOK_TABLEPROVIDER", providerNameParts)),
@@ -79,30 +174,22 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
79174
"TOK_TABLEPROVIDER",
80175
"TOK_TABLEOPTIONS",
81176
"TOK_QUERY"), createTableArgs)
82-
83177
val tableIdent: TableIdentifier = extractTableIdent(tabName)
84-
85178
val columns = tableCols.map {
86179
case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField))
180+
case _ => parseFailed("Invalid CREATE TABLE command", node)
87181
}
88-
89182
val provider = providerNameParts.map {
90183
case Token(name, Nil) => name
184+
case _ => parseFailed("Invalid CREATE TABLE command", node)
91185
}.mkString(".")
92-
93-
val options: Map[String, String] = tableOpts.toSeq.flatMap {
94-
case Token("TOK_TABLEOPTIONS", options) =>
95-
options.map {
96-
case Token("TOK_TABLEOPTION", keysAndValue) =>
97-
val key = keysAndValue.init.map(_.text).mkString(".")
98-
val value = unquoteString(keysAndValue.last.text)
99-
(key, value)
100-
}
186+
val options = tableOpts.toSeq.flatMap {
187+
case Token("TOK_TABLEOPTIONS", opts) => extractProps(opts, "TOK_TABLEOPTION")
188+
case _ => parseFailed("Invalid CREATE TABLE command", node)
101189
}.toMap
190+
val asClause = tableAs.map(nodeToPlan)
102191

103-
val asClause = tableAs.map(nodeToPlan(_))
104-
105-
if (temp.isDefined && allowExisting.isDefined) {
192+
if (temp.isDefined && ifNotExists.isDefined) {
106193
throw new AnalysisException(
107194
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
108195
}
@@ -113,7 +200,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
113200
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
114201
}
115202

116-
val mode = if (allowExisting.isDefined) {
203+
val mode = if (ifNotExists.isDefined) {
117204
SaveMode.Ignore
118205
} else if (temp.isDefined) {
119206
SaveMode.Overwrite
@@ -136,7 +223,7 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly
136223
provider,
137224
temp.isDefined,
138225
options,
139-
allowExisting.isDefined,
226+
ifNotExists.isDefined,
140227
managedIfNoPath = false)
141228
}
142229

0 commit comments

Comments
 (0)