Skip to content

Commit

Permalink
[KYUUBI #5423] Support chaining SessionConfAdvisors
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

close #5423

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

NO

Closes #5485 from lsm1/branch-5423.

Closes #5423

0bd3b02 [senmiaoliu] mention in the migration guide
dddafeb [senmiaoliu] support multi session conf advisor

Authored-by: senmiaoliu <senmiaoliu@trip.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
  • Loading branch information
lsm1 authored and ulysses-you committed Oct 30, 2023
1 parent 67b61df commit c24e984
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 29 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
|------------------------------------------------------|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
| kyuubi.session.check.interval | PT5M | The check interval for session timeout. | duration | 1.0.0 |
| kyuubi.session.close.on.disconnect | true | Session will be closed when client disconnects from kyuubi gateway. Set this to false to have session outlive its parent connection. | boolean | 1.8.0 |
| kyuubi.session.conf.advisor | &lt;undefined&gt; | A config advisor plugin for Kyuubi Server. This plugin can provide some custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | string | 1.5.0 |
| kyuubi.session.conf.advisor | &lt;undefined&gt; | A config advisor plugin for Kyuubi Server. This plugin can provide a list of custom configs for different users or session configs and overwrite the session configs before opening a new session. This config value should be a subclass of `org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor. | seq | 1.5.0 |
| kyuubi.session.conf.file.reload.interval | PT10M | When `FileSessionConfAdvisor` is used, this configuration defines the expired time of `$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf` in the cache. After exceeding this value, the file will be reloaded. | duration | 1.7.0 |
| kyuubi.session.conf.ignore.list || A comma-separated list of ignored keys. If the client connection contains any of them, the key and the corresponding value will be removed silently during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax. | set | 1.2.0 |
| kyuubi.session.conf.profile | &lt;undefined&gt; | Specify a profile to load session-level configurations from `$KYUUBI_CONF_DIR/kyuubi-session-<profile>.conf`. This configuration will be ignored if the file does not exist. This configuration only takes effect when `kyuubi.session.conf.advisor` is set as `org.apache.kyuubi.session.FileSessionConfAdvisor`. | string | 1.7.0 |
Expand Down
4 changes: 4 additions & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

# Kyuubi Migration Guide

## Upgrading from Kyuubi 1.8 to 1.9

* Since Kyuubi 1.9.0, `kyuubi.session.conf.advisor` can be set as a sequence, Kyuubi supported chaining SessionConfAdvisors.

## Upgrading from Kyuubi 1.7 to 1.8

* Since Kyuubi 1.8, SQLite is added and becomes the default database type of Kyuubi metastore, as Derby has been deprecated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2468,14 +2468,15 @@ object KyuubiConf {
.checkValues(OperationLanguages)
.createWithDefault(OperationLanguages.SQL.toString)

val SESSION_CONF_ADVISOR: OptionalConfigEntry[String] =
val SESSION_CONF_ADVISOR: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.session.conf.advisor")
.doc("A config advisor plugin for Kyuubi Server. This plugin can provide some custom " +
.doc("A config advisor plugin for Kyuubi Server. This plugin can provide a list of custom " +
"configs for different users or session configs and overwrite the session configs before " +
"opening a new session. This config value should be a subclass of " +
"`org.apache.kyuubi.plugin.SessionConfAdvisor` which has a zero-arg constructor.")
.version("1.5.0")
.stringConf
.toSequence()
.createOptional

val GROUP_PROVIDER: ConfigEntry[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ import org.apache.kyuubi.util.reflect.DynConstructors

private[kyuubi] object PluginLoader {

def loadSessionConfAdvisor(conf: KyuubiConf): SessionConfAdvisor = {
def loadSessionConfAdvisor(conf: KyuubiConf): Seq[SessionConfAdvisor] = {
val advisorClass = conf.get(KyuubiConf.SESSION_CONF_ADVISOR)
if (advisorClass.isEmpty) {
return new DefaultSessionConfAdvisor()
return new DefaultSessionConfAdvisor() :: Nil
}

try {
DynConstructors.builder.impl(advisorClass.get).buildChecked[SessionConfAdvisor].newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(
s"Class ${advisorClass.get} is not a child of '${classOf[SessionConfAdvisor].getName}'.")
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '${advisorClass.get}': ", e)
advisorClass.get.map { advisorClassName =>
try {
DynConstructors.builder.impl(advisorClassName)
.buildChecked[SessionConfAdvisor].newInstance()
} catch {
case _: ClassCastException =>
throw new KyuubiException(
s"Class $advisorClassName is not a child of '${classOf[SessionConfAdvisor].getName}'.")
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$advisorClassName': ", e)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ class KyuubiBatchSession(
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf)

val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
normalizedConf.asJava)
normalizedConf.asJava).asScala).reduce(_ ++ _)
if (confOverlay != null) {
val overlayConf = new KyuubiConf(false)
confOverlay.asScala.foreach { case (k, v) => overlayConf.set(k, v) }
confOverlay.foreach { case (k, v) => overlayConf.set(k, v) }
normalizedConf ++ overlayConf.getBatchConf(batchType)
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class KyuubiSessionImpl(
override val sessionType: SessionType = SessionType.INTERACTIVE

private[kyuubi] val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
val confOverlay = sessionManager.sessionConfAdvisor.map(_.getConfOverlay(
user,
normalizedConf.asJava)
normalizedConf.asJava).asScala).reduce(_ ++ _)
if (confOverlay != null) {
normalizedConf ++ confOverlay.asScala
normalizedConf ++ confOverlay
} else {
warn(s"the server plugin return null value for user: $user, ignore it")
normalizedConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
if (conf.isRESTEnabled) Some(new MetadataManager()) else None

// lazy is required for plugins since the conf is null when this class initialization
lazy val sessionConfAdvisor: SessionConfAdvisor = PluginLoader.loadSessionConfAdvisor(conf)
lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] = PluginLoader.loadSessionConfAdvisor(conf)
lazy val groupProvider: GroupProvider = PluginLoader.loadGroupProvider(conf)

private var limiter: Option[SessionLimiter] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ class PluginLoaderSuite extends KyuubiFunSuite {

test("SessionConfAdvisor - wrong class") {
val conf = new KyuubiConf(false)
assert(PluginLoader.loadSessionConfAdvisor(conf).isInstanceOf[DefaultSessionConfAdvisor])
assert(PluginLoader.loadSessionConfAdvisor(conf).head.isInstanceOf[DefaultSessionConfAdvisor])

conf.set(KyuubiConf.SESSION_CONF_ADVISOR, classOf[InvalidSessionConfAdvisor].getName)
conf.set(KyuubiConf.SESSION_CONF_ADVISOR, Seq(classOf[InvalidSessionConfAdvisor].getName))
val msg1 = intercept[KyuubiException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
assert(msg1.contains(s"is not a child of '${classOf[SessionConfAdvisor].getName}'"))

conf.set(KyuubiConf.SESSION_CONF_ADVISOR, "non.exists")
conf.set(KyuubiConf.SESSION_CONF_ADVISOR, Seq("non.exists"))
val msg2 = intercept[IllegalArgumentException] {
PluginLoader.loadSessionConfAdvisor(conf)
}.getMessage
Expand All @@ -44,27 +44,46 @@ class PluginLoaderSuite extends KyuubiFunSuite {

test("FileSessionConfAdvisor") {
val conf = new KyuubiConf(false)
conf.set(KyuubiConf.SESSION_CONF_ADVISOR, classOf[FileSessionConfAdvisor].getName)
conf.set(KyuubiConf.SESSION_CONF_ADVISOR, Seq(classOf[FileSessionConfAdvisor].getName))
val advisor = PluginLoader.loadSessionConfAdvisor(conf)
val emptyConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
val emptyConfig =
advisor.map(_.getConfOverlay("chris", conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(emptyConfig.isEmpty)

conf.set(KyuubiConf.SESSION_CONF_PROFILE, "non.exists")
val nonExistsConfig = advisor.getConfOverlay("chris", conf.getAll.asJava)
val nonExistsConfig =
advisor.map(_.getConfOverlay("chris", conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(nonExistsConfig.isEmpty)

conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a")
val clusterAConf = advisor.getConfOverlay("chris", conf.getAll.asJava)
val clusterAConf =
advisor.map(_.getConfOverlay("chris", conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(clusterAConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusterAConf.get("kyuubi.zk.ha.namespace") == null)
assert(clusterAConf.size() == 5)

val clusterAConfFromCache = advisor.getConfOverlay("chris", conf.getAll.asJava)
val clusterAConfFromCache =
advisor.map(_.getConfOverlay("chris", conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(clusterAConfFromCache.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusterAConfFromCache.get("kyuubi.zk.ha.namespace") == null)
assert(clusterAConfFromCache.size() == 5)
}

test("SessionConfAdvisor - multi class") {
val conf = new KyuubiConf(false)
conf.set(
KyuubiConf.SESSION_CONF_ADVISOR,
Seq(classOf[FileSessionConfAdvisor].getName, classOf[TestSessionConfAdvisor].getName))
val advisor = PluginLoader.loadSessionConfAdvisor(conf)
conf.set(KyuubiConf.SESSION_CONF_PROFILE, "cluster-a")
val clusterAConf =
advisor.map(_.getConfOverlay("chris", conf.getAll.asJava).asScala).reduce(_ ++ _).asJava
assert(clusterAConf.get("kyuubi.ha.namespace") == "kyuubi-ns-a")
assert(clusterAConf.get("kyuubi.zk.ha.namespace") == null)
assert(clusterAConf.get("spark.k3") == "v3")
assert(clusterAConf.size() == 7)
}

test("GroupProvider - wrong class") {
val conf = new KyuubiConf(false)
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")
Expand Down Expand Up @@ -99,3 +118,11 @@ class PluginLoaderSuite extends KyuubiFunSuite {

class InvalidSessionConfAdvisor
class InvalidGroupProvider

class TestSessionConfAdvisor extends SessionConfAdvisor {
override def getConfOverlay(
user: String,
sessionConf: java.util.Map[String, String]): java.util.Map[String, String] = {
Map("spark.k3" -> "v3", "spark.k4" -> "v4").asJava
}
}

0 comments on commit c24e984

Please sign in to comment.