Skip to content

Commit

Permalink
[KYUUBI #2686][1.5] Fix lock bug if engine initialization timeout
Browse files Browse the repository at this point in the history
backport #2687 for branch-1.5

### _Why are the changes needed?_

closes #2686

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2705 from ulysses-you/KYUUBI-2686-1.5.

Closes #2686

8db5ee3 [ulysses-you] fix
a8b17db [ulysses-you] engine type
f76bfb7 [ulysses-you] Fix lock bug if engine initialization timeout

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
ulysses-you committed May 19, 2022
1 parent 5106738 commit 7e9511a
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,8 @@ object MetricsSystem {
tracing(_.updateTimer(name, System.nanoTime() - startTime, TimeUnit.NANOSECONDS))
}
}

def counterValue(name: String): Option[Long] = {
maybeSystem.map(_.registry.counter(name).getCount)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ private[kyuubi] class EngineRef(

private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)

// In case the multi kyuubi instances have the small gap of timeout, here we add
// a small amount of time for timeout
private val LOCK_TIMEOUT_SPAN_FACTOR = if (Utils.isTesting) 0.5 else 0.1

@VisibleForTesting
private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
case Some(_subdomain) => _subdomain
Expand Down Expand Up @@ -153,9 +157,36 @@ private[kyuubi] class EngineRef(
try {
lock = new InterProcessSemaphoreMutex(zkClient, lockPath)
// Acquire a lease. If no leases are available, this method blocks until either the
// maximum number of leases is increased or another client/process closes a lease
lock.acquire(timeout, TimeUnit.MILLISECONDS)
// maximum number of leases is increased or another client/process closes a lease.
//
// Here, we should throw exception if timeout during acquiring lock.
// Let's say we have three clients with same request lock to two kyuubi server instances.
//
// client A ---> kyuubi X -- first acquired \
// client B ---> kyuubi X -- second acquired -- zookeeper
// client C ---> kyuubi Y -- third acquired /
//
// The first client A acqiured the lock then B and C are blocked until A release the lock,
// with the A created engine state:
// - SUCCESS
// B acquired the lock then get engine ref and release the lock.
// C acquired the lock then get engine ref and release the lock.
// - FAILED or TIMEOUT
// B acquired the lock then try to create engine again if not timeout.
// C should be timeout and throw exception back to client. This fast fail
// to avoid client too long to waiting in concurrent.

// Return false means we are timeout
val acquired = lock.acquire(
timeout + (LOCK_TIMEOUT_SPAN_FACTOR * timeout).toLong,
TimeUnit.MILLISECONDS)
if (!acquired) {
throw KyuubiSQLException(s"Timeout to lock on path [$lockPath] after " +
s"$timeout ms. There would be some problem that other session may " +
s"create engine timeout.")
}
} catch {
case e: KyuubiSQLException => throw e
case e: Exception => throw KyuubiSQLException(s"Lock failed on path [$lockPath]", e)
}
f
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.engine

import java.util.UUID
import java.util.concurrent.Executors

import org.apache.curator.utils.ZKPaths
import org.apache.hadoop.security.UserGroupInformation
Expand All @@ -28,6 +29,8 @@ import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperClientProvider
import org.apache.kyuubi.metrics.MetricsConstants.ENGINE_TOTAL
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.util.NamedThreadFactory
import org.apache.kyuubi.zookeeper.{EmbeddedZookeeper, ZookeeperConf}

Expand All @@ -37,6 +40,7 @@ class EngineRefSuite extends KyuubiFunSuite {
private val zkServer = new EmbeddedZookeeper
private val conf = KyuubiConf()
private val user = Utils.currentUser
private val metricsSystem = new MetricsSystem

override def beforeAll(): Unit = {
val zkData = Utils.createTempDir()
Expand All @@ -45,17 +49,22 @@ class EngineRefSuite extends KyuubiFunSuite {
.set("spark.sql.catalogImplementation", "in-memory")
zkServer.initialize(conf)
zkServer.start()
metricsSystem.initialize(conf)
metricsSystem.start()
super.beforeAll()
}

override def afterAll(): Unit = {
metricsSystem.stop()
zkServer.stop()
super.afterAll()
}

override def beforeEach(): Unit = {
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN)
conf.unset(KyuubiConf.ENGINE_SHARE_LEVEL_SUB_DOMAIN)
conf.unset(KyuubiConf.ENGINE_POOL_SIZE)
conf.unset(KyuubiConf.ENGINE_POOL_NAME)
super.beforeEach()
}

Expand Down Expand Up @@ -226,4 +235,46 @@ class EngineRefSuite extends KyuubiFunSuite {
assert(port2 == port1, "engine shared")
}
}

test("three same lock request with initialization timeout") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(KyuubiConf.ENGINE_INIT_TIMEOUT, 3000L)
conf.set(HighAvailabilityConf.HA_ZK_NAMESPACE, "engine_test2")
conf.set(HighAvailabilityConf.HA_ZK_QUORUM, zkServer.getConnectString)

val beforeEngines = MetricsSystem.counterValue(ENGINE_TOTAL).getOrElse(0L)
val start = System.currentTimeMillis()
val times = new Array[Long](3)
val executor = Executors.newFixedThreadPool(3)
try {
(0 until (3)).foreach { i =>
val cloned = conf.clone
executor.execute(() => {
ZooKeeperClientProvider.withZkClient(cloned) { client =>
try {
new EngineRef(cloned, user, id).getOrCreate(client)
} catch {
case e: Exception => e.printStackTrace()
} finally {
times(i) = System.currentTimeMillis()
}
}
})
}

eventually(timeout(20.seconds), interval(200.milliseconds)) {
assert(times.forall(_ > start))
// ENGINE_INIT_TIMEOUT is 3000ms
assert(times.max - times.min > 2800)
}

// we should only submit two engines, the last request should timeout and fail
assert(MetricsSystem.counterValue(ENGINE_TOTAL).get - beforeEngines == 2)
} finally {
executor.shutdown()
}
}
}

0 comments on commit 7e9511a

Please sign in to comment.