Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ test_that("column functions", {

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)

# check if array type in string is correctly supported.
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"
Expand Down
2 changes: 0 additions & 2 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independently of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.

- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,23 +548,15 @@ case class JsonToStructs(
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
}

@transient
private lazy val castRow = nullableSchema match {
case _: StructType => (row: InternalRow) => row
case _: ArrayType => (row: InternalRow) => row.getArray(0)
case _: MapType => (row: InternalRow) => row.getMap(0)
}

// This converts parsed rows to the desired output by the given schema.
private def convertRow(rows: Iterator[InternalRow]) = {
if (rows.hasNext) {
val result = rows.next()
// JSON's parser produces one record only.
assert(!rows.hasNext)
castRow(result)
} else {
throw new IllegalArgumentException("Expected one row from JSON parser.")
}
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
case _: ArrayType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
case _: MapType =>
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
}

val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
Expand Down Expand Up @@ -600,7 +592,7 @@ case class JsonToStructs(
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
convertRow(parser.parse(json.asInstanceOf[UTF8String]))
converter(parser.parse(json.asInstanceOf[UTF8String]))
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ class JacksonParser(
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null => throw new RuntimeException("Not found any JSON token")
case null => Nil
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
case rows => rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
InternalRow(null)
null
)
}

Expand Down
10 changes: 10 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Seq(Row("1"), Row("2")))
}

test("SPARK-11226 Skip empty line in json file") {
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")

checkAnswer(
sql("select count(1) from d"),
Seq(Row(3)))
}

test("SPARK-8828 sum should return null if all input values are null") {
checkAnswer(
sql("select sum(a), avg(a) from allNulls"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,7 +1126,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
Expand All @@ -1148,7 +1147,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand All @@ -1163,7 +1161,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
Row("{") ::
Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil
Expand All @@ -1185,7 +1182,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.selectExpr("a", "b", "c", "_malformed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand Down Expand Up @@ -1727,7 +1723,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.repartition(1)
.write
.option("compression", "GzIp")
.text(path)
Expand Down Expand Up @@ -2428,7 +2423,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

checkCount(2)
countForMalformedJSON(1, Seq(""))
countForMalformedJSON(0, Seq(""))
}

test("SPARK-25040: empty strings should be disallowed") {
Expand Down