diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala index 38fa5cb585ee..55af9112691b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala @@ -23,6 +23,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import scala.util.parsing.input.CharArrayReader.EofCh +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical._ private[sql] abstract class AbstractSparkSQLParser diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 52b284b757df..5886ce715540 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -21,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -41,6 +41,10 @@ trait Catalog { throw new UnsupportedOperationException } + def runNativeCommand(sql: String): Seq[Row] = { + throw new UnsupportedOperationException + } + /** * Returns tuples of (tableName, isTemporary) for all tables in the given database. * isTemporary is a Boolean value indicates if a table is a temporary or not. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala similarity index 89% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala index a0a56d728cde..263ff5f32b25 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/CatalystQl.scala @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.catalyst +package org.apache.spark.sql.catalyst.parser import java.sql.Date import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.Count @@ -33,14 +34,8 @@ import org.apache.spark.util.random.RandomSampler /** * This class translates SQL to Catalyst [[LogicalPlan]]s or [[Expression]]s. */ -private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends ParserInterface { - object Token { - def unapply(node: ASTNode): Some[(String, List[ASTNode])] = { - CurrentOrigin.setPosition(node.line, node.positionInLine) - node.pattern - } - } - +private[sql] class CatalystQl(conf: ParserConf = SimpleParserConf()) extends ParserInterface { + import ParserSupport._ /** * The safeParse method allows a user to focus on the parsing/AST transformation logic. This * method will take care of possible errors during the parsing process. @@ -90,92 +85,13 @@ private[sql] class CatalystQl(val conf: ParserConf = SimpleParserConf()) extends } } - protected def getClauses( - clauseNames: Seq[String], - nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = { - var remainingNodes = nodeList - val clauses = clauseNames.map { clauseName => - val (matches, nonMatches) = remainingNodes.partition(_.text.toUpperCase == clauseName) - remainingNodes = nonMatches ++ (if (matches.nonEmpty) matches.tail else Nil) - matches.headOption - } - - if (remainingNodes.nonEmpty) { - sys.error( - s"""Unhandled clauses: ${remainingNodes.map(_.treeString).mkString("\n")}. - |You are likely trying to use an unsupported Hive feature."""".stripMargin) - } - clauses - } - - protected def getClause(clauseName: String, nodeList: Seq[ASTNode]): ASTNode = - getClauseOption(clauseName, nodeList).getOrElse(sys.error( - s"Expected clause $clauseName missing from ${nodeList.map(_.treeString).mkString("\n")}")) - - protected def getClauseOption(clauseName: String, nodeList: Seq[ASTNode]): Option[ASTNode] = { - nodeList.filter { case ast: ASTNode => ast.text == clauseName } match { - case Seq(oneMatch) => Some(oneMatch) - case Seq() => None - case _ => sys.error(s"Found multiple instances of clause $clauseName") - } - } - - protected def nodeToAttribute(node: ASTNode): Attribute = node match { + def nodeToAttribute(node: ASTNode): Attribute = node match { case Token("TOK_TABCOL", Token(colName, Nil) :: dataType :: Nil) => AttributeReference(colName, nodeToDataType(dataType), nullable = true)() case _ => noParseRule("Attribute", node) } - protected def nodeToDataType(node: ASTNode): DataType = node match { - case Token("TOK_DECIMAL", precision :: scale :: Nil) => - DecimalType(precision.text.toInt, scale.text.toInt) - case Token("TOK_DECIMAL", precision :: Nil) => - DecimalType(precision.text.toInt, 0) - case Token("TOK_DECIMAL", Nil) => DecimalType.USER_DEFAULT - case Token("TOK_BIGINT", Nil) => LongType - case Token("TOK_INT", Nil) => IntegerType - case Token("TOK_TINYINT", Nil) => ByteType - case Token("TOK_SMALLINT", Nil) => ShortType - case Token("TOK_BOOLEAN", Nil) => BooleanType - case Token("TOK_STRING", Nil) => StringType - case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType - case Token("TOK_CHAR", Token(_, Nil) :: Nil) => StringType - case Token("TOK_FLOAT", Nil) => FloatType - case Token("TOK_DOUBLE", Nil) => DoubleType - case Token("TOK_DATE", Nil) => DateType - case Token("TOK_TIMESTAMP", Nil) => TimestampType - case Token("TOK_BINARY", Nil) => BinaryType - case Token("TOK_LIST", elementType :: Nil) => ArrayType(nodeToDataType(elementType)) - case Token("TOK_STRUCT", Token("TOK_TABCOLLIST", fields) :: Nil) => - StructType(fields.map(nodeToStructField)) - case Token("TOK_MAP", keyType :: valueType :: Nil) => - MapType(nodeToDataType(keyType), nodeToDataType(valueType)) - case _ => - noParseRule("DataType", node) - } - - protected def nodeToStructField(node: ASTNode): StructField = node match { - case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: Nil) => - StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true) - case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: comment :: Nil) => - val meta = new MetadataBuilder().putString("comment", unquoteString(comment.text)).build() - StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true, meta) - case _ => - noParseRule("StructField", node) - } - - protected def extractTableIdent(tableNameParts: ASTNode): TableIdentifier = { - tableNameParts.children.map { - case Token(part, Nil) => cleanIdentifier(part) - } match { - case Seq(tableOnly) => TableIdentifier(tableOnly) - case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName)) - case other => sys.error("Hive only supports tables names like 'tableName' " + - s"or 'databaseName.tableName', found '$other'") - } - } - /** * SELECT MAX(value) FROM src GROUP BY k1, k2, k3 GROUPING SETS((k1, k2), (k2)) * is equivalent to @@ -623,22 +539,6 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C noParseRule("Select", node) } - protected val escapedIdentifier = "`(.+)`".r - protected val doubleQuotedString = "\"([^\"]+)\"".r - protected val singleQuotedString = "'([^']+)'".r - - protected def unquoteString(str: String) = str match { - case singleQuotedString(s) => s - case doubleQuotedString(s) => s - case other => other - } - - /** Strips backticks from ident if present */ - protected def cleanIdentifier(ident: String): String = ident match { - case escapedIdentifier(i) => i - case plainIdent => plainIdent - } - /* Case insensitive matches */ val COUNT = "(?i)COUNT".r val SUM = "(?i)SUM".r @@ -1004,7 +904,4 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } protected def nodeToGenerator(node: ASTNode): Generator = noParseRule("Generator", node) - - protected def noParseRule(msg: String, node: ASTNode): Nothing = throw new NotImplementedError( - s"[$msg]: No parse rules for ASTNode type: ${node.tokenType}, tree:\n${node.treeString}") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala similarity index 93% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala index 24ec452c4d2e..7f35d650b957 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserInterface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala @@ -15,8 +15,9 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst +package org.apache.spark.sql.catalyst.parser +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserSupport.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserSupport.scala new file mode 100644 index 000000000000..5b69e858b72f --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserSupport.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.types._ + +object ParserSupport { + protected val escapedIdentifier = "`(.+)`".r + protected val doubleQuotedString = "\"([^\"]+)\"".r + protected val singleQuotedString = "'([^']+)'".r + + private[sql] def unquoteString(str: String): String = str match { + case singleQuotedString(s) => s + case doubleQuotedString(s) => s + case other => other + } + + /** Strips backticks from ident if present */ + private[sql] def cleanIdentifier(ident: String): String = ident match { + case escapedIdentifier(i) => i + case plainIdent => plainIdent + } + + private[sql] def cleanAndUnquoteString(str: String): String = { + cleanIdentifier(unquoteString(str)) + } + + object Token { + def unapply(node: ASTNode): Some[(String, List[ASTNode])] = { + CurrentOrigin.setPosition(node.line, node.positionInLine) + node.pattern + } + } + + def getClauses( + clauseNames: Seq[String], + nodeList: Seq[ASTNode]): Seq[Option[ASTNode]] = { + var remainingNodes = nodeList + val clauses = clauseNames.map { clauseName => + val (matches, nonMatches) = remainingNodes.partition(_.text.toUpperCase == clauseName) + remainingNodes = nonMatches ++ (if (matches.nonEmpty) matches.tail else Nil) + matches.headOption + } + + if (remainingNodes.nonEmpty) { + sys.error( + s"""Unhandled clauses: ${remainingNodes.map(_.treeString).mkString("\n")}. + |You are likely trying to use an unsupported Hive feature."""".stripMargin) + } + clauses + } + + def getClause(clauseName: String, nodeList: Seq[ASTNode]): ASTNode = + getClauseOption(clauseName, nodeList).getOrElse(sys.error( + s"Expected clause $clauseName missing from ${nodeList.map(_.treeString).mkString("\n")}")) + + def getClauseOption(clauseName: String, nodeList: Seq[ASTNode]): Option[ASTNode] = { + nodeList.filter { case ast: ASTNode => ast.text == clauseName } match { + case Seq(oneMatch) => Some(oneMatch) + case Seq() => None + case _ => sys.error(s"Found multiple instances of clause $clauseName") + } + } + + def extractTableIdent(tableNameParts: ASTNode): TableIdentifier = { + tableNameParts.children.map { + case Token(part, Nil) => cleanIdentifier(part) + } match { + case Seq(tableOnly) => TableIdentifier(tableOnly) + case Seq(databaseName, table) => TableIdentifier(table, Some(databaseName)) + case other => sys.error("Hive only supports tables names like 'tableName' " + + s"or 'databaseName.tableName', found '$other'") + } + } + + def nodeToDataType(node: ASTNode): DataType = node match { + case Token("TOK_DECIMAL", precision :: scale :: Nil) => + DecimalType(precision.text.toInt, scale.text.toInt) + case Token("TOK_DECIMAL", precision :: Nil) => + DecimalType(precision.text.toInt, 0) + case Token("TOK_DECIMAL", Nil) => DecimalType.USER_DEFAULT + case Token("TOK_BIGINT", Nil) => LongType + case Token("TOK_INT", Nil) => IntegerType + case Token("TOK_TINYINT", Nil) => ByteType + case Token("TOK_SMALLINT", Nil) => ShortType + case Token("TOK_BOOLEAN", Nil) => BooleanType + case Token("TOK_STRING", Nil) => StringType + case Token("TOK_VARCHAR", Token(_, Nil) :: Nil) => StringType + case Token("TOK_CHAR", Token(_, Nil) :: Nil) => StringType + case Token("TOK_FLOAT", Nil) => FloatType + case Token("TOK_DOUBLE", Nil) => DoubleType + case Token("TOK_DATE", Nil) => DateType + case Token("TOK_TIMESTAMP", Nil) => TimestampType + case Token("TOK_BINARY", Nil) => BinaryType + case Token("TOK_LIST", elementType :: Nil) => ArrayType(nodeToDataType(elementType)) + case Token("TOK_STRUCT", Token("TOK_TABCOLLIST", fields) :: Nil) => + StructType(fields.map(nodeToStructField)) + case Token("TOK_MAP", keyType :: valueType :: Nil) => + MapType(nodeToDataType(keyType), nodeToDataType(valueType)) + case _ => + noParseRule("DataType", node) + } + + def nodeToStructField(node: ASTNode): StructField = node match { + case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: Nil) => + StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true) + case Token("TOK_TABCOL", Token(fieldName, Nil) :: dataType :: comment :: Nil) => + val meta = new MetadataBuilder().putString("comment", unquoteString(comment.text)).build() + StructField(cleanIdentifier(fieldName), nodeToDataType(dataType), nullable = true, meta) + case _ => + noParseRule("StructField", node) + } + + def noParseRule(msg: String, node: ASTNode): Nothing = throw new NotImplementedError( + s"[$msg]: No parse rules for ASTNode type: ${node.tokenType}, tree:\n${node.treeString}") +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala index 53a8d6e53e38..4d500273b6c9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.parser.CatalystQl import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.unsafe.types.CalendarInterval diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5f5b7f4c19cf..7d6fd198e2b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression -import org.apache.spark.sql.execution.{ExplainCommand, FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution} +import org.apache.spark.sql.execution.commands.ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index cb4a6397b261..645d851fcf9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -35,9 +35,11 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.commands._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab} import org.apache.spark.sql.internal.{SessionState, SQLConf} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala index 4174e27e9c8b..089e1baab950 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkQl.scala @@ -16,18 +16,38 @@ */ package org.apache.spark.sql.execution +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.parser.{ASTNode, ParserConf, SimpleParserConf} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} +import org.apache.spark.sql.catalyst.parser.{ASTNode, CatalystQl, ParserConf, ParserSupport, SimpleParserConf} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.execution.commands._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends CatalystQl(conf) { + import ParserSupport._ + /** Check if a command should not be explained. */ - protected def isNoExplainCommand(command: String): Boolean = "TOK_DESCTABLE" == command + protected def isNoExplainCommand(command: String): Boolean = + "TOK_DESCTABLE" == command || "TOK_ALTERTABLE" == command + + protected def extractProps( + node: ASTNode, + firstLevelNodeStr: String, + secondLevelNodeStr: String): Seq[(String, String)] = node match { + case Token(firstLevelNodeStr, options) => + options.map { + case Token(secondLevelNodeStr, keysAndValue) => + val key = keysAndValue.init.map(x => unquoteString(x.text)).mkString(".") + val value = unquoteString(keysAndValue.last.text) + (key, value) + } + } protected override def nodeToPlan(node: ASTNode): LogicalPlan = { node match { @@ -62,6 +82,56 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly val tableIdent = extractTableIdent(nameParts) RefreshTable(tableIdent) + case Token("TOK_CREATEDATABASE", Token(databaseName, Nil) :: createDatabaseArgs) => + val Seq( + allowExisting, + dbLocation, + databaseComment, + dbprops) = getClauses(Seq( + "TOK_IFNOTEXISTS", + "TOK_DATABASELOCATION", + "TOK_DATABASECOMMENT", + "TOK_DATABASEPROPERTIES"), createDatabaseArgs) + + val location = dbLocation.map { + case Token("TOK_DATABASELOCATION", Token(loc, Nil) :: Nil) => unquoteString(loc) + } + val comment = databaseComment.map { + case Token("TOK_DATABASECOMMENT", Token(comment, Nil) :: Nil) => unquoteString(comment) + } + val props: Map[String, String] = dbprops.toSeq.flatMap { + case Token("TOK_DATABASEPROPERTIES", propList) => + propList.flatMap(extractProps(_, "TOK_DBPROPLIST", "TOK_TABLEPROPERTY")) + }.toMap + + CreateDataBase(databaseName, allowExisting.isDefined, location, comment, props)(node.source) + + case Token("TOK_CREATEFUNCTION", func :: as :: createFuncArgs) => + val funcName = func.map(x => unquoteString(x.text)).mkString(".") + val asName = unquoteString(as.text) + val Seq( + rList, + temp) = getClauses(Seq( + "TOK_RESOURCE_LIST", + "TOK_TEMPORARY"), createFuncArgs) + + val resourcesMap: Map[String, String] = rList.toSeq.flatMap { + case Token("TOK_RESOURCE_LIST", resources) => + resources.map { + case Token("TOK_RESOURCE_URI", rType :: Token(rPath, Nil) :: Nil) => + val resourceType = rType match { + case Token("TOK_JAR", Nil) => "jar" + case Token("TOK_FILE", Nil) => "file" + case Token("TOK_ARCHIVE", Nil) => "archive" + } + (resourceType, unquoteString(rPath)) + } + }.toMap + CreateFunction(funcName, asName, resourcesMap, temp.isDefined)(node.source) + + case Token("TOK_ALTERTABLE", alterTableArgs) => + AlterTableCommandParser.parse(node) + case Token("TOK_CREATETABLEUSING", createTableArgs) => val Seq( temp, @@ -88,16 +158,8 @@ private[sql] class SparkQl(conf: ParserConf = SimpleParserConf()) extends Cataly case Token(name, Nil) => name }.mkString(".") - val options: Map[String, String] = tableOpts.toSeq.flatMap { - case Token("TOK_TABLEOPTIONS", options) => - options.map { - case Token("TOK_TABLEOPTION", keysAndValue) => - val key = keysAndValue.init.map(_.text).mkString(".") - val value = unquoteString(keysAndValue.last.text) - (key, value) - } - }.toMap - + val options: Map[String, String] = + tableOpts.toSeq.flatMap(extractProps(_, "TOK_TABLEOPTIONS", "TOK_TABLEOPTION")).toMap val asClause = tableAs.map(nodeToPlan(_)) if (temp.isDefined && allowExisting.isDefined) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dd8c96d5fa1d..a6dd24b3811b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution -import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.execution.commands.{DescribeCommand => RunnableDescribeCommand, _} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.internal.SQLConf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 557464574182..132634e0eac8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -73,349 +73,3 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan override def argString: String = cmd.toString } - - -case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging { - - private def keyValueOutput: Seq[Attribute] = { - val schema = StructType( - StructField("key", StringType, false) :: - StructField("value", StringType, false) :: Nil) - schema.toAttributes - } - - private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match { - // Configures the deprecated "mapred.reduce.tasks" property. - case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") - if (value.toInt < 1) { - val msg = - s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + - "determining the number of reducers is not supported." - throw new IllegalArgumentException(msg) - } else { - sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) - Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) - } - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " + - s"External sort will continue to be used.") - Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.USE_SQL_AGGREGATE2, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " + - s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " + - s"continue to be true.") - Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " + - s"will be ignored. Tungsten will continue to be used.") - Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " + - s"will be ignored. Codegen will continue to be used.") - Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " + - s"will be ignored. Unsafe mode will continue to be used.") - Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true")) - } - (keyValueOutput, runFunc) - - case Some((SQLConf.Deprecated.SORTMERGE_JOIN, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " + - s"will be ignored. Sort merge join will continue to be used.") - Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true")) - } - (keyValueOutput, runFunc) - - // Configures a single property. - case Some((key, Some(value))) => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.setConf(key, value) - Seq(Row(key, value)) - } - (keyValueOutput, runFunc) - - // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) - // Queries all key-value pairs that are set in the SQLConf of the sqlContext. - case None => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq - } - (keyValueOutput, runFunc) - - // Queries all properties along with their default values and docs that are defined in the - // SQLConf of the sqlContext. - case Some(("-v", None)) => - val runFunc = (sqlContext: SQLContext) => { - sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) => - Row(key, defaultValue, doc) - } - } - val schema = StructType( - StructField("key", StringType, false) :: - StructField("default", StringType, false) :: - StructField("meaning", StringType, false) :: Nil) - (schema.toAttributes, runFunc) - - // Queries the deprecated "mapred.reduce.tasks" property. - case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => - val runFunc = (sqlContext: SQLContext) => { - logWarning( - s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + - s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") - Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString)) - } - (keyValueOutput, runFunc) - - // Queries a single property. - case Some((key, None)) => - val runFunc = (sqlContext: SQLContext) => { - val value = - try sqlContext.getConf(key) catch { - case _: NoSuchElementException => "" - } - Seq(Row(key, value)) - } - (keyValueOutput, runFunc) - } - - override val output: Seq[Attribute] = _output - - override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext) - -} - -/** - * An explain command for users to see how a command will be executed. - * - * Note that this command takes in a logical plan, runs the optimizer on the logical plan - * (but do NOT actually execute it). - */ -case class ExplainCommand( - logicalPlan: LogicalPlan, - override val output: Seq[Attribute] = - Seq(AttributeReference("plan", StringType, nullable = true)()), - extended: Boolean = false) - extends RunnableCommand { - - // Run through the optimizer to generate the physical plan. - override def run(sqlContext: SQLContext): Seq[Row] = try { - // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. - val queryExecution = sqlContext.executePlan(logicalPlan) - val outputString = if (extended) queryExecution.toString else queryExecution.simpleString - - outputString.split("\n").map(Row(_)) - } catch { case cause: TreeNodeException[_] => - ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) - } -} - - -case class CacheTableCommand( - tableName: String, - plan: Option[LogicalPlan], - isLazy: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - plan.foreach { logicalPlan => - sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) - } - sqlContext.cacheTable(tableName) - - if (!isLazy) { - // Performs eager caching - sqlContext.table(tableName).count() - } - - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -case class UncacheTableCommand(tableName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.table(tableName).unpersist(blocking = false) - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - -/** - * Clear all cached data from the in-memory cache. - */ -case object ClearCacheCommand extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.clearCache() - Seq.empty[Row] - } - - override def output: Seq[Attribute] = Seq.empty -} - - -case class DescribeCommand( - child: SparkPlan, - override val output: Seq[Attribute], - isExtended: Boolean) - extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - child.schema.fields.map { field => - val cmtKey = "comment" - val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" - Row(field.name, field.dataType.simpleString, comment) - } - } -} - -/** - * A command for users to get tables in the given database. - * If a databaseName is not given, the current database will be used. - * The syntax of using this command in SQL is: - * {{{ - * SHOW TABLES [IN databaseName] - * }}} - */ -case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand { - - // The result of SHOW TABLES has two columns, tableName and isTemporary. - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("tableName", StringType, false) :: - StructField("isTemporary", BooleanType, false) :: Nil) - - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - // Since we need to return a Seq of rows, we will call getTables directly - // instead of calling tables in sqlContext. - val rows = sqlContext.catalog.getTables(databaseName).map { - case (tableName, isTemporary) => Row(tableName, isTemporary) - } - - rows - } -} - -/** - * A command for users to list all of the registered functions. - * The syntax of using this command in SQL is: - * {{{ - * SHOW FUNCTIONS - * }}} - * TODO currently we are simply ignore the db - */ -case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - override def run(sqlContext: SQLContext): Seq[Row] = pattern match { - case Some(p) => - try { - val regex = java.util.regex.Pattern.compile(p) - sqlContext.functionRegistry.listFunction().filter(regex.matcher(_).matches()).map(Row(_)) - } catch { - // probably will failed in the regex that user provided, then returns empty row. - case _: Throwable => Seq.empty[Row] - } - case None => - sqlContext.functionRegistry.listFunction().map(Row(_)) - } -} - -/** - * A command for users to get the usage of a registered function. - * The syntax of using this command in SQL is - * {{{ - * DESCRIBE FUNCTION [EXTENDED] upper; - * }}} - */ -case class DescribeFunction( - functionName: String, - isExtended: Boolean) extends RunnableCommand { - - override val output: Seq[Attribute] = { - val schema = StructType( - StructField("function_desc", StringType, nullable = false) :: Nil) - - schema.toAttributes - } - - private def replaceFunctionName(usage: String, functionName: String): String = { - if (usage == null) { - "To be added." - } else { - usage.replaceAll("_FUNC_", functionName) - } - } - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.functionRegistry.lookupFunction(functionName) match { - case Some(info) => - val result = - Row(s"Function: ${info.getName}") :: - Row(s"Class: ${info.getClassName}") :: - Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil - - if (isExtended) { - result :+ Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") - } else { - result - } - - case None => Seq(Row(s"Function: $functionName not found.")) - } - } -} - -case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { - - override def run(sqlContext: SQLContext): Seq[Row] = { - sqlContext.catalog.setCurrentDatabase(databaseName) - Seq.empty[Row] - } - - override val output: Seq[Attribute] = Seq.empty -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/commands.scala new file mode 100644 index 000000000000..d34808c428c0 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/commands.scala @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.commands + +import java.util.NoSuchElementException + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging { + + private def keyValueOutput: Seq[Attribute] = { + val schema = StructType( + StructField("key", StringType, false) :: + StructField("value", StringType, false) :: Nil) + schema.toAttributes + } + + private val deprecatedProperties: Map[String, SQLContext => Seq[Row]] = + Seq((SQLConf.Deprecated.EXTERNAL_SORT, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and will be ignored. " + + s"External sort will continue to be used.") + Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true")) + }), + (SQLConf.Deprecated.USE_SQL_AGGREGATE2, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} is deprecated and " + + s"will be ignored. ${SQLConf.Deprecated.USE_SQL_AGGREGATE2} will " + + s"continue to be true.") + Seq(Row(SQLConf.Deprecated.USE_SQL_AGGREGATE2, "true")) + }), + (SQLConf.Deprecated.TUNGSTEN_ENABLED, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " + + s"will be ignored. Tungsten will continue to be used.") + Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true")) + }), + (SQLConf.Deprecated.CODEGEN_ENABLED, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " + + s"will be ignored. Codegen will continue to be used.") + Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true")) + }), + (SQLConf.Deprecated.UNSAFE_ENABLED, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " + + s"will be ignored. Unsafe mode will continue to be used.") + Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true")) + }), + (SQLConf.Deprecated.SORTMERGE_JOIN, (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.SORTMERGE_JOIN} is deprecated and " + + s"will be ignored. Sort merge join will continue to be used.") + Seq(Row(SQLConf.Deprecated.SORTMERGE_JOIN, "true")) + }) + ).toMap + + private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match { + case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + if (value.toInt < 1) { + val msg = + s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " + + "determining the number of reducers is not supported." + throw new IllegalArgumentException(msg) + } else { + sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value) + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value)) + } + } + (keyValueOutput, runFunc) + + case Some((key, Some(value))) if deprecatedProperties.contains(key) => + (keyValueOutput, deprecatedProperties(key)) + + // Configures a single property. + case Some((key, Some(value))) => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.setConf(key, value) + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) + + // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.) + // Queries all key-value pairs that are set in the SQLConf of the sqlContext. + case None => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq + } + (keyValueOutput, runFunc) + + // Queries all properties along with their default values and docs that are defined in the + // SQLConf of the sqlContext. + case Some(("-v", None)) => + val runFunc = (sqlContext: SQLContext) => { + sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) => + Row(key, defaultValue, doc) + } + } + val schema = StructType( + StructField("key", StringType, false) :: + StructField("default", StringType, false) :: + StructField("meaning", StringType, false) :: Nil) + (schema.toAttributes, runFunc) + + // Queries the deprecated "mapred.reduce.tasks" property. + case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) => + val runFunc = (sqlContext: SQLContext) => { + logWarning( + s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.") + Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString)) + } + (keyValueOutput, runFunc) + + // Queries a single property. + case Some((key, None)) => + val runFunc = (sqlContext: SQLContext) => { + val value = + try sqlContext.getConf(key) catch { + case _: NoSuchElementException => "" + } + Seq(Row(key, value)) + } + (keyValueOutput, runFunc) + } + + override val output: Seq[Attribute] = _output + + override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext) + +} + +/** + * An explain command for users to see how a command will be executed. + * + * Note that this command takes in a logical plan, runs the optimizer on the logical plan + * (but do NOT actually execute it). + */ +case class ExplainCommand( + logicalPlan: LogicalPlan, + override val output: Seq[Attribute] = + Seq(AttributeReference("plan", StringType, nullable = true)()), + extended: Boolean = false) + extends RunnableCommand { + + // Run through the optimizer to generate the physical plan. + override def run(sqlContext: SQLContext): Seq[Row] = try { + // TODO in Hive, the "extended" ExplainCommand prints the AST as well, and detailed properties. + val queryExecution = sqlContext.executePlan(logicalPlan) + val outputString = if (extended) queryExecution.toString else queryExecution.simpleString + + outputString.split("\n").map(Row(_)) + } catch { case cause: TreeNodeException[_] => + ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) + } +} + + +case class CacheTableCommand( + tableName: String, + plan: Option[LogicalPlan], + isLazy: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + plan.foreach { logicalPlan => + sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName) + } + sqlContext.cacheTable(tableName) + + if (!isLazy) { + // Performs eager caching + sqlContext.table(tableName).count() + } + + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + + +case class UncacheTableCommand(tableName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.table(tableName).unpersist(blocking = false) + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + +/** + * Clear all cached data from the in-memory cache. + */ +case object ClearCacheCommand extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.clearCache() + Seq.empty[Row] + } + + override def output: Seq[Attribute] = Seq.empty +} + + +case class DescribeCommand( + child: SparkPlan, + override val output: Seq[Attribute], + isExtended: Boolean) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + child.schema.fields.map { field => + val cmtKey = "comment" + val comment = if (field.metadata.contains(cmtKey)) field.metadata.getString(cmtKey) else "" + Row(field.name, field.dataType.simpleString, comment) + } + } +} + +/** + * A command for users to get tables in the given database. + * If a databaseName is not given, the current database will be used. + * The syntax of using this command in SQL is: + * {{{ + * SHOW TABLES [IN databaseName] + * }}} + */ +case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand { + + // The result of SHOW TABLES has two columns, tableName and isTemporary. + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("tableName", StringType, false) :: + StructField("isTemporary", BooleanType, false) :: Nil) + + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + // Since we need to return a Seq of rows, we will call getTables directly + // instead of calling tables in sqlContext. + val rows = sqlContext.catalog.getTables(databaseName).map { + case (tableName, isTemporary) => Row(tableName, isTemporary) + } + + rows + } +} + +/** + * A command for users to list all of the registered functions. + * The syntax of using this command in SQL is: + * {{{ + * SHOW FUNCTIONS + * }}} + * TODO currently we are simply ignore the db + */ +case class ShowFunctions(db: Option[String], pattern: Option[String]) extends RunnableCommand { + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("function", StringType, nullable = false) :: Nil) + + schema.toAttributes + } + + override def run(sqlContext: SQLContext): Seq[Row] = pattern match { + case Some(p) => + try { + val regex = java.util.regex.Pattern.compile(p) + sqlContext.functionRegistry.listFunction().filter(regex.matcher(_).matches()).map(Row(_)) + } catch { + // probably will failed in the regex that user provided, then returns empty row. + case _: Throwable => Seq.empty[Row] + } + case None => + sqlContext.functionRegistry.listFunction().map(Row(_)) + } +} + +/** + * A command for users to get the usage of a registered function. + * The syntax of using this command in SQL is + * {{{ + * DESCRIBE FUNCTION [EXTENDED] upper; + * }}} + */ +case class DescribeFunction( + functionName: String, + isExtended: Boolean) extends RunnableCommand { + + override val output: Seq[Attribute] = { + val schema = StructType( + StructField("function_desc", StringType, nullable = false) :: Nil) + + schema.toAttributes + } + + private def replaceFunctionName(usage: String, functionName: String): String = { + if (usage == null) { + "To be added." + } else { + usage.replaceAll("_FUNC_", functionName) + } + } + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.functionRegistry.lookupFunction(functionName) match { + case Some(info) => + val result = + Row(s"Function: ${info.getName}") :: + Row(s"Class: ${info.getClassName}") :: + Row(s"Usage: ${replaceFunctionName(info.getUsage(), info.getName)}") :: Nil + + if (isExtended) { + result :+ Row(s"Extended Usage:\n${replaceFunctionName(info.getExtended, info.getName)}") + } else { + result + } + + case None => Seq(Row(s"Function: $functionName not found.")) + } + } +} + +case class SetDatabaseCommand(databaseName: String) extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.setCurrentDatabase(databaseName) + Seq.empty[Row] + } + + override val output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/ddl.scala new file mode 100644 index 000000000000..382036d4b8aa --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/ddl.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.commands + +import java.util.NoSuchElementException + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +abstract class NativeDDLCommands(val sql: String) extends RunnableCommand { + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.runNativeCommand(sql) + } + + override val output: Seq[Attribute] = + Seq(AttributeReference("result", StringType, nullable = false)()) +} + +case class CreateDataBase( + databaseName: String, + allowExisting: Boolean, + path: Option[String], + comment: Option[String], + props: Map[String, String])(sql: String) extends NativeDDLCommands(sql) with Logging + +case class CreateFunction( + functionName: String, + asName: String, + resourcesMap: Map[String, String], + isTemp: Boolean)(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableRename( + tableName: TableIdentifier, + renameTableName: TableIdentifier)(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableSetProperties( + tableName: TableIdentifier, + setProperties: Map[String, Option[String]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableDropProperties( + tableName: TableIdentifier, + dropProperties: Map[String, Option[String]], + allowExisting: Boolean)(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableSerDeProperties( + tableName: TableIdentifier, + serdeClassName: Option[String], + serdeProperties: Option[Map[String, Option[String]]], + partition: Option[Map[String, Option[String]]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableStoreProperties( + tableName: TableIdentifier, + buckets: Option[BucketSpec], + noClustered: Boolean, + noSorted: Boolean)(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableSkewed( + tableName: TableIdentifier, + skewedCols: Seq[String], + skewedValues: Seq[Seq[String]], + storedAsDirs: Boolean, + notSkewed: Boolean, + notStoredAsDirs: Boolean)(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableSkewedLocation( + tableName: TableIdentifier, + skewedMap: Map[Seq[String], String])(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableAddPartition( + tableName: TableIdentifier, + partitionsAndLocs: Seq[(Map[String, Option[String]], Option[String])], + allowExisting: Boolean)(sql: String) extends NativeDDLCommands(sql) with Logging + +case class AlterTableRenamePartition( + tableName: TableIdentifier, + oldPartition: Map[String, Option[String]], + newPartition: Map[String, Option[String]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableExchangePartition( + tableName: TableIdentifier, + fromTableName: TableIdentifier, + partition: Map[String, Option[String]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableDropPartition( + tableName: TableIdentifier, + partitions: Seq[Seq[(String, String, String)]], + allowExisting: Boolean, + purge: Boolean, + replication: Option[(String, Boolean)])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableArchivePartition( + tableName: TableIdentifier, + partition: Map[String, Option[String]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableUnarchivePartition( + tableName: TableIdentifier, + partition: Map[String, Option[String]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableSetFileFormat( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + fileFormat: Option[Seq[String]], + genericFormat: Option[String])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableSetLocation( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + location: String)(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableTouch( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableCompact( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + compactType: String)(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableMerge( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]])(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableChangeCol( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + oldColName: String, + newColName: String, + dataType: DataType, + comment: Option[String], + afterPos: Boolean, + afterPosCol: Option[String], + restrict: Boolean, + cascade: Boolean)(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableAddCol( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + columns: StructType, + restrict: Boolean, + cascade: Boolean)(sql: String) extends NativeDDLCommands(sql) + with Logging + +case class AlterTableReplaceCol( + tableName: TableIdentifier, + partition: Option[Map[String, Option[String]]], + columns: StructType, + restrict: Boolean, + cascade: Boolean)(sql: String) extends NativeDDLCommands(sql) + with Logging + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/parsers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/parsers.scala new file mode 100644 index 000000000000..3aab2e164003 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands/parsers.scala @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.commands + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.{AnalysisException, SaveMode} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} +import org.apache.spark.sql.catalyst.parser.{ASTNode, CatalystQl, ParserConf, ParserSupport, SimpleParserConf} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.execution.commands._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.types.StructType + +object AlterTableCommandParser { + import ParserSupport._ + + def parsePartitionSpec(node: ASTNode): Option[Map[String, Option[String]]] = { + node match { + case Token("TOK_PARTSPEC", partitions) => + val spec = partitions.map { + case Token("TOK_PARTVAL", ident :: constant :: Nil) => + (cleanAndUnquoteString(ident.text), Some(cleanAndUnquoteString(constant.text))) + case Token("TOK_PARTVAL", ident :: Nil) => + (cleanAndUnquoteString(ident.text), None) + }.toMap + Some(spec) + case _ => None + } + } + + def extractTableProps(node: ASTNode): Map[String, Option[String]] = node match { + case Token("TOK_TABLEPROPERTIES", propsList) => + propsList.flatMap { + case Token("TOK_TABLEPROPLIST", props) => + props.map { + case Token("TOK_TABLEPROPERTY", key :: Token("TOK_NULL", Nil) :: Nil) => + val k = cleanAndUnquoteString(key.text) + (k, None) + case Token("TOK_TABLEPROPERTY", key :: value :: Nil) => + val k = cleanAndUnquoteString(key.text) + val v = cleanAndUnquoteString(value.text) + (k, Some(v)) + } + }.toMap + } + + def parse(v1: ASTNode): LogicalPlan = v1.children match { + case (tabName @ Token("TOK_TABNAME", _)) :: restNodes => + val tableIdent: TableIdentifier = extractTableIdent(tabName) + val partitionSpec = getClauseOption("TOK_PARTSPEC", v1.children) + val partition = partitionSpec.flatMap(parsePartitionSpec) + matchAlterTableCommands(v1, restNodes, tableIdent, partition) + case _ => + throw new NotImplementedError(v1.text) + } + + def matchAlterTableCommands( + node: ASTNode, + nodes: Seq[ASTNode], + tableIdent: TableIdentifier, + partition: Option[Map[String, Option[String]]]): LogicalPlan = nodes match { + case rename @ Token("TOK_ALTERTABLE_RENAME", renameArgs) :: _ => + val renamedTable = getClause("TOK_TABNAME", renameArgs) + val renamedTableIdent: TableIdentifier = extractTableIdent(renamedTable) + AlterTableRename(tableIdent, renamedTableIdent)(node.source) + + case Token("TOK_ALTERTABLE_PROPERTIES", args) :: _ => + val setTableProperties = extractTableProps(args.head) + AlterTableSetProperties( + tableIdent, + setTableProperties)(node.source) + + case Token("TOK_ALTERTABLE_DROPPROPERTIES", args) :: _ => + val dropTableProperties = extractTableProps(args.head) + val allowExisting = getClauseOption("TOK_IFEXISTS", args) + AlterTableDropProperties( + tableIdent, + dropTableProperties, allowExisting.isDefined)(node.source) + + case Token("TOK_ALTERTABLE_SERIALIZER", Token(serdeClassName, Nil) :: serdeArgs) :: _ => + // When SET SERDE serde_classname WITH SERDEPROPERTIES, this is None + val serdeProperties: Option[Map[String, Option[String]]] = + serdeArgs.headOption.map(extractTableProps) + + AlterTableSerDeProperties( + tableIdent, + Some(cleanAndUnquoteString(serdeClassName)), + serdeProperties, + partition)(node.source) + + case Token("TOK_ALTERTABLE_SERDEPROPERTIES", args) :: _ => + val serdeProperties: Map[String, Option[String]] = extractTableProps(args.head) + + AlterTableSerDeProperties( + tableIdent, + None, + Some(serdeProperties), + partition)(node.source) + + case Token("TOK_ALTERTABLE_CLUSTER_SORT", clusterAndSoryByArgs :: Nil) :: _ => + val (buckets, noClustered, noSorted) = clusterAndSoryByArgs match { + case Token("TOK_ALTERTABLE_BUCKETS", bucketArgsHead :: bucketArgs) => + val bucketCols = bucketArgsHead.children.map(_.text) + + val (sortCols, sortDirections, numBuckets) = { + if (bucketArgs.head.text == "TOK_TABCOLNAME") { + val (cols, directions) = bucketArgs.head.children.map { + case Token("TOK_TABSORTCOLNAMEASC", Token(colName, Nil) :: Nil) => + (colName, Ascending) + case Token("TOK_TABSORTCOLNAMEDESC", Token(colName, Nil) :: Nil) => + (colName, Descending) + }.unzip + (cols, directions, bucketArgs.last.text.toInt) + } else { + (Nil, Nil, bucketArgs.head.text.toInt) + } + } + + (Some(BucketSpec(numBuckets, bucketCols, sortCols, sortDirections)), + false, false) + case Token("TOK_NOT_CLUSTERED", Nil) => + (None, true, false) + case Token("TOK_NOT_SORTED", Nil) => + (None, false, true) + } + + AlterTableStoreProperties( + tableIdent, + buckets, + noClustered, + noSorted)(node.source) + + case Token("TOK_ALTERTABLE_BUCKETS", Token(bucketNum, Nil) :: Nil) :: _ => + val num = bucketNum.toInt + val buckets = Some(BucketSpec(num, Nil, Nil, Nil)) + AlterTableStoreProperties( + tableIdent, + buckets, + false, + false)(node.source) + + case Token("TOK_ALTERTABLE_SKEWED", Nil) :: _ => + // ALTER TABLE table_name NOT SKEWED + AlterTableSkewed(tableIdent, Nil, Nil, false, true, false)(node.source) + + case Token("TOK_ALTERTABLE_SKEWED", Token("TOK_STOREDASDIRS", Nil) :: Nil) => + // ALTER TABLE table_name NOT STORED AS DIRECTORIES + AlterTableSkewed(tableIdent, Nil, Nil, false, false, true)(node.source) + + case (tableSkewed @ Token("TOK_ALTERTABLE_SKEWED", _)) :: _ => + val skewedArgs = tableSkewed match { + case Token("TOK_ALTERTABLE_SKEWED", skewedArgs :: Nil) => + skewedArgs match { + case Token("TOK_TABLESKEWED", skewedCols :: skewedValues :: stored) => + val cols = skewedCols.children.map(n => cleanAndUnquoteString(n.text)) + val values = skewedValues match { + case Token("TOK_TABCOLVALUE", values) => + Seq(values.map(n => cleanAndUnquoteString(n.text))) + case Token("TOK_TABCOLVALUE_PAIR", pairs) => + pairs.map { + case Token("TOK_TABCOLVALUES", values :: Nil) => + values match { + case Token("TOK_TABCOLVALUE", vals) => + vals.map(n => cleanAndUnquoteString(n.text)) + } + } + } + + val storedAsDirs = stored match { + case Token("TOK_STOREDASDIRS", Nil) :: Nil => true + case _ => false + } + + (cols, values, storedAsDirs) + } + } + + AlterTableSkewed( + tableIdent, + skewedArgs._1, /* cols */ + skewedArgs._2, /* values */ + skewedArgs._3, /* storedAsDirs */ + false, false)(node.source) + + case Token("TOK_ALTERTABLE_SKEWED_LOCATION", + Token("TOK_SKEWED_LOCATIONS", + Token("TOK_SKEWED_LOCATION_LIST", locationMaps) :: Nil) :: Nil) :: _ => + val skewedMaps = locationMaps.map { + case Token("TOK_SKEWED_LOCATION_MAP", key :: value :: Nil) => + val k = key match { + case Token(const, Nil) => Seq(cleanAndUnquoteString(const)) + case Token("TOK_TABCOLVALUES", values :: Nil) => + values match { + case Token("TOK_TABCOLVALUE", vals) => + vals.map(n => cleanAndUnquoteString(n.text)) + } + } + (k, cleanAndUnquoteString(value.text)) + }.toMap + AlterTableSkewedLocation(tableIdent, skewedMaps)(node.source) + + case Token("TOK_ALTERTABLE_ADDPARTS", addPartsArgs) :: _ => + val (allowExisting, parts) = addPartsArgs match { + case Token("TOK_IFNOTEXISTS", Nil) :: others => (true, others) + case _ => (false, addPartsArgs) + } + + val partitions: ArrayBuffer[(Map[String, Option[String]], Option[String])] = + new ArrayBuffer() + var currentPart: Map[String, Option[String]] = null + parts.map { + case t @ Token("TOK_PARTSPEC", partArgs) => + if (currentPart != null) { + partitions += ((currentPart, None)) + } + currentPart = parsePartitionSpec(t).get + case Token("TOK_PARTITIONLOCATION", loc :: Nil) => + val location = unquoteString(loc.text) + if (currentPart != null) { + partitions += ((currentPart, Some(location))) + currentPart = null + } else { + // We should not reach here + throw new AnalysisException("Partition location must follow a partition spec.") + } + } + + if (currentPart != null) { + partitions += ((currentPart, None)) + } + AlterTableAddPartition(tableIdent, partitions, allowExisting)(node.source) + + case Token("TOK_ALTERTABLE_RENAMEPART", partArg :: Nil) :: _ => + val Some(newPartition) = parsePartitionSpec(partArg) + AlterTableRenamePartition(tableIdent, partition.get, newPartition)(node.source) + + case Token("TOK_ALTERTABLE_EXCHANGEPARTITION", + (p @ Token("TOK_PARTSPEC", _)) :: (t @ Token("TOK_TABNAME", _)) :: Nil) :: _ => + val Some(partition) = parsePartitionSpec(p) + val fromTableIdent = extractTableIdent(t) + AlterTableExchangePartition(tableIdent, fromTableIdent, partition)(node.source) + + case Token("TOK_ALTERTABLE_DROPPARTS", args) :: _ => + val parts = args.collect { + case Token("TOK_PARTSPEC", partitions) => + partitions.map { + case Token("TOK_PARTVAL", ident :: op :: constant :: Nil) => + (cleanAndUnquoteString(ident.text), + op.text, cleanAndUnquoteString(constant.text)) + } + } + + val allowExisting = getClauseOption("TOK_IFEXISTS", args).isDefined + + val purge = getClauseOption("PURGE", args) + + val replication = getClauseOption("TOK_REPLICATION", args).map { + case Token("TOK_REPLICATION", replId :: metadata) => + (cleanAndUnquoteString(replId.text), metadata.nonEmpty) + } + + AlterTableDropPartition( + tableIdent, + parts, + allowExisting, + purge.isDefined, + replication)(node.source) + + case Token("TOK_ALTERTABLE_ARCHIVE", partArg :: Nil) :: _ => + val Some(partition) = parsePartitionSpec(partArg) + AlterTableArchivePartition(tableIdent, partition)(node.source) + + case Token("TOK_ALTERTABLE_UNARCHIVE", partArg :: Nil) :: _ => + val Some(partition) = parsePartitionSpec(partArg) + AlterTableUnarchivePartition(tableIdent, partition)(node.source) + + case Token("TOK_ALTERTABLE_FILEFORMAT", args) :: _ => + val Seq(fileFormat, genericFormat) = + getClauses(Seq("TOK_TABLEFILEFORMAT", "TOK_FILEFORMAT_GENERIC"), + args) + val fFormat = fileFormat.map(_.children.map(n => cleanAndUnquoteString(n.text))) + val gFormat = genericFormat.map(f => cleanAndUnquoteString(f.children(0).text)) + AlterTableSetFileFormat(tableIdent, partition, fFormat, gFormat)(node.source) + + case Token("TOK_ALTERTABLE_LOCATION", Token(loc, Nil) :: Nil) :: _ => + AlterTableSetLocation(tableIdent, partition, cleanAndUnquoteString(loc))(node.source) + + case Token("TOK_ALTERTABLE_TOUCH", args) :: _ => + val part = getClauseOption("TOK_PARTSPEC", args).flatMap(parsePartitionSpec) + AlterTableTouch(tableIdent, part)(node.source) + + case Token("TOK_ALTERTABLE_COMPACT", Token(compactType, Nil) :: Nil) :: _ => + AlterTableCompact(tableIdent, partition, cleanAndUnquoteString(compactType))(node.source) + + case Token("TOK_ALTERTABLE_MERGEFILES", _) :: _ => + AlterTableMerge(tableIdent, partition)(node.source) + + case Token("TOK_ALTERTABLE_RENAMECOL", args) :: _ => + val oldName = args(0).text + val newName = args(1).text + val dataType = nodeToDataType(args(2)) + val afterPos = + getClauseOption("TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION", args) + val afterPosCol = afterPos.map { ap => + ap.children match { + case Token(col, Nil) :: Nil => col + case _ => null + } + } + + val restrict = getClauseOption("TOK_RESTRICT", args) + val cascade = getClauseOption("TOK_CASCADE", args) + + val comment = if (args.size > 3) { + args(3) match { + case Token(commentStr, Nil) + if commentStr != "TOK_ALTERTABLE_CHANGECOL_AFTER_POSITION" && + commentStr != "TOK_RESTRICT" && commentStr != "TOK_CASCADE" => + Some(cleanAndUnquoteString(commentStr)) + case _ => + None + } + } else { + None + } + + AlterTableChangeCol( + tableIdent, + partition, + oldName, + newName, + dataType, + comment, + afterPos.isDefined, + afterPosCol, + restrict.isDefined, + cascade.isDefined)(node.source) + + case Token("TOK_ALTERTABLE_ADDCOLS", args) :: _ => + val tableCols = getClause("TOK_TABCOLLIST", args) + val columns = tableCols match { + case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) + } + + val restrict = getClauseOption("TOK_RESTRICT", args) + val cascade = getClauseOption("TOK_CASCADE", args) + + AlterTableAddCol( + tableIdent, + partition, + columns, + restrict.isDefined, + cascade.isDefined)(node.source) + + case Token("TOK_ALTERTABLE_REPLACECOLS", args) :: _ => + val tableCols = getClause("TOK_TABCOLLIST", args) + val columns = tableCols match { + case Token("TOK_TABCOLLIST", fields) => StructType(fields.map(nodeToStructField)) + } + + val restrict = getClauseOption("TOK_RESTRICT", args) + val cascade = getClauseOption("TOK_CASCADE", args) + + AlterTableReplaceCol( + tableIdent, + partition, + columns, + restrict.isDefined, + cascade.isDefined)(node.source) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala index 3e0d484b74cf..d73647c8efaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions.SortDirection import org.apache.spark.sql.sources.{HadoopFsRelation, HadoopFsRelationProvider, OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType @@ -35,7 +36,8 @@ import org.apache.spark.sql.types.StructType private[sql] case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) + sortColumnNames: Seq[String], + sortDirections: Seq[SortDirection] = Nil) private[sql] trait BucketedHadoopFsRelationProvider extends HadoopFsRelationProvider { final override def createRelation( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index b9873d38a664..86412c34895a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -22,11 +22,12 @@ import scala.reflect.runtime.universe.{typeTag, TypeTag} import scala.util.Try import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.{CatalystQl, ScalaReflection} +import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.parser.CatalystQl import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index f93a405f77fc..f5f36544a702 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.internal import org.apache.spark.sql.{ContinuousQueryManager, SQLContext, UDFRegistration} -import org.apache.spark.sql.catalyst.ParserInterface import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{PreInsertCastAndRename, ResolveDataSource} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/DDLCommandSuite.scala new file mode 100644 index 000000000000..91e175a4a862 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/DDLCommandSuite.scala @@ -0,0 +1,652 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.execution.commands._ +import org.apache.spark.sql.execution.datasources.BucketSpec +import org.apache.spark.sql.types._ + +class DDLCommandSuite extends PlanTest { + val parser = new SparkQl() + + test("create database") { + val sql = + """ + |CREATE DATABASE IF NOT EXISTS database_name + |COMMENT 'database_comment' LOCATION '/home/user/db' + |WITH DBPROPERTIES ('a'='a', 'b'='b', 'c'='c') + """.stripMargin + + val parsed = parser.parsePlan(sql) + + val expected = CreateDataBase( + "database_name", + true, + Some("/home/user/db"), + Some("database_comment"), + Map("a" -> "a", "b" -> "b", "c" -> "c"))(sql) + + comparePlans(parsed, expected) + } + + test("create function") { + val sql = + """ + |CREATE TEMPORARY FUNCTION helloworld as + |'com.matthewrathbone.example.SimpleUDFExample' USING JAR '/path/to/jar', + |FILE 'path/to/file' + """.stripMargin + + val parsed = parser.parsePlan(sql) + + val expected = CreateFunction( + "helloworld", + "com.matthewrathbone.example.SimpleUDFExample", + Map("jar" -> "/path/to/jar", "file" -> "path/to/file"), + true)(sql) + + comparePlans(parsed, expected) + } + + test("alter table: rename table") { + val sql = "ALTER TABLE table_name RENAME TO new_table_name" + val parsed = parser.parsePlan(sql) + val expected = AlterTableRename( + TableIdentifier("table_name", None), + TableIdentifier("new_table_name", None))(sql) + comparePlans(parsed, expected) + } + + test("alter table: alter table properties") { + val sql1 = "ALTER TABLE table_name SET TBLPROPERTIES ('test' = 'test', " + + "'comment' = 'new_comment')" + val sql2 = "ALTER TABLE table_name UNSET TBLPROPERTIES ('comment', 'test')" + val sql3 = "ALTER TABLE table_name UNSET TBLPROPERTIES IF EXISTS ('comment', 'test')" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + + val expected1 = AlterTableSetProperties( + TableIdentifier("table_name", None), + Map("test" -> Some("test"), "comment" -> Some("new_comment")))(sql1) + + val expected2 = AlterTableDropProperties( + TableIdentifier("table_name", None), + Map("comment" -> None, "test" -> None), + false)(sql2) + + val expected3 = AlterTableDropProperties( + TableIdentifier("table_name", None), + Map("comment" -> None, "test" -> None), + true)(sql3) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: SerDe properties") { + val sql1 = "ALTER TABLE table_name SET SERDE 'org.apache.class'" + val sql2 = + """ + |ALTER TABLE table_name SET SERDE 'org.apache.class' + |WITH SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',') + """.stripMargin + + val sql3 = + """ + |ALTER TABLE table_name SET SERDEPROPERTIES ('columns'='foo,bar', + |'field.delim' = ',') + """.stripMargin + + val sql4 = + """ + |ALTER TABLE table_name PARTITION (test, dt='2008-08-08', + |country='us') SET SERDE 'org.apache.class' WITH SERDEPROPERTIES ('columns'='foo,bar', + |'field.delim' = ',') + """.stripMargin + + val sql5 = + """ + |ALTER TABLE table_name PARTITION (test, dt='2008-08-08', + |country='us') SET SERDEPROPERTIES ('columns'='foo,bar', 'field.delim' = ',') + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val parsed5 = parser.parsePlan(sql5) + + val expected1 = AlterTableSerDeProperties( + TableIdentifier("table_name", None), + Some("org.apache.class"), + None, + None)(sql1) + + val expected2 = AlterTableSerDeProperties( + TableIdentifier("table_name", None), + Some("org.apache.class"), + Some(Map("columns" -> Some("foo,bar"), "field.delim" -> Some(","))), + None)(sql2) + + val expected3 = AlterTableSerDeProperties( + TableIdentifier("table_name", None), + None, + Some(Map("columns" -> Some("foo,bar"), "field.delim" -> Some(","))), + None)(sql3) + + val expected4 = AlterTableSerDeProperties( + TableIdentifier("table_name", None), + Some("org.apache.class"), + Some(Map("columns" -> Some("foo,bar"), "field.delim" -> Some(","))), + Some(Map("test" -> None, "dt" -> Some("2008-08-08"), "country" -> Some("us"))))(sql4) + + val expected5 = AlterTableSerDeProperties( + TableIdentifier("table_name", None), + None, + Some(Map("columns" -> Some("foo,bar"), "field.delim" -> Some(","))), + Some(Map("test" -> None, "dt" -> Some("2008-08-08"), "country" -> Some("us"))))(sql5) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + } + + test("alter table: storage properties") { + val sql1 = "ALTER TABLE table_name CLUSTERED BY (dt, country) INTO 10 BUCKETS" + + val sql2 = "ALTER TABLE table_name CLUSTERED BY (dt, country) SORTED BY " + + "(dt, country DESC) INTO 10 BUCKETS" + + val sql3 = "ALTER TABLE table_name INTO 20 BUCKETS" + val sql4 = "ALTER TABLE table_name NOT CLUSTERED" + val sql5 = "ALTER TABLE table_name NOT SORTED" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + val parsed4 = parser.parsePlan(sql4) + val parsed5 = parser.parsePlan(sql5) + + val expected1 = AlterTableStoreProperties( + TableIdentifier("table_name", None), + Some(BucketSpec(10, List("dt", "country"), List(), List())), + false, + false)(sql1) + + val expected2 = AlterTableStoreProperties( + TableIdentifier("table_name", None), + Some(BucketSpec(10, List("dt", "country"), List("dt", "country"), + List(Ascending, Descending))), + false, + false)(sql2) + + val expected3 = AlterTableStoreProperties( + TableIdentifier("table_name", None), + Some(BucketSpec(20, List(), List(), List())), + false, + false)(sql3) + + val expected4 = AlterTableStoreProperties( + TableIdentifier("table_name", None), + None, + true, + false)(sql4) + + val expected5 = AlterTableStoreProperties( + TableIdentifier("table_name", None), + None, + false, + true)(sql5) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + comparePlans(parsed4, expected4) + comparePlans(parsed5, expected5) + } + + test("alter table: skewed") { + val sql1 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |(('2008-08-08', 'us'), ('2009-09-09', 'uk')) STORED AS DIRECTORIES + """.stripMargin + + val sql2 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |('2008-08-08', 'us') STORED AS DIRECTORIES + """.stripMargin + + val sql3 = + """ + |ALTER TABLE table_name SKEWED BY (dt, country) ON + |(('2008-08-08', 'us'), ('2009-09-09', 'uk')) + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + + val expected1 = AlterTableSkewed( + TableIdentifier("table_name", None), + Seq("dt", "country"), + Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")), + true, + false, + false)(sql1) + + val expected2 = AlterTableSkewed( + TableIdentifier("table_name", None), + Seq("dt", "country"), + Seq(List("2008-08-08", "us")), + true, + false, + false)(sql2) + + val expected3 = AlterTableSkewed( + TableIdentifier("table_name", None), + Seq("dt", "country"), + Seq(List("2008-08-08", "us"), List("2009-09-09", "uk")), + false, + false, + false)(sql3) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: skewed location") { + val sql1 = + """ + |ALTER TABLE table_name SET SKEWED LOCATION + |('123'='location1', 'test'='location2') + """.stripMargin + + val sql2 = + """ + |ALTER TABLE table_name SET SKEWED LOCATION + |(('2008-08-08', 'us')='location1', 'test'='location2') + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableSkewedLocation( + TableIdentifier("table_name", None), + Map(List("123") -> "location1", List("test") -> "location2"))(sql1) + + val expected2 = AlterTableSkewedLocation( + TableIdentifier("table_name", None), + Map(List("2008-08-08", "us") -> "location1", List("test") -> "location2"))(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: add partition") { + val sql = + """ + |ALTER TABLE table_name ADD IF NOT EXISTS PARTITION + |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION + |(dt='2009-09-09', country='uk') + """.stripMargin + + val parsed = parser.parsePlan(sql) + + val expected = AlterTableAddPartition( + TableIdentifier("table_name", None), + Seq( + (Map("dt" -> Some("2008-08-08"), "country" -> Some("us")), Some("location1")), + (Map("dt" -> Some("2009-09-09"), "country" -> Some("uk")), None)), + true)(sql) + + comparePlans(parsed, expected) + } + + test("alter table: rename partition") { + val sql = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |RENAME TO PARTITION (dt='2008-09-09', country='uk') + """.stripMargin + + val parsed = parser.parsePlan(sql) + + val expected = AlterTableRenamePartition( + TableIdentifier("table_name", None), + Map("dt" -> Some("2008-08-08"), "country" -> Some("us")), + Map("dt" -> Some("2008-09-09"), "country" -> Some("uk")))(sql) + + comparePlans(parsed, expected) + } + + test("alter table: exchange partition") { + val sql = + """ + |ALTER TABLE table_name_1 EXCHANGE PARTITION + |(dt='2008-08-08', country='us') WITH TABLE table_name_2 + """.stripMargin + + val parsed = parser.parsePlan(sql) + + val expected = AlterTableExchangePartition( + TableIdentifier("table_name_1", None), + TableIdentifier("table_name_2", None), + Map("dt" -> Some("2008-08-08"), "country" -> Some("us")))(sql) + + comparePlans(parsed, expected) + } + + test("alter table: drop partitions") { + val sql1 = + """ + |ALTER TABLE table_name DROP IF EXISTS PARTITION + |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') + """.stripMargin + + val sql2 = + """ + |ALTER TABLE table_name DROP IF EXISTS PARTITION + |(dt='2008-08-08', country='us'), PARTITION (dt='2009-09-09', country='uk') + |PURGE FOR METADATA REPLICATION ('test') + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableDropPartition( + TableIdentifier("table_name", None), + Seq( + List(("dt", "=", "2008-08-08"), ("country", "=", "us")), + List(("dt", "=", "2009-09-09"), ("country", "=", "uk"))), + true, + false, + None)(sql1) + + val expected2 = AlterTableDropPartition( + TableIdentifier("table_name", None), + Seq( + List(("dt", "=", "2008-08-08"), ("country", "=", "us")), + List(("dt", "=", "2009-09-09"), ("country", "=", "uk"))), + true, + true, + Some(("test", true)))(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: archive partition") { + val sql = "ALTER TABLE table_name ARCHIVE PARTITION (dt='2008-08-08', country='us')" + val parsed = parser.parsePlan(sql) + + val expected = AlterTableArchivePartition( + TableIdentifier("table_name", None), + Map("dt" -> Some("2008-08-08"), "country" -> Some("us")))(sql) + + comparePlans(parsed, expected) + } + + test("alter table: unarchive partition") { + val sql = "ALTER TABLE table_name UNARCHIVE PARTITION (dt='2008-08-08', country='us')" + val parsed = parser.parsePlan(sql) + + val expected = AlterTableUnarchivePartition( + TableIdentifier("table_name", None), + Map("dt" -> Some("2008-08-08"), "country" -> Some("us")))(sql) + + comparePlans(parsed, expected) + } + + test("alter table: set file format") { + val sql1 = + """ + |ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' + |OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test' + """.stripMargin + + val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " + + "OUTPUTFORMAT 'test' SERDE 'test'" + + val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + + "SET FILEFORMAT PARQUET" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + + val expected1 = AlterTableSetFileFormat( + TableIdentifier("table_name", None), + None, + Some(List("test", "test", "test", "test", "test")), + None)(sql1) + + val expected2 = AlterTableSetFileFormat( + TableIdentifier("table_name", None), + None, + Some(List("test", "test", "test")), + None)(sql2) + + val expected3 = AlterTableSetFileFormat( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))), + None, + Some("PARQUET"))(sql3) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: set location") { + val sql1 = "ALTER TABLE table_name SET LOCATION 'new location'" + val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " + + "SET LOCATION 'new location'" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableSetLocation( + TableIdentifier("table_name", None), + None, + "new location")(sql1) + + val expected2 = AlterTableSetLocation( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))), + "new location")(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: touch") { + val sql1 = "ALTER TABLE table_name TOUCH" + val sql2 = "ALTER TABLE table_name TOUCH PARTITION (dt='2008-08-08', country='us')" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableTouch( + TableIdentifier("table_name", None), + None)(sql1) + + val expected2 = AlterTableTouch( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))))(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: compact") { + val sql1 = "ALTER TABLE table_name COMPACT 'compaction_type'" + val sql2 = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |COMPACT 'MAJOR' + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableCompact( + TableIdentifier("table_name", None), + None, + "compaction_type")(sql1) + + val expected2 = AlterTableCompact( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))), + "MAJOR")(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: concatenate") { + val sql1 = "ALTER TABLE table_name CONCATENATE" + val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE" + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val expected1 = AlterTableMerge( + TableIdentifier("table_name", None), + None)(sql1) + + val expected2 = AlterTableMerge( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))))(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } + + test("alter table: change column name/type/position/comment") { + val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT" + + val sql2 = + """ + |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT + |COMMENT 'col_comment' FIRST CASCADE + """.stripMargin + + val sql3 = + """ + |ALTER TABLE table_name CHANGE COLUMN col_old_name col_new_name INT + |COMMENT 'col_comment' AFTER column_name RESTRICT + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + val parsed3 = parser.parsePlan(sql3) + + val expected1 = AlterTableChangeCol( + TableIdentifier("table_name", None), + None, + "col_old_name", + "col_new_name", + IntegerType, + None, + false, + None, + false, + false)(sql1) + + val expected2 = AlterTableChangeCol( + TableIdentifier("table_name", None), + None, + "col_old_name", + "col_new_name", + IntegerType, + Some("col_comment"), + false, + None, + false, + true)(sql2) + + val expected3 = AlterTableChangeCol( + TableIdentifier("table_name", None), + None, + "col_old_name", + "col_new_name", + IntegerType, + Some("col_comment"), + true, + Some("column_name"), + true, + false)(sql3) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + comparePlans(parsed3, expected3) + } + + test("alter table: add/replace columns") { + val sql1 = + """ + |ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') + |ADD COLUMNS (new_col1 INT COMMENT 'test_comment', new_col2 LONG + |COMMENT 'test_comment2') CASCADE + """.stripMargin + + val sql2 = + """ + |ALTER TABLE table_name REPLACE COLUMNS (new_col1 INT + |COMMENT 'test_comment', new_col2 LONG COMMENT 'test_comment2') RESTRICT + """.stripMargin + + val parsed1 = parser.parsePlan(sql1) + val parsed2 = parser.parsePlan(sql2) + + val meta1 = new MetadataBuilder().putString("comment", "test_comment").build() + val meta2 = new MetadataBuilder().putString("comment", "test_comment2").build() + + val expected1 = AlterTableAddCol( + TableIdentifier("table_name", None), + Some(Map("dt" -> Some("2008-08-08"), "country" -> Some("us"))), + StructType(Seq( + StructField("new_col1", IntegerType, true, meta1), + StructField("new_col2", LongType, true, meta2))), + false, + true)(sql1) + + val expected2 = AlterTableReplaceCol( + TableIdentifier("table_name", None), + None, + StructType(Seq( + StructField("new_col1", IntegerType, true, meta1), + StructField("new_col2", LongType, true, meta2))), + true, + false)(sql2) + + comparePlans(parsed1, expected1) + comparePlans(parsed2, expected2) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f8a9a95c873a..8491d5bc27ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -24,7 +24,7 @@ import java.util.{Calendar, GregorianCalendar, Properties} import org.h2.jdbc.JdbcSQLException import org.scalatest.{BeforeAndAfter, PrivateMethodTester} -import org.apache.spark.sql.execution.ExplainCommand +import org.apache.spark.sql.execution.commands.ExplainCommand import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.execution.PhysicalRDD diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 458d4f2c3c95..8b113cb9c21c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -34,7 +34,7 @@ import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow} -import org.apache.spark.sql.execution.SetCommand +import org.apache.spark.sql.execution.commands.SetCommand import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index a9295d31c07b..51bab763c91b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -44,8 +44,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.commands._ import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3788736fd13c..7aef8f4edc2c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable, _} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.Logging -import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} import org.apache.spark.sql.catalyst.catalog._ @@ -249,7 +249,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } if (userSpecifiedSchema.isDefined && bucketSpec.isDefined) { - val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames) = bucketSpec.get + val BucketSpec(numBuckets, bucketColumnNames, sortColumnNames, _) = bucketSpec.get tableProperties.put("spark.sql.sources.schema.numBuckets", numBuckets.toString) tableProperties.put("spark.sql.sources.schema.numBucketCols", @@ -718,6 +718,10 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte override def setCurrentDatabase(databaseName: String): Unit = { client.setCurrentDatabase(databaseName) } + + override def runNativeCommand(sql: String): Seq[Row] = { + hive.runSqlHive(sql).map(Row(_)) + } } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 58010513538f..a1e0b9813501 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -81,34 +81,21 @@ private[hive] case class CreateViewAsSelect( /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging { + import ParserSupport._ + protected val nativeCommands = Seq( "TOK_ALTERDATABASE_OWNER", "TOK_ALTERDATABASE_PROPERTIES", "TOK_ALTERINDEX_PROPERTIES", "TOK_ALTERINDEX_REBUILD", - "TOK_ALTERTABLE", - "TOK_ALTERTABLE_ADDCOLS", - "TOK_ALTERTABLE_ADDPARTS", "TOK_ALTERTABLE_ALTERPARTS", - "TOK_ALTERTABLE_ARCHIVE", - "TOK_ALTERTABLE_CLUSTER_SORT", - "TOK_ALTERTABLE_DROPPARTS", "TOK_ALTERTABLE_PARTITION", - "TOK_ALTERTABLE_PROPERTIES", - "TOK_ALTERTABLE_RENAME", - "TOK_ALTERTABLE_RENAMECOL", - "TOK_ALTERTABLE_REPLACECOLS", - "TOK_ALTERTABLE_SKEWED", - "TOK_ALTERTABLE_TOUCH", - "TOK_ALTERTABLE_UNARCHIVE", "TOK_ALTERVIEW_ADDPARTS", "TOK_ALTERVIEW_AS", "TOK_ALTERVIEW_DROPPARTS", "TOK_ALTERVIEW_PROPERTIES", "TOK_ALTERVIEW_RENAME", - "TOK_CREATEDATABASE", - "TOK_CREATEFUNCTION", "TOK_CREATEINDEX", "TOK_CREATEMACRO", "TOK_CREATEROLE", @@ -162,7 +149,8 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging protected val noExplainCommands = Seq( "TOK_DESCTABLE", "TOK_SHOWTABLES", - "TOK_TRUNCATETABLE" // truncate table" is a NativeCommand, does not need to explain. + "TOK_TRUNCATETABLE", // truncate table" is a NativeCommand, does not need to explain. + "TOK_ALTERTABLE" ) ++ nativeCommands /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 09f54be04d0c..5acb9bf4e1b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.hive import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.ParserInterface import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry, OverrideCatalog} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.{python, SparkPlanner} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{SessionState, SQLConf} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 3687dd6f5a7a..95adaa3859b7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -22,7 +22,8 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.commands.{DescribeCommand => RunnableDescribeCommand, _} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand} import org.apache.spark.sql.hive.execution._ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index a7eca46d1980..f8a44b1506ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.ExpressionInfo import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.CacheTableCommand +import org.apache.spark.sql.execution.commands.CacheTableCommand import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveNativeCommand diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index af4c44e578c8..87969897c760 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.{ExplainCommand, SetCommand} +import org.apache.spark.sql.execution.commands.{ExplainCommand, SetCommand} import org.apache.spark.sql.execution.datasources.DescribeCommand import org.apache.spark.sql.hive.{InsertIntoHiveTable => LogicalInsertIntoHiveTable, SQLBuilder} import org.apache.spark.sql.hive.test.TestHive diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 9a52276fcdc6..d6166358607e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -78,7 +78,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet // This test verifies parts of the plan. Disable whole stage codegen. withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + val BucketSpec(numBuckets, bucketColumnNames, _, _) = bucketSpec // Limit: bucket pruning only works when the bucket column has one and only one column assert(bucketColumnNames.length == 1) val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head)