Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Jun 3, 2016

What changes were proposed in this pull request?

When saving datasets on storage, partitionBy provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs.

  • ORC with all column partitioning: AnalysisException on future read due to schema inference failure.

    scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")
    
    scala> spark.read.format("orc").load("/tmp/data").collect()
    org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually;
  • Parquet with all-column partitioning: InvalidSchemaException on write execution due to Parquet limitation.

    scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
    [Stage 0:>                                                          (0 + 8) / 8]16/06/02 16:51:17
    ERROR Utils: Aborting task
    org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
    ... (lots of error messages)

Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising AnalysisException before executing save operation.

How was this patch tested?

Newly added PartitioningUtilsSuite.

@SparkQA
Copy link

SparkQA commented Jun 3, 2016

Test build #59902 has finished for PR 13486 at commit bb97467.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59977 has finished for PR 13486 at commit 3fd3512.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One little concern. If it is added here, should the method name be changed? After all it will do more than validating data types after the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for attention, @wangyang1992 . Good point!
Maybe, validatePartitionColumnDataTypes -> validatePartitionColumnDataTypesAndCount ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it's better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, let's change it. :)
Since PartitionUtils is private[sql], it's safe to be changed.
I'll update this PR. Thank you for your review and idea!

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59986 has finished for PR 13486 at commit 9c5f13d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2016

Test build #59987 has finished for PR 13486 at commit 6a9006d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 6, 2016

Test build #60024 has finished for PR 13486 at commit 8c68b79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @marmbrus .
Could you review this PR?

Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)

def validatePartitionColumnDataTypes(
def validatePartitionColumnDataTypesAndCount(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just validatePartitionColumns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @marmbrus .
That sounds better. I'll update that.

@SparkQA
Copy link

SparkQA commented Jun 8, 2016

Test build #60193 has finished for PR 13486 at commit 13bd3ac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 9, 2016

Test build #60203 has finished for PR 13486 at commit 46d4208.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Hi, @marmbrus .
Now, the PR is updated according to your advice and passed the Jenkins again.

@dongjoon-hyun
Copy link
Member Author

Hi, @marmbrus .
Could you review this PR again?

@marmbrus
Copy link
Contributor

Merging to master and 2.0

asfgit pushed a commit that referenced this pull request Jun 10, 2016
## What changes were proposed in this pull request?

When saving datasets on storage, `partitionBy` provides an easy way to construct the directory structure. However, if a user choose all columns as partition columns, some exceptions occurs.

- **ORC with all column partitioning**: `AnalysisException` on **future read** due to schema inference failure.
 ```scala
scala> spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

scala> spark.read.format("orc").load("/tmp/data").collect()
org.apache.spark.sql.AnalysisException: Unable to infer schema for ORC at /tmp/data. It must be specified manually;
```

- **Parquet with all-column partitioning**: `InvalidSchemaException` on **write execution** due to Parquet limitation.
 ```scala
scala> spark.range(100).write.format("parquet").mode("overwrite").partitionBy("id").save("/tmp/data")
[Stage 0:>                                                          (0 + 8) / 8]16/06/02 16:51:17
ERROR Utils: Aborting task
org.apache.parquet.schema.InvalidSchemaException: A group type can not be empty. Parquet does not support empty group without leaves. Empty group: spark_schema
... (lots of error messages)
```

Although some formats like JSON support all-column partitioning without any problem, it seems not a good idea to make lots of empty directories.

This PR prevents saving with all-column partitioning by consistently raising `AnalysisException` before executing save operation.

## How was this patch tested?

Newly added `PartitioningUtilsSuite`.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13486 from dongjoon-hyun/SPARK-15743.

(cherry picked from commit 2413fce)
Signed-off-by: Michael Armbrust <michael@databricks.com>
@asfgit asfgit closed this in 2413fce Jun 10, 2016
@dongjoon-hyun
Copy link
Member Author

Thank you, @marmbrus !

@tdas
Copy link
Contributor

tdas commented Jun 17, 2016

@dongjoon-hyun There is a corner case that this check does not handle. When there are zero columns, the error is very non-intuitive.

See https://issues.apache.org/jira/browse/SPARK-16006

Can you handle this case in another PR. Refer me in the PR if you make one.

@dongjoon-hyun
Copy link
Member Author

Oh, I see. I will fix tonight.
Thank you, @tdas !

spark.range(10).write.format("parquet").mode("overwrite").partitionBy("id").save(path)
}
intercept[AnalysisException] {
spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is wrong, right? This exception message will be like The ORC data source must be used with Hive support enabled. To test this, we need to move this to another suite.

Will fix it in my ongoing PR. My suggestion is to always verify the error message, if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That will be fixed in #13730 .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I mean there is already ongoing PR.
And, thank you for advice!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine if you want to fix it. You can put it in OrcSuite. Then, running the following command to verify whether it passes or not:

build/sbt -Phive "hive/test-only org.apache.spark.sql.hive.orc.OrcSourceSuite"

  test("prevent all column partitioning") {
    withTempDir { dir =>
      val path = dir.getCanonicalPath
      val e = intercept[AnalysisException] {
        spark.range(10).write.format("orc").mode("overwrite").partitionBy("id").save(path)
      }.getMessage
      assert(e.contains("Cannot use all columns for partition columns"))
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean. I'm going to exclude ORC cases in #13730 .
So, for the OrcSuite, you can do that. If then, I'll really appreciate it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am trying to add more edge cases in the ongoing PR for improving the test coverage of DataFrameReader and DataFrameWriter. Will include this too. Thanks!

@dongjoon-hyun dongjoon-hyun deleted the SPARK-15743 branch July 20, 2016 07:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants