Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser #46400

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -316,17 +316,17 @@ class UnivocityParser(
throw BadRecordException(
() => getCurrentInput,
() => Array.empty,
QueryExecutionErrors.malformedCSVRecordError(""))
() => QueryExecutionErrors.malformedCSVRecordError(""))
}

val currentInput = getCurrentInput

var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) {
var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it as a malformed record.
// However, we still have chance to parse some of the tokens. It continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing
// tokens.
Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
} else None
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to:
Expand All @@ -348,7 +348,7 @@ class UnivocityParser(
} catch {
case e: SparkUpgradeException => throw e
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
badRecordException = badRecordException.orElse(Some(() => e))
// Use the corresponding DEFAULT value associated with the column, if any.
row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
}
Expand All @@ -359,7 +359,7 @@ class UnivocityParser(
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
() => currentInput, () => Array(requiredRow.get), badRecordException.get)
() => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get)
} else {
requiredRow
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,16 +613,15 @@ class JacksonParser(
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => Array.empty, e)
throw BadRecordException(() => recordLiteral(record), cause = e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => Array.empty,
wrappedCharException)
throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,31 @@ case class PartialResultArrayException(
extends Exception(cause)

/**
* Exception thrown when the underlying parser meet a bad record and can't parse it.
* Exception thrown when the underlying parser meets a bad record and can't parse it. Used for
* control flow between wrapper and underlying parser without overhead of creating a full exception.
* @param record a function to return the record that cause the parser to fail
* @param partialResults a function that returns an row array, which is the partial results of
* parsing this bad record.
* @param cause the actual exception about why the record is bad and can't be parsed.
* @param cause a function to return the actual exception about why the record is bad and can't be
* parsed.
*/
case class BadRecordException(
@transient record: () => UTF8String,
@transient partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
cause: Throwable) extends Exception(cause)
@transient partialResults: () => Array[InternalRow],
@transient cause: () => Throwable)
extends Exception() {

override def getStackTrace(): Array[StackTraceElement] = new Array[StackTraceElement](0)
override def fillInStackTrace(): Throwable = this
}

object BadRecordException {
def apply(
record: () => UTF8String,
partialResults: () => Array[InternalRow] = () => Array.empty[InternalRow],
cause: Throwable): BadRecordException =
new BadRecordException(record, partialResults, () => cause)
}

/**
* Exception thrown when the underlying parser parses a JSON array as a struct.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FailureSafeParser[IN](
case DropMalformedMode =>
Iterator.empty
case FailFastMode =>
e.getCause match {
e.cause() match {
case _: JsonArraysAsStructsException =>
// SPARK-42298 we recreate the exception here to make sure the error message
// have the record content.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,15 @@ class StaxXmlParser(
// XML parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => xmlRecord, () => Array.empty, e)
throw BadRecordException(() => xmlRecord, cause = e)
case e: CharConversionException if options.charset.isEmpty =>
val msg =
"""XML parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => xmlRecord, () => Array.empty,
wrappedCharException)
throw BadRecordException(() => xmlRecord, cause = wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => xmlRecord,
Expand Down