From 83082ffb9dae922374b2d2f6051cce4bf24cb1de Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 17 Jun 2016 22:13:36 -0700 Subject: [PATCH 1/6] fix --- .../apache/spark/sql/DataFrameWriter.scala | 32 +++++++++++++++---- .../sql/test/DataFrameReaderWriterSuite.scala | 24 ++++++++++++++ 2 files changed, 49 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index e6fc9749c726..ad4a9c6b3039 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -271,16 +271,33 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ifNotExists = false)).toRdd } - private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => - cols.map(normalize(_, "Partition")) + /** Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. */ + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { + if (columnNames.length != columnNames.distinct.length) { + val duplicateColumns = columnNames.groupBy(identity).collect { + case (x, ys) if ys.length > 1 => "`" + x + "`" + }.mkString(", ") + throw new AnalysisException( + s"Duplicate column(s): $duplicateColumns found in $columnType columns") + } + } + + private def normalizedParCols: Option[Seq[String]] = { + val partitionByCols = partitioningColumns.map { cols => cols.map(normalize(_, "Partition")) } + partitionByCols.foreach(checkDuplicates(_, "Partition")) + partitionByCols } - private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols => - cols.map(normalize(_, "Bucketing")) + private def normalizedBucketColNames: Option[Seq[String]] = { + val bucketByCols = bucketColumnNames.map { cols => cols.map(normalize(_, "Bucketing")) } + bucketByCols.foreach(checkDuplicates(_, "Bucketing")) + bucketByCols } - private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols => - cols.map(normalize(_, "Sorting")) + private def normalizedSortColNames: Option[Seq[String]] = { + val sortByCols = sortColumnNames.map { cols => cols.map(normalize(_, "Sorting")) } + sortByCols.foreach(checkDuplicates(_, "Sorting")) + sortByCols } private def getBucketSpec: Option[BucketSpec] = { @@ -369,6 +386,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def saveAsTable(tableIdent: TableIdentifier): Unit = { val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val partitions = normalizedParCols.map(_.toArray).getOrElse(Array.empty[String]) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -382,7 +400,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { CreateTableUsingAsSelect( tableIdent, source, - partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]), + partitions, getBucketSpec, mode, extraOptions.toMap, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 98e57b38044f..e3f3256572d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -80,6 +80,7 @@ class DefaultSource class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { + import testImplicits._ private def newMetadataDir = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath @@ -97,6 +98,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext { } } + test("duplicate columns in bucketBy") { + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab") + }.getMessage + assert(e.contains("Duplicate column(s): `j` found in Bucketing columns")) + } + + test("duplicate columns in sortBy") { + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab") + }.getMessage + assert(e.contains("Duplicate column(s): `k` found in Sorting columns")) + } + + test("duplicate columns in partitionBy") { + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + val e = intercept[AnalysisException] { + df.write.format("json").partitionBy("i", "i").saveAsTable("tab") + }.getMessage + assert(e.contains("Duplicate column(s): `i` found in Partition columns")) + } test("resolve default source") { spark.read From 785d625b38ee2b4c6d1e2fb1bd6387e387772106 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 21 Jun 2016 21:31:17 -0700 Subject: [PATCH 2/6] address comments --- .../apache/spark/sql/DataFrameWriter.scala | 29 ++------ .../sql/execution/datasources/rules.scala | 26 +++++++ .../sql/test/DataFrameReaderWriterSuite.scala | 30 +++++---- .../sql/hive/execution/HiveDDLSuite.scala | 67 +++++++++++++++++++ 4 files changed, 117 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 92eedae618b6..86c2a5b5f65b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -259,33 +259,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ifNotExists = false)).toRdd } - /** Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. */ - private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { - if (columnNames.length != columnNames.distinct.length) { - val duplicateColumns = columnNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "`" + x + "`" - }.mkString(", ") - throw new AnalysisException( - s"Duplicate column(s): $duplicateColumns found in $columnType columns") - } - } - - private def normalizedParCols: Option[Seq[String]] = { - val partitionByCols = partitioningColumns.map { cols => cols.map(normalize(_, "Partition")) } - partitionByCols.foreach(checkDuplicates(_, "Partition")) - partitionByCols + private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => + cols.map(normalize(_, "Partition")) } - private def normalizedBucketColNames: Option[Seq[String]] = { - val bucketByCols = bucketColumnNames.map { cols => cols.map(normalize(_, "Bucketing")) } - bucketByCols.foreach(checkDuplicates(_, "Bucketing")) - bucketByCols + private def normalizedBucketColNames: Option[Seq[String]] = bucketColumnNames.map { cols => + cols.map(normalize(_, "Bucketing")) } - private def normalizedSortColNames: Option[Seq[String]] = { - val sortByCols = sortColumnNames.map { cols => cols.map(normalize(_, "Sorting")) } - sortByCols.foreach(checkDuplicates(_, "Sorting")) - sortByCols + private def normalizedSortColNames: Option[Seq[String]] = sortColumnNames.map { cols => + cols.map(normalize(_, "Sorting")) } private def getBucketSpec: Option[BucketSpec] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 15b9d14bd73f..91c0c061fe74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -206,7 +206,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case c: CreateTableUsing => + // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. + checkDuplicates(c.partitionColumns, "Partition") + c.bucketSpec.foreach(b => { + checkDuplicates(b.bucketColumnNames, "Bucketing") + checkDuplicates(b.sortColumnNames, "Sorting") + }) + case c: CreateTableUsingAsSelect => + // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. + checkDuplicates(c.partitionColumns, "Partition") + c.bucketSpec.foreach(b => { + checkDuplicates(b.bucketColumnNames, "Bucketing") + checkDuplicates(b.sortColumnNames, "Sorting") + }) + // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) { @@ -248,4 +263,15 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) case _ => // OK } } + + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { + val duplicateColumns = columnNames.groupBy { name => + if (conf.caseSensitiveAnalysis) name else name.toLowerCase }.collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + if (duplicateColumns.nonEmpty) { + throw new AnalysisException( + s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 24c34819872c..8a685a0bcd97 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -112,28 +112,34 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be test("duplicate columns in bucketBy") { import testImplicits._ val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - val e = intercept[AnalysisException] { - df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab") - }.getMessage - assert(e.contains("Duplicate column(s): `j` found in Bucketing columns")) + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Bucketing: `j`")) + } } test("duplicate columns in sortBy") { import testImplicits._ val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - val e = intercept[AnalysisException] { - df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab") - }.getMessage - assert(e.contains("Duplicate column(s): `k` found in Sorting columns")) + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Sorting: `k`")) + } } test("duplicate columns in partitionBy") { import testImplicits._ val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - val e = intercept[AnalysisException] { - df.write.format("json").partitionBy("i", "i").saveAsTable("tab") - }.getMessage - assert(e.contains("Duplicate column(s): `i` found in Partition columns")) + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").partitionBy("i", "i").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Partition: `i`")) + } } test("resolve default source") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index b2f01fcc8328..0b69a27b1a4d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -140,6 +140,73 @@ class HiveDDLSuite } } + test("duplicate columns in bucketBy, sortBy and partitionBy in CTAS") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + + test("duplicate columns in bucketBy, sortBy and partitionBy in Create Data Source Tables") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => From 24edb5f60f3caf79c6e860905041cd09301589db Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 22 Jun 2016 23:23:18 -0700 Subject: [PATCH 3/6] address comments --- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 2 -- .../apache/spark/sql/execution/datasources/rules.scala | 6 ++++++ .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 9 +++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2ae8380644ac..e6375ad578a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -311,8 +311,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { /** * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan. - * - * TODO add bucketing and partitioning. */ override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) { val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 91c0c061fe74..fd29962b66b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrd import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command.CreateTableCommand import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} @@ -206,6 +207,11 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) // The relation in l is not an InsertableRelation. failAnalysis(s"$l does not allow insertion.") + case c: CreateTableCommand => + // Duplicates are not allowed in partitionBy + // Todo: when bucketBy and sortBy are supported, we also need to ban the duplication. + checkDuplicates(c.table.partitionColumnNames, "Partition") + case c: CreateTableUsing => // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. checkDuplicates(c.partitionColumns, "Partition") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 0b69a27b1a4d..9a04d0068fd8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -207,6 +207,15 @@ class HiveDDLSuite } } + test("duplicate columns in partitionBy in CREATE TABLE") { + withTable("t") { + val e = intercept[AnalysisException] { + sql("CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT, a INT)") + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `a`")) + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => From 69d7de6d5d78210da1a26ab25fb4ff131ef5461c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 23 Jun 2016 20:00:41 -0700 Subject: [PATCH 4/6] revert --- .../sql/test/DataFrameReaderWriterSuite.scala | 33 +------------------ 1 file changed, 1 insertion(+), 32 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 8a685a0bcd97..3fa3864bc969 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -85,6 +85,7 @@ class DefaultSource class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { + private val userSchema = new StructType().add("s", StringType) private val textSchema = new StructType().add("value", StringType) private val data = Seq("1", "2", "3") @@ -109,38 +110,6 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } - test("duplicate columns in bucketBy") { - import testImplicits._ - val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - withTable("tab123") { - val e = intercept[AnalysisException] { - df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab123") - }.getMessage - assert(e.contains("Found duplicate column(s) in Bucketing: `j`")) - } - } - - test("duplicate columns in sortBy") { - import testImplicits._ - val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - withTable("tab123") { - val e = intercept[AnalysisException] { - df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab123") - }.getMessage - assert(e.contains("Found duplicate column(s) in Sorting: `k`")) - } - } - - test("duplicate columns in partitionBy") { - import testImplicits._ - val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") - withTable("tab123") { - val e = intercept[AnalysisException] { - df.write.format("json").partitionBy("i", "i").saveAsTable("tab123") - }.getMessage - assert(e.contains("Found duplicate column(s) in Partition: `i`")) - } - } test("resolve default source") { spark.read From c0e7e0c54fce43e9750401d22950b581cad0a048 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 23 Jun 2016 23:03:38 -0700 Subject: [PATCH 5/6] address comments --- .../spark/sql/execution/SparkSqlParser.scala | 21 +-- .../sql/execution/datasources/rules.scala | 16 ++- .../execution/command/DDLCommandSuite.scala | 14 -- .../command/DDLSemanticAnalysisSuite.scala | 83 +++++++++++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 13 ++ .../HiveDDLSemanticAnalysisSuite.scala | 135 ++++++++++++++++++ .../sql/hive/execution/HiveDDLSuite.scala | 76 ---------- 7 files changed, 247 insertions(+), 111 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index e6375ad578a4..34e4b4bdc660 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -865,8 +865,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning either a [[CreateTableCommand]] or a - * [[CreateHiveTableAsSelectLogicalPlan]]. + * Create a table, returning either a [[CreateTableCommand]], a + * [[CreateHiveTableAsSelectLogicalPlan]] or a [[CreateTableUsingAsSelect]]. * * This is not used to create datasource tables, which is handled through * "CREATE TABLE ... USING ...". @@ -907,23 +907,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty) val selectQuery = Option(ctx.query).map(plan) - // Ensuring whether no duplicate name is used in table definition - val colNames = cols.map(_.name) - if (colNames.length != colNames.distinct.length) { - val duplicateColumns = colNames.groupBy(identity).collect { - case (x, ys) if ys.length > 1 => "\"" + x + "\"" - } - throw operationNotAllowed(s"Duplicated column names found in table definition of $name: " + - duplicateColumns.mkString("[", ",", "]"), ctx) - } - - // For Hive tables, partition columns must not be part of the schema - val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet) - if (badPartCols.nonEmpty) { - throw operationNotAllowed(s"Partition columns may not be specified in the schema: " + - badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx) - } - // Note: Hive requires partition columns to be distinct from the schema, so we need // to include the partition columns here explicitly val schema = cols ++ partitionCols diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index fd29962b66b9..cfe6fd55586a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -146,7 +146,7 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log } /** - * A rule to do various checks before inserting into or writing to a data source table. + * A rule to do various checks before inserting into or writing to a table. */ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) extends (LogicalPlan => Unit) { @@ -208,9 +208,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) failAnalysis(s"$l does not allow insertion.") case c: CreateTableCommand => + val allColNamesInSchema = c.table.schema.map(_.name) + val colNames = allColNamesInSchema.diff(c.table.partitionColumnNames) + val partitionColumnNames = c.table.partitionColumnNames // Duplicates are not allowed in partitionBy // Todo: when bucketBy and sortBy are supported, we also need to ban the duplication. - checkDuplicates(c.table.partitionColumnNames, "Partition") + checkDuplicates(partitionColumnNames, "Partition") + // Ensuring whether no duplicate name is used in table definition + checkDuplicates(colNames, s"table definition of ${c.table.identifier}") + // For non-data-source tables, partition columns must not be part of the schema + val badPartCols = partitionColumnNames.toSet.intersect(colNames.toSet) + if (badPartCols.nonEmpty) { + failAnalysis(s"Operation not allowed: Partition columns may not be specified in the " + + "schema: " + badPartCols.map("`" + _ + "`").mkString(",")) + } + case c: CreateTableUsing => // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 5bee28b4462c..aec7e99d9d37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -334,20 +334,6 @@ class DDLCommandSuite extends PlanTest { assert(ct.table.storage.locationUri == Some("/something/anything")) } - test("create table - column repeated in partitioning columns") { - val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains( - "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]")) - } - - test("create table - duplicate column names in the table definition") { - val query = "CREATE TABLE default.tab1 (key INT, key STRING)" - val e = intercept[ParseException] { parser.parsePlan(query) } - assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " + - "table definition of `default`.`tab1`: [\"key\"]")) - } - test("create table using - with partitioned by") { val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet PARTITIONED BY (a)" val expected = CreateTableUsing( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala new file mode 100644 index 000000000000..c3b9889c60aa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSemanticAnalysisSuite.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.test.SharedSQLContext + +class DDLSemanticAnalysisSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + test("create table - duplicate column names in the table definition") { + val query = "CREATE TABLE default.tab1 (key INT, key STRING)" + val e = intercept[AnalysisException] { sql(query) } + assert(e.getMessage.contains( + "Found duplicate column(s) in table definition of `default`.`tab1`: `key`")) + } + + test("create table - column repeated in partitioning columns") { + val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)" + val e = intercept[AnalysisException] { sql(query) } + assert(e.getMessage.contains( + "Operation not allowed: Partition columns may not be specified in the schema: `key`")) + } + + test("duplicate column names in bucketBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "j").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Bucketing: `j`")) + } + } + + test("duplicate column names in sortBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").bucketBy(8, "j", "k").sortBy("k", "k").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Sorting: `k`")) + } + } + + test("duplicate column names in partitionBy") { + import testImplicits._ + val df = (0 until 5).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k") + withTable("tab123") { + val e = intercept[AnalysisException] { + df.write.format("json").partitionBy("i", "i").saveAsTable("tab123") + }.getMessage + assert(e.contains("Found duplicate column(s) in Partition: `i`")) + } + } + +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 2e0b5d59b578..054dfc66c978 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -441,6 +441,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log case p: LogicalPlan if p.resolved => p case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) => + // Ensuring whether no duplicate name is used in table definition + checkDuplicates(child.output.map(_.name), s"table definition of ${table.identifier}") + val desc = if (table.storage.serde.isEmpty) { // add default serde table.withNewStorage( @@ -456,6 +459,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log child, allowExisting) } + + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { + val duplicateColumns = columnNames.groupBy(_.toLowerCase).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } + if (duplicateColumns.nonEmpty) { + throw new AnalysisException( + s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}") + } + } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala new file mode 100644 index 000000000000..c585d3b8524c --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSemanticAnalysisSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.sql.execution.command.CreateTableCommand +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class HiveDDLSemanticAnalysisSuite + extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { + + override def afterEach(): Unit = { + try { + // drop all databases, tables and functions after each test + spark.sessionState.catalog.reset() + } finally { + super.afterEach() + } + } + + test("duplicate columns in bucketBy, sortBy and partitionBy in CTAS") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + + test("duplicate columns in bucketBy, sortBy and partitionBy in Create Data Source Tables") { + withTable("t") { + var e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) + + e = intercept[AnalysisException] { + sql( + """ + |CREATE TABLE t (a String, b String) USING PARQUET + |OPTIONS (PATH '/path/to/file') + |PARTITIONED BY (ds, ds, hr, hr) + |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS + """.stripMargin) + } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) + } + } + + test("duplicate columns in partitionBy in Create Cataloged Table") { + withTable("t") { + val errorQuery = "CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT, a INT)" + val correctQuery = "CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT)" + val e = intercept[AnalysisException] { sql(errorQuery) } + assert(e.getMessage.contains("Found duplicate column(s) in Partition: `a`")) + assert(sql(correctQuery).queryExecution.analyzed.isInstanceOf[CreateTableCommand]) + } + } + + test("duplicate columns in partitionBy in Create Cataloged Table As Select") { + import spark.implicits._ + withTable("t", "t1") { + spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1") + val errorQuery = "CREATE TABLE t SELECT a, b as a from t1" + val correctQuery = "CREATE TABLE t SELECT a, b from t1" + val e = intercept[AnalysisException] { sql(errorQuery) } + assert(e.getMessage.contains("Found duplicate column(s) in table definition of `t`: `a`")) + assert( + sql(correctQuery).queryExecution.analyzed.isInstanceOf[CreateHiveTableAsSelectCommand]) + } + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 9a04d0068fd8..b2f01fcc8328 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -140,82 +140,6 @@ class HiveDDLSuite } } - test("duplicate columns in bucketBy, sortBy and partitionBy in CTAS") { - withTable("t") { - var e = intercept[AnalysisException] { - sql( - """CREATE TABLE t USING PARQUET - |OPTIONS (PATH '/path/to/file') - |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) - - e = intercept[AnalysisException] { - sql( - """CREATE TABLE t USING PARQUET - |OPTIONS (PATH '/path/to/file') - |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) - - e = intercept[AnalysisException] { - sql( - """CREATE TABLE t USING PARQUET - |OPTIONS (PATH '/path/to/file') - |PARTITIONED BY (ds, ds, hr, hr) - |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS - |AS SELECT 1 AS a, 2 AS b - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) - } - } - - test("duplicate columns in bucketBy, sortBy and partitionBy in Create Data Source Tables") { - withTable("t") { - var e = intercept[AnalysisException] { - sql( - """CREATE TABLE t (a String, b String) USING PARQUET - |OPTIONS (PATH '/path/to/file') - |CLUSTERED BY (a, a) SORTED BY (b) INTO 2 BUCKETS - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Bucketing: `a`")) - - e = intercept[AnalysisException] { - sql( - """CREATE TABLE t (a String, b String) USING PARQUET - |OPTIONS (PATH '/path/to/file') - |CLUSTERED BY (a) SORTED BY (b, b) INTO 2 BUCKETS - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Sorting: `b`")) - - e = intercept[AnalysisException] { - sql( - """CREATE TABLE t (a String, b String) USING PARQUET - |OPTIONS (PATH '/path/to/file') - |PARTITIONED BY (ds, ds, hr, hr) - |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS - """.stripMargin) - } - assert(e.getMessage.contains("Found duplicate column(s) in Partition: `hr`, `ds`")) - } - } - - test("duplicate columns in partitionBy in CREATE TABLE") { - withTable("t") { - val e = intercept[AnalysisException] { - sql("CREATE TABLE boxes (b INT, c INT) PARTITIONED BY (a INT, a INT)") - } - assert(e.getMessage.contains("Found duplicate column(s) in Partition: `a`")) - } - } - test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => From 08b5374e827f6680b4e4a00ed700ef689dce22ff Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 5 Jul 2016 11:19:23 -0700 Subject: [PATCH 6/6] address comments. --- .../spark/sql/execution/SparkSqlParser.scala | 4 +-- .../sql/execution/datasources/rules.scala | 33 ++++++++++++------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 5495e22fbd44..2dbdf87a1a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -874,8 +874,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } /** - * Create a table, returning either a [[CreateTableCommand]], a - * [[CreateHiveTableAsSelectLogicalPlan]] or a [[CreateTableUsingAsSelect]]. + * Create a table, returning a [[CreateTableCommand]], a [[CreateHiveTableAsSelectLogicalPlan]] + * or a [[CreateTableUsingAsSelect]]. * * This is not used to create datasource tables, which is handled through * "CREATE TABLE ... USING ...". diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index cfe6fd55586a..82a9152ecf63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -208,14 +208,20 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) failAnalysis(s"$l does not allow insertion.") case c: CreateTableCommand => - val allColNamesInSchema = c.table.schema.map(_.name) - val colNames = allColNamesInSchema.diff(c.table.partitionColumnNames) - val partitionColumnNames = c.table.partitionColumnNames + // If caseSensitiveAnalysis is false, convert the names to lower cases + val allColNamesInSchema = + c.table.schema.map(col => convertToCaseSensitiveAnalysisAware(col.name)) + val partitionColumnNames = + c.table.partitionColumnNames.map(convertToCaseSensitiveAnalysisAware) + // Duplicates are not allowed in partitionBy // Todo: when bucketBy and sortBy are supported, we also need to ban the duplication. checkDuplicates(partitionColumnNames, "Partition") + + val colNames = allColNamesInSchema.diff(partitionColumnNames) // Ensuring whether no duplicate name is used in table definition checkDuplicates(colNames, s"table definition of ${c.table.identifier}") + // For non-data-source tables, partition columns must not be part of the schema val badPartCols = partitionColumnNames.toSet.intersect(colNames.toSet) if (badPartCols.nonEmpty) { @@ -223,22 +229,21 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) "schema: " + badPartCols.map("`" + _ + "`").mkString(",")) } - case c: CreateTableUsing => // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. checkDuplicates(c.partitionColumns, "Partition") - c.bucketSpec.foreach(b => { + c.bucketSpec.foreach { b => checkDuplicates(b.bucketColumnNames, "Bucketing") checkDuplicates(b.sortColumnNames, "Sorting") - }) + } case c: CreateTableUsingAsSelect => // Duplicates are not allowed in partitionBy/bucketBy/sortBy columns. checkDuplicates(c.partitionColumns, "Partition") - c.bucketSpec.foreach(b => { + c.bucketSpec.foreach { b => checkDuplicates(b.bucketColumnNames, "Bucketing") checkDuplicates(b.sortColumnNames, "Sorting") - }) + } // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. @@ -282,11 +287,15 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } } + private def convertToCaseSensitiveAnalysisAware(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } + private def checkDuplicates(columnNames: Seq[String], columnType: String): Unit = { - val duplicateColumns = columnNames.groupBy { name => - if (conf.caseSensitiveAnalysis) name else name.toLowerCase }.collect { - case (x, ys) if ys.length > 1 => s"`$x`" - } + val duplicateColumns = + columnNames.groupBy(convertToCaseSensitiveAnalysisAware).collect { + case (x, ys) if ys.length > 1 => s"`$x`" + } if (duplicateColumns.nonEmpty) { throw new AnalysisException( s"Found duplicate column(s) in $columnType: ${duplicateColumns.mkString(", ")}")