Skip to content

Commit 3675fae

Browse files
committed
Separate tests for parse modes
1 parent 4c46f4b commit 3675fae

File tree

1 file changed

+25
-18
lines changed
  • sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json

1 file changed

+25
-18
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -963,20 +963,37 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
963963
)
964964
}
965965

966+
test("SPARK-13764 Parse modes in JSON data source") {
967+
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
968+
// `FAILFAST` mode should throw an exception for corrupt records.
969+
val exception = intercept[SparkException] {
970+
sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect()
971+
}
972+
assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {"))
973+
974+
// `DROPMALFORMED` mode should skip corrupt records
975+
// For `PERMISSIVE` mode, it is tested in "Corrupt records" test.
976+
val jsonDF = sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords)
977+
val schema = StructType(
978+
StructField("_unparsed", StringType, true) ::
979+
StructField("a", StringType, true) ::
980+
StructField("b", StringType, true) ::
981+
StructField("c", StringType, true) :: Nil)
982+
assert(schema === jsonDF.schema)
983+
984+
checkAnswer(
985+
jsonDF,
986+
Row(null, "str_a_4", "str_b_4", "str_c_4") :: Nil
987+
)
988+
}
989+
}
990+
966991
test("Corrupt records") {
967992
// Test if we can query corrupt records.
968993
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
969994
withTempTable("jsonTable") {
970995
val jsonDF = sqlContext.read.json(corruptRecords)
971996
jsonDF.registerTempTable("jsonTable")
972-
val jsonDFWithDropMalformed =
973-
sqlContext.read.option("mode", "DROPMALFORMED").json(corruptRecords)
974-
jsonDFWithDropMalformed.registerTempTable("jsonTableWithDropMalformed")
975-
val exception = intercept[SparkException]{
976-
sqlContext.read.option("mode", "FAILFAST").json(corruptRecords).collect()
977-
}
978-
assert(exception.getMessage.contains("Malformed line in FAILFAST mode: {"))
979-
980997
val schema = StructType(
981998
StructField("_unparsed", StringType, true) ::
982999
StructField("a", StringType, true) ::
@@ -999,16 +1016,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
9991016
Row(null, null, null, "]") :: Nil
10001017
)
10011018

1002-
// Check if corrupt records are dropped.
1003-
checkAnswer(
1004-
sql(
1005-
"""
1006-
|SELECT a, b, c, _unparsed
1007-
|FROM jsonTableWithDropMalformed
1008-
""".stripMargin),
1009-
Row("str_a_4", "str_b_4", "str_c_4", null) :: Nil
1010-
)
1011-
10121019
checkAnswer(
10131020
sql(
10141021
"""

0 commit comments

Comments
 (0)