diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index eaecf284b51f..f5833734103a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2241,7 +2241,7 @@ def json_tuple(col, *fields): def from_json(col, schema, options={}): """ Parses a column containing a JSON string into a :class:`MapType` with :class:`StringType` - as keys type, :class:`StructType` or :class:`ArrayType` of :class:`StructType`\\s with + as keys type, :class:`StructType` or :class:`ArrayType` with the specified schema. Returns `null`, in the case of an unparseable string. :param col: string column in json format @@ -2269,6 +2269,11 @@ def from_json(col, schema, options={}): >>> schema = schema_of_json(lit('''{"a": 0}''')) >>> df.select(from_json(df.value, schema).alias("json")).collect() [Row(json=Row(a=1))] + >>> data = [(1, '''[1, 2, 3]''')] + >>> schema = ArrayType(IntegerType()) + >>> df = spark.createDataFrame(data, ("key", "value")) + >>> df.select(from_json(df.value, schema).alias("json")).collect() + [Row(json=[1, 2, 3])] """ sc = SparkContext._active_spark_context diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index abe88754f3a1..ca99100b6d64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -495,7 +495,7 @@ case class JsonTuple(children: Seq[Expression]) } /** - * Converts an json input string to a [[StructType]] or [[ArrayType]] of [[StructType]]s + * Converts an json input string to a [[StructType]], [[ArrayType]] or [[MapType]] * with the specified schema. */ // scalastyle:off line.size.limit @@ -544,17 +544,10 @@ case class JsonToStructs( timeZoneId = None) override def checkInputDataTypes(): TypeCheckResult = nullableSchema match { - case _: StructType | ArrayType(_: StructType, _) | _: MapType => + case _: StructType | _: ArrayType | _: MapType => super.checkInputDataTypes() case _ => TypeCheckResult.TypeCheckFailure( - s"Input schema ${nullableSchema.catalogString} must be a struct or an array of structs.") - } - - @transient - lazy val rowSchema = nullableSchema match { - case st: StructType => st - case ArrayType(st: StructType, _) => st - case mt: MapType => mt + s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.") } // This converts parsed rows to the desired output by the given schema. @@ -562,8 +555,8 @@ case class JsonToStructs( lazy val converter = nullableSchema match { case _: StructType => (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null - case ArrayType(_: StructType, _) => - (rows: Seq[InternalRow]) => new GenericArrayData(rows) + case _: ArrayType => + (rows: Seq[InternalRow]) => rows.head.getArray(0) case _: MapType => (rows: Seq[InternalRow]) => rows.head.getMap(0) } @@ -571,7 +564,7 @@ case class JsonToStructs( @transient lazy val parser = new JacksonParser( - rowSchema, + nullableSchema, new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) override def dataType: DataType = nullableSchema diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 4d409caddd33..6feea500b2aa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -61,6 +61,7 @@ class JacksonParser( dt match { case st: StructType => makeStructRootConverter(st) case mt: MapType => makeMapRootConverter(mt) + case at: ArrayType => makeArrayRootConverter(at) } } @@ -101,6 +102,35 @@ class JacksonParser( } } + private def makeArrayRootConverter(at: ArrayType): JsonParser => Seq[InternalRow] = { + val elemConverter = makeConverter(at.elementType) + (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, at) { + case START_ARRAY => Seq(InternalRow(convertArray(parser, elemConverter))) + case START_OBJECT if at.elementType.isInstanceOf[StructType] => + // This handles the case when an input JSON object is a structure but + // the specified schema is an array of structures. In that case, the input JSON is + // considered as an array of only one element of struct type. + // This behavior was introduced by changes for SPARK-19595. + // + // For example, if the specified schema is ArrayType(new StructType().add("i", IntegerType)) + // and JSON input as below: + // + // [{"i": 1}, {"i": 2}] + // [{"i": 3}] + // {"i": 4} + // + // The last row is considered as an array with one element, and result of conversion: + // + // Seq(Row(1), Row(2)) + // Seq(Row(3)) + // Seq(Row(4)) + // + val st = at.elementType.asInstanceOf[StructType] + val fieldConverters = st.map(_.dataType).map(makeConverter).toArray + Seq(InternalRow(new GenericArrayData(Seq(convertObject(parser, st, fieldConverters))))) + } + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 310e428b6981..5a6ed5964a75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3339,7 +3339,7 @@ object functions { /** * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType` - * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. + * as keys type, `StructType` or `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3371,7 +3371,7 @@ object functions { /** * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType` - * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. + * as keys type, `StructType` or `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3400,7 +3400,7 @@ object functions { /** * Parses a column containing a JSON string into a `MapType` with `StringType` as keys type, - * `StructType` or `ArrayType` of `StructType`s with the specified schema. + * `StructType` or `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3414,7 +3414,7 @@ object functions { /** * (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType` - * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. + * as keys type, `StructType` or `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. @@ -3431,7 +3431,7 @@ object functions { /** * (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType` - * as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema. + * as keys type, `StructType` or `ArrayType` with the specified schema. * Returns `null`, in the case of an unparseable string. * * @param e a string column containing JSON data. diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 79fdd5895e69..0cf370c13e8c 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -39,3 +39,15 @@ select from_json('{"a":1, "b":"2"}', 'struct'); -- infer schema of json literal select schema_of_json('{"c1":0, "c2":[1]}'); select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}')); + +-- from_json - array type +select from_json('[1, 2, 3]', 'array'); +select from_json('[1, "2", 3]', 'array'); +select from_json('[1, 2, null]', 'array'); + +select from_json('[{"a": 1}, {"a":2}]', 'array>'); +select from_json('{"a": 1}', 'array>'); +select from_json('[null, {"a":2}]', 'array>'); + +select from_json('[{"a": 1}, {"b":2}]', 'array>'); +select from_json('[{"a": 1}, 2]', 'array>'); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 827931d74138..b44883b07066 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 30 +-- Number of queries: 38 -- !query 0 @@ -290,3 +290,67 @@ select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}')) struct>> -- !query 29 output {"c1":[1,2,3]} + + +-- !query 30 +select from_json('[1, 2, 3]', 'array') +-- !query 30 schema +struct> +-- !query 30 output +[1,2,3] + + +-- !query 31 +select from_json('[1, "2", 3]', 'array') +-- !query 31 schema +struct> +-- !query 31 output +NULL + + +-- !query 32 +select from_json('[1, 2, null]', 'array') +-- !query 32 schema +struct> +-- !query 32 output +[1,2,null] + + +-- !query 33 +select from_json('[{"a": 1}, {"a":2}]', 'array>') +-- !query 33 schema +struct>> +-- !query 33 output +[{"a":1},{"a":2}] + + +-- !query 34 +select from_json('{"a": 1}', 'array>') +-- !query 34 schema +struct>> +-- !query 34 output +[{"a":1}] + + +-- !query 35 +select from_json('[null, {"a":2}]', 'array>') +-- !query 35 schema +struct>> +-- !query 35 output +[null,{"a":2}] + + +-- !query 36 +select from_json('[{"a": 1}, {"b":2}]', 'array>') +-- !query 36 schema +struct>> +-- !query 36 output +[{"a":1},{"b":2}] + + +-- !query 37 +select from_json('[{"a": 1}, 2]', 'array>') +-- !query 37 schema +struct>> +-- !query 37 output +NULL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index d3b2701f2558..f321ab86e9b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -133,15 +133,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { Row(null) :: Nil) } - test("from_json invalid schema") { + test("from_json - json doesn't conform to the array type") { val df = Seq("""{"a" 1}""").toDS() val schema = ArrayType(StringType) - val message = intercept[AnalysisException] { - df.select(from_json($"value", schema)) - }.getMessage - assert(message.contains( - "Input schema array must be a struct or an array of structs.")) + checkAnswer(df.select(from_json($"value", schema)), Seq(Row(null))) } test("from_json array support") { @@ -405,4 +401,72 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(out.schema == expected) } + + test("from_json - array of primitive types") { + val df = Seq("[1, 2, 3]").toDF("a") + val schema = new ArrayType(IntegerType, false) + + checkAnswer(df.select(from_json($"a", schema)), Seq(Row(Array(1, 2, 3)))) + } + + test("from_json - array of primitive types - malformed row") { + val df = Seq("[1, 2 3]").toDF("a") + val schema = new ArrayType(IntegerType, false) + + checkAnswer(df.select(from_json($"a", schema)), Seq(Row(null))) + } + + test("from_json - array of arrays") { + val jsonDF = Seq("[[1], [2, 3], [4, 5, 6]]").toDF("a") + val schema = new ArrayType(ArrayType(IntegerType, false), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer( + sql("select json[0][0], json[1][1], json[2][2] from jsonTable"), + Seq(Row(1, 3, 6))) + } + + test("from_json - array of arrays - malformed row") { + val jsonDF = Seq("[[1], [2, 3], 4, 5, 6]]").toDF("a") + val schema = new ArrayType(ArrayType(IntegerType, false), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer(sql("select json[0] from jsonTable"), Seq(Row(null))) + } + + test("from_json - array of structs") { + val jsonDF = Seq("""[{"a":1}, {"a":2}, {"a":3}]""").toDF("a") + val schema = new ArrayType(new StructType().add("a", IntegerType), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer( + sql("select json[0], json[1], json[2] from jsonTable"), + Seq(Row(Row(1), Row(2), Row(3)))) + } + + test("from_json - array of structs - malformed row") { + val jsonDF = Seq("""[{"a":1}, {"a:2}, {"a":3}]""").toDF("a") + val schema = new ArrayType(new StructType().add("a", IntegerType), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer(sql("select json[0], json[1]from jsonTable"), Seq(Row(null, null))) + } + + test("from_json - array of maps") { + val jsonDF = Seq("""[{"a":1}, {"b":2}]""").toDF("a") + val schema = new ArrayType(MapType(StringType, IntegerType, false), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer( + sql("""select json[0], json[1] from jsonTable"""), + Seq(Row(Map("a" -> 1), Map("b" -> 2)))) + } + + test("from_json - array of maps - malformed row") { + val jsonDF = Seq("""[{"a":1} "b":2}]""").toDF("a") + val schema = new ArrayType(MapType(StringType, IntegerType, false), false) + jsonDF.select(from_json($"a", schema) as "json").createOrReplaceTempView("jsonTable") + + checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null))) + } }