From 27c734ed9555931ec9970e9cecedfabbf8ea8a93 Mon Sep 17 00:00:00 2001 From: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Date: Fri, 22 Nov 2024 22:10:57 -0800 Subject: [PATCH] [KYUUBI #6722] Fix AppState when Engine connection is terminated MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This issue was noticed a few times when the batch `state` was `set` to `ERROR`, but the `appState` kept the non-terminal state forever (e.g. `RUNNING`), even if the application was finished (in this case Yarn Application). ```json { "id": "********", "user": "****", "batchType": "SPARK", "name": "*********", "appStartTime": 0, "appId": "********", "appUrl": "********", "appState": "RUNNING", "appDiagnostic": "", "kyuubiInstance": "*********", "state": "ERROR", "createTime": 1725343207318, "endTime": 1725343300986, "batchInfo": {} } ``` It seems that this happens when there is some intermittent failure during the monitoring step and the batch ends with ERROR, leaving the application metadata without an update. This can lead to some misinterpretation that the application is still running. We need to set this to `UNKNOWN` state to avoid errors. ## Describe Your Solution ๐Ÿ”ง This is a simple fix that only checks if the batch state is `ERROR` and the appState is not in a terminal state and changes the `appState` to `UNKNOWN`, in these cases (during the batch metadata update). ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: If there is some error between the Kyuubi and the Application request (e.g. YARN client), the batch is finished with `ERROR` state and the application keeps the last know state (e.g. RUNNING). #### Behavior With This Pull Request :tada: If there is some error between the Kyuubi and the Application request (e.g. YARN client), the batch is finished with `ERROR `state and the application has a non-terminal state, it is forced to `UNKNOWN` state. #### Related Unit Tests I've tried to implement a unit test to replicate this behavior but I didn't make it. We need to force an exception in the Engine Request (e.g. `YarnClient.getApplication`) but we need to wait for the application to be in the RUNNING state before raising this exception, or maybe block the connection between kyuubi and the engine. --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6722 from joaopamaral/fix/app-state-on-batch-error. Closes #6722 8409eacac [Wang, Fei] fix da8c356a7 [Joao Amaral] format fix 73b77b3f7 [Joao Amaral] use isTerminated 64f96a256 [Joao Amaral] Remove test 1eb80ef73 [Joao Amaral] Remove test 13498fa6b [Joao Amaral] Remove test 60ce55ef3 [Joao Amaral] add todo 3a3ba162b [Joao Amaral] Fix 215ac665f [Joao Amaral] Fix AppState when Engine connection is terminated Lead-authored-by: Joao Amaral <7281460+joaopamaral@users.noreply.github.com> Co-authored-by: Wang, Fei Signed-off-by: Wang, Fei --- .../apache/kyuubi/operation/BatchJobSubmission.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala index 671d8ba6509..129fbc8d9aa 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala @@ -154,13 +154,23 @@ class BatchJobSubmission( engineId = appInfo.id, engineName = appInfo.name, engineUrl = appInfo.url.orNull, - engineState = appInfo.state.toString, + engineState = getAppState(state, appInfo.state).toString, engineError = appInfo.error, endTime = endTime) session.sessionManager.updateMetadata(metadataToUpdate) } } + private def getAppState( + opState: OperationState, + appState: ApplicationState.ApplicationState): ApplicationState.ApplicationState = { + if (opState == OperationState.ERROR && !ApplicationState.isTerminated(appState)) { + ApplicationState.UNKNOWN + } else { + appState + } + } + override def getOperationLog: Option[OperationLog] = Option(_operationLog) // we can not set to other state if it is canceled