Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2189,11 +2189,16 @@ def from_json(col, schema, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[Row(a=1)])]
>>> schema = schema_of_json(lit('''{"a": 0}'''))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: '''{"a": 0}''' -> '{"a": 0}'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feel free to fix other examples above too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for other functions too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I mean the examples here in this function.

>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
"""

sc = SparkContext._active_spark_context
if isinstance(schema, DataType):
schema = schema.json()
elif isinstance(schema, Column):
schema = _to_java_column(schema)
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
return Column(jc)

Expand Down Expand Up @@ -2235,6 +2240,28 @@ def to_json(col, options={}):
return Column(jc)


@ignore_unicode_prefix
@since(2.4)
def schema_of_json(col):
"""
Parses a column containing a JSON string and infers its schema in DDL format.

:param col: string column in json format

>>> from pyspark.sql.types import *
>>> data = [(1, '{"a": 1}')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(schema_of_json(df.value).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.schema_of_json(_to_java_column(col))
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ object FunctionRegistry {
// json
expression[StructsToJson]("to_json"),
expression[JsonToStructs]("from_json"),
expression[SchemaOfJson]("schema_of_json"),

// cast
expression[Cast]("cast"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, CharArrayWriter, InputStreamReader, StringWriter}
import java.io._

import scala.util.parsing.combinator.RegexParsers

Expand All @@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -525,17 +526,19 @@ case class JsonToStructs(
override def nullable: Boolean = true

// Used in `FunctionRegistry`
def this(child: Expression, schema: Expression) =
def this(child: Expression, schema: Expression, options: Map[String, String]) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = Map.empty[String, String],
schema = JsonExprUtils.evalSchemaExpr(schema),
options = options,
child = child,
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])

def this(child: Expression, schema: Expression, options: Expression) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
schema = JsonExprUtils.evalSchemaExpr(schema),
options = JsonExprUtils.convertToMapData(options),
child = child,
timeZoneId = None,
Expand Down Expand Up @@ -744,11 +747,44 @@ case class StructsToJson(
override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
}

/**
* A function infers schema of JSON string.
*/
@ExpressionDescription(
usage = "_FUNC_(json[, options]) - Returns schema in the DDL format of JSON string.",
examples = """
Examples:
> SELECT _FUNC_('[{"col":0}]');
array<struct<col:int>>
""",
since = "2.4.0")
case class SchemaOfJson(child: Expression)
extends UnaryExpression with String2StringExpression with CodegenFallback {

private val jsonOptions = new JSONOptions(Map.empty, "UTC")
private val jsonFactory = new JsonFactory()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems jsonOptions.setJacksonOptions(factory) is missed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out. I really didn't know that I have to call the method.

jsonOptions.setJacksonOptions(jsonFactory)

override def convert(v: UTF8String): UTF8String = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser =>
parser.nextToken()
inferField(parser, jsonOptions)
}

UTF8String.fromString(dt.catalogString)
}
}

object JsonExprUtils {

def validateSchemaLiteral(exp: Expression): DataType = exp match {
def evalSchemaExpr(exp: Expression): DataType = exp match {
case Literal(s, StringType) => DataType.fromDDL(s.toString)
case e => throw new AnalysisException(s"Expected a string literal instead of $e")
case e @ SchemaOfJson(_: Literal) =>
val ddlSchema = e.eval().asInstanceOf[UTF8String]
DataType.fromDDL(ddlSchema.toString)
case e => throw new AnalysisException(
"Schema should be specified in DDL format as a string literal" +
s" or output of the schema_of_json function instead of ${e.sql}")
}

def convertToMapData(exp: Expression): Map[String, String] = exp match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import java.util.Comparator

Expand All @@ -25,7 +25,6 @@ import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -103,7 +102,7 @@ private[sql] object JsonInferSchema {
/**
* Infer the type of a json document from the parser's token stream
*/
private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
import com.fasterxml.jackson.core.JsonToken._
parser.getCurrentToken match {
case null | VALUE_NULL => NullType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,4 +706,11 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
assert(schemaToCompare == schema)
}
}

test("SPARK-24709: infer schema of json strings") {
checkEvaluation(SchemaOfJson(Literal.create("""{"col":0}""")), "struct<col:bigint>")
checkEvaluation(
SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
"struct<col0:array<string>,col1:struct<col2:string>>")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
Expand Down
42 changes: 42 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3381,6 +3381,48 @@ object functions {
from_json(e, dataType, options)
}

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
*
* @group collection_funcs
* @since 2.4.0
*/
def from_json(e: Column, schema: Column): Column = {
from_json(e, schema, Map.empty[String, String].asJava)
}

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
*
* @group collection_funcs
* @since 2.4.0
*/
def from_json(e: Column, schema: Column, options: java.util.Map[String, String]): Column = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me leave my last comment, #21686 (comment) in case it's missed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call the method from python: https://github.com/apache/spark/pull/21686/files#diff-f5295f69bfbdbf6e161aed54057ea36dR2202

Do you really want to revert changes for python?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I am fine then. Thanks.

withExpr(new JsonToStructs(e.expr, schema.expr, options.asScala.toMap))
}

/**
* Parses a column containing a JSON string and infers its schema.
*
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.4.0
*/
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))

/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ DROP VIEW IF EXISTS jsonTable;
-- from_json - complex types
select from_json('{"a":1, "b":2}', 'map<string, int>');
select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>');

-- infer schema of json literal
select schema_of_json('{"c1":0, "c2":[1]}');
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'));
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 28
-- Number of queries: 30


-- !query 0
Expand Down Expand Up @@ -183,7 +183,7 @@ select from_json('{"a":1}', 1)
struct<>
-- !query 17 output
org.apache.spark.sql.AnalysisException
Expected a string literal instead of 1;; line 1 pos 7
Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of 1;; line 1 pos 7


-- !query 18
Expand Down Expand Up @@ -274,3 +274,19 @@ select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>')
struct<jsontostructs({"a":1, "b":"2"}):struct<a:int,b:string>>
-- !query 27 output
{"a":1,"b":"2"}


-- !query 28
select schema_of_json('{"c1":0, "c2":[1]}')
-- !query 28 schema
struct<schemaofjson({"c1":0, "c2":[1]}):string>
-- !query 28 output
struct<c1:bigint,c2:array<bigint>>


-- !query 29
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'))
-- !query 29 schema
struct<jsontostructs({"c1":[1, 2, 3]}):struct<c1:array<bigint>>>
-- !query 29 output
{"c1":[1,2,3]}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions.{from_json, lit, map, struct, to_json}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -311,7 +311,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
val errMsg1 = intercept[AnalysisException] {
df3.selectExpr("from_json(value, 1)")
}
assert(errMsg1.getMessage.startsWith("Expected a string literal instead of"))
assert(errMsg1.getMessage.startsWith("Schema should be specified in DDL format as a string"))
val errMsg2 = intercept[AnalysisException] {
df3.selectExpr("""from_json(value, 'time InvalidType')""")
}
Expand Down Expand Up @@ -392,4 +392,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(Seq("""{"{"f": 1}": "a"}""").toDS().select(from_json($"value", schema)),
Row(null))
}

test("SPARK-24709: infers schemas of json strings and pass them to from_json") {
val in = Seq("""{"a": [1, 2, 3]}""").toDS()
val out = in.select(from_json('value, schema_of_json(lit("""{"a": [1]}"""))) as "parsed")
val expected = StructType(StructField(
"parsed",
StructType(StructField(
"a",
ArrayType(LongType, true), true) :: Nil),
true) :: Nil)

assert(out.schema == expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleType
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down