Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,10 @@ class JacksonParser(
skipRow = structFilters.skipRow(row, index)
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) if isRoot =>
badRecordException = badRecordException.orElse(Some(e))
case NonFatal(e) =>
if (isRoot) {
badRecordException = badRecordException.orElse(Some(e))
}
parser.skipChildren()
}
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,9 +741,10 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0")
checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null)))
checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))),
Row(Row(Row(123456, null))))
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Seq(Row(123456, null))))
val df4 = Seq("""{"c2": [19]}""").toDF("c0")
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
}
Expand Down Expand Up @@ -840,4 +841,29 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-35094: Spark from_json(JsonToStruct) function return wrong value " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Peng-Lei this test passes without your fix. Can you show the reproducible codes with before/after results?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon I am so sorry, It did pass without my fix. My fix is simply skipping the parsing when an parsing exception occurs.

"in permissive mode in case best effort") {
val s1 = StructField("name", StringType, nullable = true)
val s2_1 =
StructField(
"badNestedField",
StructType(
Seq(StructField("SomethingWhichNotInJsonMessage", IntegerType, nullable = true))))
val s2 =
StructField("nestedField", StructType(Seq(s2_1, s1)))
val customSchema = StructType(Seq(s1, s2))

val jsonStringToTest =
"""{"name":"v1","nestedField":{"badNestedField":"14","name":"v2"}}"""
val df = List(jsonStringToTest)
.toDF("json")
.select(from_json($"json", customSchema).as("toBeFlatten"))
.select("toBeFlatten.*")

assert(
df.select("name").as[String].first() == "v1",
"wrong value in root schema, parser take value from column with same name " +
"but in another nested elvel")
}
}