From 6cc3dc2ef4d2ffbff7ffc400e723b97b462e1bab Mon Sep 17 00:00:00 2001 From: Vladimir Golubev Date: Thu, 9 May 2024 15:35:28 +0800 Subject: [PATCH] [SPARK-48169][SPARK-48143][SQL] Revert BadRecordException optimizations ### What changes were proposed in this pull request? Revert BadRecordException optimizations for UnivocityParser, StaxXmlParser and JacksonParser ### Why are the changes needed? To reduce the blast radius - this will be implemented differently. There were two PRs by me recently: - https://github.com/apache/spark/pull/46438 - https://github.com/apache/spark/pull/46400 which introduced optimizations to speed-up control flow between UnivocityParser, StaxXmlParser and JacksonParser. However, these changes are quite unstable and may break any calling code, which relies on exception cause type, for example. Also, there may be some Spark plugins/extensions using that exception for user-facing errors ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No Closes #46478 from vladimirg-db/vladimirg-db/revert-SPARK-48169-SPARK-48143. Authored-by: Vladimir Golubev Signed-off-by: Wenchen Fan --- .../sql/catalyst/csv/UnivocityParser.scala | 8 +++---- .../sql/catalyst/json/JacksonParser.scala | 13 ++++++----- .../catalyst/util/BadRecordException.scala | 13 +++-------- .../sql/catalyst/util/FailureSafeParser.scala | 2 +- .../sql/catalyst/xml/StaxXmlParser.scala | 23 +++++++++---------- 5 files changed, 26 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 8d06789a75126..a5158d8a22c6b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -316,17 +316,17 @@ class UnivocityParser( throw BadRecordException( () => getCurrentInput, () => Array.empty, - () => QueryExecutionErrors.malformedCSVRecordError("")) + QueryExecutionErrors.malformedCSVRecordError("")) } val currentInput = getCurrentInput - var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) { + var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens. It continues to parses the // tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing // tokens. - Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) + Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) } else None // When the length of the returned tokens is identical to the length of the parsed schema, // we just need to: @@ -348,7 +348,7 @@ class UnivocityParser( } catch { case e: SparkUpgradeException => throw e case NonFatal(e) => - badRecordException = badRecordException.orElse(Some(() => e)) + badRecordException = badRecordException.orElse(Some(e)) // Use the corresponding DEFAULT value associated with the column, if any. row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 848c20ee36bef..5e75ff6f6e1a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -613,7 +613,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => recordLiteral(record), cause = () => e) + throw BadRecordException(() => recordLiteral(record), () => Array.empty, e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -621,17 +621,18 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) - throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException) + throw BadRecordException(() => recordLiteral(record), () => Array.empty, + wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(row), - cause = () => convertCauseForPartialResult(cause)) + convertCauseForPartialResult(cause)) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => rows, - cause = () => cause) + cause) // These exceptions should never be thrown outside of JacksonParser. // They are used for the control flow in the parser. We add them here for completeness // since they also indicate a bad record. @@ -639,12 +640,12 @@ class JacksonParser( throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(InternalRow(arrayData)), - cause = () => convertCauseForPartialResult(cause)) + convertCauseForPartialResult(cause)) case PartialMapDataResultException(mapData, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(InternalRow(mapData)), - cause = () => convertCauseForPartialResult(cause)) + convertCauseForPartialResult(cause)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala index c4fcdf40360af..65a56c1064e45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala @@ -67,23 +67,16 @@ case class PartialResultArrayException( extends Exception(cause) /** - * Exception thrown when the underlying parser meets a bad record and can't parse it. Used for - * control flow between wrapper and underlying parser without overhead of creating a full exception. + * Exception thrown when the underlying parser meet a bad record and can't parse it. * @param record a function to return the record that cause the parser to fail * @param partialResults a function that returns an row array, which is the partial results of * parsing this bad record. - * @param cause a function to return the actual exception about why the record is bad and can't be - * parsed. + * @param cause the actual exception about why the record is bad and can't be parsed. */ case class BadRecordException( @transient record: () => UTF8String, @transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow], - @transient cause: () => Throwable) - extends Exception() { - - override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0) - override def fillInStackTrace(): Throwable = this -} + cause: Throwable) extends Exception(cause) /** * Exception thrown when the underlying parser parses a JSON array as a struct. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala index b005563aa824f..10cd159c769b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala @@ -70,7 +70,7 @@ class FailureSafeParser[IN]( case DropMalformedMode => Iterator.empty case FailFastMode => - e.cause() match { + e.getCause match { case _: JsonArraysAsStructsException => // SPARK-42298 we recreate the exception here to make sure the error message // have the record content. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala index 2b237ab5db643..ab671e56a21e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlParser.scala @@ -148,27 +148,26 @@ class StaxXmlParser( // XML parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. - throw BadRecordException(() => xmlRecord, cause = () => e) + throw BadRecordException(() => xmlRecord, () => Array.empty, e) case e: CharConversionException if options.charset.isEmpty => - throw BadRecordException(() => xmlRecord, cause = () => { - val msg = - """XML parser cannot handle a character in its input. - |Specifying encoding as an input option explicitly might help to resolve the issue. - |""".stripMargin + e.getMessage - val wrappedCharException = new CharConversionException(msg) - wrappedCharException.initCause(e) - wrappedCharException - }) + val msg = + """XML parser cannot handle a character in its input. + |Specifying encoding as an input option explicitly might help to resolve the issue. + |""".stripMargin + e.getMessage + val wrappedCharException = new CharConversionException(msg) + wrappedCharException.initCause(e) + throw BadRecordException(() => xmlRecord, () => Array.empty, + wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => xmlRecord, partialResults = () => Array(row), - () => cause) + cause) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => xmlRecord, partialResults = () => rows, - () => cause) + cause) } }