Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
}
}

/**
* Retrieve user defaults configs in key-value pairs from [[KyuubiConf]] with key prefix "___"
*/
def getAllUserDefaults: Map[String, String] = {
getAll.filter { case (k, _) => k.startsWith(USER_DEFAULTS_CONF_QUOTE) }
}

/** Copy this object */
override def clone: KyuubiConf = {
val cloned = KyuubiConf(false)
Expand All @@ -159,11 +166,12 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
def getUserDefaults(user: String): KyuubiConf = {
val cloned = KyuubiConf(false)

for (e <- settings.entrySet().asScala if !e.getKey.startsWith("___")) {
for (e <- settings.entrySet().asScala if !e.getKey.startsWith(USER_DEFAULTS_CONF_QUOTE)) {
cloned.set(e.getKey, e.getValue)
}

for ((k, v) <- getAllWithPrefix(s"___${user}___", "")) {
for ((k, v) <-
getAllWithPrefix(s"$USER_DEFAULTS_CONF_QUOTE${user}$USER_DEFAULTS_CONF_QUOTE", "")) {
cloned.set(k, v)
}
serverOnlyConfEntries.foreach(cloned.unset)
Expand Down Expand Up @@ -198,6 +206,7 @@ object KyuubiConf {
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val USER_DEFAULTS_CONF_QUOTE = "___"

private[this] val kyuubiConfEntriesUpdateLock = new Object

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.client.AdminRestApi
import org.apache.kyuubi.ctl.RestClientFactory.withKyuubiRestClient
import org.apache.kyuubi.ctl.cmd.AdminCtlCommand
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType.{HADOOP_CONF, USER_DEFAULTS_CONF}
import org.apache.kyuubi.ctl.opt.CliConfig
import org.apache.kyuubi.ctl.util.{Tabulator, Validator}

Expand All @@ -33,7 +34,8 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
withKyuubiRestClient(normalizedCliConfig, null, conf) { kyuubiRestClient =>
val adminRestApi = new AdminRestApi(kyuubiRestClient)
normalizedCliConfig.adminConfigOpts.configType match {
case "hadoopConf" => adminRestApi.refreshHadoopConf()
case HADOOP_CONF => adminRestApi.refreshHadoopConf()
case USER_DEFAULTS_CONF => adminRestApi.refreshUserDefaultsConf()
case configType => throw new KyuubiException(s"Invalid config type:$configType")
}
}
Expand All @@ -43,3 +45,7 @@ class RefreshConfigCommand(cliConfig: CliConfig) extends AdminCtlCommand[String]
info(Tabulator.format("", Array("Response"), Array(Array(resp))))
}
}
object RefreshConfigCommandConfigType {
final val HADOOP_CONF = "hadoopConf"
final val USER_DEFAULTS_CONF = "userDefaultsConf"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.kyuubi.ctl.opt
import scopt.{OParser, OParserBuilder}

import org.apache.kyuubi.KYUUBI_VERSION
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType._

object AdminCommandLine extends CommonCommandLine {

Expand Down Expand Up @@ -100,6 +101,7 @@ object AdminCommandLine extends CommonCommandLine {
arg[String]("<configType>")
.optional()
.action((v, c) => c.copy(adminConfigOpts = c.adminConfigOpts.copy(configType = v)))
.text("The valid config type can be one of the following: hadoopConf."))
.text("The valid config type can be one of the following: " +
s"$HADOOP_CONF, $USER_DEFAULTS_CONF."))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.ctl

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite}
import org.apache.kyuubi.ctl.cli.AdminControlCliArguments
import org.apache.kyuubi.ctl.cmd.refresh.RefreshConfigCommandConfigType._
import org.apache.kyuubi.ctl.opt.{ControlAction, ControlObject}

class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
Expand Down Expand Up @@ -64,6 +65,15 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
assert(opArgs.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs.cliConfig.adminConfigOpts.configType === "hadoopConf")

args = Array(
"refresh",
"config",
"userDefaultsConf")
val opArgs2 = new AdminControlCliArguments(args)
assert(opArgs2.cliConfig.action === ControlAction.REFRESH)
assert(opArgs2.cliConfig.resource === ControlObject.CONFIG)
assert(opArgs2.cliConfig.adminConfigOpts.configType === "userDefaultsConf")

args = Array(
"refresh",
"config",
Expand Down Expand Up @@ -137,7 +147,7 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi
| Refresh the resource.
|Command: refresh config [<configType>]
| Refresh the config with specified type.
| <configType> The valid config type can be one of the following: hadoopConf.
| <configType> The valid config type can be one of the following: $HADOOP_CONF, $USER_DEFAULTS_CONF.
|
| -h, --help Show help message and exit.""".stripMargin
// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public String refreshHadoopConf() {
return this.getClient().post(path, null, client.getAuthHeader());
}

public String refreshUserDefaultsConf() {
String path = String.format("%s/%s", API_BASE_PATH, "refresh/user_defaults_conf");
return this.getClient().post(path, null, client.getAuthHeader());
}

public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
Map<String, Object> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.server

import scala.util.Properties

import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation

Expand Down Expand Up @@ -105,6 +106,28 @@ object KyuubiServer extends Logging {
val _hadoopConf = KyuubiHadoopUtils.newHadoopConf(new KyuubiConf().loadFileDefaults())
hadoopConf = _hadoopConf
}

private[kyuubi] def refreshUserDefaultsConf(): Unit = kyuubiServer.conf.synchronized {
val existedUserDefaults = kyuubiServer.conf.getAllUserDefaults
val refreshedUserDefaults = KyuubiConf().loadFileDefaults().getAllUserDefaults
var (unsetCount, updatedCount, addedCount) = (0, 0, 0)
for ((k, _) <- existedUserDefaults if !refreshedUserDefaults.contains(k)) {
kyuubiServer.conf.unset(k)
unsetCount = unsetCount + 1
}
for ((k, v) <- refreshedUserDefaults) {
if (existedUserDefaults.contains(k)) {
if (!StringUtils.equals(existedUserDefaults.get(k).orNull, v)) {
updatedCount = updatedCount + 1
}
} else {
addedCount = addedCount + 1
}
kyuubiServer.conf.set(k, v)
}
info(s"Refreshed user defaults configs with changes of " +
s"unset: $unsetCount, updated: $updatedCount, added: $addedCount")
}
}

class KyuubiServer(name: String) extends Serverable(name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
Response.ok(s"Refresh the hadoop conf for ${fe.connectionUrl} successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON)),
description = "refresh the user defaults configs")
@POST
@Path("refresh/user_defaults_conf")
def refreshUserDefaultsConf(): Response = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we lock this method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refreshUserDefaultsConf of KyuubiServer is synchronized on kyuubiServer.conf to prevent concurrent racing.

val userName = fe.getSessionUser(Map.empty[String, String])
val ipAddress = fe.getIpAddress
info(s"Receive refresh user defaults conf request from $userName/$ipAddress")
if (!userName.equals(administrator)) {
throw new NotAllowedException(
s"$userName is not allowed to refresh the user defaults conf")
}
info(s"Reloading user defaults conf")
KyuubiServer.refreshUserDefaultsConf()
Response.ok(s"Refresh the user defaults conf successfully.").build()
}

@ApiResponse(
responseCode = "200",
content = Array(new Content(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(200 == response.getStatus)
}

test("refresh user defaults config of the kyuubi server") {
var response = webTarget.path("api/v1/admin/refresh/user_defaults_conf")
.request()
.post(null)
assert(405 == response.getStatus)

val adminUser = Utils.currentUser
val encodeAuthorization = new String(
Base64.getEncoder.encode(
s"$adminUser:".getBytes()),
"UTF-8")
response = webTarget.path("api/v1/admin/refresh/user_defaults_conf")
.request()
.header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization")
.post(null)
assert(200 == response.getStatus)
}

test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down