Skip to content

Commit

Permalink
Add a max cache time to clean SparkSessions that may have token expir…
Browse files Browse the repository at this point in the history
…y issue fix #186
  • Loading branch information
yaooqinn committed May 14, 2019
1 parent a131ce0 commit a4a1c69
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 276 deletions.
5 changes: 2 additions & 3 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ spark.kyuubi.<br />backend.session.wait.other.times | 60 | How many times to che
spark.kyuubi.<br />backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
spark.kyuubi.<br />backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
spark.kyuubi.<br />backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|How long the SparkSession instance will be cached after user logout. Using cached SparkSession can significantly cut the startup time for SparkContext, which makes sense for queries that are short lived. The timeout is calculated from when all sessions of the user are disconnected
spark.kyuubi.<br />backend.session.max.cache.time|5d|Max cache time for a SparkSession instance when its original copy has been created. When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may continuously run queries, we need this configuration to stop the cached SparkSession which may end up with token expiry issue in kerberized clusters. When in the interval of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections. But once it fails stop in that region, we will force to stop it
spark.kyuubi.<br />backend.session.local.dir|KYUUBI_HOME/<br />local|Default value to set `spark.local.dir`. For YARN mode, this only affect the Kyuubi server side settings according to the rule of Spark treating `spark.local.dir`.
spark.kyuubi.<br />backend.session.long.cache|${UserGroupInformation.<br />isSecurityEnabled}|Whether to update the tokens of Spark's executor to support long caching SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is loadable. This is used towards kerberized hadoop clusters in case of `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time limit or SparkSession never idles.
spark.kyuubi.<br />backend.session.token.update.class|org.apache.spark.<br />scheduler.cluster.<br />CoarseGrainedClusterMessages$<br />UpdateDelegationTokens|`CoarseGrainedClusterMessages` for token update message from the driver of Spark to executors, it is loadable only by higher version Spark release(2.3 and later)

#### Operation

Expand Down
34 changes: 15 additions & 19 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,24 @@ object KyuubiConf {

val BACKEND_SESSION_IDLE_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.idle.timeout")
.doc("SparkSession timeout")
.doc("How long the SparkSession instance will be cached after user logout. Using cached" +
" SparkSession can significantly cut the startup time for SparkContext, which makes" +
" sense for queries that are short lived. The timeout is calculated from when all" +
" sessions of the user are disconnected")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.MINUTES.toMillis(30L))

val BACKEND_SESSION_MAX_CACHE_TIME: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.max.cache.time")
.doc("Max cache time for a SparkSession instance when its original copy has been created." +
" When `spark.kyuubi.backend.session.idle.timeout` never is reached for user may" +
" continuously run queries, we need this configuration to stop the cached SparkSession" +
" which may end up with token expiry issue in kerberized clusters. When in the interval" +
" of [t, t * 1.25], we will try to stop the SparkSession gracefully util no connections." +
" But once it fails stop in that region, we will force to stop it")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.DAYS.toMillis(5))

val BACKEND_SESSION_LOCAL_DIR: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.local.dir")
.doc("Default value to set `spark.local.dir`, for YARN mode, this only affect the Kyuubi" +
Expand All @@ -293,24 +307,6 @@ object KyuubiConf {
s"${sys.env.getOrElse("KYUUBI_HOME", System.getProperty("java.io.tmpdir"))}"
+ File.separator + "local")

val BACKEND_SESSION_LONG_CACHE: ConfigEntry[Boolean] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.long.cache")
.doc("Whether to update the tokens of Spark's executor to support long caching" +
" SparkSessions iff this is true && `spark.kyuubi.backend.token.update.class` is" +
" loadable. This is used towards kerberized hadoop clusters in case of" +
" `spark.kyuubi.backend.session.idle.timeout` is set longer than token expiration time" +
" limit or SparkSession never idles.")
.booleanConf
.createWithDefault(UserGroupInformation.isSecurityEnabled)

val BACKEND_SESSION_TOKEN_UPDATE_CLASS: ConfigEntry[String] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.token.update.class")
.doc("`CoarseGrainedClusterMessages` for token update message from the driver of Spark to" +
" executors, it is loadable only by higher version Spark release(2.3 and later)")
.stringConf
.createWithDefault(
"org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$UpdateDelegationTokens")

/////////////////////////////////////////////////////////////////////////////////////////////////
// Authentication //
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.hadoop.hive.ql.session.OperationLog
import org.apache.hive.service.cli.thrift.TProtocolVersion
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSQLUtils}
import org.apache.spark.sql.catalyst.catalog.{FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.parser.ParseException
Expand Down Expand Up @@ -376,10 +375,6 @@ class KyuubiOperation(session: KyuubiSession, statement: String) extends Logging
_.onStatementParsed(statementId, result.queryExecution.toString())
}

if (conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))) {
KyuubiSparkExecutorUtils.populateTokens(sparkSession.sparkContext, session.ugi)
}
debug(result.queryExecution.toString())
iter = if (incrementalCollect) {
val numParts = result.rdd.getNumPartitions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package yaooqinn.kyuubi.spark

import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.sql.SparkSession

/**
* A recorder for how many client sessions have been cloned by the original [[SparkSession]], which
* helps the [[SparkSessionCacheManager]] cache and recycle [[SparkSession]] instances.
*
* @param spark the original [[SparkSession]] instances
* @param times times of the original [[SparkSession]] instance has been cloned, start from 1
*/
case class SparkSessionCache private (spark: SparkSession, times: AtomicInteger)

object SparkSessionCache {
def apply(spark: SparkSession): SparkSessionCache =
new SparkSessionCache(spark, new AtomicInteger(1))
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ package yaooqinn.kyuubi.spark
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.security.Credentials
import org.apache.spark.{KyuubiSparkUtil, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.scheduler.cluster.KyuubiSparkExecutorUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import yaooqinn.kyuubi.Logging
import yaooqinn.kyuubi.service.AbstractService
import yaooqinn.kyuubi.ui.KyuubiServerMonitor

/**
* Manager for cached [[SparkSession]]s.
*/
class SparkSessionCacheManager private(name: String) extends AbstractService(name) with Logging {

def this() = this(classOf[SparkSessionCacheManager].getSimpleName)
Expand All @@ -44,62 +44,78 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat(getClass.getSimpleName + "-%d").build())

private val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
private val userToSession = new ConcurrentHashMap[String, SparkSessionCache]
private val userFirstCreate = new ConcurrentHashMap[String, Long]
private val userLatestLogout = new ConcurrentHashMap[String, Long]
private var idleTimeout: Long = _

private val userToCredentials = new ConcurrentHashMap[String, Credentials]
private var needPopulateToken: Boolean = _
private var maxCacheTime: Long = _

private val sessionCleaner = new Runnable {
override def run(): Unit = {
userToSession.asScala.foreach {
case (user, (session, _)) if session.sparkContext.isStopped =>
case (user, ssc) if ssc.spark.sparkContext.isStopped =>
warn(s"SparkSession for $user might already be stopped outside Kyuubi," +
s" cleaning it..")
removeSparkSession(user)
case (user, (session, times)) if times.get() > 0 || !userLatestLogout.containsKey(user) =>
debug(s"There are $times active connection(s) bound to the SparkSession instance" +
s" of $user ")
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
case (user, (session, _))
if userLatestLogout.get(user) + idleTimeout <= System.currentTimeMillis() =>
case (user, ssc) if ssc.times.get > 0 || !userLatestLogout.containsKey(user) =>
debug(s"There are ${ssc.times.get} active connection(s) bound to the SparkSession" +
s" instance of $user")
case (user, _) if now - userLatestLogout.get(user) >= idleTimeout =>
info(s"Stopping idle SparkSession for user [$user].")
removeSparkSession(user)
session.stop()
System.setProperty("SPARK_YARN_MODE", "true")
case (user, (session, _)) =>
if (needPopulateToken) {
val credentials = userToCredentials.getOrDefault(user, new Credentials)
KyuubiSparkExecutorUtils.populateTokens(session.sparkContext, credentials)
}
removeAndStopSparkSession(user)
case (user, _) if isSessionCleanable(user) =>
info(s"Stopping expired SparkSession for user [$user].")
removeAndStopSparkSession(user)
case _ =>
}
}
}

private def removeSparkSession(user: String): Unit = {
private def removeSparkSession(user: String): SparkSessionCache = {
Option(userLatestLogout.remove(user)) match {
case Some(t) => info(s"User [$user] last time logout at " +
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(t)))
case _ =>
}
userToSession.remove(user)
KyuubiServerMonitor.detachUITab(user)
userToSession.remove(user)
}

private def removeAndStopSparkSession(user: String): Unit = {
val cache = removeSparkSession(user)
cache.spark.stop()
System.setProperty("SPARK_YARN_MODE", "true")
}

/**
* If the last time is between [maxCacheTime, maxCacheTime * 1.25], we will try to stop this
* SparkSession only when all connection are disconnected.
* If the last time is above maxCacheTime * 1.25, we will stop this SparkSession whether it has
* connections linked or jobs running with.
*
*/
private def isSessionCleanable(user: String): Boolean = {
val ct = userFirstCreate.get(user)
(now - ct >= maxCacheTime && {
val cache = userToSession.get(user)
cache != null && cache.times.get <= 0
}) || (now - ct > maxCacheTime * 1.25)
}

private def now: Long = System.currentTimeMillis()

def set(user: String, sparkSession: SparkSession): Unit = {
userToSession.put(user, (sparkSession, new AtomicInteger(1)))
val sessionCache = SparkSessionCache(sparkSession)
userToSession.put(user, sessionCache)
userFirstCreate.put(user, now)
}

def getAndIncrease(user: String): Option[SparkSession] = {
Option(userToSession.get(user)) match {
case Some((ss, times)) if !ss.sparkContext.isStopped =>
val currentTime = times.incrementAndGet()
case Some(ssc) if !ssc.spark.sparkContext.isStopped =>
val currentTime = ssc.times.incrementAndGet()
info(s"SparkSession for [$user] is reused for $currentTime time(s) after + 1")
Some(ss)
Some(ssc.spark)
case _ =>
info(s"SparkSession for [$user] isn't cached, will create a new one.")
None
Expand All @@ -108,31 +124,26 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam

def decrease(user: String): Unit = {
Option(userToSession.get(user)) match {
case Some((ss, times)) if !ss.sparkContext.isStopped =>
userLatestLogout.put(user, System.currentTimeMillis())
val currentTime = times.decrementAndGet()
case Some(ssc) if !ssc.spark.sparkContext.isStopped =>
userLatestLogout.put(user, now)
val currentTime = ssc.times.decrementAndGet()
info(s"SparkSession for [$user] is reused for $currentTime time(s) after - 1")
case _ =>
warn(s"SparkSession for [$user] was not found in the cache.")
}
}

def setupCredentials(user: String, creds: Credentials): Unit = {
userToCredentials.put(user, creds)
}

override def init(conf: SparkConf): Unit = {
idleTimeout = math.max(conf.getTimeAsMs(BACKEND_SESSION_IDLE_TIMEOUT.key), 60 * 1000)
needPopulateToken = conf.get(BACKEND_SESSION_LONG_CACHE).toBoolean &&
KyuubiSparkUtil.classIsLoadable(conf.get(BACKEND_SESSION_TOKEN_UPDATE_CLASS))
idleTimeout = math.max(conf.getTimeAsMs(BACKEND_SESSION_IDLE_TIMEOUT), 60 * 1000)
maxCacheTime = math.max(conf.getTimeAsMs(BACKEND_SESSION_MAX_CACHE_TIME), 0)
super.init(conf)
}

/**
* Periodically close idle SparkSessions in 'spark.kyuubi.session.clean.interval(default 1min)'
*/
override def start(): Unit = {
val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL.key), 1)
val interval = math.max(conf.getTimeAsSeconds(BACKEND_SESSION_CHECK_INTERVAL), 1)
info(s"Scheduling SparkSession cache cleaning every $interval seconds")
cacheManager.scheduleAtFixedRate(sessionCleaner, interval, interval, TimeUnit.SECONDS)
super.start()
Expand All @@ -141,7 +152,7 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
override def stop(): Unit = {
info("Stopping SparkSession Cache Manager")
cacheManager.shutdown()
userToSession.asScala.values.foreach { kv => kv._1.stop() }
userToSession.asScala.values.foreach(_.spark.stop())
userToSession.clear()
super.stop()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ class SparkSessionWithUGI(
def init(sessionConf: Map[String, String]): Unit = {
getOrCreate(sessionConf)

cache.setupCredentials(userName, user.getCredentials)

try {
initialDatabase.foreach { db =>
KyuubiHadoopUtil.doAs(user) {
Expand Down
Loading

0 comments on commit a4a1c69

Please sign in to comment.