Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ Key | Default | Meaning | Type | Since
kyuubi.session.check.interval|PT5M|The check interval for session timeout.|duration|1.0.0
kyuubi.session.conf.advisor|<undefined>|A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different user or session configs and overwrite the session configs before open a new session. This config value should be a class which is a child of 'org.apache.kyuubi.plugin.SessionConfAdvisor' which has zero-arg constructor.|string|1.5.0
kyuubi.session.conf.ignore.list||A comma separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.|seq|1.2.0
kyuubi.session.conf.profile|&lt;undefined&gt;|Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf`. This configuration will be ignored if the file does not exist. This configuration only has effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`.|string|1.7.0
kyuubi.session.conf.restrict.list||A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.|seq|1.2.0
kyuubi.session.engine.alive.probe.enabled|false|Whether to enable the engine alive probe, it true, we will create a companion thrift client that sends simple request to check whether the engine is keep alive.|boolean|1.6.0
kyuubi.session.engine.alive.probe.interval|PT10S|The interval for engine alive probe.|duration|1.6.0
Expand Down
8 changes: 6 additions & 2 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ object Utils extends Logging {
}

def getDefaultPropertiesFile(env: Map[String, String] = sys.env): Option[File] = {
getPropertiesFile(KYUUBI_CONF_FILE_NAME, env)
}

def getPropertiesFile(fileName: String, env: Map[String, String] = sys.env): Option[File] = {
env.get(KYUUBI_CONF_DIR)
.orElse(env.get(KYUUBI_HOME).map(_ + File.separator + "conf"))
.map(d => new File(d + File.separator + KYUUBI_CONF_FILE_NAME))
.map(d => new File(d + File.separator + fileName))
.filter(_.exists())
.orElse {
Option(Utils.getContextOrKyuubiClassLoader.getResource(KYUUBI_CONF_FILE_NAME)).map { url =>
Option(Utils.getContextOrKyuubiClassLoader.getResource(fileName)).map { url =>
new File(url.getFile)
}.filter(_.exists())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,17 @@ object KyuubiConf {
.stringConf
.createOptional

val SESSION_CONF_PROFILE: OptionalConfigEntry[String] =
buildConf("kyuubi.session.conf.profile")
.doc("Specify a profile to load session-level configurations from " +
"`$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf`. " +
"This configuration will be ignored if the file does not exist. " +
"This configuration only has effect when `kyuubi.session.conf.advisor` " +
"is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`.")
.version("1.7.0")
.stringConf
.createOptional

val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime")
.doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.flink

import java.io.{File, FilenameFilter}
import java.nio.file.{Files, Paths}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -71,7 +70,7 @@ class FlinkProcessBuilder(
}

buffer += "-cp"
val classpathEntries = new util.LinkedHashSet[String]
val classpathEntries = new java.util.LinkedHashSet[String]
// flink engine runtime jar
mainResource.foreach(classpathEntries.add)
// flink sql client jar
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.session

import java.util.{Map => JMap}
import java.util.Collections
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.plugin.SessionConfAdvisor
import org.apache.kyuubi.session.FileSessionConfAdvisor.sessionConfCache

class FileSessionConfAdvisor extends SessionConfAdvisor {
override def getConfOverlay(
user: String,
sessionConf: JMap[String, String]): JMap[String, String] = {
val profile: String = sessionConf.get(KyuubiConf.SESSION_CONF_PROFILE.key)
profile match {
case null => Collections.emptyMap()
case _ =>
sessionConfCache.get(profile)
}
}
}

object FileSessionConfAdvisor extends Logging {
private val sessionConfCache: LoadingCache[String, JMap[String, String]] =
CacheBuilder.newBuilder()
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(new CacheLoader[String, JMap[String, String]] {
override def load(profile: String): JMap[String, String] = {
val propsFile = Utils.getPropertiesFile(s"kyuubi-session-$profile.conf")
propsFile match {
case None =>
error("File not found: $KYUUBI_CONF_DIR/" + s"kyuubi-session-$profile.conf")
Collections.emptyMap()
case Some(_) =>
Utils.getPropertiesFromFile(propsFile).asJava
}
}
})
}
23 changes: 23 additions & 0 deletions kyuubi-server/src/test/resources/kyuubi-session-cluster-a.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

kyuubi.ha.namespace kyuubi-ns-a
kyuubi.engine.type SPARK_SQL
kyuubi.engine.pool.balance.policy POLLING

kyuubi.engineEnv.SPARK_HOME /opt/spark30
kyuubi.engineEnv.HADOOP_CONF_DIR /opt/hadoop_conf_dir
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kyuubi.engine.flink

import java.io.File
import java.util

import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
Expand Down Expand Up @@ -61,7 +60,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
}

private def constructClasspathStr(builder: FlinkProcessBuilder) = {
val classpathEntries = new util.LinkedHashSet[String]
val classpathEntries = new java.util.LinkedHashSet[String]
builder.mainResource.foreach(classpathEntries.add)

val flinkHome = builder.flinkHome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.kyuubi.plugin

import scala.collection.JavaConverters._

import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.session.FileSessionConfAdvisor

class PluginLoaderSuite extends KyuubiFunSuite {

Expand All @@ -37,6 +40,30 @@ class PluginLoaderSuite extends KyuubiFunSuite {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
assert(msg2.startsWith("Error while instantiating 'non.exists'"))

}

test("test FileSessionConfAdvisor") {
val conf = new KyuubiConf(false)
conf.set(KyuubiConf.SESSION_CONF_ADVISOR, classOf[FileSessionConfAdvisor].getName)
val advisor = PluginLoader.loadSessionConfAdvisor(conf)
val emptyConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
assert(emptyConfig.isEmpty)

conf.set(KyuubiConf.SESSION_CONF_PROFILE, "non.exists")
val nonexistsConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
assert(nonexistsConfig.isEmpty)

conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a")
val clusteraConf = advisor.getConfOverlay("chris", conf.getAll.asJava)
assert(clusteraConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusteraConf.get("kyuubi.zk.ha.namespace") == null)
assert(clusteraConf.size() == 5)

val clusteraConfFromCache = advisor.getConfOverlay("chris", conf.getAll.asJava)
assert(clusteraConfFromCache.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusteraConfFromCache.get("kyuubi.zk.ha.namespace") == null)
assert(clusteraConfFromCache.size() == 5)
}
}

Expand Down