diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 7946068b9452e..3861ba817330b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -1259,7 +1259,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } def classifyException[T]( - errorClass: String, + condition: String, messageParameters: Map[String, String], dialect: JdbcDialect, description: String, @@ -1269,7 +1269,7 @@ object JdbcUtils extends Logging with SQLConfHelper { } catch { case e: SparkThrowable with Throwable => throw e case e: Throwable => - throw dialect.classifyException(e, errorClass, messageParameters, description, isRuntime) + throw dialect.classifyException(e, condition, messageParameters, description, isRuntime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala index 20283cc124596..cf9aa4f8682bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala @@ -64,7 +64,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt properties: util.Map[String, String]): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.CREATE_INDEX", + condition = "FAILED_JDBC.CREATE_INDEX", messageParameters = Map( "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), @@ -87,7 +87,7 @@ case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOpt override def dropIndex(indexName: String): Unit = { JdbcUtils.withConnection(jdbcOptions) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.DROP_INDEX", + condition = "FAILED_JDBC.DROP_INDEX", messageParameters = Map( "url" -> jdbcOptions.getRedactUrl(), "indexName" -> toSQLId(indexName), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 99e9abe965183..715112e352963 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -68,7 +68,7 @@ class JDBCTableCatalog extends TableCatalog JdbcUtils.withConnection(options) { conn => val schemaPattern = if (namespace.length == 1) namespace.head else null val rs = JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.GET_TABLES", + condition = "FAILED_JDBC.GET_TABLES", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), @@ -89,7 +89,7 @@ class JDBCTableCatalog extends TableCatalog val writeOptions = new JdbcOptionsInWrite( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.TABLE_EXISTS", + condition = "FAILED_JDBC.TABLE_EXISTS", messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), @@ -116,7 +116,7 @@ class JDBCTableCatalog extends TableCatalog checkNamespace(oldIdent.namespace()) JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.RENAME_TABLE", + condition = "FAILED_JDBC.RENAME_TABLE", messageParameters = Map( "url" -> options.getRedactUrl(), "oldName" -> toSQLId(oldIdent), @@ -134,7 +134,7 @@ class JDBCTableCatalog extends TableCatalog val optionsWithTableName = new JDBCOptions( options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident))) JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.LOAD_TABLE", + condition = "FAILED_JDBC.LOAD_TABLE", messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), @@ -191,7 +191,7 @@ class JDBCTableCatalog extends TableCatalog val schema = CatalogV2Util.v2ColumnsToStructType(columns) JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.CREATE_TABLE", + condition = "FAILED_JDBC.CREATE_TABLE", messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), @@ -209,7 +209,7 @@ class JDBCTableCatalog extends TableCatalog checkNamespace(ident.namespace()) JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.ALTER_TABLE", + condition = "FAILED_JDBC.ALTER_TABLE", messageParameters = Map( "url" -> options.getRedactUrl(), "tableName" -> toSQLId(ident)), @@ -226,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog case Array(db) => JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.NAMESPACE_EXISTS", + condition = "FAILED_JDBC.NAMESPACE_EXISTS", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(namespace.toSeq)), @@ -242,7 +242,7 @@ class JDBCTableCatalog extends TableCatalog override def listNamespaces(): Array[Array[String]] = { JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.LIST_NAMESPACES", + condition = "FAILED_JDBC.LIST_NAMESPACES", messageParameters = Map("url" -> options.getRedactUrl()), dialect, description = s"Failed list namespaces", @@ -295,7 +295,7 @@ class JDBCTableCatalog extends TableCatalog } JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.CREATE_NAMESPACE", + condition = "FAILED_JDBC.CREATE_NAMESPACE", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), @@ -321,7 +321,7 @@ class JDBCTableCatalog extends TableCatalog if (set.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", + condition = "FAILED_JDBC.CREATE_NAMESPACE_COMMENT", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), @@ -339,7 +339,7 @@ class JDBCTableCatalog extends TableCatalog if (unset.property() == SupportsNamespaces.PROP_COMMENT) { JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", + condition = "FAILED_JDBC.REMOVE_NAMESPACE_COMMENT", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), @@ -368,7 +368,7 @@ class JDBCTableCatalog extends TableCatalog case Array(db) if namespaceExists(namespace) => JdbcUtils.withConnection(options) { conn => JdbcUtils.classifyException( - errorClass = "FAILED_JDBC.DROP_NAMESPACE", + condition = "FAILED_JDBC.DROP_NAMESPACE", messageParameters = Map( "url" -> options.getRedactUrl(), "namespace" -> toSQLId(db)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 2f54f1f62fde1..3f91592eb27a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -154,7 +154,7 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe } override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -167,13 +167,13 @@ private case class DB2Dialect() extends JdbcDialect with SQLConfHelper with NoLe namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case "42710" if errorClass == "FAILED_JDBC.RENAME_TABLE" => + case "42710" if condition == "FAILED_JDBC.RENAME_TABLE" => val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) case _ => - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } - case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime) + case _ => super.classifyException(e, condition, messageParameters, description, isRuntime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 798ecb5b36ff2..0d9dc88451cca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -197,7 +197,7 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -230,13 +230,13 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { throw new NoSuchNamespaceException(errorClass = "SCHEMA_NOT_FOUND", messageParameters = Map("schemaName" -> quotedName)) // INDEX_ALREADY_EXISTS_1 - case 42111 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + case 42111 if condition == "FAILED_JDBC.CREATE_INDEX" => val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException( indexName = indexName, tableName = tableName, cause = Some(e)) // INDEX_NOT_FOUND_1 - case 42112 if errorClass == "FAILED_JDBC.DROP_INDEX" => + case 42112 if condition == "FAILED_JDBC.DROP_INDEX" => val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) @@ -244,7 +244,7 @@ private[sql] case class H2Dialect() extends JdbcDialect with NoLegacyJDBCError { } case _ => // do nothing } - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } override def compileExpression(expr: Expression): Option[String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 3bf1390cb664d..d641ca1d32b7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -738,7 +738,7 @@ abstract class JdbcDialect extends Serializable with Logging { /** * Gets a dialect exception, classifies it and wraps it by `AnalysisException`. * @param e The dialect specific exception. - * @param errorClass The error class assigned in the case of an unclassified `e` + * @param condition The error condition assigned in the case of an unclassified `e` * @param messageParameters The message parameters of `errorClass` * @param description The error description * @param isRuntime Whether the exception is a runtime exception or not. @@ -746,7 +746,7 @@ abstract class JdbcDialect extends Serializable with Logging { */ def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -759,7 +759,7 @@ abstract class JdbcDialect extends Serializable with Logging { * @param e The dialect specific exception. * @return `AnalysisException` or its sub-class. */ - @deprecated("Please override the classifyException method with an error class", "4.0.0") + @deprecated("Please override the classifyException method with an error condition", "4.0.0") def classifyException(message: String, e: Throwable): AnalysisException = { new AnalysisException( errorClass = "FAILED_JDBC.UNCLASSIFIED", @@ -850,18 +850,18 @@ trait NoLegacyJDBCError extends JdbcDialect { override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { if (isRuntime) { new SparkRuntimeException( - errorClass = errorClass, + errorClass = condition, messageParameters = messageParameters, cause = e) } else { new AnalysisException( - errorClass = errorClass, + errorClass = condition, messageParameters = messageParameters, cause = Some(e)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 7d476d43e5c7a..fd0dfcb5b0667 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -205,7 +205,7 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -217,13 +217,13 @@ private case class MsSqlServerDialect() extends JdbcDialect with NoLegacyJDBCErr namespace = messageParameters.get("namespace").toArray, details = sqlException.getMessage, cause = Some(e)) - case 15335 if errorClass == "FAILED_JDBC.RENAME_TABLE" => + case 15335 if condition == "FAILED_JDBC.RENAME_TABLE" => val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) case _ => - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } - case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime) + case _ => super.classifyException(e, condition, messageParameters, description, isRuntime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index dd0118d875998..5cf73b4e82306 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -350,7 +350,7 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -358,22 +358,22 @@ private case class MySQLDialect() extends JdbcDialect with SQLConfHelper with No case sqlException: SQLException => sqlException.getErrorCode match { // ER_DUP_KEYNAME - case 1050 if errorClass == "FAILED_JDBC.RENAME_TABLE" => + case 1050 if condition == "FAILED_JDBC.RENAME_TABLE" => val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) - case 1061 if errorClass == "FAILED_JDBC.CREATE_INDEX" => + case 1061 if condition == "FAILED_JDBC.CREATE_INDEX" => val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new IndexAlreadyExistsException(indexName, tableName, cause = Some(e)) - case 1091 if errorClass == "FAILED_JDBC.DROP_INDEX" => + case 1091 if condition == "FAILED_JDBC.DROP_INDEX" => val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) case _ => - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime) + case _ => super.classifyException(e, condition, messageParameters, description, isRuntime) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala index a73a34c646356..9e1f73e7fcb67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/OracleDialect.scala @@ -233,20 +233,20 @@ private case class OracleDialect() extends JdbcDialect with SQLConfHelper with N override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 955 if errorClass == "FAILED_JDBC.RENAME_TABLE" => + case 955 if condition == "FAILED_JDBC.RENAME_TABLE" => val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) case _ => - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } - case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime) + case _ => super.classifyException(e, condition, messageParameters, description, isRuntime) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 8341063e09890..8e6c9c532f4c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -259,7 +259,7 @@ private case class PostgresDialect() override def classifyException( e: Throwable, - errorClass: String, + condition: String, messageParameters: Map[String, String], description: String, isRuntime: Boolean): Throwable with SparkThrowable = { @@ -268,12 +268,12 @@ private case class PostgresDialect() sqlException.getSQLState match { // https://www.postgresql.org/docs/14/errcodes-appendix.html case "42P07" => - if (errorClass == "FAILED_JDBC.CREATE_INDEX") { + if (condition == "FAILED_JDBC.CREATE_INDEX") { throw new IndexAlreadyExistsException( indexName = messageParameters("indexName"), tableName = messageParameters("tableName"), cause = Some(e)) - } else if (errorClass == "FAILED_JDBC.RENAME_TABLE") { + } else if (condition == "FAILED_JDBC.RENAME_TABLE") { val newTable = messageParameters("newName") throw QueryCompilationErrors.tableAlreadyExistsError(newTable) } else { @@ -281,10 +281,10 @@ private case class PostgresDialect() if (tblRegexp.nonEmpty) { throw QueryCompilationErrors.tableAlreadyExistsError(tblRegexp.get.group(1)) } else { - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } } - case "42704" if errorClass == "FAILED_JDBC.DROP_INDEX" => + case "42704" if condition == "FAILED_JDBC.DROP_INDEX" => val indexName = messageParameters("indexName") val tableName = messageParameters("tableName") throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) @@ -294,10 +294,10 @@ private case class PostgresDialect() details = sqlException.getMessage, cause = Some(e)) case _ => - super.classifyException(e, errorClass, messageParameters, description, isRuntime) + super.classifyException(e, condition, messageParameters, description, isRuntime) } case unsupported: UnsupportedOperationException => throw unsupported - case _ => super.classifyException(e, errorClass, messageParameters, description, isRuntime) + case _ => super.classifyException(e, condition, messageParameters, description, isRuntime) } }