Skip to content

Commit eb9935a

Browse files
LantaoJinwangyum
authored andcommitted
[SPARK-29283][SQL] Error message is hidden when query from JDBC, especially enabled adaptive execution
### What changes were proposed in this pull request? When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why. ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message `Adaptive execution failed due to stage materialization failures`. The easiest way to reproduce is changing the code of `AdaptiveSparkPlanExec`, to let it throws out an exception when it faces `StageSuccess`. ```scala case class AdaptiveSparkPlanExec( events.drainTo(rem) (Seq(nextMsg) ++ rem.asScala).foreach { case StageSuccess(stage, res) => // stage.resultOption = Some(res) val ex = new SparkException("Wrapper Exception", new IllegalArgumentException("Root cause is IllegalArgumentException for Test")) errors.append( new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex)) case StageFailure(stage, ex) => errors.append( new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex)) ``` ### Why are the changes needed? To make the error message more user-friend and more useful for query from JDBC. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually test query: ```shell 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string); CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string); +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.225 seconds) 0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int); CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int); +---------+--+ | Result | +---------+--+ +---------+--+ No rows selected (0.043 seconds) ``` Before: ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` After: ```shell 0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v; SELECT * FROM testData join testData2 ON key = v; Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0) 0: jdbc:hive2://localhost:10000> ``` Closes #25960 from LantaoJin/SPARK-29283. Authored-by: lajin <lajin@ebay.com> Signed-off-by: Yuming Wang <wgyumg@gmail.com> (cherry picked from commit fda4070) Signed-off-by: Yuming Wang <wgyumg@gmail.com>
1 parent 0333d82 commit eb9935a

File tree

8 files changed

+109
-35
lines changed

8 files changed

+109
-35
lines changed

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
2626
import scala.collection.mutable.ArrayBuffer
2727
import scala.util.control.NonFatal
2828

29+
import org.apache.commons.lang3.exception.ExceptionUtils
2930
import org.apache.hadoop.hive.metastore.api.FieldSchema
3031
import org.apache.hadoop.hive.shims.Utils
3132
import org.apache.hive.service.cli._
@@ -312,12 +313,16 @@ private[hive] class SparkExecuteStatementOperation(
312313
} else {
313314
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
314315
setState(OperationState.ERROR)
315-
HiveThriftServer2.listener.onStatementError(
316-
statementId, e.getMessage, SparkUtils.exceptionString(e))
317-
if (e.isInstanceOf[HiveSQLException]) {
318-
throw e.asInstanceOf[HiveSQLException]
319-
} else {
320-
throw new HiveSQLException("Error running query: " + e.toString, e)
316+
e match {
317+
case hiveException: HiveSQLException =>
318+
HiveThriftServer2.listener.onStatementError(
319+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
320+
throw hiveException
321+
case _ =>
322+
val root = ExceptionUtils.getRootCause(e)
323+
HiveThriftServer2.listener.onStatementError(
324+
statementId, root.getMessage, SparkUtils.exceptionString(root))
325+
throw new HiveSQLException("Error running query: " + root.toString, root)
321326
}
322327
}
323328
} finally {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.util.UUID
2121

22+
import org.apache.commons.lang3.exception.ExceptionUtils
2223
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
2324
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
2425
import org.apache.hive.service.cli.operation.GetCatalogsOperation
@@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation(
6869
}
6970
setState(OperationState.FINISHED)
7071
} catch {
71-
case e: HiveSQLException =>
72+
case e: Throwable =>
73+
logError(s"Error executing get catalogs operation with $statementId", e)
7274
setState(OperationState.ERROR)
73-
HiveThriftServer2.listener.onStatementError(
74-
statementId, e.getMessage, SparkUtils.exceptionString(e))
75-
throw e
75+
e match {
76+
case hiveException: HiveSQLException =>
77+
HiveThriftServer2.listener.onStatementError(
78+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
79+
throw hiveException
80+
case _ =>
81+
val root = ExceptionUtils.getRootCause(e)
82+
HiveThriftServer2.listener.onStatementError(
83+
statementId, root.getMessage, SparkUtils.exceptionString(root))
84+
throw new HiveSQLException("Error getting catalogs: " + root.toString, root)
85+
}
7686
}
7787
HiveThriftServer2.listener.onStatementFinish(statementId)
7888
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
2222

2323
import scala.collection.JavaConverters.seqAsJavaListConverter
2424

25+
import org.apache.commons.lang3.exception.ExceptionUtils
2526
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject}
2627
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
2728
import org.apache.hive.service.cli._
@@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation(
129130
}
130131
setState(OperationState.FINISHED)
131132
} catch {
132-
case e: HiveSQLException =>
133+
case e: Throwable =>
134+
logError(s"Error executing get columns operation with $statementId", e)
133135
setState(OperationState.ERROR)
134-
HiveThriftServer2.listener.onStatementError(
135-
statementId, e.getMessage, SparkUtils.exceptionString(e))
136-
throw e
136+
e match {
137+
case hiveException: HiveSQLException =>
138+
HiveThriftServer2.listener.onStatementError(
139+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
140+
throw hiveException
141+
case _ =>
142+
val root = ExceptionUtils.getRootCause(e)
143+
HiveThriftServer2.listener.onStatementError(
144+
statementId, root.getMessage, SparkUtils.exceptionString(root))
145+
throw new HiveSQLException("Error getting columns: " + root.toString, root)
146+
}
137147
}
138148
HiveThriftServer2.listener.onStatementFinish(statementId)
139149
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.UUID
2222

2323
import scala.collection.JavaConverters.seqAsJavaListConverter
2424

25+
import org.apache.commons.lang3.exception.ExceptionUtils
2526
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils}
2627
import org.apache.hive.service.cli._
2728
import org.apache.hive.service.cli.operation.GetFunctionsOperation
@@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation(
104105
}
105106
setState(OperationState.FINISHED)
106107
} catch {
107-
case e: HiveSQLException =>
108+
case e: Throwable =>
109+
logError(s"Error executing get functions operation with $statementId", e)
108110
setState(OperationState.ERROR)
109-
HiveThriftServer2.listener.onStatementError(
110-
statementId, e.getMessage, SparkUtils.exceptionString(e))
111-
throw e
111+
e match {
112+
case hiveException: HiveSQLException =>
113+
HiveThriftServer2.listener.onStatementError(
114+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
115+
throw hiveException
116+
case _ =>
117+
val root = ExceptionUtils.getRootCause(e)
118+
HiveThriftServer2.listener.onStatementError(
119+
statementId, root.getMessage, SparkUtils.exceptionString(root))
120+
throw new HiveSQLException("Error getting functions: " + root.toString, root)
121+
}
112122
}
113123
HiveThriftServer2.listener.onStatementFinish(statementId)
114124
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
2020
import java.util.UUID
2121
import java.util.regex.Pattern
2222

23+
import org.apache.commons.lang3.exception.ExceptionUtils
2324
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
2425
import org.apache.hive.service.cli._
2526
import org.apache.hive.service.cli.operation.GetSchemasOperation
@@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation(
8788
}
8889
setState(OperationState.FINISHED)
8990
} catch {
90-
case e: HiveSQLException =>
91+
case e: Throwable =>
92+
logError(s"Error executing get schemas operation with $statementId", e)
9193
setState(OperationState.ERROR)
92-
HiveThriftServer2.listener.onStatementError(
93-
statementId, e.getMessage, SparkUtils.exceptionString(e))
94-
throw e
94+
e match {
95+
case hiveException: HiveSQLException =>
96+
HiveThriftServer2.listener.onStatementError(
97+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
98+
throw hiveException
99+
case _ =>
100+
val root = ExceptionUtils.getRootCause(e)
101+
HiveThriftServer2.listener.onStatementError(
102+
statementId, root.getMessage, SparkUtils.exceptionString(root))
103+
throw new HiveSQLException("Error getting schemas: " + root.toString, root)
104+
}
95105
}
96106
HiveThriftServer2.listener.onStatementFinish(statementId)
97107
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.util.UUID
2121

22+
import org.apache.commons.lang3.exception.ExceptionUtils
2223
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
2324
import org.apache.hive.service.cli._
2425
import org.apache.hive.service.cli.operation.GetTableTypesOperation
@@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation(
7475
}
7576
setState(OperationState.FINISHED)
7677
} catch {
77-
case e: HiveSQLException =>
78+
case e: Throwable =>
79+
logError(s"Error executing get table types operation with $statementId", e)
7880
setState(OperationState.ERROR)
79-
HiveThriftServer2.listener.onStatementError(
80-
statementId, e.getMessage, SparkUtils.exceptionString(e))
81-
throw e
81+
e match {
82+
case hiveException: HiveSQLException =>
83+
HiveThriftServer2.listener.onStatementError(
84+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
85+
throw hiveException
86+
case _ =>
87+
val root = ExceptionUtils.getRootCause(e)
88+
HiveThriftServer2.listener.onStatementError(
89+
statementId, root.getMessage, SparkUtils.exceptionString(root))
90+
throw new HiveSQLException("Error getting table types: " + root.toString, root)
91+
}
8292
}
8393
HiveThriftServer2.listener.onStatementFinish(statementId)
8494
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.util.regex.Pattern
2222

2323
import scala.collection.JavaConverters._
2424

25+
import org.apache.commons.lang3.exception.ExceptionUtils
2526
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
2627
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
2728
import org.apache.hive.service.cli._
@@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession
3031

3132
import org.apache.spark.internal.Logging
3233
import org.apache.spark.sql.SQLContext
33-
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
3434
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
3535
import org.apache.spark.sql.hive.HiveUtils
3636
import org.apache.spark.util.{Utils => SparkUtils}
@@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation(
119119
}
120120
setState(OperationState.FINISHED)
121121
} catch {
122-
case e: HiveSQLException =>
122+
case e: Throwable =>
123+
logError(s"Error executing get tables operation with $statementId", e)
123124
setState(OperationState.ERROR)
124-
HiveThriftServer2.listener.onStatementError(
125-
statementId, e.getMessage, SparkUtils.exceptionString(e))
126-
throw e
125+
e match {
126+
case hiveException: HiveSQLException =>
127+
HiveThriftServer2.listener.onStatementError(
128+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
129+
throw hiveException
130+
case _ =>
131+
val root = ExceptionUtils.getRootCause(e)
132+
HiveThriftServer2.listener.onStatementError(
133+
statementId, root.getMessage, SparkUtils.exceptionString(root))
134+
throw new HiveSQLException("Error getting tables: " + root.toString, root)
135+
}
127136
}
128137
HiveThriftServer2.listener.onStatementFinish(statementId)
129138
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
1919

2020
import java.util.UUID
2121

22+
import org.apache.commons.lang3.exception.ExceptionUtils
2223
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
2324
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
2425
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
@@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation(
9293
})
9394
setState(OperationState.FINISHED)
9495
} catch {
95-
case e: HiveSQLException =>
96+
case e: Throwable =>
97+
logError(s"Error executing get type info with $statementId", e)
9698
setState(OperationState.ERROR)
97-
HiveThriftServer2.listener.onStatementError(
98-
statementId, e.getMessage, SparkUtils.exceptionString(e))
99-
throw e
99+
e match {
100+
case hiveException: HiveSQLException =>
101+
HiveThriftServer2.listener.onStatementError(
102+
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
103+
throw hiveException
104+
case _ =>
105+
val root = ExceptionUtils.getRootCause(e)
106+
HiveThriftServer2.listener.onStatementError(
107+
statementId, root.getMessage, SparkUtils.exceptionString(root))
108+
throw new HiveSQLException("Error getting type info: " + root.toString, root)
109+
}
100110
}
101111
HiveThriftServer2.listener.onStatementFinish(statementId)
102112
}

0 commit comments

Comments
 (0)