Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli._
Expand Down Expand Up @@ -281,12 +282,16 @@ private[hive] class SparkExecuteStatementOperation(
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
if (e.isInstanceOf[HiveSQLException]) {
throw e.asInstanceOf[HiveSQLException]
} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetCatalogsOperation
Expand Down Expand Up @@ -68,11 +69,20 @@ private[hive] class SparkGetCatalogsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NonFatal?

Copy link
Contributor

@juliuszsompolski juliuszsompolski Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we may want to catch a Throwable.
E.g. InterruptedExpression is not catched by NonFatal, and we want to inform the HiveThriftServer2.listener about the error after an interrupt - this definitely can happen in SparkExecuteStatementOperation that is async and can be cancelled. After a ThreadDeath of OutOfMemoryError I think we also want to inform the HiveThriftServer2.listener to not get the query hanging in the UI, as I think the server would continue to go on (I think it won't bring the whole JVM down?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, we should list up InterruptedExpression here, too? IIUC the reason why we mainly use NonFatal in this case is not to catch NonLocalReturnControl. But, yea, this is not my area, so I think @wangyum could suggest more about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Throwable.

Extractor of non-fatal Throwables. Will not match fatal errors like VirtualMachineError.
(for example, OutOfMemoryError and StackOverflowError, subclasses of VirtualMachineError), ThreadDeath, LinkageError, InterruptedException, ControlThrowable.

https://github.com/scala/scala/blob/v2.12.10/src/library/scala/util/control/NonFatal.scala#L17-L19

logError(s"Error executing get catalogs operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting catalogs: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.regex.Pattern

import scala.collection.JavaConverters.seqAsJavaListConverter

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObject}
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject.HivePrivilegeObjectType
import org.apache.hive.service.cli._
Expand Down Expand Up @@ -129,11 +130,20 @@ private[hive] class SparkGetColumnsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get columns operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting columns: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID

import scala.collection.JavaConverters.seqAsJavaListConverter

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetFunctionsOperation
Expand Down Expand Up @@ -104,11 +105,20 @@ private[hive] class SparkGetFunctionsOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get functions operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting functions: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.UUID
import java.util.regex.Pattern

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetSchemasOperation
Expand Down Expand Up @@ -87,11 +88,20 @@ private[hive] class SparkGetSchemasOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get schemas operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting schemas: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTableTypesOperation
Expand Down Expand Up @@ -74,11 +75,20 @@ private[hive] class SparkGetTableTypesOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get table types operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting table types: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.regex.Pattern

import scala.collection.JavaConverters._

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
import org.apache.hive.service.cli._
Expand All @@ -30,7 +31,6 @@ import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{Utils => SparkUtils}
Expand Down Expand Up @@ -119,11 +119,20 @@ private[hive] class SparkGetTablesOperation(
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get tables operation with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting tables: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
Expand Down Expand Up @@ -92,11 +93,20 @@ private[hive] class SparkGetTypeInfoOperation(
})
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
logError(s"Error executing get type info with $statementId", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting type info: " + root.toString, root)
}
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
Expand Down