Skip to content

Commit

Permalink
[SPARK-48169][SPARK-48143][SQL] Revert BadRecordException optimizations
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Revert BadRecordException optimizations for UnivocityParser, StaxXmlParser and JacksonParser

### Why are the changes needed?
To reduce the blast radius - this will be implemented differently. There were two PRs by me recently:
- apache#46438
- apache#46400

which introduced optimizations to speed-up control flow between UnivocityParser, StaxXmlParser and JacksonParser. However, these changes are quite unstable and may break any calling code, which relies on exception cause type, for example. Also, there may be some Spark plugins/extensions using that exception for user-facing errors

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46478 from vladimirg-db/vladimirg-db/revert-SPARK-48169-SPARK-48143.

Authored-by: Vladimir Golubev <vladimir.golubev@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
vladimirg-db authored and cloud-fan committed May 9, 2024
1 parent 85a6e35 commit 6cc3dc2
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,17 @@ class UnivocityParser(
throw BadRecordException(
() => getCurrentInput,
() => Array.empty,
() => QueryExecutionErrors.malformedCSVRecordError(""))
QueryExecutionErrors.malformedCSVRecordError(""))
}

val currentInput = getCurrentInput

var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) {
var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens. It continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing
// tokens.
Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
} else None
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
Expand All @@ -348,7 +348,7 @@ class UnivocityParser(
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(() => e))
badRecordException = badRecordException.orElse(Some(e))
// Use the corresponding DEFAULT value associated with the column, if any.
row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,38 +613,39 @@ class JacksonParser(
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), cause = () => e)
throw BadRecordException(() => recordLiteral(record), () => Array.empty, e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException)
throw BadRecordException(() => recordLiteral(record), () => Array.empty,
wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(row),
cause = () => convertCauseForPartialResult(cause))
convertCauseForPartialResult(cause))
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => rows,
cause = () => cause)
cause)
// These exceptions should never be thrown outside of JacksonParser.
// They are used for the control flow in the parser. We add them here for completeness
// since they also indicate a bad record.
case PartialArrayDataResultException(arrayData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(arrayData)),
cause = () => convertCauseForPartialResult(cause))
convertCauseForPartialResult(cause))
case PartialMapDataResultException(mapData, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResults = () => Array(InternalRow(mapData)),
cause = () => convertCauseForPartialResult(cause))
convertCauseForPartialResult(cause))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,16 @@ case class PartialResultArrayException(
extends Exception(cause)

/**
* Exception thrown when the underlying parser meets a bad record and can't parse it. Used for
* control flow between wrapper and underlying parser without overhead of creating a full exception.
* Exception thrown when the underlying parser meet a bad record and can't parse it.
* @param record a function to return the record that cause the parser to fail
* @param partialResults a function that returns an row array, which is the partial results of
* parsing this bad record.
* @param cause a function to return the actual exception about why the record is bad and can't be
* parsed.
* @param cause the actual exception about why the record is bad and can't be parsed.
*/
case class BadRecordException(
@transient record: () => UTF8String,
@transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
@transient cause: () => Throwable)
extends Exception() {

override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
override def fillInStackTrace(): Throwable = this
}
cause: Throwable) extends Exception(cause)

/**
* Exception thrown when the underlying parser parses a JSON array as a struct.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FailureSafeParser[IN](
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
e.cause() match {
e.getCause match {
case _: JsonArraysAsStructsException =>
// SPARK-42298 we recreate the exception here to make sure the error message
// have the record content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,27 +148,26 @@ class StaxXmlParser(
// XML parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => xmlRecord, cause = () => e)
throw BadRecordException(() => xmlRecord, () => Array.empty, e)
case e: CharConversionException if options.charset.isEmpty =>
throw BadRecordException(() => xmlRecord, cause = () => {
val msg =
"""XML parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
wrappedCharException
})
val msg =
"""XML parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => xmlRecord, () => Array.empty,
wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => xmlRecord,
partialResults = () => Array(row),
() => cause)
cause)
case PartialResultArrayException(rows, cause) =>
throw BadRecordException(
record = () => xmlRecord,
partialResults = () => rows,
() => cause)
cause)
}
}

Expand Down

0 comments on commit 6cc3dc2

Please sign in to comment.