diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index 51500f229a6..046aaf8f918 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -462,6 +462,7 @@ Key | Default | Meaning | Type | Since kyuubi.session.check.interval|PT5M|The check interval for session timeout.|duration|1.0.0 kyuubi.session.conf.advisor|<undefined>|A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different user or session configs and overwrite the session configs before open a new session. This config value should be a class which is a child of 'org.apache.kyuubi.plugin.SessionConfAdvisor' which has zero-arg constructor.|string|1.5.0 kyuubi.session.conf.ignore.list||A comma separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.|seq|1.2.0 +kyuubi.session.conf.profile|<undefined>|Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-.conf`. This configuration will be ignored if the file does not exist. This configuration only has effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`.|string|1.7.0 kyuubi.session.conf.restrict.list||A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.|seq|1.2.0 kyuubi.session.engine.alive.probe.enabled|false|Whether to enable the engine alive probe, it true, we will create a companion thrift client that sends simple request to check whether the engine is keep alive.|boolean|1.6.0 kyuubi.session.engine.alive.probe.interval|PT10S|The interval for engine alive probe.|duration|1.6.0 diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 19b84d7d6cc..33a4e116e95 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -50,12 +50,16 @@ object Utils extends Logging { } def getDefaultPropertiesFile(env: Map[String, String] = sys.env): Option[File] = { + getPropertiesFile(KYUUBI_CONF_FILE_NAME, env) + } + + def getPropertiesFile(fileName: String, env: Map[String, String] = sys.env): Option[File] = { env.get(KYUUBI_CONF_DIR) .orElse(env.get(KYUUBI_HOME).map(_ + File.separator + "conf")) - .map(d => new File(d + File.separator + KYUUBI_CONF_FILE_NAME)) + .map(d => new File(d + File.separator + fileName)) .filter(_.exists()) .orElse { - Option(Utils.getContextOrKyuubiClassLoader.getResource(KYUUBI_CONF_FILE_NAME)).map { url => + Option(Utils.getContextOrKyuubiClassLoader.getResource(fileName)).map { url => new File(url.getFile) }.filter(_.exists()) } diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index f0d1534c133..eba958ad30c 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -993,6 +993,17 @@ object KyuubiConf { .stringConf .createOptional + val SESSION_CONF_PROFILE: OptionalConfigEntry[String] = + buildConf("kyuubi.session.conf.profile") + .doc("Specify a profile to load session-level configurations from " + + "`$KYUUBI_CONF_DIR/kyuubi-session-.conf`. " + + "This configuration will be ignored if the file does not exist. " + + "This configuration only has effect when `kyuubi.session.conf.advisor` " + + "is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`.") + .version("1.7.0") + .stringConf + .createOptional + val ENGINE_SPARK_MAX_LIFETIME: ConfigEntry[Long] = buildConf("kyuubi.session.engine.spark.max.lifetime") .doc("Max lifetime for spark engine, the engine will self-terminate when it reaches the" + diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index d30397039bb..b8146c4d2b6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -19,7 +19,6 @@ package org.apache.kyuubi.engine.flink import java.io.{File, FilenameFilter} import java.nio.file.{Files, Paths} -import java.util import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -71,7 +70,7 @@ class FlinkProcessBuilder( } buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new java.util.LinkedHashSet[String] // flink engine runtime jar mainResource.foreach(classpathEntries.add) // flink sql client jar diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/FileSessionConfAdvisor.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/FileSessionConfAdvisor.scala new file mode 100644 index 00000000000..a006753afc4 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/FileSessionConfAdvisor.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.session + +import java.util.{Map => JMap} +import java.util.Collections +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} + +import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.plugin.SessionConfAdvisor +import org.apache.kyuubi.session.FileSessionConfAdvisor.sessionConfCache + +class FileSessionConfAdvisor extends SessionConfAdvisor { + override def getConfOverlay( + user: String, + sessionConf: JMap[String, String]): JMap[String, String] = { + val profile: String = sessionConf.get(KyuubiConf.SESSION_CONF_PROFILE.key) + profile match { + case null => Collections.emptyMap() + case _ => + sessionConfCache.get(profile) + } + } +} + +object FileSessionConfAdvisor extends Logging { + private val sessionConfCache: LoadingCache[String, JMap[String, String]] = + CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(new CacheLoader[String, JMap[String, String]] { + override def load(profile: String): JMap[String, String] = { + val propsFile = Utils.getPropertiesFile(s"kyuubi-session-$profile.conf") + propsFile match { + case None => + error("File not found: $KYUUBI_CONF_DIR/" + s"kyuubi-session-$profile.conf") + Collections.emptyMap() + case Some(_) => + Utils.getPropertiesFromFile(propsFile).asJava + } + } + }) +} diff --git a/kyuubi-server/src/test/resources/kyuubi-session-cluster-a.conf b/kyuubi-server/src/test/resources/kyuubi-session-cluster-a.conf new file mode 100644 index 00000000000..8bd8abc7800 --- /dev/null +++ b/kyuubi-server/src/test/resources/kyuubi-session-cluster-a.conf @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +kyuubi.ha.namespace kyuubi-ns-a +kyuubi.engine.type SPARK_SQL +kyuubi.engine.pool.balance.policy POLLING + +kyuubi.engineEnv.SPARK_HOME /opt/spark30 +kyuubi.engineEnv.HADOOP_CONF_DIR /opt/hadoop_conf_dir diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 6c6ad7070fe..7ee38d4ef99 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi.engine.flink import java.io.File -import java.util import scala.collection.JavaConverters._ import scala.collection.immutable.ListMap @@ -61,7 +60,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { } private def constructClasspathStr(builder: FlinkProcessBuilder) = { - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new java.util.LinkedHashSet[String] builder.mainResource.foreach(classpathEntries.add) val flinkHome = builder.flinkHome diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala index 7006ba1cbf4..6edac374a03 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/plugin/PluginLoaderSuite.scala @@ -17,8 +17,11 @@ package org.apache.kyuubi.plugin +import scala.collection.JavaConverters._ + import org.apache.kyuubi.{KyuubiException, KyuubiFunSuite} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.session.FileSessionConfAdvisor class PluginLoaderSuite extends KyuubiFunSuite { @@ -37,6 +40,30 @@ class PluginLoaderSuite extends KyuubiFunSuite { PluginLoader.loadSessionConfAdvisor(conf) }.getMessage assert(msg2.startsWith("Error while instantiating 'non.exists'")) + + } + + test("test FileSessionConfAdvisor") { + val conf = new KyuubiConf(false) + conf.set(KyuubiConf.SESSION_CONF_ADVISOR, classOf[FileSessionConfAdvisor].getName) + val advisor = PluginLoader.loadSessionConfAdvisor(conf) + val emptyConfig = advisor.getConfOverlay("chris", conf.getAll.asJava) + assert(emptyConfig.isEmpty) + + conf.set(KyuubiConf.SESSION_CONF_PROFILE, "non.exists") + val nonexistsConfig = advisor.getConfOverlay("chris", conf.getAll.asJava) + assert(nonexistsConfig.isEmpty) + + conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a") + val clusteraConf = advisor.getConfOverlay("chris", conf.getAll.asJava) + assert(clusteraConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a") + assert(clusteraConf.get("kyuubi.zk.ha.namespace") == null) + assert(clusteraConf.size() == 5) + + val clusteraConfFromCache = advisor.getConfOverlay("chris", conf.getAll.asJava) + assert(clusteraConfFromCache.get("kyuubi.ha.namespace") == "kyuubi-ns-a") + assert(clusteraConfFromCache.get("kyuubi.zk.ha.namespace") == null) + assert(clusteraConfFromCache.size() == 5) } }