From 215ac665fc0b7ec52359024fe8b58480d3f76b27 Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Mon, 7 Oct 2024 19:59:13 -0300 Subject: [PATCH] Fix AppState when Engine connection is terminated --- .../org/apache/kyuubi/operation/BatchJobSubmission.scala | 9 ++++++++- .../scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 5 ++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 671d8ba6509..39098a58698 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -154,13 +154,20 @@ class BatchJobSubmission( engineId = appInfo.id, engineName = appInfo.name, engineUrl = appInfo.url.orNull, - engineState = appInfo.state.toString, + engineState = getAppState(state, appInfo.state).toString, engineError = appInfo.error, endTime = endTime) session.sessionManager.updateMetadata(metadataToUpdate) } } + private def getAppState(operState: OperationState, appState: ApplicationState): Unit = { + if (state == ERROR && appState != ApplicationState.FAILED) { + ApplicationState.UNKNOWN + } + appState + } + override def getOperationLog: Option[OperationLog] = Option(_operationLog) // we can not set to other state if it is canceled diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala index 8d3f7b17d8b..2e1840a8fc6 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala @@ -193,13 +193,14 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD } test("fast fail the kyuubi connection on engine terminated") { + val batchId = UUID.randomUUID().toString withSessionConf(Map.empty)(Map( "spark.master" -> "yarn", "spark.submit.deployMode" -> "cluster", "spark.sql.defaultCatalog=spark_catalog" -> "spark_catalog", "spark.sql.catalog.spark_catalog.type" -> "invalid_type", ENGINE_INIT_TIMEOUT.key -> "PT10M", - KYUUBI_BATCH_ID_KEY -> UUID.randomUUID().toString))(Map.empty) { + KYUUBI_BATCH_ID_KEY -> batchId))(Map.empty) { val startTime = System.currentTimeMillis() val exception = intercept[Exception] { withJdbcStatement() { _ => } @@ -207,6 +208,8 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD val elapsedTime = System.currentTimeMillis() - startTime assert(elapsedTime < 60 * 1000) assert(exception.getMessage contains "Could not open client transport with JDBC Uri") + assert(sessionManager.getBatchMetadata(batchId).map(_.state).contains("ERROR")) + assert(sessionManager.getBatchMetadata(batchId).map(_.appState).contains("UNKNOWN")) } } }