diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 1617c88e9fe31..0e5c2e58ec1e9 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -4305,6 +4305,140 @@ "Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting to -1 or increase the spark driver memory by setting to a higher value" ] }, + "_LEGACY_ERROR_TEMP_2251" : { + "message" : [ + " does not support the execute() code path." + ] + }, + "_LEGACY_ERROR_TEMP_2252" : { + "message" : [ + "Cannot merge with " + ] + }, + "_LEGACY_ERROR_TEMP_2253" : { + "message" : [ + "Data source does not support continuous processing." + ] + }, + "_LEGACY_ERROR_TEMP_2254" : { + "message" : [ + "Data read failed" + ] + }, + "_LEGACY_ERROR_TEMP_2255" : { + "message" : [ + "Epoch marker generation failed" + ] + }, + "_LEGACY_ERROR_TEMP_2256" : { + "message" : [ + "Foreach writer has been aborted due to a task failure" + ] + }, + "_LEGACY_ERROR_TEMP_2258" : { + "message" : [ + "Error reading delta file of : key size cannot be " + ] + }, + "_LEGACY_ERROR_TEMP_2259" : { + "message" : [ + "Error reading snapshot file of : " + ] + }, + "_LEGACY_ERROR_TEMP_2260" : { + "message" : [ + "Cannot purge as it might break internal state." + ] + }, + "_LEGACY_ERROR_TEMP_2261" : { + "message" : [ + "Clean up source files is not supported when reading from the output directory of FileStreamSink." + ] + }, + "_LEGACY_ERROR_TEMP_2262" : { + "message" : [ + "latestOffset(Offset, ReadLimit) should be called instead of this method" + ] + }, + "_LEGACY_ERROR_TEMP_2263" : { + "message" : [ + "Error: we detected a possible problem with the location of your checkpoint and you", + "likely need to move it before restarting this query.", + "", + "Earlier version of Spark incorrectly escaped paths when writing out checkpoints for", + "structured streaming. While this was corrected in Spark 3.0, it appears that your", + "query was started using an earlier version that incorrectly handled the checkpoint", + "path.", + "", + "Correct Checkpoint Directory: ", + "Incorrect Checkpoint Directory: ", + "", + "Please move the data from the incorrect directory to the correct one, delete the", + "incorrect directory, and then restart this query. If you believe you are receiving", + "this message in error, you can disable it with the SQL conf", + "." + ] + }, + "_LEGACY_ERROR_TEMP_2264" : { + "message" : [ + "Subprocess exited with status . Error: " + ] + }, + "_LEGACY_ERROR_TEMP_2265" : { + "message" : [ + " without serde does not support
as output data type" + ] + }, + "_LEGACY_ERROR_TEMP_2266" : { + "message" : [ + "Invalid `startIndex` provided for generating iterator over the array. Total elements: , requested `startIndex`: " + ] + }, + "_LEGACY_ERROR_TEMP_2267" : { + "message" : [ + "The backing has been modified since the creation of this Iterator" + ] + }, + "_LEGACY_ERROR_TEMP_2268" : { + "message" : [ + " does not implement doExecuteBroadcast" + ] + }, + "_LEGACY_ERROR_TEMP_2269" : { + "message" : [ + " is a system preserved database, please rename your existing database to resolve the name conflict, or set a different value for , and launch your Spark application again." + ] + }, + "_LEGACY_ERROR_TEMP_2270" : { + "message" : [ + "comment on table is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_2271" : { + "message" : [ + "UpdateColumnNullability is not supported" + ] + }, + "_LEGACY_ERROR_TEMP_2272" : { + "message" : [ + "Rename column is only supported for MySQL version 8.0 and above." + ] + }, + "_LEGACY_ERROR_TEMP_2273" : { + "message" : [ + "" + ] + }, + "_LEGACY_ERROR_TEMP_2274" : { + "message" : [ + "Nested field is not supported." + ] + }, + "_LEGACY_ERROR_TEMP_2275" : { + "message" : [ + "Dataset transformations and actions can only be invoked by the driver, not inside of other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the dataset1.map transformation. For more information, see SPARK-28702." + ] + }, "_LEGACY_ERROR_TEMP_2276" : { "message" : [ "Hive table with ANSI intervals is not supported" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index c49833322202c..8c6c6c86e361e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.errors import java.io.{FileNotFoundException, IOException} import java.lang.reflect.InvocationTargetException import java.net.{URISyntaxException, URL} -import java.sql.{SQLFeatureNotSupportedException} import java.time.{DateTimeException, LocalDate} import java.time.temporal.ChronoField import java.util.ConcurrentModificationException @@ -48,7 +47,6 @@ import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, DateT import org.apache.spark.sql.connector.catalog.{CatalogNotFoundException, Identifier, Table, TableProvider} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.expressions.Transform -import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE import org.apache.spark.sql.streaming.OutputMode @@ -2359,30 +2357,49 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { cause = oe).initCause(oe.getCause) } - def executeCodePathUnsupportedError(execName: String): Throwable = { - new UnsupportedOperationException(s"$execName does not support the execute() code path.") + def executeCodePathUnsupportedError(execName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2251", + messageParameters = Map( + "execName" -> execName)) } - def cannotMergeClassWithOtherClassError(className: String, otherClass: String): Throwable = { - new UnsupportedOperationException( - s"Cannot merge $className with $otherClass") + def cannotMergeClassWithOtherClassError( + className: String, otherClass: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2252", + messageParameters = Map( + "className" -> className, + "otherClass" -> otherClass)) } - def continuousProcessingUnsupportedByDataSourceError(sourceName: String): Throwable = { - new UnsupportedOperationException( - s"Data source $sourceName does not support continuous processing.") + def continuousProcessingUnsupportedByDataSourceError( + sourceName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2253", + messageParameters = Map( + "sourceName" -> sourceName)) } def failedToReadDataError(failureReason: Throwable): Throwable = { - new SparkException("Data read failed", failureReason) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2254", + messageParameters = Map.empty, + cause = failureReason) } def failedToGenerateEpochMarkerError(failureReason: Throwable): Throwable = { - new SparkException("Epoch marker generation failed", failureReason) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2255", + messageParameters = Map.empty, + cause = failureReason) } def foreachWriterAbortedDueToTaskFailureError(): Throwable = { - new SparkException("Foreach writer has been aborted due to a task failure") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2256", + messageParameters = Map.empty, + cause = null) } def incorrectRampUpRate(rowsPerSecond: Long, @@ -2410,76 +2427,99 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { } def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = { - new IOException( - s"Error reading delta file $fileToRead of $clazz: key size cannot be $keySize") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2258", + messageParameters = Map( + "fileToRead" -> fileToRead.toString(), + "clazz" -> clazz, + "keySize" -> keySize.toString()), + cause = null) } def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = { - new IOException(s"Error reading snapshot file $fileToRead of $clazz: $message") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2259", + messageParameters = Map( + "fileToRead" -> fileToRead.toString(), + "clazz" -> clazz, + "message" -> message), + cause = null) } - def cannotPurgeAsBreakInternalStateError(): Throwable = { - new UnsupportedOperationException("Cannot purge as it might break internal state.") + def cannotPurgeAsBreakInternalStateError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2260", + messageParameters = Map.empty) } - def cleanUpSourceFilesUnsupportedError(): Throwable = { - new UnsupportedOperationException("Clean up source files is not supported when" + - " reading from the output directory of FileStreamSink.") + def cleanUpSourceFilesUnsupportedError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2261", + messageParameters = Map.empty) } - def latestOffsetNotCalledError(): Throwable = { - new UnsupportedOperationException( - "latestOffset(Offset, ReadLimit) should be called instead of this method") + def latestOffsetNotCalledError(): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2262", + messageParameters = Map.empty) } def legacyCheckpointDirectoryExistsError( checkpointPath: Path, legacyCheckpointDir: String): Throwable = { new SparkException( - s""" - |Error: we detected a possible problem with the location of your checkpoint and you - |likely need to move it before restarting this query. - | - |Earlier version of Spark incorrectly escaped paths when writing out checkpoints for - |structured streaming. While this was corrected in Spark 3.0, it appears that your - |query was started using an earlier version that incorrectly handled the checkpoint - |path. - | - |Correct Checkpoint Directory: $checkpointPath - |Incorrect Checkpoint Directory: $legacyCheckpointDir - | - |Please move the data from the incorrect directory to the correct one, delete the - |incorrect directory, and then restart this query. If you believe you are receiving - |this message in error, you can disable it with the SQL conf - |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}. - """.stripMargin) + errorClass = "_LEGACY_ERROR_TEMP_2263", + messageParameters = Map( + "checkpointPath" -> checkpointPath.toString(), + "legacyCheckpointDir" -> legacyCheckpointDir, + "StreamingCheckpointEscapedPathCheckEnabled" + -> SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key), + cause = null) } def subprocessExitedError( exitCode: Int, stderrBuffer: CircularBuffer, cause: Throwable): Throwable = { - new SparkException(s"Subprocess exited with status $exitCode. " + - s"Error: ${stderrBuffer.toString}", cause) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2264", + messageParameters = Map( + "exitCode" -> exitCode.toString(), + "stderrBuffer" -> stderrBuffer.toString()), + cause = cause) } def outputDataTypeUnsupportedByNodeWithoutSerdeError( nodeName: String, dt: DataType): Throwable = { - new SparkException(s"$nodeName without serde does not support " + - s"${dt.getClass.getSimpleName} as output data type") + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2265", + messageParameters = Map( + "nodeName" -> nodeName, + "dt" -> dt.getClass.getSimpleName), + cause = null) } - def invalidStartIndexError(numRows: Int, startIndex: Int): Throwable = { - new ArrayIndexOutOfBoundsException( - "Invalid `startIndex` provided for generating iterator over the array. " + - s"Total elements: $numRows, requested `startIndex`: $startIndex") + def invalidStartIndexError(numRows: Int, startIndex: Int): SparkArrayIndexOutOfBoundsException = { + new SparkArrayIndexOutOfBoundsException( + errorClass = "_LEGACY_ERROR_TEMP_2266", + messageParameters = Map( + "numRows" -> numRows.toString(), + "startIndex" -> startIndex.toString()), + context = Array.empty, + summary = "") } def concurrentModificationOnExternalAppendOnlyUnsafeRowArrayError( - className: String): Throwable = { - new ConcurrentModificationException( - s"The backing $className has been modified since the creation of this Iterator") + className: String): SparkConcurrentModificationException = { + new SparkConcurrentModificationException( + errorClass = "_LEGACY_ERROR_TEMP_2267", + messageParameters = Map( + "className" -> className)) } - def doExecuteBroadcastNotImplementedError(nodeName: String): Throwable = { - new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast") + def doExecuteBroadcastNotImplementedError( + nodeName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2268", + messageParameters = Map( + "nodeName" -> nodeName)) } def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { @@ -2492,45 +2532,53 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = { new SparkException( - s""" - |$globalTempDB is a system preserved database, please rename your existing database - |to resolve the name conflict, or set a different value for - |${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again. - """.stripMargin.split("\n").mkString(" ")) + errorClass = "_LEGACY_ERROR_TEMP_2269", + messageParameters = Map( + "globalTempDB" -> globalTempDB, + "globalTempDatabase" -> GLOBAL_TEMP_DATABASE.key), + cause = null) } - def commentOnTableUnsupportedError(): Throwable = { - new SQLFeatureNotSupportedException("comment on table is not supported") + def commentOnTableUnsupportedError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2270", + messageParameters = Map.empty) } - def unsupportedUpdateColumnNullabilityError(): Throwable = { - new SQLFeatureNotSupportedException("UpdateColumnNullability is not supported") + def unsupportedUpdateColumnNullabilityError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2271", + messageParameters = Map.empty) } - def renameColumnUnsupportedForOlderMySQLError(): Throwable = { - new SQLFeatureNotSupportedException( - "Rename column is only supported for MySQL version 8.0 and above.") + def renameColumnUnsupportedForOlderMySQLError(): SparkSQLFeatureNotSupportedException = { + new SparkSQLFeatureNotSupportedException( + errorClass = "_LEGACY_ERROR_TEMP_2272", + messageParameters = Map.empty) } - def failedToExecuteQueryError(e: Throwable): QueryExecutionException = { + def failedToExecuteQueryError(e: Throwable): SparkException = { val message = "Hit an error when executing a query" + (if (e.getMessage == null) "" else s": ${e.getMessage}") - new QueryExecutionException(message, e) + new SparkException( + errorClass = "_LEGACY_ERROR_TEMP_2273", + messageParameters = Map( + "message" -> message), + cause = e) } - def nestedFieldUnsupportedError(colName: String): Throwable = { - new UnsupportedOperationException(s"Nested field $colName is not supported.") + def nestedFieldUnsupportedError(colName: String): SparkUnsupportedOperationException = { + new SparkUnsupportedOperationException( + errorClass = "_LEGACY_ERROR_TEMP_2274", + messageParameters = Map( + "colName" -> colName)) } def transformationsAndActionsNotInvokedByDriverError(): Throwable = { new SparkException( - """ - |Dataset transformations and actions can only be invoked by the driver, not inside of - |other Dataset transformations; for example, dataset1.map(x => dataset2.values.count() - |* x) is invalid because the values transformation and count action cannot be - |performed inside of the dataset1.map transformation. For more information, - |see SPARK-28702. - """.stripMargin.split("\n").mkString(" ")) + errorClass = "_LEGACY_ERROR_TEMP_2275", + messageParameters = Map.empty, + cause = null) } def repeatedPivotsUnsupportedError(): Throwable = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index 4f37d2d353653..aebeb08775f00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkRuntimeException +import org.apache.spark.{SparkRuntimeException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index b81b0f775a5ea..9b1e5a9e16e8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.util.Progressable import org.scalatest.PrivateMethodTester import org.scalatest.time.SpanSugar._ +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.util._ @@ -2055,7 +2056,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { AddFilesToFileStreamSinkLog(fileSystem, srcPath, sinkLog, 0) { path => path.getName.startsWith("keep1") }, - ExpectFailure[UnsupportedOperationException]( + ExpectFailure[SparkUnsupportedOperationException]( t => assert(t.getMessage.startsWith("Clean up source files is not supported")), isFatalError = false) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 01efd9857f608..dd6acc983b7c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark._ import org.apache.spark.sql.{functions, Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} -import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, LeafRunnableCommand} import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand @@ -359,7 +359,7 @@ class DataFrameCallbackSuite extends QueryTest Dataset.ofRows(spark, ErrorTestCommand("foo")).collect() } sparkContext.listenerBus.waitUntilEmpty() - assert(e != null && e.isInstanceOf[QueryExecutionException] + assert(e != null && e.isInstanceOf[SparkException] && e.getCause.isInstanceOf[Error] && e.getCause.getMessage == "foo") spark.listenerManager.unregister(listener) }