Skip to content

Conversation

@maropu
Copy link
Member

@maropu maropu commented Mar 29, 2018

What changes were proposed in this pull request?

This pr added a new JSON option dropFieldIfAllNull to ignore column of all null values or empty array/struct during JSON schema inference.

How was this patch tested?

Added tests in JsonSuite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mengxr This test matches your intention described in the jira? (I just want to confirm this before I brush up the code).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more test coverage. I have a similar internal implementation that tests the following cases (ignore the actual test, just look at the example records):

  test("null") {
    assert(removeNullRecursively("null") === "null")
  }

  test("empty string") {
    assert(removeNullRecursively("\"\"") === "\"\"")
  }

  test("empty object") {
    assert(removeNullRecursively("{}") === "null")
  }

  test("object with all null values") {
    val json = """{"a":null,"b":null, "c":null}"""
    assert(removeNullRecursively(json) === "null")
  }

  test("object with some null fields") {
    val json = """{"a":null,"b":"c","d":null,"e":"f"}"""
    val expected = """{"b":"c","e":"f"}"""
    assert(removeNullRecursively(json) === expected)
  }

  test("object with some nested null values") {
    val json = """{"a":{},"b":{"c":null},"d":{"c":"e"},"f":{"c":null,"g":"h"}}"""
    val expected = """{"d":{"c":"e"},"f":{"g":"h"}}"""
    assert(removeNullRecursively(json) === expected)
  }

  test("array with all null elements") {
    val json = """[null,null,{},{"a":null}]"""
    val expected = "null"
    assert(removeNullRecursively(json) === expected)
  }

  test("array with some null elements") {
    // TODO: is it an issue if we covert empty object to null in an array?
    val json = """[null,"a",null,{},"b"]"""
    val expected = """[null,"a",null,null,"b"]"""
    assert(removeNullRecursively(json) === expected)
  }

@SparkQA
Copy link

SparkQA commented Mar 29, 2018

Test build #88682 has finished for PR 20929 at commit 876da84.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @MaxGekk

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added this weird type for this prototype, so we need to consider more here. I don't have better idea now, so I'd like to have any suggestion about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to introduce a new DataType? Would it be the same if we use NullType? With the flag on, at the end of schema inference, NullType, ArrayType(NullType), etc should be dropped instead of using StringType as fallback. Basically, during schema inference, we keep the one that reveals more details, for example:

(NullType, ArrayType(NullType)) => ArrayType(NullType)
(ArrayType(NullType), ArrayType(StructType(Field("a", NullType)))) => ArrayType(StructType(Field("a", NullType))))

At the end, we implement a util method that determine whether a field is all null and drop them if true. It should be done recursively. I have an internal implementation that implements a similar logic, but on the JSON record itself. You might want to apply it to data types.

  /**
   * Removes null fields recursively from the input JSON record.
   * An array is null if all its elements are null.
   * An object is null if all its values are null.
   */
  def removeNullRecursively(jsonStr: String): String = {
    val json = parse(jsonStr)
    val cleaned = doRemoveNullRecursively(json)
    compact(render(cleaned)) // should handle null correctly
  }

  private def doRemoveNullRecursively(value: JValue): JValue = {
    value match {
      case null =>
        null

      case JNull =>
        null

      case JArray(values) =>
        val cleaned = values.map(doRemoveNullRecursively)
        if (cleaned.exists(_ != null)) {
          JArray(cleaned)
        } else {
          null
        }

      case JObject(pairs) =>
        val cleaned = pairs.flatMap { case (k, v) =>
          val cv = doRemoveNullRecursively(v)
          if (cv != null) {
            Some((k, cv))
          } else {
            None
          }
        }
        if (cleaned.nonEmpty) {
          JObject(cleaned)
        } else {
          null
        }

      // all other types are non-null
      case _ =>
        value
    }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the first attempt, I used the new type instead of NullType because some Sinks (FileStreamSink) could not handle NullType;

// parquet
java.lang.RuntimeException: Unsupported data type NullType.
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.org$apache$spark$sql$execution$datasources$parquet$ParquetWriteSupport$$makeWriter(ParquetWriteSupport.scala:206)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport$$anonfun$init$2.apply(ParquetWriteSupport.scala:93)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
// orc
java.lang.IllegalArgumentException: Can't parse category at 'struct<c0:bigint,c1:null^,c2:array<null>>'
        at org.apache.orc.TypeDescription.parseCategory(TypeDescription.java:223)
        at org.apache.orc.TypeDescription.parseType(TypeDescription.java:332)
        at org.apache.orc.TypeDescription.parseStruct(TypeDescription.java:327)
        at org.apache.orc.TypeDescription.parseType(TypeDescription.java:385)
        at org.apache.orc.TypeDescription.fromString(TypeDescription.java:406)

// csv
java.lang.UnsupportedOperationException: CSV data source does not support null data type.
        at org.apache.spark.sql.execution.datasources.csv.CSVUtils$.org$apache$spark$sql$execution$datasources$csv$CSVUtils$$verifyType$1(CSVUtils.scala:130)
        at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134)
        at org.apache.spark.sql.execution.datasources.csv.CSVUtils$$anonfun$verifySchema$1.apply(CSVUtils.scala:134)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)

So, in the previous fix, I tried to add PlaceholderType inherited from StringType and this type could be correctly handled in all the Sink, but too tricky.

In the suggested, NullType, ArrayType(NullType), etc should be dropped means that we need to handle an inferred schema as follows? e.g.,

Inferred schema: "StructType<IntegerType, NullType, ArrayType(NullType)>" -> Schema used in FileStreamSource: "StructType<IntegerType>"

Is this right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think removeNullRecursively seems to be a little time-consuming (json parsing twice) and why can't we do the same thing directly in JacksonParser? e.g.,
https://github.com/apache/spark/pull/20929/files#diff-635e02b2d1ce4ad1675b0350ccac0c10R334

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the latest fix, I used NullType for unresolved types and FileStreamSource would resolve the types when we see non-null values. As suggested in SPARK-12436, I think we might need to fallback into StringType just before Sinks, e.g.,

Copy link
Contributor

@mengxr mengxr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late review! I just saw it ... My main feedback is that this feature shouldn't require adding new types to Spark SQL, nor adding new Spark SQL conf. It should live in the JSON data source. I also recommended a way to implement it in inline comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to introduce a new DataType? Would it be the same if we use NullType? With the flag on, at the end of schema inference, NullType, ArrayType(NullType), etc should be dropped instead of using StringType as fallback. Basically, during schema inference, we keep the one that reveals more details, for example:

(NullType, ArrayType(NullType)) => ArrayType(NullType)
(ArrayType(NullType), ArrayType(StructType(Field("a", NullType)))) => ArrayType(StructType(Field("a", NullType))))

At the end, we implement a util method that determine whether a field is all null and drop them if true. It should be done recursively. I have an internal implementation that implements a similar logic, but on the JSON record itself. You might want to apply it to data types.

  /**
   * Removes null fields recursively from the input JSON record.
   * An array is null if all its elements are null.
   * An object is null if all its values are null.
   */
  def removeNullRecursively(jsonStr: String): String = {
    val json = parse(jsonStr)
    val cleaned = doRemoveNullRecursively(json)
    compact(render(cleaned)) // should handle null correctly
  }

  private def doRemoveNullRecursively(value: JValue): JValue = {
    value match {
      case null =>
        null

      case JNull =>
        null

      case JArray(values) =>
        val cleaned = values.map(doRemoveNullRecursively)
        if (cleaned.exists(_ != null)) {
          JArray(cleaned)
        } else {
          null
        }

      case JObject(pairs) =>
        val cleaned = pairs.flatMap { case (k, v) =>
          val cv = doRemoveNullRecursively(v)
          if (cv != null) {
            Some((k, cv))
          } else {
            None
          }
        }
        if (cleaned.nonEmpty) {
          JObject(cleaned)
        } else {
          null
        }

      // all other types are non-null
      case _ =>
        value
    }
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I was expecting a configuration for the JSON data source instead of a Spark SQL conf. Say, how to implement this feature if the data source is outside Spark?
  • Since Spark SQL cannot control the behavior of external data source implementations, the flag won't work unless external data sources recognize it. This is another reason to put it under JSON data source.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I dropped the conf in SQLConf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoreNullFields is not precise. For example

{"a": 1}
{"a": null}

The null value is not ignored. I would suggest dropFieldIfAllNull.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I used dropFieldIfAllNull.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more test coverage. I have a similar internal implementation that tests the following cases (ignore the actual test, just look at the example records):

  test("null") {
    assert(removeNullRecursively("null") === "null")
  }

  test("empty string") {
    assert(removeNullRecursively("\"\"") === "\"\"")
  }

  test("empty object") {
    assert(removeNullRecursively("{}") === "null")
  }

  test("object with all null values") {
    val json = """{"a":null,"b":null, "c":null}"""
    assert(removeNullRecursively(json) === "null")
  }

  test("object with some null fields") {
    val json = """{"a":null,"b":"c","d":null,"e":"f"}"""
    val expected = """{"b":"c","e":"f"}"""
    assert(removeNullRecursively(json) === expected)
  }

  test("object with some nested null values") {
    val json = """{"a":{},"b":{"c":null},"d":{"c":"e"},"f":{"c":null,"g":"h"}}"""
    val expected = """{"d":{"c":"e"},"f":{"g":"h"}}"""
    assert(removeNullRecursively(json) === expected)
  }

  test("array with all null elements") {
    val json = """[null,null,{},{"a":null}]"""
    val expected = "null"
    assert(removeNullRecursively(json) === expected)
  }

  test("array with some null elements") {
    // TODO: is it an issue if we covert empty object to null in an array?
    val json = """[null,"a",null,{},"b"]"""
    val expected = """[null,"a",null,null,"b"]"""
    assert(removeNullRecursively(json) === expected)
  }

@maropu
Copy link
Member Author

maropu commented Apr 12, 2018

@mengxr Thanks for the review! ok, plz give me more time to reconsider the design based the comments.

@mengxr
Copy link
Contributor

mengxr commented May 1, 2018

@maropu Any updates?

@maropu
Copy link
Member Author

maropu commented May 2, 2018

oh, sorry, I'll update in a few days.

maropu added 3 commits May 7, 2018 15:45
@SparkQA
Copy link

SparkQA commented May 7, 2018

Test build #90324 has finished for PR 20929 at commit 53b686d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented May 7, 2018

retest this please

@SparkQA
Copy link

SparkQA commented May 8, 2018

Test build #90345 has finished for PR 20929 at commit 53b686d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented May 15, 2018

@maropu Thanks for the update! I saw some test failures, no sure if they are relevant. Looking at the changes, I feel we don't need to handle the streaming scenarios in the same PR. We just add a flag to JSON data source and leave it off by default, so it won't break any existing streaming use cases. In this PR, we should focus on properly identifying "null" types, which include NullType, ArrayType(NullType), ArrayType(ArrayType(NullType)), etc.

Could you make another update? Thanks!

@maropu
Copy link
Member Author

maropu commented May 16, 2018

retest this please

@maropu
Copy link
Member Author

maropu commented May 16, 2018

ok, in this pr, I'll focus on adding a new flag to do so. just a sec for the update. Thanks!

@SparkQA
Copy link

SparkQA commented May 16, 2018

Test build #90664 has finished for PR 20929 at commit 53b686d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu maropu changed the title [SPARK-23772][SQL][WIP] Provide an option to ignore column of all null values or empty array during JSON schema inference [SPARK-23772][SQL] Provide an option to ignore column of all null values or empty array during JSON schema inference May 22, 2018
@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90978 has finished for PR 20929 at commit 411bd9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 22, 2018

Test build #90979 has finished for PR 20929 at commit 6c4592d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented May 23, 2018

@mengxr Could you check? Thanks!

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add description for the option to readwriter.py and streaming.py

* that should be used for parsing.</li>
* <li>`samplingRatio` (default is 1.0): defines fraction of input JSON objects used
* for schema inferring.</li>
* <li>`dropFieldIfAllNull` (default `false`): whether to ignore column of all null values or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about DataStreamReader? I guess the description should be added to it too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], "e":{}}""",
"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""",
"""{"a":null, "b":[], "c":[], "d": null, "e":null}""")
.toDS().write.mode("overwrite").text(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need the overwrite mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

val expectedSchema = new StructType()
.add("a", NullType).add("b", NullType).add("c", NullType).add("d", NullType)
.add("e", NullType)
assert(df.schema === expectedSchema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the DefaultEquality is used here which applies ==. Are there any reasons for === instead of just ==?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, there's no explicit preference between them since the preferences are diverted even in committers. It's fine to use one of them.

Seq(
"""{"a":null, "b":[null, null], "c":null, "d":[[], [null]], "e":{}}""",
"""{"a":null, "b":[null], "c":[], "d": [null, []], "e":{}}""",
"""{"a":null, "b":[], "c":[], "d": null, "e":null}""")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test when dropFieldIfAllNull is set to true but not all values in a column are nulls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@SparkQA
Copy link

SparkQA commented May 28, 2018

Test build #91226 has finished for PR 20929 at commit 907cf38.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Jun 6, 2018

@maropu Thanks for updating this PR! It would be easier to maintain the logic in one place. I think it should be feasible to do everything inside canonicalizeType without modifying JsonParser or other methods in JsonInferSchema. The following code outlines my logic, though I didn't test it ...:

 /**
   * Canonicalize data types and remove StructTypes with no fields.
   * @return Some(canonicalizedType) or None if nothing left.
   */
  private def canonicalizeType(tpe: DataType, options: JSONOptions): Option[DataType] = tpe match {
    case at @ ArrayType(elementType, _) =>
      canonicalizeType(elementType, options).map(t => at.copy(elementType = t))

    case StructType(fields) =>
      val canonicalizedFields = fields.flatMap { f =>
        canonicalizeType(f.dataType, options).map(t => f.copy(dataType = t))
      }
      // per SPARK-8093: empty structs should be deleted
      if (canonicalizedFields.isEmpty) {
        None
      } else {
        StructType(canonicalizedFields)
      }

    case NullType => 
      if (options.dropFieldIfAllNull) {
        None
      else {
        Some(StringType)
      }
    }

    case other => Some(other)
  }  

In the test, we should also include scenarios with nested "null" fields like [[], null, [[]]].

@maropu
Copy link
Member Author

maropu commented Jun 7, 2018

yea, thanks for the comments! I'll try to fix based on the comments.

@mengxr
Copy link
Contributor

mengxr commented Jun 12, 2018

@maropu Any updates?

@maropu
Copy link
Member Author

maropu commented Jun 12, 2018

sorry, I'll update tongiht

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91803 has finished for PR 20929 at commit 58054ef.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jun 14, 2018

@mengxr ok, could you check?

@SparkQA
Copy link

SparkQA commented Jun 14, 2018

Test build #91805 has finished for PR 20929 at commit 22e0d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Jun 17, 2018

@maropu I sent you a PR at maropu#1. Could you take a look and see if there are things I missed? I updated the tests, so the null fields are dropped in the expected schema.

@maropu
Copy link
Member Author

maropu commented Jun 18, 2018

ok, I'll check

@maropu
Copy link
Member Author

maropu commented Jun 18, 2018

I checked the json test passed in my local. So, I merged the fix into this. Sorry to bother you, but could you check again?

@maropu
Copy link
Member Author

maropu commented Jun 18, 2018

@HyukjinKwon I think this is your area, so could you double-check this? Thanks!

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92016 has finished for PR 20929 at commit 4544433.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jun 18, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92019 has finished for PR 20929 at commit 4544433.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member Author

maropu commented Jun 18, 2018

retest this please

@mengxr
Copy link
Contributor

mengxr commented Jun 18, 2018

LGTM pending Jenkins

@SparkQA
Copy link

SparkQA commented Jun 18, 2018

Test build #92026 has finished for PR 20929 at commit 4544433.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in e219e69 Jun 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants