Skip to content

Conversation

@LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Sep 29, 2019

What changes were proposed in this pull request?

When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why.

0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000> 

For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message Adaptive execution failed due to stage materialization failures.

The easiest way to reproduce is changing the code of AdaptiveSparkPlanExec, to let it throws out an exception when it faces StageSuccess.

  case class AdaptiveSparkPlanExec(
      events.drainTo(rem)
         (Seq(nextMsg) ++ rem.asScala).foreach {
           case StageSuccess(stage, res) =>
//            stage.resultOption = Some(res)
            val ex = new SparkException("Wrapper Exception",
              new IllegalArgumentException("Root cause is IllegalArgumentException for Test"))
            errors.append(
              new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
           case StageFailure(stage, ex) =>
             errors.append(
               new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))

Why are the changes needed?

To make the error message more user-friend and more useful for query from JDBC.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually test query:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.225 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.043 seconds)

Before:

0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000> 

After:

0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0)
0: jdbc:hive2://localhost:10000> 

@SparkQA
Copy link

SparkQA commented Sep 29, 2019

Test build #111555 has finished for PR 25960 at commit ffed088.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Sep 29, 2019

Test build #111558 has finished for PR 25960 at commit ffed088.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
throw new HiveSQLException("Error running query: " +
SparkUtils.findFirstCause(e).toString, e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111603 has finished for PR 25960 at commit 7d55615.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@juliuszsompolski
Copy link
Contributor

For consistency, should we do that in all Spark*Operation?
I.e. replace the current

      case e: HiveSQLException =>
        setState(OperationState.ERROR)
        HiveThriftServer2.listener.onStatementError(
          statementId, e.getMessage, SparkUtils.exceptionString(e))
        throw e

with

  case e: Throwable =>
    logError(s"Error executing operation with $statementId, currentState $currentState, ", e)
    setState(OperationState.ERROR)
    HiveThriftServer2.listener.onStatementError(
      statementId, e.getMessage, SparkUtils.exceptionString(e))
    if (e.isInstanceOf[HiveSQLException]) {
      throw e.asInstanceOf[HiveSQLException]
    } else {
      val root = ExceptionUtils.getRootCause(e)
      throw new HiveSQLException("Error running query: " +
        (if (root == null) e.toString else root.toString), e)
    }

in all of them?

@LantaoJin
Copy link
Contributor Author

@juliuszsompolski fixed.

@LantaoJin LantaoJin requested a review from wangyum October 2, 2019 02:19
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
case e: Throwable =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NonFatal?

Copy link
Contributor

@juliuszsompolski juliuszsompolski Oct 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we may want to catch a Throwable.
E.g. InterruptedExpression is not catched by NonFatal, and we want to inform the HiveThriftServer2.listener about the error after an interrupt - this definitely can happen in SparkExecuteStatementOperation that is async and can be cancelled. After a ThreadDeath of OutOfMemoryError I think we also want to inform the HiveThriftServer2.listener to not get the query hanging in the UI, as I think the server would continue to go on (I think it won't bring the whole JVM down?).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, we should list up InterruptedExpression here, too? IIUC the reason why we mainly use NonFatal in this case is not to catch NonLocalReturnControl. But, yea, this is not my area, so I think @wangyum could suggest more about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Throwable.

Extractor of non-fatal Throwables. Will not match fatal errors like VirtualMachineError.
(for example, OutOfMemoryError and StackOverflowError, subclasses of VirtualMachineError), ThreadDeath, LinkageError, InterruptedException, ControlThrowable.

https://github.com/scala/scala/blob/v2.12.10/src/library/scala/util/control/NonFatal.scala#L17-L19

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111664 has finished for PR 25960 at commit 2acb51a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

throw e.asInstanceOf[HiveSQLException]
} else {
throw new HiveSQLException("Error running query: " + e.toString, e)
val root = ExceptionUtils.getRootCause(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change it to?

setState(OperationState.ERROR)
e match {
  case hiveException: HiveSQLException =>
    logError(s"Error executing query with $statementId, currentState $currentState, ", e)
    HiveThriftServer2.listener.onStatementError(
      statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
    throw hiveException
  case _ =>
    val rootCause = Option(ExceptionUtils.getRootCause(e)).getOrElse(e)
    logError(
      s"Error executing query with $statementId, currentState $currentState, ", rootCause)
    HiveThriftServer2.listener.onStatementError(
      statementId, rootCause.getMessage, SparkUtils.exceptionString(rootCause))
    throw new HiveSQLException("Error running query: " + rootCause.toString, rootCause)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val rootCause = Option(ExceptionUtils.getRootCause(e)).getOrElse(e)

Return null only if the input e is null. Do we still add this option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides the null checker, I've changed the code to above style. @wangyum

@LantaoJin
Copy link
Contributor Author

Retest this please.

2 similar comments
@LantaoJin
Copy link
Contributor Author

Retest this please.

@LantaoJin
Copy link
Contributor Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Oct 12, 2019

Test build #111949 has finished for PR 25960 at commit 4aa2dd2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Oct 12, 2019

The UT could passed after #26028 merged.

@wangyum
Copy link
Member

wangyum commented Oct 13, 2019

retest this please

@SparkQA
Copy link

SparkQA commented Oct 13, 2019

Test build #111991 has finished for PR 25960 at commit 4aa2dd2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112002 has finished for PR 25960 at commit 4aa2dd2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's hard to refactor this error handling vs copying it? seems OK.

@juliuszsompolski
Copy link
Contributor

LGTM.
@srowen yeah, some common logic could be refactored out into a mixin trait (because direct inheritence goes from different Hive Operation implementations), but I think it's a bigger refactor change than the scope of this PR.

@wangyum wangyum closed this in fda4070 Oct 17, 2019
wangyum pushed a commit that referenced this pull request Oct 17, 2019
…cially enabled adaptive execution

### What changes were proposed in this pull request?
When adaptive execution is enabled, the Spark users who connected from JDBC always get adaptive execution error whatever the under root cause is. It's very confused. We have to check the driver log to find out why.
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```

For example, a job queried from JDBC failed due to HDFS missing block. User still get the error message `Adaptive execution failed due to stage materialization failures`.

The easiest way to reproduce is changing the code of `AdaptiveSparkPlanExec`, to let it throws out  an exception when it faces `StageSuccess`.
```scala
  case class AdaptiveSparkPlanExec(
      events.drainTo(rem)
         (Seq(nextMsg) ++ rem.asScala).foreach {
           case StageSuccess(stage, res) =>
//            stage.resultOption = Some(res)
            val ex = new SparkException("Wrapper Exception",
              new IllegalArgumentException("Root cause is IllegalArgumentException for Test"))
            errors.append(
              new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
           case StageFailure(stage, ex) =>
             errors.append(
               new SparkException(s"Failed to materialize query stage: ${stage.treeString}", ex))
```

### Why are the changes needed?
To make the error message more user-friend and more useful for query from JDBC.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Manually test query:
```shell
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
CREATE TEMPORARY VIEW testData (key, value) AS SELECT explode(array(1, 2, 3, 4)), cast(substring(rand(), 3, 4) as string);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.225 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
CREATE TEMPORARY VIEW testData2 (k, v) AS SELECT explode(array(1, 1, 2, 2)), cast(substring(rand(), 3, 4) as int);
+---------+--+
| Result  |
+---------+--+
+---------+--+
No rows selected (0.043 seconds)
```
Before:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures. (state=,code=0)
0: jdbc:hive2://localhost:10000>
```
After:
```shell
0: jdbc:hive2://localhost:10000> SELECT * FROM testData join testData2 ON key = v;
SELECT * FROM testData join testData2 ON key = v;
Error: Error running query: java.lang.IllegalArgumentException: Root cause is IllegalArgumentException for Test (state=,code=0)
0: jdbc:hive2://localhost:10000>
```

Closes #25960 from LantaoJin/SPARK-29283.

Authored-by: lajin <lajin@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
(cherry picked from commit fda4070)
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
@wangyum
Copy link
Member

wangyum commented Oct 17, 2019

Merged to master and branch-3.0-preview.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants