diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index c08ac5c549c..cbb28f00e02 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -147,6 +147,13 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { } } + /** + * Retrieve user defaults configs in key-value pairs from [[KyuubiConf]] with key prefix "___" + */ + def getAllUserDefaults: Map[String, String] = { + getAll.filter { case (k, _) => k.startsWith(USER_DEFAULTS_CONF_QUOTE) } + } + /** Copy this object */ override def clone: KyuubiConf = { val cloned = KyuubiConf(false) @@ -159,11 +166,12 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging { def getUserDefaults(user: String): KyuubiConf = { val cloned = KyuubiConf(false) - for (e <- settings.entrySet().asScala if !e.getKey.startsWith("___")) { + for (e <- settings.entrySet().asScala if !e.getKey.startsWith(USER_DEFAULTS_CONF_QUOTE)) { cloned.set(e.getKey, e.getValue) } - for ((k, v) <- getAllWithPrefix(s"___${user}___", "")) { + for ((k, v) <- + getAllWithPrefix(s"$USER_DEFAULTS_CONF_QUOTE${user}$USER_DEFAULTS_CONF_QUOTE", "")) { cloned.set(k, v) } serverOnlyConfEntries.foreach(cloned.unset) @@ -198,6 +206,7 @@ object KyuubiConf { final val KYUUBI_HOME = "KYUUBI_HOME" final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv" final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf" + final val USER_DEFAULTS_CONF_QUOTE = "___" private[this] val kyuubiConfEntriesUpdateLock = new Object diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala index 80d673327bd..b658c0e45e6 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/refresh/RefreshConfigCommand.scala @@ -21,6 +21,7 @@ import org.apache.kyuubi.KyuubiException import org.apache.kyuubi.client.AdminRestApi import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient import org.apache.kyuubi.ctl.cmd.AdminCtlCommand +import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, USER_DEFAULTS_CONF} import org.apache.kyuubi.ctl.opt.CliConfig import org.apache.kyuubi.ctl.util.{Tabulator, Validator} @@ -33,7 +34,8 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String] withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient => val adminRestApi = new AdminRestApi(kyuubiRestClient) normalizedCliConfig.adminConfigOpts.configType match { - case "hadoopConf" => adminRestApi.refreshHadoopConf() + case HADOOP_CONF => adminRestApi.refreshHadoopConf() + case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf() case configType => throw new KyuubiException(s"Invalid config type:$configType") } } @@ -43,3 +45,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String] info(Tabulator.format("", Array("Response"), Array(Array(resp)))) } } +object RefreshConfigCommandConfigType { + final val HADOOP_CONF = "hadoopConf" + final val USER_DEFAULTS_CONF = "userDefaultsConf" +} diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala index 524f2954e0e..59ad7f5fc4c 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala @@ -20,6 +20,7 @@ package org.apache.kyuubi.ctl.opt import scopt.{OParser, OParserBuilder} import org.apache.kyuubi.KYUUBI_VERSION +import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType._ object AdminCommandLine extends CommonCommandLine { @@ -100,6 +101,7 @@ object AdminCommandLine extends CommonCommandLine { arg[String]("") .optional() .action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v))) - .text("The valid config type can be one of the following: hadoopConf.")) + .text("The valid config type can be one of the following: " + + s"$HADOOP_CONF, $USER_DEFAULTS_CONF.")) } } diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala index 03b606d3418..afb946e9285 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.ctl import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite} import org.apache.kyuubi.ctl.cli.AdminControlCliArguments +import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType._ import org.apache.kyuubi.ctl.opt.{ControlAction, ControlObject} class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit { @@ -64,6 +65,15 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi assert(opArgs.cliConfig.resource === ControlObject.CONFIG) assert(opArgs.cliConfig.adminConfigOpts.configType === "hadoopConf") + args = Array( + "refresh", + "config", + "userDefaultsConf") + val opArgs2 = new AdminControlCliArguments(args) + assert(opArgs2.cliConfig.action === ControlAction.REFRESH) + assert(opArgs2.cliConfig.resource === ControlObject.CONFIG) + assert(opArgs2.cliConfig.adminConfigOpts.configType === "userDefaultsConf") + args = Array( "refresh", "config", @@ -137,7 +147,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi | Refresh the resource. |Command: refresh config [] | Refresh the config with specified type. - | The valid config type can be one of the following: hadoopConf. + | The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF. | | -h, --help Show help message and exit.""".stripMargin // scalastyle:on diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index 7d7c341ab5c..da9782df5b1 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -39,6 +39,11 @@ public String refreshHadoopConf() { return this.getClient().post(path, null, client.getAuthHeader()); } + public String refreshUserDefaultsConf() { + String path = String.format("%s/%s", API_BASE_PATH, "refresh/user_defaults_conf"); + return this.getClient().post(path, null, client.getAuthHeader()); + } + public String deleteEngine( String engineType, String shareLevel, String subdomain, String hs2ProxyUser) { Map params = new HashMap<>(); diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala index 09f4d5bff82..731ad5df629 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala @@ -19,6 +19,7 @@ package org.apache.kyuubi.server import scala.util.Properties +import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.UserGroupInformation @@ -105,6 +106,28 @@ object KyuubiServer extends Logging { val _hadoopConf = KyuubiHadoopUtils.newHadoopConf(new KyuubiConf().loadFileDefaults()) hadoopConf = _hadoopConf } + + private[kyuubi] def refreshUserDefaultsConf(): Unit = kyuubiServer.conf.synchronized { + val existedUserDefaults = kyuubiServer.conf.getAllUserDefaults + val refreshedUserDefaults = KyuubiConf().loadFileDefaults().getAllUserDefaults + var (unsetCount, updatedCount, addedCount) = (0, 0, 0) + for ((k, _) <- existedUserDefaults if !refreshedUserDefaults.contains(k)) { + kyuubiServer.conf.unset(k) + unsetCount = unsetCount + 1 + } + for ((k, v) <- refreshedUserDefaults) { + if (existedUserDefaults.contains(k)) { + if (!StringUtils.equals(existedUserDefaults.get(k).orNull, v)) { + updatedCount = updatedCount + 1 + } + } else { + addedCount = addedCount + 1 + } + kyuubiServer.conf.set(k, v) + } + info(s"Refreshed user defaults configs with changes of " + + s"unset: $unsetCount, updated: $updatedCount, added: $addedCount") + } } class KyuubiServer(name: String) extends Serverable(name) { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 16653be32f4..a92992e66f4 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -64,6 +64,26 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { Response.ok(s"Refresh the hadoop conf for ${fe.connectionUrl} successfully.").build() } + @ApiResponse( + responseCode = "200", + content = Array(new Content( + mediaType = MediaType.APPLICATION_JSON)), + description = "refresh the user defaults configs") + @POST + @Path("refresh/user_defaults_conf") + def refreshUserDefaultsConf(): Response = { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Receive refresh user defaults conf request from $userName/$ipAddress") + if (!userName.equals(administrator)) { + throw new NotAllowedException( + s"$userName is not allowed to refresh the user defaults conf") + } + info(s"Reloading user defaults conf") + KyuubiServer.refreshUserDefaultsConf() + Response.ok(s"Refresh the user defaults conf successfully.").build() + } + @ApiResponse( responseCode = "200", content = Array(new Content( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index 7a31b8c24b2..bcbdad2cebe 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -66,6 +66,24 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(200 == response.getStatus) } + test("refresh user defaults config of the kyuubi server") { + var response = webTarget.path("api/v1/admin/refresh/user_defaults_conf") + .request() + .post(null) + assert(405 == response.getStatus) + + val adminUser = Utils.currentUser + val encodeAuthorization = new String( + Base64.getEncoder.encode( + s"$adminUser:".getBytes()), + "UTF-8") + response = webTarget.path("api/v1/admin/refresh/user_defaults_conf") + .request() + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .post(null) + assert(200 == response.getStatus) + } + test("delete engine - user share level") { val id = UUID.randomUUID().toString conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)