Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,7 @@ createFileFormat
;

fileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)?
(INPUTDRIVER inDriver=STRING OUTPUTDRIVER outDriver=STRING)? #tableFileFormat
: INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE serdeCls=STRING)? #tableFileFormat
| identifier #genericFileFormat
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell I deleted the INPUTDRIVER and OUTPUTDRIVER here because Hive doesn't support it. Why was this added in the first place? Is there any supporting documentation for this somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 I wanted to make sure we supported the same grammar as Hive and I used their grammars as a basis. So this is defined in the following two locations:

The main idea was that I could throw better errors. But if it is not supported by Hive itself then please remove it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove it. I have never seen this before and it is not documented anywhere.

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,30 @@ case class CatalogTable(
tableType: CatalogTableType,
storage: CatalogStorageFormat,
schema: Seq[CatalogColumn],
partitionColumns: Seq[CatalogColumn] = Seq.empty,
sortColumns: Seq[CatalogColumn] = Seq.empty,
numBuckets: Int = 0,
partitionColumnNames: Seq[String] = Seq.empty,
sortColumnNames: Seq[String] = Seq.empty,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we maintain the ordering of the sort columns here? For the clause SORTED BY (col_name [ASC|DESC], ...), within CLUSTERED BY clause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do not support SORTED BY and CLUSTERED BY for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also that has nothing to do with this patch

bucketColumnNames: Seq[String] = Seq.empty,
numBuckets: Int = -1,
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None) {
viewText: Option[String] = None,
comment: Option[String] = None) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe renaming it to description (be consistent with CatalogDatabase)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this "comment" mean the comment for the table, as defined in the syntax create table t1 (c1 int) COMMENT 'abc'? If it is, shall it be put into properties, it seems Hive puts this table comment into TBLPROPERTIES. so it is consistent when creating hive metastore table from CatalogTable object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually prefer to keep it as comment because it's COMMENT in the query and "comment" in Hive's table properties

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


// Verify that the provided columns are part of the schema
private val colNames = schema.map(_.name).toSet
private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
}
requireSubsetOfSchema(partitionColumnNames, "partition")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the following example from Hive's doc

CREATE TABLE page_view(viewTime INT, userid BIGINT,
     page_url STRING, referrer_url STRING,
     ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\001'
STORED AS SEQUENCEFILE;

Will this CREATE TABLE statement trigger an error at here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no because we put the partition columns in the schema when we parse it. There's a comment in HiveSqlParser that describes this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

requireSubsetOfSchema(sortColumnNames, "sort")
requireSubsetOfSchema(bucketColumnNames, "bucket")

/** Columns this table is partitioned by. */
def partitionColumns: Seq[CatalogColumn] =
schema.filter { c => partitionColumnNames.contains(c.name) }

/** Return the database this table was specified to belong to, assuming it exists. */
def database: String = identifier.database.getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,12 @@ abstract class CatalogTestUtils {
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL_TABLE,
storage = storageFormat,
schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")),
partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")))
schema = Seq(
CatalogColumn("col1", "int"),
CatalogColumn("col2", "string"),
CatalogColumn("a", "int"),
CatalogColumn("b", "string")),
partitionColumnNames = Seq("a", "b"))
}

def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ class SparkSqlAstBuilder extends AstBuilder {
}
}

/** Type to keep track of a table header. */
/**
* Type to keep track of a table header: (identifier, isTemporary, ifNotExists, isExternal).
*/
type TableHeader = (TableIdentifier, Boolean, Boolean, Boolean)

/**
Expand Down Expand Up @@ -616,10 +618,7 @@ class SparkSqlAstBuilder extends AstBuilder {
case s: GenericFileFormatContext =>
(Seq.empty[String], Option(s.identifier.getText))
case s: TableFileFormatContext =>
val elements = Seq(s.inFmt, s.outFmt) ++
Option(s.serdeCls).toSeq ++
Option(s.inDriver).toSeq ++
Option(s.outDriver).toSeq
val elements = Seq(s.inFmt, s.outFmt) ++ Option(s.serdeCls).toSeq
(elements.map(string), None)
}
AlterTableSetFileFormat(
Expand Down Expand Up @@ -773,22 +772,6 @@ class SparkSqlAstBuilder extends AstBuilder {
.map(_.identifier.getText))
}

/**
* Create a skew specification. This contains three components:
* - The Skewed Columns
* - Values for which are skewed. The size of each entry must match the number of skewed columns.
* - A store in directory flag.
*/
override def visitSkewSpec(
ctx: SkewSpecContext): (Seq[String], Seq[Seq[String]], Boolean) = withOrigin(ctx) {
val skewedValues = if (ctx.constantList != null) {
Seq(visitConstantList(ctx.constantList))
} else {
visitNestedConstantList(ctx.nestedConstantList)
}
(visitIdentifierList(ctx.identifierList), skewedValues, ctx.DIRECTORIES != null)
}

/**
* Convert a nested constants list into a sequence of string sequences.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,29 +224,6 @@ case class DropTable(
}
}

/**
* A command that renames a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* ALTER VIEW view1 RENAME TO view2;
* }}}
*/
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
Seq.empty[Row]
}

}

/**
* A command that sets table/view properties.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable


// TODO: move the rest of the table commands from ddl.scala to this file

/**
* A command to create a table.
*
* Note: This is currently used only for creating Hive tables.
* This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
* {{{
* CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
* [(col1 data_type [COMMENT col_comment], ...)]
* [COMMENT table_comment]
* [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
* [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO num_buckets BUCKETS]
* [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 I just did a quick check with our InsertIntoHiveTable command. Seems this command does not really understand how to handle a table having specifications on CLUSTERED BY, SORTED BY or SKEWED BY. How about we just throw exceptions when a define provide these specs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* [STORED AS DIRECTORIES]
* [ROW FORMAT row_format]
* [STORED AS file_format | STORED BY storage_handler_class [WITH SERDEPROPERTIES (...)]]
* [LOCATION path]
* [TBLPROPERTIES (property_name=property_value, ...)]
* [AS select_statement];
* }}}
*/
case class CreateTable(table: CatalogTable, ifNotExists: Boolean) extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
sqlContext.sessionState.catalog.createTable(table, ifNotExists)
Seq.empty[Row]
}

}


/**
* A command that renames a table/view.
*
* The syntax of this command is:
* {{{
* ALTER TABLE table1 RENAME TO table2;
* ALTER VIEW view1 RENAME TO view2;
* }}}
*/
case class AlterTableRename(
oldName: TableIdentifier,
newName: TableIdentifier)
extends RunnableCommand {

override def run(sqlContext: SQLContext): Seq[Row] = {
val catalog = sqlContext.sessionState.catalog
catalog.invalidateTable(oldName)
catalog.renameTable(oldName, newName)
Seq.empty[Row]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -440,37 +440,25 @@ class DDLCommandSuite extends PlanTest {
}

test("alter table: set file format") {
val sql1 =
"""
|ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test'
|OUTPUTFORMAT 'test' SERDE 'test' INPUTDRIVER 'test' OUTPUTDRIVER 'test'
""".stripMargin
val sql2 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
val sql1 = "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
"OUTPUTFORMAT 'test' SERDE 'test'"
val sql3 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
val sql2 = "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
"SET FILEFORMAT PARQUET"
val parsed1 = parser.parsePlan(sql1)
val parsed2 = parser.parsePlan(sql2)
val parsed3 = parser.parsePlan(sql3)
val tableIdent = TableIdentifier("table_name", None)
val expected1 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test", "test", "test"),
List("test", "test", "test"),
None)(sql1)
val expected2 = AlterTableSetFileFormat(
tableIdent,
None,
List("test", "test", "test"),
None)(sql2)
val expected3 = AlterTableSetFileFormat(
tableIdent,
Some(Map("dt" -> "2008-08-08", "country" -> "us")),
Seq(),
Some("PARQUET"))(sql3)
Some("PARQUET"))(sql2)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
comparePlans(parsed3, expected3)
}

test("alter table: set location") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}

// TODO: ADD a testcase for Drop Database in Restric when we can create tables in SQLContext

test("show tables") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this TODO comment is duplicated in line 197

withTempTable("show1a", "show2b") {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {

runCliWithin(3.minute)(
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
-> "",
"SHOW TABLES;"
-> "hive_test",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE hive_test;"
Expand All @@ -187,7 +187,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"USE hive_test_db;"
-> "",
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
-> "",
"SHOW TABLES;"
-> "hive_test"
)
Expand All @@ -210,9 +210,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
"""CREATE TABLE t1(key string, val string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
""".stripMargin
-> "OK",
-> "",
"CREATE TABLE sourceTable (key INT, val STRING);"
-> "OK",
-> "",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
-> "OK",
"INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
Expand Down
Loading