diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index f77af76d2bf3..49a701d902d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -357,6 +357,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def saveAsTable(tableIdent: TableIdentifier): Unit = { val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val partitions = normalizedParCols.map(_.toArray).getOrElse(Array.empty[String]) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -370,7 +371,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { CreateTableUsingAsSelect( tableIdent, source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), + partitions, getBucketSpec, mode, extraOptions.toMap, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 42ec210baa2d..2dbdf87a1a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -311,8 +311,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan. - * - * TODO add bucketing and partitioning. */ override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) @@ -876,8 +874,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning either a [[CreateTableCommand]] or a - * [[CreateHiveTableAsSelectLogicalPlan]]. + * Create a table, returning a [[CreateTableCommand]], a [[CreateHiveTableAsSelectLogicalPlan]] + * or a [[CreateTableUsingAsSelect]]. * * This is not used to create datasource tables, which is handled through * "CREATE TABLE ... USING ...". @@ -918,23 +916,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) - // Ensuring whether no duplicate name is used in table definition - val colNames = cols.map(_.name) - if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - operationNotAllowed(s"Duplicated column names found in table definition of $name: " + - duplicateColumns.mkString("[", ",", "]"), ctx) - } - - // For Hive tables, partition columns must not be part of the schema - val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet) - if (badPartCols.nonEmpty) { - operationNotAllowed(s"Partition columns may not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx) - } - // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly val schema = cols ++ partitionCols diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 15b9d14bd73f..82a9152ecf63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CreateTableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} @@ -145,7 +146,7 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log } /** - * A rule to do various checks before inserting into or writing to a data source table. + * A rule to do various checks before inserting into or writing to a table. */ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) extends (LogicalPlan => Unit) { @@ -206,7 +207,44 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case c: CreateTableCommand => + // If caseSensitiveAnalysis is false, convert the names to lower cases + val allColNamesInSchema = + c.table.schema.map(col => convertToCaseSensitiveAnalysisAware(col.name)) + val partitionColumnNames = + c.table.partitionColumnNames.map(convertToCaseSensitiveAnalysisAware) + + // Duplicates are not allowed in partitionBy + // Todo: when bucketBy and sortBy are supported, we also need to ban the duplication. + checkDuplicates(partitionColumnNames, "Partition") + + val colNames = allColNamesInSchema.diff(partitionColumnNames) + // Ensuring whether no duplicate name is used in table definition + checkDuplicates(colNames, s"table definition of ${c.table.identifier}") + + // For non-data-source tables, partition columns must not be part of the schema + val badPartCols = partitionColumnNames.toSet.intersect(colNames.toSet) + if (badPartCols.nonEmpty) { + failAnalysis(s"Operation not allowed: Partition columns may not be specified in the " + + "schema: " + badPartCols.map("`" + _ + "`").mkString(",")) + } + + case c: CreateTableUsing => + // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. + checkDuplicates(c.partitionColumns, "Partition") + c.bucketSpec.foreach { b => + checkDuplicates(b.bucketColumnNames, "Bucketing") + checkDuplicates(b.sortColumnNames, "Sorting") + } + case c: CreateTableUsingAsSelect => + // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. + checkDuplicates(c.partitionColumns, "Partition") + c.bucketSpec.foreach { b => + checkDuplicates(b.bucketColumnNames, "Bucketing") + checkDuplicates(b.sortColumnNames, "Sorting") + } + // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) { @@ -248,4 +286,19 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) case _ => // OK } } + + private def convertToCaseSensitiveAnalysisAware(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } + + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { + val duplicateColumns = + columnNames.groupBy(convertToCaseSensitiveAnalysisAware).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + if (duplicateColumns.nonEmpty) { + throw new AnalysisException( + s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index e1a7b9b0048b..322e86f1a288 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -334,20 +334,6 @@ class DDLCommandSuite extends PlanTest { assert(ct.table.storage.locationUri == Some("/something/anything")) } - test("create table - column repeated in partitioning columns") { - val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains( - "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]")) - } - - test("create table - duplicate column names in the table definition") { - val query = "CREATE TABLE default.tab1 (key INT, key STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " + - "table definition of `default`.`tab1`: [\"key\"]")) - } - test("create table using - with partitioned by") { val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " + "USING parquet PARTITIONED BY (a)" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala new file mode 100644 index 000000000000..c3b9889c60aa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.test.SharedSQLContext + +class DDLSemanticAnalysisSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + test("create table - duplicate column names in the table definition") { + val query = "CREATE TABLE default.tab1 (key INT, key STRING)" + val e = intercept[AnalysisException] { sql(query) } + assert(e.getMessage.contains( + "Found duplicate column(s) in table definition of `default`.`tab1`: `key`")) + } + + test("create table - column repeated in partitioning columns") { + val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" + val e = intercept[AnalysisException] { sql(query) } + assert(e.getMessage.contains( + "Operation not allowed: Partition columns may not be specified in the schema: `key`")) + } + + test("duplicate column names in bucketBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Bucketing: `j`")) + } + } + + test("duplicate column names in sortBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Sorting: `k`")) + } + } + + test("duplicate column names in partitionBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").partitionBy("i", "i").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Partition: `i`")) + } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 7dae473f47cb..4831f7d3cffe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -443,6 +443,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p: LogicalPlan if p.resolved => p case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => + // Ensuring whether no duplicate name is used in table definition + checkDuplicates(child.output.map(_.name), s"table definition of ${table.identifier}") + val desc = if (table.storage.serde.isEmpty) { // add default serde table.withNewStorage( @@ -458,6 +461,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log child, allowExisting) } + + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { + val duplicateColumns = columnNames.groupBy(_.toLowerCase).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + if (duplicateColumns.nonEmpty) { + throw new AnalysisException( + s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala new file mode 100644 index 000000000000..c585d3b8524c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.execution.command.CreateTableCommand +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class HiveDDLSemanticAnalysisSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + test("duplicate columns in bucketBy, sortBy and partitionBy in CTAS") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + + test("duplicate columns in bucketBy, sortBy and partitionBy in Create Data Source Tables") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + + test("duplicate columns in partitionBy in Create Cataloged Table") { + withTable("t") { + val errorQuery = "CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT, a INT)" + val correctQuery = "CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT)" + val e = intercept[AnalysisException] { sql(errorQuery) } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `a`")) + assert(sql(correctQuery).queryExecution.analyzed.isInstanceOf[CreateTableCommand]) + } + } + + test("duplicate columns in partitionBy in Create Cataloged Table As Select") { + import spark.implicits._ + withTable("t", "t1") { + spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") + val errorQuery = "CREATE TABLE t SELECT a, b as a from t1" + val correctQuery = "CREATE TABLE t SELECT a, b from t1" + val e = intercept[AnalysisException] { sql(errorQuery) } + assert(e.getMessage.contains("Found duplicate column(s) in table definition of `t`: `a`")) + assert( + sql(correctQuery).queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand]) + } + } + +}