diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index a109d847e52..5db3b63d741 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -71,6 +71,8 @@ private[kyuubi] class EngineRef( private val clientPoolName: String = conf.get(ENGINE_POOL_NAME) + private var builder: ProcBuilder = _ + @VisibleForTesting private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match { case Some(_subdomain) => _subdomain @@ -162,7 +164,7 @@ private[kyuubi] class EngineRef( conf.set(HA_ZK_ENGINE_REF_ID, engineRefId) val started = System.currentTimeMillis() conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started)) - val builder = engineType match { + builder = engineType match { case SPARK_SQL => conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) new SparkProcessBuilder(appUser, conf, extraEngineLog) @@ -230,4 +232,17 @@ private[kyuubi] class EngineRef( create(discoveryClient, extraEngineLog) } } + + def close(): Unit = { + if (shareLevel == CONNECTION && builder != null) { + try { + val clusterManager = builder.clusterManager() + builder.close(true) + engineManager.killApplication(clusterManager, engineRefId) + } catch { + case e: Exception => + warn(s"Error closing engine builder, engineRefId: $engineRefId", e) + } + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 7b7ff744db4..4bc59af9538 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -237,12 +237,12 @@ trait ProcBuilder { process } - def close(): Unit = synchronized { + def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized { if (logCaptureThread != null) { logCaptureThread.interrupt() logCaptureThread = null } - if (!waitCompletion && process != null) { + if (destroyProcess && process != null) { info("Destroy the process, since waitCompletion is false.") process.destroyForcibly() process = null diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index 08f5d9499e7..c0c5b2705bb 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -160,6 +160,7 @@ class KyuubiSessionImpl( try { if (_client != null) _client.closeSession() } finally { + if (engine != null) engine.close() sessionEvent.endTime = System.currentTimeMillis() EventBus.post(sessionEvent) MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))