-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26716][SQL] FileFormat: the supported types of read/write should be consistent #23639
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -367,69 +367,43 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo | |
} | ||
|
||
test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") { | ||
withTempDir { dir => | ||
val tempDir = new File(dir, "files").getCanonicalPath | ||
|
||
Seq("orc").foreach { format => | ||
// write path | ||
var msg = intercept[AnalysisException] { | ||
sql("select null").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
msg = intercept[AnalysisException] { | ||
spark.udf.register("testType", () => new NullData()) | ||
sql("select testType()").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
// read path | ||
// We expect the types below should be passed for backward-compatibility | ||
|
||
// Null type | ||
var schema = StructType(StructField("a", NullType, true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
|
||
// UDT having null data | ||
schema = StructType(StructField("a", new NullUDT(), true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
} | ||
|
||
Seq("parquet", "csv").foreach { format => | ||
// write path | ||
var msg = intercept[AnalysisException] { | ||
sql("select null").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
msg = intercept[AnalysisException] { | ||
spark.udf.register("testType", () => new NullData()) | ||
sql("select testType()").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
// read path | ||
msg = intercept[AnalysisException] { | ||
val schema = StructType(StructField("a", NullType, true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
msg = intercept[AnalysisException] { | ||
val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
// TODO(SPARK-26716): support data type validating in V2 data source, and test V2 as well. | ||
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the intention, but DSv2 ORC should have this test coverage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, there is no such validation in V2. I promise I will implement it in this PR #23601 (or may a separated one) recently. Is that OK to you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we revisit this in 3.0.0 timeframe, it sounds okay. Then, can we remove this line in this PR? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK the corresponding check is not implemented in orc v2 source yet, if we don't disable v2 here, we will see runtime errors. Shall we leave a TODO here and say this check should be done in orc v2 source as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for adding |
||
withTempDir { dir => | ||
val tempDir = new File(dir, "files").getCanonicalPath | ||
|
||
Seq("parquet", "csv", "orc").foreach { format => | ||
// write path | ||
var msg = intercept[AnalysisException] { | ||
sql("select null").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
msg = intercept[AnalysisException] { | ||
spark.udf.register("testType", () => new NullData()) | ||
sql("select testType()").write.format(format).mode("overwrite").save(tempDir) | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
// read path | ||
msg = intercept[AnalysisException] { | ||
val schema = StructType(StructField("a", NullType, true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
|
||
msg = intercept[AnalysisException] { | ||
val schema = StructType(StructField("a", new NullUDT(), true) :: Nil) | ||
spark.range(1).write.format(format).mode("overwrite").save(tempDir) | ||
spark.read.schema(schema).format(format).load(tempDir).collect() | ||
}.getMessage | ||
assert(msg.toLowerCase(Locale.ROOT) | ||
.contains(s"$format data source does not support null data type.")) | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur? @gengliangwang . We need to create a new JIRA issue ID here instead of this PR.
If we use this JIRA ID, it will be closed as
Resolved
status and nobody is going to take a look at that later.