From 8102a1ed90b3840488d037affc5dd1c9001b1295 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 25 May 2022 21:02:30 +0800 Subject: [PATCH] [SPARK-38700][SQL][3.3] Use error classes in the execution errors of save mode --- .../main/resources/error/error-classes.json | 11 ++++ .../scala/org/apache/spark/ErrorInfo.scala | 33 +++++++++- .../sql/errors/QueryExecutionErrors.scala | 9 ++- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../sql/errors/QueryErrorsSuiteBase.scala | 65 +++++++++++++++++++ .../errors/QueryExecutionErrorsSuite.scala | 34 ++++++++-- 6 files changed, 145 insertions(+), 9 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/errors/QueryErrorsSuiteBase.scala diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 855d3c5cd6e0e..409b633dce387 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -187,6 +187,17 @@ "UNSUPPORTED_OPERATION" : { "message" : [ "The operation is not supported: %s" ] }, + "UNSUPPORTED_SAVE_MODE" : { + "message" : [ "The save mode is not supported for: " ], + "subClass" : { + "EXISTENT_PATH" : { + "message" : [ "an existent path." ] + }, + "NON_EXISTENT_PATH" : { + "message" : [ "a not existent path." ] + } + } + }, "WRITING_JOB_ABORTED" : { "message" : [ "Writing job aborted" ], "sqlState" : "40000" diff --git a/core/src/main/scala/org/apache/spark/ErrorInfo.scala b/core/src/main/scala/org/apache/spark/ErrorInfo.scala index 0917085c01b23..0447572bb1c29 100644 --- a/core/src/main/scala/org/apache/spark/ErrorInfo.scala +++ b/core/src/main/scala/org/apache/spark/ErrorInfo.scala @@ -28,14 +28,30 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.util.Utils +/** + * Information associated with an error subclass. + * + * @param subClass SubClass associated with this class. + * @param message C-style message format compatible with printf. + * The error message is constructed by concatenating the lines with newlines. + */ +private[spark] case class ErrorSubInfo(message: Seq[String]) { + // For compatibility with multi-line error messages + @JsonIgnore + val messageFormat: String = message.mkString("\n") +} + /** * Information associated with an error class. * * @param sqlState SQLSTATE associated with this class. + * @param subClass A sequence of subclasses * @param message C-style message format compatible with printf. * The error message is constructed by concatenating the lines with newlines. */ -private[spark] case class ErrorInfo(message: Seq[String], sqlState: Option[String]) { +private[spark] case class ErrorInfo(message: Seq[String], + subClass: Option[Map[String, ErrorSubInfo]], + sqlState: Option[String]) { // For compatibility with multi-line error messages @JsonIgnore val messageFormat: String = message.mkString("\n") @@ -58,7 +74,20 @@ private[spark] object SparkThrowableHelper { def getMessage(errorClass: String, messageParameters: Array[String]): String = { val errorInfo = errorClassToInfoMap.getOrElse(errorClass, throw new IllegalArgumentException(s"Cannot find error class '$errorClass'")) - String.format(errorInfo.messageFormat, messageParameters: _*) + if (errorInfo.subClass.isDefined) { + val subClass = errorInfo.subClass.get + val subErrorClass = messageParameters.head + val errorSubInfo = subClass.getOrElse(subErrorClass, + throw new IllegalArgumentException(s"Cannot find sub error class '$subErrorClass'")) + val subMessageParameters = messageParameters.tail + "[" + errorClass + "." + subErrorClass + "] " + String.format((errorInfo.messageFormat + + errorSubInfo.messageFormat).replaceAll("<[a-zA-Z0-9_-]+>", "%s"), + subMessageParameters: _*) + } else { + "[" + errorClass + "] " + String.format( + errorInfo.messageFormat.replaceAll("<[a-zA-Z0-9_-]+>", "%s"), + messageParameters: _*) + } } def getSqlState(errorClass: String): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 3a89147c4b53b..ac66df772f27f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -605,8 +605,13 @@ object QueryExecutionErrors extends QueryErrorsBase { """.stripMargin) } - def unsupportedSaveModeError(saveMode: String, pathExists: Boolean): Throwable = { - new IllegalStateException(s"unsupported save mode $saveMode ($pathExists)") + def saveModeUnsupportedError(saveMode: Any, pathExists: Boolean): Throwable = { + pathExists match { + case true => new SparkIllegalArgumentException(errorClass = "UNSUPPORTED_SAVE_MODE", + messageParameters = Array("EXISTENT_PATH", toSQLValue(saveMode, StringType))) + case _ => new SparkIllegalArgumentException(errorClass = "UNSUPPORTED_SAVE_MODE", + messageParameters = Array("NON_EXISTENT_PATH", toSQLValue(saveMode, StringType))) + } } def cannotClearOutputDirectoryError(staticPrefixPath: Path): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 74be483cd7c37..d773d4bd271b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -133,7 +133,7 @@ case class InsertIntoHadoopFsRelationCommand( case (SaveMode.Ignore, exists) => !exists case (s, exists) => - throw QueryExecutionErrors.unsupportedSaveModeError(s.toString, exists) + throw QueryExecutionErrors.saveModeUnsupportedError(s, exists) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryErrorsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryErrorsSuiteBase.scala new file mode 100644 index 0000000000000..eb7871d5559e5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryErrorsSuiteBase.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.errors + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +trait QueryErrorsSuiteBase extends SharedSparkSession { + def checkErrorClass( + exception: Exception with SparkThrowable, + errorClass: String, + errorSubClass: Option[String] = None, + msg: String, + sqlState: Option[String] = None, + matchMsg: Boolean = false): Unit = { + assert(exception.getErrorClass === errorClass) + sqlState.foreach(state => exception.getSqlState === state) + val fullErrorClass = if (errorSubClass.isDefined) { + errorClass + "." + errorSubClass.get + } else { + errorClass + } + if (matchMsg) { + assert(exception.getMessage.matches(s"""\\[$fullErrorClass\\] """ + msg)) + } else { + assert(exception.getMessage === s"""[$fullErrorClass] """ + msg) + } + } + + def validateParsingError( + sqlText: String, + errorClass: String, + errorSubClass: Option[String] = None, + sqlState: String, + message: String): Unit = { + val e = intercept[ParseException] { + sql(sqlText) + } + + val fullErrorClass = if (errorSubClass.isDefined) { + errorClass + "." + errorSubClass.get + } else { + errorClass + } + assert(e.getErrorClass === errorClass) + assert(e.getSqlState === sqlState) + assert(e.getMessage === s"""\n[$fullErrorClass] """ + message) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index a7625e17b4ae6..e9de46957a366 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -17,19 +17,19 @@ package org.apache.spark.sql.errors -import org.apache.spark.{SparkArithmeticException, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} -import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} +import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} import org.apache.spark.sql.execution.datasources.orc.OrcTest import org.apache.spark.sql.execution.datasources.parquet.ParquetTest import org.apache.spark.sql.functions.{lit, lower, struct, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.EXCEPTION -import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{StructType, TimestampType} import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.util.Utils class QueryExecutionErrorsSuite extends QueryTest - with ParquetTest with OrcTest with SharedSparkSession { + with ParquetTest with OrcTest with QueryErrorsSuiteBase { import testImplicits._ @@ -278,4 +278,30 @@ class QueryExecutionErrorsSuite extends QueryTest assert(e.getMessage === "Datetime operation overflow: add 1000000 YEAR to TIMESTAMP '2022-03-09 01:02:03'.") } + + test("UNSUPPORTED_SAVE_MODE: unsupported null saveMode whether the path exists or not") { + withTempPath { path => + val e1 = intercept[SparkIllegalArgumentException] { + val saveMode: SaveMode = null + Seq(1, 2).toDS().write.mode(saveMode).parquet(path.getAbsolutePath) + } + checkErrorClass( + exception = e1, + errorClass = "UNSUPPORTED_SAVE_MODE", + errorSubClass = Some("NON_EXISTENT_PATH"), + msg = "The save mode NULL is not supported for: a not existent path.") + + Utils.createDirectory(path) + + val e2 = intercept[SparkIllegalArgumentException] { + val saveMode: SaveMode = null + Seq(1, 2).toDS().write.mode(saveMode).parquet(path.getAbsolutePath) + } + checkErrorClass( + exception = e2, + errorClass = "UNSUPPORTED_SAVE_MODE", + errorSubClass = Some("EXISTENT_PATH"), + msg = "The save mode NULL is not supported for: an existent path.") + } + } }