diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md index ed2ff139bcc3..3c346be4c45d 100644 --- a/docs/sql-migration-guide-upgrade.md +++ b/docs/sql-migration-guide-upgrade.md @@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide - Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`. - - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully. + + - In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully. ## Upgrading From Spark SQL 2.3 to 2.4 diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1d2dd4d80893..7b10512a4329 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ @@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index d92b0d5677e2..fc23b9d99c34 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ @@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non set, it uses the default value, ``PERMISSIVE``. * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ - into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \ fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 2357595906b1..7e3bd4df51bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -22,6 +22,7 @@ import java.nio.charset.MalformedInputException import scala.collection.mutable.ArrayBuffer import scala.util.Try +import scala.util.control.NonFatal import com.fasterxml.jackson.core._ @@ -29,7 +30,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -347,17 +347,28 @@ class JacksonParser( schema: StructType, fieldConverters: Array[ValueConverter]): InternalRow = { val row = new GenericInternalRow(schema.length) + var badRecordException: Option[Throwable] = None + while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) - + try { + row.update(index, fieldConverters(index).apply(parser)) + } catch { + case NonFatal(e) => + badRecordException = badRecordException.orElse(Some(e)) + parser.skipChildren() + } case None => parser.skipChildren() } } - row + if (badRecordException.isEmpty) { + row + } else { + throw PartialResultException(row, badRecordException.get) + } } /** @@ -428,6 +439,11 @@ class JacksonParser( val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException) + case PartialResultException(row, cause) => + throw BadRecordException( + record = () => recordLiteral(record), + partialResult = () => Some(row), + cause) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index 985f0dc1cd60..d719a33929fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.unsafe.types.UTF8String +/** + * Exception thrown when the underlying parser returns a partial result of parsing. + * @param partialResult the partial result of parsing a bad record. + * @param cause the actual exception about why the parser cannot return full result. + */ +case class PartialResultException( + partialResult: InternalRow, + cause: Throwable) + extends Exception(cause) + /** * Exception thrown when the underlying parser meet a bad record and can't parse it. * @param record a function to return the record that cause the parser to fail diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 661fe98d8c90..9751528654ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *