diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index bd1a4b640c7..ae32b04f0f1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -297,6 +297,7 @@ private[kyuubi] class EngineRef( * * @param discoveryClient the zookeeper client to get or create engine instance * @param extraEngineLog the launch engine operation log, used to inject engine log into it + * @return engine host and port */ def getOrCreate( discoveryClient: DiscoveryClient, @@ -312,19 +313,27 @@ private[kyuubi] class EngineRef( * * @param discoveryClient the zookeeper client to get or create engine instance * @param hostPort the existing engine host and port + * @return deregister result and message */ - def deregister(discoveryClient: DiscoveryClient, hostPort: (String, Int)): Unit = + def deregister(discoveryClient: DiscoveryClient, hostPort: (String, Int)): (Boolean, String) = tryWithLock(discoveryClient) { // refer the DiscoveryClient::getServerHost implementation discoveryClient.getServiceNodesInfo(engineSpace, Some(1), silent = true) match { case Seq(sn) => if ((sn.host, sn.port) == hostPort) { - info(s"Deleting engine node:$sn") + val msg = s"Deleting engine node:$sn" + info(msg) discoveryClient.delete(s"$engineSpace/${sn.nodeName}") + (true, msg) } else { - warn(s"Engine node:$sn is not matched with host&port[$hostPort]") + val msg = s"Engine node:$sn is not matched with host&port[$hostPort]" + warn(msg) + (false, msg) } - case _ => warn(s"No engine node found in $engineSpace") + case _ => + val msg = s"No engine node found in $engineSpace" + warn(msg) + (false, msg) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala index af927960d3e..4341c541584 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/EngineRefTests.scala @@ -22,6 +22,7 @@ import java.util.concurrent.Executors import scala.collection.JavaConverters._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar.convertIntToGrainOfTime import org.apache.kyuubi.{KYUUBI_VERSION, Utils} @@ -356,10 +357,12 @@ trait EngineRefTests extends KyuubiFunSuite { DiscoveryClientProvider.withDiscoveryClient(conf) { client => val hp = engine.getOrCreate(client) assert(client.getServerHost(engine.engineSpace) == Option(hp)) - engine.deregister(client, ("non_existing_host", 0)) + assert(!engine.deregister(client, ("non_existing_host", 0))._1) assert(client.getServerHost(engine.engineSpace) == Option(hp)) - engine.deregister(client, hp) - assert(client.getServerHost(engine.engineSpace).isEmpty) + assert(engine.deregister(client, hp)._1) + eventually(Timeout(10.seconds)) { + assert(client.getServerHost(engine.engineSpace).isEmpty) + } } } }