diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 5978f88d6a46..51d5861eebef 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -376,7 +376,11 @@ object MimaExcludes { // [SPARK-28199][SS] Remove deprecated ProcessingTime ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"), + + // [SPARK-28556][SQL] QueryExecutionListener should also notify Error + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure") ) // Exclude rules for 2.4.x diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index ca66337846a0..6046805ae95d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -85,7 +85,7 @@ object SQLExecution { }.getOrElse(callSite.shortForm) withSQLConfPropagated(sparkSession) { - var ex: Option[Exception] = None + var ex: Option[Throwable] = None val startTime = System.nanoTime() try { sc.listenerBus.post(SparkListenerSQLExecutionStart( @@ -99,7 +99,7 @@ object SQLExecution { time = System.currentTimeMillis())) body } catch { - case e: Exception => + case e: Throwable => ex = Some(e) throw e } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 67d1f27271b2..81cbc7f54c7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -60,7 +60,7 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) @JsonIgnore private[sql] var qe: QueryExecution = null // The exception object that caused this execution to fail. None if the execution doesn't fail. - @JsonIgnore private[sql] var executionFailure: Option[Exception] = None + @JsonIgnore private[sql] var executionFailure: Option[Throwable] = None } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 77ae047705de..2da8469a0041 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -58,12 +58,12 @@ trait QueryExecutionListener { * @param funcName the name of the action that triggered this query. * @param qe the QueryExecution object that carries detail information like logical plan, * physical plan, etc. - * @param exception the exception that failed this query. + * @param error the error that failed this query. * * @note This can be invoked by multiple different threads. */ @DeveloperApi - def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit + def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 6fe58b780eae..1d461a03fd1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite { test("fork new session and inherit listener manager") { class CommandCollector extends QueryExecutionListener { val commands: ArrayBuffer[String] = ArrayBuffer.empty[String] - override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable) : Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { commands += funcName } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala index d2a6358ee822..fd6bc9662bfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala @@ -28,7 +28,7 @@ class TestQueryExecutionListener extends QueryExecutionListener { OnSuccessCall.isOnSuccessCalled.set(true) } - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { } + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 058c5ba7e50b..0f47759b292f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -338,7 +338,7 @@ class UDFSuite extends QueryTest with SharedSQLContext { withTempPath { path => var numTotalCachedHit = 0 val listener = new QueryExecutionListener { - override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {} + override def onFailure(f: String, qe: QueryExecution, e: Throwable): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala index f79cecc76397..493aee6c1a9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala @@ -180,13 +180,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext { withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> format, SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> format) { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val exceptions = ArrayBuffer.empty[(String, Exception)] + val errors = ArrayBuffer.empty[(String, Throwable)] val listener = new QueryExecutionListener { override def onFailure( funcName: String, qe: QueryExecution, - exception: Exception): Unit = { - exceptions += funcName -> exception + error: Throwable): Unit = { + errors += funcName -> error } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 126e23e6e592..15a000b45a7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -267,7 +267,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index c347caef39a6..a6f7f2250b58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -36,7 +36,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += ((funcName, qe, duration)) @@ -63,10 +63,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { } testQuietly("execute callback functions when a DataFrame action failed") { - val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] + val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { - metrics += ((funcName, qe, exception)) + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { + metrics += ((funcName, qe, error)) } // Only test failed case here, so no need to implement `onSuccess` @@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { val metric = qe.executedPlan match { @@ -132,7 +132,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += qe.executedPlan.longMetric("dataSize").value @@ -172,10 +172,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { test("execute callback functions for DataFrameWriter") { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val exceptions = ArrayBuffer.empty[(String, Exception)] + val errors = ArrayBuffer.empty[(String, Throwable)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { - exceptions += funcName -> exception + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { + errors += funcName -> error } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { @@ -221,9 +221,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { spark.range(10).select($"id", $"id").write.insertInto("tab") } sparkContext.listenerBus.waitUntilEmpty(1000) - assert(exceptions.length == 1) - assert(exceptions.head._1 == "insertInto") - assert(exceptions.head._2 == e) + assert(errors.length == 1) + assert(errors.head._1 == "insertInto") + assert(errors.head._2 == e) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala index da414f4311e5..79819e765541 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala @@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends QueryExecutionListener { CALLBACK_COUNT.incrementAndGet() } - override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { CALLBACK_COUNT.incrementAndGet() }