Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ statement
| createTableHeader ('(' colTypeList ')')? tableProvider
(OPTIONS tablePropertyList)? #createTableUsing
| createTableHeader tableProvider
(OPTIONS tablePropertyList)? AS? query #createTableUsing
(OPTIONS tablePropertyList)?
(PARTITIONED BY partitionColumnNames=identifierList)?
bucketSpec? AS? query #createTableUsing
| createTableHeader ('(' columns=colTypeList ')')?
(COMMENT STRING)?
(PARTITIONED BY '(' partitionColumns=colTypeList ')')?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
val options = Option(ctx.tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)
val provider = ctx.tableProvider.qualifiedName.getText
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

if (ctx.query != null) {
// Get the backing query.
Expand All @@ -302,9 +303,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
} else {
SaveMode.ErrorIfExists
}
CreateTableUsingAsSelect(table, provider, temp, Array.empty, None, mode, options, query)

val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
.getOrElse(Array.empty[String])

CreateTableUsingAsSelect(
table, provider, temp, partitionColumnNames, bucketSpec, mode, options, query)
} else {
val struct = Option(ctx.colTypeList).map(createStructType)
val struct = Option(ctx.colTypeList()).map(createStructType)
Copy link
Contributor

@yhuai yhuai Apr 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the command is not CTAS statement, seems we should throw exceptions if users define any of PARTITIONED BY, SORTED BY, or CLUSTERED BY clause?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that is not very related to this pr. I always find that the keyword CLUSTERED BY is very confusing, because there is a CLUSTER BY keyword (, which is DISTRIBUTE BY + SORT BY). But, we do not need to change it right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to add the check for this else branch and add some tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry. PARTITIONED BY and CLUSTERED BY are both associated with CREATE TABLE USING AS SELECT rule. So, for CREATE TABLE USING, if PARTITIONED BY or CLUSTERED PY is provided, we already throw an exception.

CreateTableUsing(table, struct, provider, temp, options, ifNotExists, managedIfNoPath = false)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -940,4 +940,97 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) })
}
}

test("CTAS: persisted partitioned data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath

sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|PARTITIONED BY (a)
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)

val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBuckets"))
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numBucketCols"))
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))

checkAnswer(table("t"), Row(2, 1))
}
}
}

test("CTAS: persisted bucketed data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath

sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)

val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)

checkAnswer(table("t"), Row(1, 2))
}

withTable("t") {
val path = dir.getCanonicalPath

sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|CLUSTERED BY (a) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b
""".stripMargin
)

val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numPartCols"))
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(!metastoreTable.properties.contains("spark.sql.sources.schema.numSortCols"))

checkAnswer(table("t"), Row(1, 2))
}
}
}

test("CTAS: persisted partitioned bucketed data source table") {
withTempDir { dir =>
withTable("t") {
val path = dir.getCanonicalPath

sql(
s"""CREATE TABLE t USING PARQUET
|OPTIONS (PATH '$path')
|PARTITIONED BY (a)
|CLUSTERED BY (b) SORTED BY (c) INTO 2 BUCKETS
|AS SELECT 1 AS a, 2 AS b, 3 AS c
""".stripMargin
)

val metastoreTable = sharedState.externalCatalog.getTable("default", "t")
assert(metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numBuckets").toInt === 2)
assert(metastoreTable.properties("spark.sql.sources.schema.numBucketCols").toInt === 1)
assert(metastoreTable.properties("spark.sql.sources.schema.numSortCols").toInt === 1)

checkAnswer(table("t"), Row(2, 3, 1))
}
}
}
}