-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16041][SQL] Disallow Duplicate Columns in partitionBy, bucketBy and sortBy #13756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
83082ff
83828fe
785d625
ae15ea9
24edb5f
69d7de6
c0e7e0c
53417f1
6bd359c
08b5374
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd | |
| import org.apache.spark.sql.catalyst.plans.logical | ||
| import org.apache.spark.sql.catalyst.plans.logical._ | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.command.CreateTableCommand | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} | ||
|
|
||
|
|
@@ -145,7 +146,7 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log | |
| } | ||
|
|
||
| /** | ||
| * A rule to do various checks before inserting into or writing to a data source table. | ||
| * A rule to do various checks before inserting into or writing to a table. | ||
| */ | ||
| private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) | ||
| extends (LogicalPlan => Unit) { | ||
|
|
@@ -206,7 +207,44 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) | |
| // The relation in l is not an InsertableRelation. | ||
| failAnalysis(s"$l does not allow insertion.") | ||
|
|
||
| case c: CreateTableCommand => | ||
| // If caseSensitiveAnalysis is false, convert the names to lower cases | ||
| val allColNamesInSchema = | ||
| c.table.schema.map(col => convertToCaseSensitiveAnalysisAware(col.name)) | ||
| val partitionColumnNames = | ||
| c.table.partitionColumnNames.map(convertToCaseSensitiveAnalysisAware) | ||
|
|
||
| // Duplicates are not allowed in partitionBy | ||
| // Todo: when bucketBy and sortBy are supported, we also need to ban the duplication. | ||
| checkDuplicates(partitionColumnNames, "Partition") | ||
|
|
||
| val colNames = allColNamesInSchema.diff(partitionColumnNames) | ||
| // Ensuring whether no duplicate name is used in table definition | ||
| checkDuplicates(colNames, s"table definition of ${c.table.identifier}") | ||
|
|
||
| // For non-data-source tables, partition columns must not be part of the schema | ||
| val badPartCols = partitionColumnNames.toSet.intersect(colNames.toSet) | ||
| if (badPartCols.nonEmpty) { | ||
| failAnalysis(s"Operation not allowed: Partition columns may not be specified in the " + | ||
| "schema: " + badPartCols.map("`" + _ + "`").mkString(",")) | ||
| } | ||
|
|
||
| case c: CreateTableUsing => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. : ) True. Let me add them now.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only found one case: First, Second, Let me know if anything is still missing. Thanks! |
||
| // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. | ||
| checkDuplicates(c.partitionColumns, "Partition") | ||
| c.bucketSpec.foreach { b => | ||
| checkDuplicates(b.bucketColumnNames, "Bucketing") | ||
| checkDuplicates(b.sortColumnNames, "Sorting") | ||
| } | ||
|
|
||
| case c: CreateTableUsingAsSelect => | ||
| // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. | ||
| checkDuplicates(c.partitionColumns, "Partition") | ||
| c.bucketSpec.foreach { b => | ||
| checkDuplicates(b.bucketColumnNames, "Bucketing") | ||
| checkDuplicates(b.sortColumnNames, "Sorting") | ||
| } | ||
|
|
||
| // When the SaveMode is Overwrite, we need to check if the table is an input table of | ||
| // the query. If so, we will throw an AnalysisException to let users know it is not allowed. | ||
| if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) { | ||
|
|
@@ -248,4 +286,19 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) | |
| case _ => // OK | ||
| } | ||
| } | ||
|
|
||
| private def convertToCaseSensitiveAnalysisAware(name: String): String = { | ||
| if (conf.caseSensitiveAnalysis) name else name.toLowerCase | ||
| } | ||
|
|
||
| private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { | ||
| val duplicateColumns = | ||
| columnNames.groupBy(convertToCaseSensitiveAnalysisAware).collect { | ||
| case (x, ys) if ys.length > 1 => s"`$x`" | ||
| } | ||
| if (duplicateColumns.nonEmpty) { | ||
| throw new AnalysisException( | ||
| s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.command | ||
|
|
||
| import org.scalatest.BeforeAndAfterEach | ||
|
|
||
| import org.apache.spark.sql.{AnalysisException, QueryTest} | ||
| import org.apache.spark.sql.test.SharedSQLContext | ||
|
|
||
| class DDLSemanticAnalysisSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { | ||
|
|
||
| override def afterEach(): Unit = { | ||
| try { | ||
| // drop all databases, tables and functions after each test | ||
| spark.sessionState.catalog.reset() | ||
| } finally { | ||
| super.afterEach() | ||
| } | ||
| } | ||
|
|
||
| test("create table - duplicate column names in the table definition") { | ||
| val query = "CREATE TABLE default.tab1 (key INT, key STRING)" | ||
| val e = intercept[AnalysisException] { sql(query) } | ||
| assert(e.getMessage.contains( | ||
| "Found duplicate column(s) in table definition of `default`.`tab1`: `key`")) | ||
| } | ||
|
|
||
| test("create table - column repeated in partitioning columns") { | ||
| val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" | ||
| val e = intercept[AnalysisException] { sql(query) } | ||
| assert(e.getMessage.contains( | ||
| "Operation not allowed: Partition columns may not be specified in the schema: `key`")) | ||
| } | ||
|
|
||
| test("duplicate column names in bucketBy") { | ||
| import testImplicits._ | ||
| val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") | ||
| withTable("tab123") { | ||
| val e = intercept[AnalysisException] { | ||
| df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab123") | ||
| }.getMessage | ||
| assert(e.contains("Found duplicate column(s) in Bucketing: `j`")) | ||
| } | ||
| } | ||
|
|
||
| test("duplicate column names in sortBy") { | ||
| import testImplicits._ | ||
| val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") | ||
| withTable("tab123") { | ||
| val e = intercept[AnalysisException] { | ||
| df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab123") | ||
| }.getMessage | ||
| assert(e.contains("Found duplicate column(s) in Sorting: `k`")) | ||
| } | ||
| } | ||
|
|
||
| test("duplicate column names in partitionBy") { | ||
| import testImplicits._ | ||
| val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") | ||
| withTable("tab123") { | ||
| val e = intercept[AnalysisException] { | ||
| df.write.format("json").partitionBy("i", "i").saveAsTable("tab123") | ||
| }.getMessage | ||
| assert(e.contains("Found duplicate column(s) in Partition: `i`")) | ||
| } | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -443,6 +443,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log | |
| case p: LogicalPlan if p.resolved => p | ||
|
|
||
| case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => | ||
| // Ensuring whether no duplicate name is used in table definition | ||
| checkDuplicates(child.output.map(_.name), s"table definition of ${table.identifier}") | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Actually, in Hive, there is a stage called
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sounds like a good idea, do you have more information about the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, my previous comment is not accurate. In Hive, they just split what our To answer your last question, let me post the error messages generated by semantic analyzer. The range of error codes from 10000 to 19999 is used by semantic analyzer:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rxin @cloud-fan @liancheng @yhuai Do you think we can open an umbrallel JIRA for the whole community to track whether the same/similar error messages should be issued by Spark SQL? That could help us find all the potential holes and improve the code quality? |
||
|
|
||
| val desc = if (table.storage.serde.isEmpty) { | ||
| // add default serde | ||
| table.withNewStorage( | ||
|
|
@@ -458,6 +461,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log | |
| child, | ||
| allowExisting) | ||
| } | ||
|
|
||
| private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { | ||
| val duplicateColumns = columnNames.groupBy(_.toLowerCase).collect { | ||
| case (x, ys) if ys.length > 1 => s"`$x`" | ||
| } | ||
| if (duplicateColumns.nonEmpty) { | ||
| throw new AnalysisException( | ||
| s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this line is using case sensitive comparison.