Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2365,30 +2365,32 @@ def to_json(col, options={}):

@ignore_unicode_prefix
@since(2.4)
def schema_of_json(col, options={}):
def schema_of_json(json, options={}):
"""
Parses a column containing a JSON string and infers its schema in DDL format.
Parses a JSON string and infers its schema in DDL format.

:param col: string column in json format
:param json: a JSON string or a string literal containing a JSON string.
:param options: options to control parsing. accepts the same options as the JSON datasource

.. versionchanged:: 3.0
It accepts `options` parameter to control schema inferring.

>>> from pyspark.sql.types import *
>>> data = [(1, '{"a": 1}')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(schema_of_json(df.value).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
>>> df = spark.range(1)
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
>>> schema = schema_of_json(lit('{a: 1}'), {'allowUnquotedFieldNames':'true'})
>>> schema = schema_of_json('{a: 1}', {'allowUnquotedFieldNames':'true'})
>>> df.select(schema.alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
"""
if isinstance(json, basestring):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after more thoughts, maybe we should not add new features to 2.4? We can accept strings directly in 3.0.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, Wenchen, I think that's going to make it a bit complicated after few more thoughts .. I think it's okay to go ahead. It's kind of a mistake that we had to fix in 2.4.

col = _create_column_from_literal(json)
elif isinstance(json, Column):
col = _to_java_column(json)
else:
raise TypeError("schema argument should be a column or string")

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.schema_of_json(_to_java_column(col), options)
jc = sc._jvm.functions.schema_of_json(col, options)
return Column(jc)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,14 +752,18 @@ case class StructsToJson(
case class SchemaOfJson(
child: Expression,
options: Map[String, String])
extends UnaryExpression with String2StringExpression with CodegenFallback {
extends UnaryExpression with CodegenFallback {

def this(child: Expression) = this(child, Map.empty[String, String])

def this(child: Expression, options: Expression) = this(
child = child,
options = ExprUtils.convertToMapData(options))

override def dataType: DataType = StringType

override def nullable: Boolean = false

@transient
private lazy val jsonOptions = new JSONOptions(options, "UTC")

Expand All @@ -770,8 +774,17 @@ case class SchemaOfJson(
factory
}

override def convert(v: UTF8String): UTF8String = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser =>
@transient
private lazy val json = child.eval().asInstanceOf[UTF8String]

override def checkInputDataTypes(): TypeCheckResult = child match {
case Literal(s, StringType) if s != null => super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"The input json should be a string literal and not null; however, got ${child.sql}.")
}

override def eval(v: InternalRow): Any = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser =>
parser.nextToken()
inferField(parser, jsonOptions)
}
Expand All @@ -786,7 +799,7 @@ object JsonExprUtils {
def evalSchemaExpr(exp: Expression): DataType = exp match {
case Literal(s, StringType) => DataType.fromDDL(s.toString)
case e @ SchemaOfJson(_: Literal, _) =>
val ddlSchema = e.eval().asInstanceOf[UTF8String]
val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
DataType.fromDDL(ddlSchema.toString)
case e => throw new AnalysisException(
"Schema should be specified in DDL format as a string literal" +
Expand Down
24 changes: 17 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3626,28 +3626,38 @@ object functions {
}

/**
* Parses a column containing a JSON string and infers its schema.
* Parses a JSON string and infers its schema in DDL format.
*
* @param e a string column containing JSON data.
* @param json a JSON string.
*
* @group collection_funcs
* @since 2.4.0
*/
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))
def schema_of_json(json: String): Column = schema_of_json(lit(json))

/**
* Parses a column containing a JSON string and infers its schema using options.
* Parses a JSON string and infers its schema in DDL format.
*
* @param e a string column containing JSON data.
* @param json a string literal containing a JSON string.
*
* @group collection_funcs
* @since 2.4.0
*/
def schema_of_json(json: Column): Column = withExpr(new SchemaOfJson(json.expr))

/**
* Parses a JSON string and infers its schema in DDL format using options.
*
* @param json a string column containing JSON data.
* @param options options to control how the json is parsed. accepts the same options and the
* json data source. See [[DataFrameReader#json]].
* @return a column with string literal containing schema in DDL format.
*
* @group collection_funcs
* @since 3.0.0
*/
def schema_of_json(e: Column, options: java.util.Map[String, String]): Column = {
withExpr(SchemaOfJson(e.expr, options.asScala.toMap))
def schema_of_json(json: Column, options: java.util.Map[String, String]): Column = {
withExpr(SchemaOfJson(json.expr, options.asScala.toMap))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ select to_json(array(array(1, 2, 3), array(4)));
-- infer schema of json literal using options
select schema_of_json('{"c1":1}', map('primitivesAsString', 'true'));
select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'true', 'prefersDecimal', 'true'));

select schema_of_json(null);
CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a');
SELECT schema_of_json(jsonField) FROM jsonTable;
-- Clean up
DROP VIEW IF EXISTS jsonTable;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 38
-- Number of queries: 42


-- !query 0
Expand Down Expand Up @@ -318,3 +318,37 @@ select schema_of_json('{"c1":01, "c2":0.1}', map('allowNumericLeadingZeros', 'tr
struct<schema_of_json({"c1":01, "c2":0.1}):string>
-- !query 37 output
struct<c1:bigint,c2:decimal(1,1)>


-- !query 38
select schema_of_json(null)
-- !query 38 schema
struct<>
-- !query 38 output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7


-- !query 39
CREATE TEMPORARY VIEW jsonTable(jsonField, a) AS SELECT * FROM VALUES ('{"a": 1, "b": 2}', 'a')
-- !query 39 schema
struct<>
-- !query 39 output



-- !query 40
SELECT schema_of_json(jsonField) FROM jsonTable
-- !query 40 schema
struct<>
-- !query 40 output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7


-- !query 41
DROP VIEW IF EXISTS jsonTable
-- !query 41 schema
struct<>
-- !query 41 output

Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

test("SPARK-24709: infers schemas of json strings and pass them to from_json") {
val in = Seq("""{"a": [1, 2, 3]}""").toDS()
val out = in.select(from_json('value, schema_of_json(lit("""{"a": [1]}"""))) as "parsed")
val out = in.select(from_json('value, schema_of_json("""{"a": [1]}""")) as "parsed")
val expected = StructType(StructField(
"parsed",
StructType(StructField(
Expand Down