Skip to content

Commit 32ec9ba

Browse files
committed
Added the skipInputWithoutTokens flag to JacksonParser
1 parent 240a479 commit 32ec9ba

File tree

5 files changed

+24
-10
lines changed

5 files changed

+24
-10
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,11 @@ case class JsonToStructs(
583583
(StructType(StructField("value", other) :: Nil), other)
584584
}
585585

586-
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
586+
val rawParser = new JacksonParser(
587+
actualSchema,
588+
parsedOptions,
589+
allowArrayAsStructs = false,
590+
skipInputWithoutTokens = false)
587591
val createParser = CreateJacksonParser.utf8String _
588592

589593
new FailureSafeParser[UTF8String](

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ import org.apache.spark.util.Utils
4040
class JacksonParser(
4141
schema: DataType,
4242
val options: JSONOptions,
43-
allowArrayAsStructs: Boolean) extends Logging {
43+
allowArrayAsStructs: Boolean,
44+
skipInputWithoutTokens: Boolean) extends Logging {
4445

4546
import JacksonUtils._
4647
import com.fasterxml.jackson.core.JsonToken._
@@ -418,6 +419,7 @@ class JacksonParser(
418419
// a null first token is equivalent to testing for input.trim.isEmpty
419420
// but it works on any token stream and not just strings
420421
parser.nextToken() match {
422+
case null if skipInputWithoutTokens => Nil
421423
case null => throw new RuntimeException("Not found any JSON token")
422424
case _ => rootConverter.apply(parser) match {
423425
case null => throw new RuntimeException("Root converter returned null")

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
455455

456456
val createParser = CreateJacksonParser.string _
457457
val parsed = jsonDataset.rdd.mapPartitions { iter =>
458-
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
458+
val rawParser = new JacksonParser(
459+
actualSchema,
460+
parsedOptions,
461+
allowArrayAsStructs = true,
462+
skipInputWithoutTokens = true)
459463
val parser = new FailureSafeParser[String](
460464
input => rawParser.parse(input, createParser, UTF8String.fromString),
461465
parsedOptions.parseMode,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
125125
}
126126

127127
(file: PartitionedFile) => {
128-
val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
128+
val parser = new JacksonParser(
129+
actualSchema,
130+
parsedOptions,
131+
allowArrayAsStructs = true,
132+
skipInputWithoutTokens = true)
129133
JsonDataSource(parsedOptions).readFile(
130134
broadcastedHadoopConf.value.value,
131135
file,

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
6666

6767
val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
6868
val dummySchema = StructType(Seq.empty)
69-
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
69+
val parser = new JacksonParser(
70+
dummySchema,
71+
dummyOption,
72+
allowArrayAsStructs = true,
73+
skipInputWithoutTokens = true)
7074

7175
Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
7276
jsonParser.nextToken()
@@ -1114,7 +1118,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11141118
Row(null, null, null),
11151119
Row(null, null, null),
11161120
Row(null, null, null),
1117-
Row(null, null, null),
11181121
Row("str_a_4", "str_b_4", "str_c_4"),
11191122
Row(null, null, null))
11201123
)
@@ -1136,7 +1139,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11361139
checkAnswer(
11371140
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
11381141
Row(null, null, null, "{") ::
1139-
Row(null, null, null, "") ::
11401142
Row(null, null, null, """{"a":1, b:2}""") ::
11411143
Row(null, null, null, """{"a":{, b:3}""") ::
11421144
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1151,7 +1153,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11511153
checkAnswer(
11521154
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
11531155
Row("{") ::
1154-
Row("") ::
11551156
Row("""{"a":1, b:2}""") ::
11561157
Row("""{"a":{, b:3}""") ::
11571158
Row("]") :: Nil
@@ -1173,7 +1174,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11731174
checkAnswer(
11741175
jsonDF.selectExpr("a", "b", "c", "_malformed"),
11751176
Row(null, null, null, "{") ::
1176-
Row(null, null, null, "") ::
11771177
Row(null, null, null, """{"a":1, b:2}""") ::
11781178
Row(null, null, null, """{"a":{, b:3}""") ::
11791179
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -2517,7 +2517,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
25172517
}
25182518

25192519
checkCount(2)
2520-
countForMalformedJSON(1, Seq(""))
2520+
countForMalformedJSON(0, Seq(""))
25212521
}
25222522

25232523
test("SPARK-25040: empty strings should be disallowed") {

0 commit comments

Comments
 (0)