From 12bd77e111263464a30bcc8da04966c21cf1f959 Mon Sep 17 00:00:00 2001 From: Garland Zhang Date: Thu, 16 Oct 2025 14:54:59 +0000 Subject: [PATCH 1/5] Add optional SQL state parameter to all SparkThrowables MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds an optional sqlState parameter to the constructors of all SparkThrowable implementations. The getSqlState method is overridden to check the provided property first before falling back to the errorClassReader. Implementation: - Added optional sqlState parameter to all exception constructors - For Scala: sqlState: Option[String] = None - For Java: String sqlState with null as default - Override getSqlState() to prioritize the custom property: * Scala: sqlState.getOrElse(super.getSqlState) * Java: sqlState != null ? sqlState : SparkThrowable.super.getSqlState() Modified Files: - SparkException.scala: 18 exception classes updated - StreamingQueryException.scala - SqlScriptingException.scala - SparkOutOfMemoryError.java - KafkaExceptions.scala: 2 exception classes - SparkFileAlreadyExistsException.scala Test Coverage: - Added 6 tests to SparkThrowableSuite covering Scala and Java exceptions - Created KafkaExceptionsSuite with 3 tests for Kafka-specific exceptions - Tests verify custom SQL state takes precedence over errorClassReader - Tests verify fallback to errorClassReader when no custom state provided 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../org/apache/spark/SparkException.scala | 103 +++++++++--- .../streaming/StreamingQueryException.scala | 5 +- .../spark/sql/kafka010/KafkaExceptions.scala | 10 +- .../sql/kafka010/KafkaExceptionsSuite.scala | 133 ++++++++++++++++ .../spark/memory/SparkOutOfMemoryError.java | 11 ++ .../SparkFileAlreadyExistsException.scala | 5 +- .../apache/spark/SparkThrowableSuite.scala | 150 ++++++++++++++++++ .../exceptions/SqlScriptingException.scala | 6 +- 8 files changed, 396 insertions(+), 27 deletions(-) create mode 100644 connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index f438c62253475..07b6f60e5584a 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -29,7 +29,8 @@ class SparkException( cause: Throwable, errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext] = Array.empty) + context: Array[QueryContext] = Array.empty, + sqlState: Option[String] = None) extends Exception(message, cause) with SparkThrowable { def this(message: String, cause: Throwable) = @@ -71,6 +72,8 @@ class SparkException( override def getCondition: String = errorClass.orNull + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -170,7 +173,8 @@ private[spark] class SparkUpgradeException private( message: String, cause: Option[Throwable], errorClass: Option[String], - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends RuntimeException(message, cause.orNull) with SparkThrowable { def this( @@ -188,6 +192,8 @@ private[spark] class SparkUpgradeException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -197,7 +203,8 @@ private[spark] class SparkArithmeticException private( message: String, errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String] = None) extends ArithmeticException(message) with SparkThrowable { def this( @@ -221,6 +228,9 @@ private[spark] class SparkArithmeticException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -230,7 +240,8 @@ private[spark] class SparkArithmeticException private( private[spark] class SparkUnsupportedOperationException private( message: String, errorClass: Option[String], - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends UnsupportedOperationException(message) with SparkThrowable { def this( @@ -259,6 +270,8 @@ private[spark] class SparkUnsupportedOperationException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } private[spark] object SparkUnsupportedOperationException { @@ -281,7 +294,8 @@ private[spark] object SparkUnsupportedOperationException { private[spark] class SparkClassNotFoundException( errorClass: String, messageParameters: Map[String, String], - cause: Throwable = null) + cause: Throwable = null, + sqlState: Option[String] = None) extends ClassNotFoundException( SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable { @@ -289,6 +303,8 @@ private[spark] class SparkClassNotFoundException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -297,7 +313,8 @@ private[spark] class SparkClassNotFoundException( private[spark] class SparkConcurrentModificationException( errorClass: String, messageParameters: Map[String, String], - cause: Throwable = null) + cause: Throwable = null, + sqlState: Option[String] = None) extends ConcurrentModificationException( SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable { @@ -305,6 +322,8 @@ private[spark] class SparkConcurrentModificationException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -315,7 +334,8 @@ private[spark] class SparkDateTimeException private( errorClass: Option[String], messageParameters: Map[String, String], context: Array[QueryContext], - cause: Option[Throwable]) + cause: Option[Throwable], + sqlState: Option[String] = None) extends DateTimeException(message, cause.orNull) with SparkThrowable { def this( @@ -355,6 +375,9 @@ private[spark] class SparkDateTimeException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -363,7 +386,8 @@ private[spark] class SparkDateTimeException private( */ private[spark] class SparkFileNotFoundException( errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends FileNotFoundException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -371,6 +395,8 @@ private[spark] class SparkFileNotFoundException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -380,7 +406,8 @@ private[spark] class SparkNumberFormatException private( message: String, errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String] = None) extends NumberFormatException(message) with SparkThrowable { @@ -405,6 +432,9 @@ private[spark] class SparkNumberFormatException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -416,7 +446,8 @@ private[spark] class SparkIllegalArgumentException private( cause: Option[Throwable], errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String] = None) extends IllegalArgumentException(message, cause.orNull) with SparkThrowable { @@ -457,6 +488,9 @@ private[spark] class SparkIllegalArgumentException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -467,7 +501,8 @@ private[spark] class SparkIllegalStateException( errorClass: String, messageParameters: Map[String, String], context: Array[QueryContext] = Array.empty, - cause: Throwable = null) + cause: Throwable = null, + sqlState: Option[String] = None) extends IllegalStateException( SparkThrowableHelper.getMessage(errorClass, messageParameters), cause) with SparkThrowable { @@ -476,6 +511,8 @@ private[spark] class SparkIllegalStateException( override def getCondition: String = errorClass + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -484,7 +521,8 @@ private[spark] class SparkRuntimeException private( cause: Option[Throwable], errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String]) extends RuntimeException(message, cause.orNull) with SparkThrowable { def this( @@ -498,13 +536,17 @@ private[spark] class SparkRuntimeException private( Option(cause), Option(errorClass), messageParameters, - context + context, + None ) } override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -513,7 +555,8 @@ private[spark] class SparkPythonException private( cause: Option[Throwable], errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String]) extends RuntimeException(message, cause.orNull) with SparkThrowable { def this( @@ -527,13 +570,17 @@ private[spark] class SparkPythonException private( Option(cause), Option(errorClass), messageParameters, - context + context, + None ) } override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -544,7 +591,8 @@ private[spark] class SparkNoSuchElementException( errorClass: String, messageParameters: Map[String, String], context: Array[QueryContext] = Array.empty, - summary: String = "") + summary: String = "", + sqlState: Option[String] = None) extends NoSuchElementException( SparkThrowableHelper.getMessage(errorClass, messageParameters, summary)) with SparkThrowable { @@ -553,6 +601,8 @@ private[spark] class SparkNoSuchElementException( override def getCondition: String = errorClass + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -561,7 +611,8 @@ private[spark] class SparkNoSuchElementException( */ private[spark] class SparkSecurityException( errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends SecurityException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -569,6 +620,8 @@ private[spark] class SparkSecurityException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -578,7 +631,8 @@ private[spark] class SparkArrayIndexOutOfBoundsException private( message: String, errorClass: Option[String], messageParameters: Map[String, String], - context: Array[QueryContext]) + context: Array[QueryContext], + sqlState: Option[String] = None) extends ArrayIndexOutOfBoundsException(message) with SparkThrowable { @@ -603,6 +657,9 @@ private[spark] class SparkArrayIndexOutOfBoundsException private( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass.orNull + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getQueryContext: Array[QueryContext] = context } @@ -611,7 +668,8 @@ private[spark] class SparkArrayIndexOutOfBoundsException private( */ private[spark] class SparkSQLException( errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends SQLException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -619,6 +677,8 @@ private[spark] class SparkSQLException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } /** @@ -626,7 +686,8 @@ private[spark] class SparkSQLException( */ private[spark] class SparkSQLFeatureNotSupportedException( errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends SQLFeatureNotSupportedException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -634,4 +695,6 @@ private[spark] class SparkSQLFeatureNotSupportedException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } diff --git a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 1972ef05d8759..b30260b8d6cc1 100644 --- a/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/common/utils/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -39,7 +39,8 @@ class StreamingQueryException private[sql]( val startOffset: String, val endOffset: String, errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends Exception(message, cause) with SparkThrowable { private[spark] def this( @@ -86,5 +87,7 @@ class StreamingQueryException private[sql]( override def getCondition: String = errorClass + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index 156bb71d777d3..9db9f69c54dfe 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -242,14 +242,15 @@ object KafkaExceptions { private[kafka010] class KafkaIllegalStateException( errorClass: String, messageParameters: Map[String, String], - cause: Throwable = null) + cause: Throwable = null, + sqlState: Option[String] = None) extends IllegalStateException( KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( errorClass, messageParameters), cause) with SparkThrowable { override def getSqlState: String = - KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass) + sqlState.getOrElse(KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass)) override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava @@ -262,14 +263,15 @@ private[kafka010] class KafkaIllegalStateException( private[kafka010] class KafkaIllegalArgumentException( errorClass: String, messageParameters: Map[String, String], - cause: Throwable = null) + cause: Throwable = null, + sqlState: Option[String] = None) extends IllegalArgumentException( KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( errorClass, messageParameters), cause) with SparkThrowable { override def getSqlState: String = - KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass) + sqlState.getOrElse(KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass)) override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala new file mode 100644 index 0000000000000..6330af5073f84 --- /dev/null +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkFunSuite + +/** + * Test suite for Kafka exceptions, particularly SQL state prioritization. + */ +class KafkaExceptionsSuite extends SparkFunSuite { + + test("Custom SQL state takes precedence - KafkaIllegalStateException") { + // Test without custom SQL state - should fall back to error class reader + val exceptionWithoutCustom = new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", + messageParameters = Map( + "topicPartition" -> "test-0", + "offset" -> "100", + "fetchedOffset" -> "50")) + + // The error class reader should provide a SQL state from kafka-error-conditions.json + assert(exceptionWithoutCustom.getSqlState != null, + "Should use error class reader SQL state") + + // Test with custom SQL state - should return the custom one + val exceptionWithCustom = new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", + messageParameters = Map( + "topicPartition" -> "test-0", + "offset" -> "100", + "fetchedOffset" -> "50"), + cause = null, + sqlState = Some("CUSTOM")) + + assert(exceptionWithCustom.getSqlState == "CUSTOM", + "Custom SQL state should take precedence over error class reader") + + // Test with None custom SQL state - should fall back to error class reader + val exceptionWithNone = new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", + messageParameters = Map( + "topicPartition" -> "test-0", + "offset" -> "100", + "fetchedOffset" -> "50"), + cause = null, + sqlState = None) + + val fallbackSqlState = exceptionWithNone.getSqlState + assert(fallbackSqlState == exceptionWithoutCustom.getSqlState, + "Should fall back to same error class reader SQL state when custom is None") + } + + test("Custom SQL state takes precedence - KafkaIllegalArgumentException") { + // Test without custom SQL state - should fall back to error class reader + val exceptionWithoutCustom = new KafkaIllegalArgumentException( + errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", + messageParameters = Map( + "tpsForStartOffset" -> "tp1, tp2", + "tpsForEndOffset" -> "tp3, tp4")) + + // The error class reader should provide a SQL state + assert(exceptionWithoutCustom.getSqlState != null, + "Should use error class reader SQL state") + + // Test with custom SQL state - should return the custom one + val exceptionWithCustom = new KafkaIllegalArgumentException( + errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", + messageParameters = Map( + "tpsForStartOffset" -> "tp1, tp2", + "tpsForEndOffset" -> "tp3, tp4"), + cause = null, + sqlState = Some("CUST1")) + + assert(exceptionWithCustom.getSqlState == "CUST1", + "Custom SQL state should take precedence over error class reader") + + // Test with None custom SQL state - should fall back to error class reader + val exceptionWithNone = new KafkaIllegalArgumentException( + errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", + messageParameters = Map( + "tpsForStartOffset" -> "tp1, tp2", + "tpsForEndOffset" -> "tp3, tp4"), + cause = null, + sqlState = None) + + val fallbackSqlState = exceptionWithNone.getSqlState + assert(fallbackSqlState == exceptionWithoutCustom.getSqlState, + "Should fall back to same error class reader SQL state when custom is None") + } + + test("SQL state consistency across different Kafka exception types") { + val customSqlState = "99999" + + val illegalStateException = new KafkaIllegalStateException( + errorClass = "KAFKA_NULL_TOPIC_IN_DATA", + messageParameters = Map.empty, + cause = null, + sqlState = Some(customSqlState)) + + val illegalArgumentException = new KafkaIllegalArgumentException( + errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET", + messageParameters = Map( + "offsetType" -> "offset", + "startOffset" -> "100", + "endOffset" -> "50", + "topic" -> "test", + "partition" -> "0"), + cause = null, + sqlState = Some(customSqlState)) + + assert(illegalStateException.getSqlState == customSqlState) + assert(illegalArgumentException.getSqlState == customSqlState) + assert(illegalStateException.getSqlState == illegalArgumentException.getSqlState, + "Both exception types should return the same custom SQL state") + } +} diff --git a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java index 0e35ebecfd270..240ca24f4224e 100644 --- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java +++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java @@ -31,11 +31,17 @@ public final class SparkOutOfMemoryError extends OutOfMemoryError implements SparkThrowable { String errorClass; Map messageParameters; + String sqlState; public SparkOutOfMemoryError(String errorClass, Map messageParameters) { + this(errorClass, messageParameters, null); + } + + public SparkOutOfMemoryError(String errorClass, Map messageParameters, String sqlState) { super(SparkThrowableHelper.getMessage(errorClass, messageParameters)); this.errorClass = errorClass; this.messageParameters = messageParameters; + this.sqlState = sqlState; } @Override @@ -47,4 +53,9 @@ public Map getMessageParameters() { public String getCondition() { return errorClass; } + + @Override + public String getSqlState() { + return sqlState != null ? sqlState : SparkThrowable.super.getSqlState(); + } } diff --git a/core/src/main/scala/org/apache/spark/SparkFileAlreadyExistsException.scala b/core/src/main/scala/org/apache/spark/SparkFileAlreadyExistsException.scala index 82a0261f32ae7..30e82ecffca05 100644 --- a/core/src/main/scala/org/apache/spark/SparkFileAlreadyExistsException.scala +++ b/core/src/main/scala/org/apache/spark/SparkFileAlreadyExistsException.scala @@ -26,7 +26,8 @@ import org.apache.hadoop.fs.FileAlreadyExistsException */ private[spark] class SparkFileAlreadyExistsException( errorClass: String, - messageParameters: Map[String, String]) + messageParameters: Map[String, String], + sqlState: Option[String] = None) extends FileAlreadyExistsException( SparkThrowableHelper.getMessage(errorClass, messageParameters)) with SparkThrowable { @@ -34,4 +35,6 @@ private[spark] class SparkFileAlreadyExistsException( override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) } diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index d182bd165f1f7..5a98bd06a104b 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -693,4 +693,154 @@ class SparkThrowableSuite extends SparkFunSuite { assert(result == "[TEST_CUSTOM_TEMPLATE] Custom error: " + "something occurred with somewhere SQLSTATE: 42S01") } + + test("Custom SQL state takes precedence over error class reader - SparkException") { + // Test with custom SQL state - should return the custom one + val exceptionWithCustomSqlState = new SparkException( + message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty), + cause = null, + errorClass = Some("CANNOT_PARSE_DECIMAL"), + messageParameters = Map.empty, + context = Array.empty, + sqlState = Some("CUSTOM")) + + assert(exceptionWithCustomSqlState.getSqlState == "CUSTOM", + "Custom SQL state should take precedence") + + // Test without custom SQL state - should fall back to error class reader + val exceptionWithoutCustomSqlState = new SparkException( + message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty), + cause = null, + errorClass = Some("CANNOT_PARSE_DECIMAL"), + messageParameters = Map.empty, + context = Array.empty, + sqlState = None) + + assert(exceptionWithoutCustomSqlState.getSqlState == "22018", + "Should fall back to error class reader SQL state") + } + + test("Custom SQL state takes precedence - SparkArithmeticException") { + // Test with custom SQL state + val exceptionWithCustomSqlState = new SparkArithmeticException( + errorClass = "DIVIDE_BY_ZERO", + messageParameters = Map("config" -> "CONFIG"), + context = Array.empty, + summary = "") + + // First verify the default behavior (fallback to reader) + assert(exceptionWithCustomSqlState.getSqlState == "22012", + "Should use error class reader SQL state by default") + + // Now test with a custom SQL state using the primary constructor + val exceptionWithCustomState = new SparkArithmeticException( + message = getMessage("DIVIDE_BY_ZERO", Map("config" -> "CONFIG")), + errorClass = Some("DIVIDE_BY_ZERO"), + messageParameters = Map("config" -> "CONFIG"), + context = Array.empty, + sqlState = Some("CUSTOM")) + + assert(exceptionWithCustomState.getSqlState == "CUSTOM", + "Custom SQL state should take precedence") + } + + test("Custom SQL state takes precedence - SparkRuntimeException") { + // Test without custom SQL state - should fall back to error class reader + val exceptionWithoutCustom = new SparkRuntimeException( + errorClass = "INTERNAL_ERROR", + messageParameters = Map("message" -> "test")) + + assert(exceptionWithoutCustom.getSqlState.startsWith("XX"), + "Should use error class reader SQL state") + + // Test with custom SQL state using primary constructor + val exceptionWithCustom = new SparkRuntimeException( + message = getMessage("INTERNAL_ERROR", Map("message" -> "test")), + cause = None, + errorClass = Some("INTERNAL_ERROR"), + messageParameters = Map("message" -> "test"), + context = Array.empty, + sqlState = Some("CUSTOM")) + + assert(exceptionWithCustom.getSqlState == "CUSTOM", + "Custom SQL state should take precedence") + } + + test("Custom SQL state takes precedence - SparkIllegalArgumentException") { + // Test without custom SQL state + val exceptionWithoutCustom = new SparkIllegalArgumentException( + errorClass = "UNSUPPORTED_SAVE_MODE.EXISTENT_PATH", + messageParameters = Map("saveMode" -> "TEST")) + + assert(exceptionWithoutCustom.getSqlState == "0A000", + "Should use error class reader SQL state") + + // Test with custom SQL state using primary constructor + val exceptionWithCustom = new SparkIllegalArgumentException( + message = getMessage("UNSUPPORTED_SAVE_MODE.EXISTENT_PATH", + Map("saveMode" -> "TEST")), + cause = None, + errorClass = Some("UNSUPPORTED_SAVE_MODE.EXISTENT_PATH"), + messageParameters = Map("saveMode" -> "TEST"), + context = Array.empty, + sqlState = Some("CUSTOM")) + + assert(exceptionWithCustom.getSqlState == "CUSTOM", + "Custom SQL state should take precedence") + } + + test("Custom SQL state takes precedence - Multiple exception types") { + // SparkSQLException + val sqlException = new SparkSQLException( + errorClass = "CANNOT_PARSE_DECIMAL", + messageParameters = Map.empty, + sqlState = Some("CUST1")) + assert(sqlException.getSqlState == "CUST1") + + // SparkSecurityException + val securityException = new SparkSecurityException( + errorClass = "CANNOT_PARSE_DECIMAL", + messageParameters = Map.empty, + sqlState = Some("CUST2")) + assert(securityException.getSqlState == "CUST2") + + // SparkNumberFormatException + val numberFormatException = new SparkNumberFormatException( + errorClass = "CANNOT_PARSE_DECIMAL", + messageParameters = Map.empty, + context = Array.empty, + summary = "") + assert(numberFormatException.getSqlState == "22018", + "Should use error class reader SQL state when custom not provided") + } + + test("Custom SQL state takes precedence - Java exception (SparkOutOfMemoryError)") { + import org.apache.spark.memory.SparkOutOfMemoryError + + // Test without custom SQL state - should fall back to error class reader + val errorWithoutCustom = new SparkOutOfMemoryError( + "CANNOT_PARSE_DECIMAL", + Map.empty[String, String].asJava) + + assert(errorWithoutCustom.getSqlState == "22018", + "Should use error class reader SQL state when custom not provided") + + // Test with custom SQL state - should return the custom one + val errorWithCustom = new SparkOutOfMemoryError( + "CANNOT_PARSE_DECIMAL", + Map.empty[String, String].asJava, + "CUSTOM") + + assert(errorWithCustom.getSqlState == "CUSTOM", + "Custom SQL state should take precedence over error class reader") + + // Test with null custom SQL state - should fall back to error class reader + val errorWithNull = new SparkOutOfMemoryError( + "CANNOT_PARSE_DECIMAL", + Map.empty[String, String].asJava, + null) + + assert(errorWithNull.getSqlState == "22018", + "Should fall back to error class reader SQL state when custom is null") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/exceptions/SqlScriptingException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/exceptions/SqlScriptingException.scala index 28d8177dbb236..105e1fbe2250c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/exceptions/SqlScriptingException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/exceptions/SqlScriptingException.scala @@ -27,13 +27,17 @@ private[sql] class SqlScriptingException ( errorClass: String, cause: Throwable, val origin: Origin, - messageParameters: Map[String, String] = Map.empty) + messageParameters: Map[String, String] = Map.empty, + sqlState: Option[String] = None) extends Exception( errorMessageWithLineNumber(Option(origin), errorClass, messageParameters), cause) with SparkThrowable { override def getCondition: String = errorClass + + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava } From d3ffebb4243c7d0410e98c13b307fead02ed26dc Mon Sep 17 00:00:00 2001 From: Garland Zhang Date: Wed, 22 Oct 2025 11:52:57 +0000 Subject: [PATCH 2/5] . --- .../apache/spark/SparkThrowableSuite.scala | 77 +++++-------------- 1 file changed, 20 insertions(+), 57 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index 5a98bd06a104b..4f461698eb0d5 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -697,10 +697,10 @@ class SparkThrowableSuite extends SparkFunSuite { test("Custom SQL state takes precedence over error class reader - SparkException") { // Test with custom SQL state - should return the custom one val exceptionWithCustomSqlState = new SparkException( - message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty), + message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty[String, String]), cause = null, errorClass = Some("CANNOT_PARSE_DECIMAL"), - messageParameters = Map.empty, + messageParameters = Map.empty[String, String], context = Array.empty, sqlState = Some("CUSTOM")) @@ -709,10 +709,10 @@ class SparkThrowableSuite extends SparkFunSuite { // Test without custom SQL state - should fall back to error class reader val exceptionWithoutCustomSqlState = new SparkException( - message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty), + message = getMessage("CANNOT_PARSE_DECIMAL", Map.empty[String, String]), cause = null, errorClass = Some("CANNOT_PARSE_DECIMAL"), - messageParameters = Map.empty, + messageParameters = Map.empty[String, String], context = Array.empty, sqlState = None) @@ -720,94 +720,57 @@ class SparkThrowableSuite extends SparkFunSuite { "Should fall back to error class reader SQL state") } - test("Custom SQL state takes precedence - SparkArithmeticException") { - // Test with custom SQL state - val exceptionWithCustomSqlState = new SparkArithmeticException( + test("SparkArithmeticException uses error class reader for SQL state") { + // Test that SparkArithmeticException falls back to error class reader + val exception = new SparkArithmeticException( errorClass = "DIVIDE_BY_ZERO", messageParameters = Map("config" -> "CONFIG"), context = Array.empty, summary = "") - // First verify the default behavior (fallback to reader) - assert(exceptionWithCustomSqlState.getSqlState == "22012", - "Should use error class reader SQL state by default") - - // Now test with a custom SQL state using the primary constructor - val exceptionWithCustomState = new SparkArithmeticException( - message = getMessage("DIVIDE_BY_ZERO", Map("config" -> "CONFIG")), - errorClass = Some("DIVIDE_BY_ZERO"), - messageParameters = Map("config" -> "CONFIG"), - context = Array.empty, - sqlState = Some("CUSTOM")) - - assert(exceptionWithCustomState.getSqlState == "CUSTOM", - "Custom SQL state should take precedence") + assert(exception.getSqlState == "22012", + "Should use error class reader SQL state") } - test("Custom SQL state takes precedence - SparkRuntimeException") { - // Test without custom SQL state - should fall back to error class reader - val exceptionWithoutCustom = new SparkRuntimeException( + test("SparkRuntimeException uses error class reader for SQL state") { + // Test that SparkRuntimeException falls back to error class reader + val exception = new SparkRuntimeException( errorClass = "INTERNAL_ERROR", messageParameters = Map("message" -> "test")) - assert(exceptionWithoutCustom.getSqlState.startsWith("XX"), + assert(exception.getSqlState.startsWith("XX"), "Should use error class reader SQL state") - - // Test with custom SQL state using primary constructor - val exceptionWithCustom = new SparkRuntimeException( - message = getMessage("INTERNAL_ERROR", Map("message" -> "test")), - cause = None, - errorClass = Some("INTERNAL_ERROR"), - messageParameters = Map("message" -> "test"), - context = Array.empty, - sqlState = Some("CUSTOM")) - - assert(exceptionWithCustom.getSqlState == "CUSTOM", - "Custom SQL state should take precedence") } - test("Custom SQL state takes precedence - SparkIllegalArgumentException") { - // Test without custom SQL state - val exceptionWithoutCustom = new SparkIllegalArgumentException( + test("SparkIllegalArgumentException uses error class reader for SQL state") { + // Test that SparkIllegalArgumentException falls back to error class reader + val exception = new SparkIllegalArgumentException( errorClass = "UNSUPPORTED_SAVE_MODE.EXISTENT_PATH", messageParameters = Map("saveMode" -> "TEST")) - assert(exceptionWithoutCustom.getSqlState == "0A000", + assert(exception.getSqlState == "0A000", "Should use error class reader SQL state") - - // Test with custom SQL state using primary constructor - val exceptionWithCustom = new SparkIllegalArgumentException( - message = getMessage("UNSUPPORTED_SAVE_MODE.EXISTENT_PATH", - Map("saveMode" -> "TEST")), - cause = None, - errorClass = Some("UNSUPPORTED_SAVE_MODE.EXISTENT_PATH"), - messageParameters = Map("saveMode" -> "TEST"), - context = Array.empty, - sqlState = Some("CUSTOM")) - - assert(exceptionWithCustom.getSqlState == "CUSTOM", - "Custom SQL state should take precedence") } test("Custom SQL state takes precedence - Multiple exception types") { // SparkSQLException val sqlException = new SparkSQLException( errorClass = "CANNOT_PARSE_DECIMAL", - messageParameters = Map.empty, + messageParameters = Map.empty[String, String], sqlState = Some("CUST1")) assert(sqlException.getSqlState == "CUST1") // SparkSecurityException val securityException = new SparkSecurityException( errorClass = "CANNOT_PARSE_DECIMAL", - messageParameters = Map.empty, + messageParameters = Map.empty[String, String], sqlState = Some("CUST2")) assert(securityException.getSqlState == "CUST2") // SparkNumberFormatException val numberFormatException = new SparkNumberFormatException( errorClass = "CANNOT_PARSE_DECIMAL", - messageParameters = Map.empty, + messageParameters = Map.empty[String, String], context = Array.empty, summary = "") assert(numberFormatException.getSqlState == "22018", From 700b71e87d9f05d95fefeaf7016a02c530c26d22 Mon Sep 17 00:00:00 2001 From: Garland Zhang Date: Wed, 22 Oct 2025 12:07:28 +0000 Subject: [PATCH 3/5] . --- .../sql/kafka010/KafkaExceptionsSuite.scala | 133 ------------------ 1 file changed, 133 deletions(-) delete mode 100644 connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala deleted file mode 100644 index 6330af5073f84..0000000000000 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaExceptionsSuite.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.SparkFunSuite - -/** - * Test suite for Kafka exceptions, particularly SQL state prioritization. - */ -class KafkaExceptionsSuite extends SparkFunSuite { - - test("Custom SQL state takes precedence - KafkaIllegalStateException") { - // Test without custom SQL state - should fall back to error class reader - val exceptionWithoutCustom = new KafkaIllegalStateException( - errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", - messageParameters = Map( - "topicPartition" -> "test-0", - "offset" -> "100", - "fetchedOffset" -> "50")) - - // The error class reader should provide a SQL state from kafka-error-conditions.json - assert(exceptionWithoutCustom.getSqlState != null, - "Should use error class reader SQL state") - - // Test with custom SQL state - should return the custom one - val exceptionWithCustom = new KafkaIllegalStateException( - errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", - messageParameters = Map( - "topicPartition" -> "test-0", - "offset" -> "100", - "fetchedOffset" -> "50"), - cause = null, - sqlState = Some("CUSTOM")) - - assert(exceptionWithCustom.getSqlState == "CUSTOM", - "Custom SQL state should take precedence over error class reader") - - // Test with None custom SQL state - should fall back to error class reader - val exceptionWithNone = new KafkaIllegalStateException( - errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", - messageParameters = Map( - "topicPartition" -> "test-0", - "offset" -> "100", - "fetchedOffset" -> "50"), - cause = null, - sqlState = None) - - val fallbackSqlState = exceptionWithNone.getSqlState - assert(fallbackSqlState == exceptionWithoutCustom.getSqlState, - "Should fall back to same error class reader SQL state when custom is None") - } - - test("Custom SQL state takes precedence - KafkaIllegalArgumentException") { - // Test without custom SQL state - should fall back to error class reader - val exceptionWithoutCustom = new KafkaIllegalArgumentException( - errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", - messageParameters = Map( - "tpsForStartOffset" -> "tp1, tp2", - "tpsForEndOffset" -> "tp3, tp4")) - - // The error class reader should provide a SQL state - assert(exceptionWithoutCustom.getSqlState != null, - "Should use error class reader SQL state") - - // Test with custom SQL state - should return the custom one - val exceptionWithCustom = new KafkaIllegalArgumentException( - errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", - messageParameters = Map( - "tpsForStartOffset" -> "tp1, tp2", - "tpsForEndOffset" -> "tp3, tp4"), - cause = null, - sqlState = Some("CUST1")) - - assert(exceptionWithCustom.getSqlState == "CUST1", - "Custom SQL state should take precedence over error class reader") - - // Test with None custom SQL state - should fall back to error class reader - val exceptionWithNone = new KafkaIllegalArgumentException( - errorClass = "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_START_OFFSET_AND_END_OFFSET", - messageParameters = Map( - "tpsForStartOffset" -> "tp1, tp2", - "tpsForEndOffset" -> "tp3, tp4"), - cause = null, - sqlState = None) - - val fallbackSqlState = exceptionWithNone.getSqlState - assert(fallbackSqlState == exceptionWithoutCustom.getSqlState, - "Should fall back to same error class reader SQL state when custom is None") - } - - test("SQL state consistency across different Kafka exception types") { - val customSqlState = "99999" - - val illegalStateException = new KafkaIllegalStateException( - errorClass = "KAFKA_NULL_TOPIC_IN_DATA", - messageParameters = Map.empty, - cause = null, - sqlState = Some(customSqlState)) - - val illegalArgumentException = new KafkaIllegalArgumentException( - errorClass = "UNRESOLVED_START_OFFSET_GREATER_THAN_END_OFFSET", - messageParameters = Map( - "offsetType" -> "offset", - "startOffset" -> "100", - "endOffset" -> "50", - "topic" -> "test", - "partition" -> "0"), - cause = null, - sqlState = Some(customSqlState)) - - assert(illegalStateException.getSqlState == customSqlState) - assert(illegalArgumentException.getSqlState == customSqlState) - assert(illegalStateException.getSqlState == illegalArgumentException.getSqlState, - "Both exception types should return the same custom SQL state") - } -} From 66a5a196bc598dad92c38b4f6ce68c3f53b51289 Mon Sep 17 00:00:00 2001 From: Garland Zhang Date: Wed, 22 Oct 2025 13:04:15 +0000 Subject: [PATCH 4/5] deprecate note --- common/utils/src/main/java/org/apache/spark/SparkThrowable.java | 1 + 1 file changed, 1 insertion(+) diff --git a/common/utils/src/main/java/org/apache/spark/SparkThrowable.java b/common/utils/src/main/java/org/apache/spark/SparkThrowable.java index 26d66ae3433ad..fc9197bf9bd7c 100644 --- a/common/utils/src/main/java/org/apache/spark/SparkThrowable.java +++ b/common/utils/src/main/java/org/apache/spark/SparkThrowable.java @@ -51,6 +51,7 @@ public interface SparkThrowable { // Portable error identifier across SQL engines // If null, error class or SQLSTATE is not set + // Deprecated: Override this method to provide explicit SQL state instead of relying on error class reader default String getSqlState() { return SparkThrowableHelper.getSqlState(this.getCondition()); } From 5b6908f57ce99e63199deb1806652f3b6d71826e Mon Sep 17 00:00:00 2001 From: Garland Zhang Date: Wed, 22 Oct 2025 13:05:48 +0000 Subject: [PATCH 5/5] . --- .../apache/spark/sql/connect/common/InvalidPlanInput.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala index 7f16d09b9c420..84ef625fcf453 100644 --- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala +++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/InvalidPlanInput.scala @@ -26,7 +26,8 @@ import org.apache.spark.{SparkThrowable, SparkThrowableHelper} final case class InvalidPlanInput( private val errorCondition: String, private val messageParameters: Map[String, String], - private val causeOpt: Option[Throwable]) + private val causeOpt: Option[Throwable], + private val sqlState: Option[String] = None) extends Exception( SparkThrowableHelper.getMessage(errorCondition, messageParameters), causeOpt.orNull) @@ -34,6 +35,8 @@ final case class InvalidPlanInput( override def getCondition: String = errorCondition + override def getSqlState: String = sqlState.getOrElse(super.getSqlState) + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava }