From c4458d46fa9a2859f9ef5111ce0b53234c19d7b1 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Thu, 16 Jun 2016 22:22:38 -0700 Subject: [PATCH 1/2] [SPARK-16006][SQL] Attemping to write empty DataFrame with no fields throw non-intuitive exception --- .../execution/datasources/PartitioningUtils.scala | 9 +++++++-- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 12 +++++++++--- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 388df7002dc36..17743c38bd4fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -339,6 +339,9 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) + /** + * Validate partition columns for writing executions. + */ def validatePartitionColumn( schema: StructType, partitionColumns: Seq[String], @@ -351,8 +354,10 @@ private[sql] object PartitioningUtils { } } - if (partitionColumns.size == schema.fields.size) { - throw new AnalysisException(s"Cannot use all columns for partition columns") + if (schema.fields.isEmpty) { + throw new AnalysisException("Cannot write dataset with no fields") + } else if (partitionColumns.size == schema.fields.length) { + throw new AnalysisException("Cannot use all columns for partition columns") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 3fa3864bc9690..c72881faf74a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -214,12 +214,18 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath - intercept[AnalysisException] { + var e = intercept[AnalysisException] { + spark.emptyDataFrame.write.format("text").mode("overwrite").save(path) + } + assert(e.getMessage.contains("Cannot write dataset with no fields")) + e = intercept[AnalysisException] { spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) } - intercept[AnalysisException] { - spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path) + assert(e.getMessage.contains("Cannot use all columns for partition columns")) + e = intercept[AnalysisException] { + spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) } + assert(e.getMessage.contains("Cannot use all columns for partition columns")) } } From 7d38003ef51e4e18239e97df4d14fba4732385c5 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 17 Jun 2016 04:23:40 -0700 Subject: [PATCH 2/2] Allow emptyDataFrame.write. --- .../sql/execution/datasources/PartitioningUtils.scala | 9 ++------- .../spark/sql/test/DataFrameReaderWriterSuite.scala | 11 +++-------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 17743c38bd4fd..c3561099d6842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -339,9 +339,6 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) - /** - * Validate partition columns for writing executions. - */ def validatePartitionColumn( schema: StructType, partitionColumns: Seq[String], @@ -354,10 +351,8 @@ private[sql] object PartitioningUtils { } } - if (schema.fields.isEmpty) { - throw new AnalysisException("Cannot write dataset with no fields") - } else if (partitionColumns.size == schema.fields.length) { - throw new AnalysisException("Cannot use all columns for partition columns") + if (partitionColumns.nonEmpty && partitionColumns.size == schema.fields.length) { + throw new AnalysisException(s"Cannot use all columns for partition columns") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index c72881faf74a9..a4041d7ebe67d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -214,18 +214,13 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be test("prevent all column partitioning") { withTempDir { dir => val path = dir.getCanonicalPath - var e = intercept[AnalysisException] { - spark.emptyDataFrame.write.format("text").mode("overwrite").save(path) - } - assert(e.getMessage.contains("Cannot write dataset with no fields")) - e = intercept[AnalysisException] { + intercept[AnalysisException] { spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path) } - assert(e.getMessage.contains("Cannot use all columns for partition columns")) - e = intercept[AnalysisException] { + intercept[AnalysisException] { spark.range(10).write.format("csv").mode("overwrite").partitionBy("id").save(path) } - assert(e.getMessage.contains("Cannot use all columns for partition columns")) + spark.emptyDataFrame.write.format("parquet").mode("overwrite").save(path) } }