Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6003] Allow disabling user impersonation on launching engine #6003

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 96 additions & 95 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
test("Check if spark.kubernetes.executor.podNamePrefix is invalid") {
Seq("_123", "spark_exec", "spark@", "a" * 238).foreach { invalid =>
conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, invalid)
val builder = new SparkProcessBuilder("test", conf)
val builder = new SparkProcessBuilder("test", true, conf)
val e = intercept[KyuubiException](builder.validateConf)
assert(e.getMessage === s"'$invalid' in spark.kubernetes.executor.podNamePrefix is" +
s" invalid. must conform https://kubernetes.io/docs/concepts/overview/" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,20 +2087,33 @@ object KyuubiConf {
.version("1.5.0")
.fallbackConf(ENGINE_CONNECTION_URL_USE_HOSTNAME)

val ENGINE_DO_AS_ENABLED: ConfigEntry[Boolean] =
buildConf("kyuubi.engine.doAs.enabled")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yaooqinn @zhouyifan279 WDYT of this configuration name? My another candidate is kyuubi.engine.impersonation.enabled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for kyuubi.engine.doAs.enabled.

.doc("Whether to enable user impersonation on launching engine. When enabled, " +
"for engines which supports user impersonation, e.g. SPARK, depends on the " +
s"`kyuubi.engine.share.level`, different users will be used to launch the engine. " +
"Otherwise, Kyuubi Server's user will always be used to launch the engine.")
.version("1.9.0")
.booleanConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add this to 1.9.0 instead of a bugfix version

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

.createWithDefault(true)

val ENGINE_SHARE_LEVEL: ConfigEntry[String] = buildConf("kyuubi.engine.share.level")
.doc("Engines will be shared in different levels, available configs are: <ul>" +
" <li>CONNECTION: engine will not be shared but only used by the current client" +
" connection</li>" +
" <li>USER: engine will be shared by all sessions created by a unique username," +
s" see also ${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}</li>" +
" <li>CONNECTION: the engine will not be shared but only used by the current client" +
" connection, and the engine will be launched by session user.</li>" +
" <li>USER: the engine will be shared by all sessions created by a unique username," +
s" and the engine will be launched by session user.</li>" +
" <li>GROUP: the engine will be shared by all sessions created" +
" by all users belong to the same primary group name." +
" The engine will be launched by the group name as the effective" +
" The engine will be launched by the primary group name as the effective" +
" username, so here the group name is in value of special user who is able to visit the" +
" computing resources/data of the team. It follows the" +
" [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the" +
" primary group is not found, it fallback to the USER level." +
" <li>SERVER: the App will be shared by Kyuubi servers</li></ul>")
" <li>SERVER: the engine will be shared by Kyuubi servers, and the engine will be launched" +
" by Server's user.</li>" +
" </ul>" +
s" See also `${ENGINE_SHARE_LEVEL_SUBDOMAIN.key}` and `${ENGINE_DO_AS_ENABLED.key}`.")
.version("1.2.0")
.fallbackConf(LEGACY_ENGINE_SHARE_LEVEL)

Expand All @@ -2115,8 +2128,8 @@ object KyuubiConf {
" all the capacity of the Trino.</li>" +
" <li>HIVE_SQL: specify this engine type will launch a Hive engine which can provide" +
" all the capacity of the Hive Server2.</li>" +
" <li>JDBC: specify this engine type will launch a JDBC engine which can forward " +
" queries to the database system through the certain JDBC driver, " +
" <li>JDBC: specify this engine type will launch a JDBC engine which can forward" +
" queries to the database system through the certain JDBC driver," +
" for now, it supports Doris, MySQL, Phoenix, PostgreSQL and StarRocks.</li>" +
" <li>CHAT: specify this engine type will launch a Chat engine.</li>" +
"</ul>")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ import org.apache.kyuubi.server.KyuubiServer
* The description and functionality of an engine at server side
*
* @param conf Engine configuration
* @param user Caller of the engine
* @param sessionUser Caller of the engine
* @param engineRefId Id of the corresponding session in which the engine is created
*/
private[kyuubi] class EngineRef(
conf: KyuubiConf,
user: String,
sessionUser: String,
doAsEnabled: Boolean,
groupProvider: GroupProvider,
engineRefId: String,
engineManager: KyuubiApplicationManager,
Expand Down Expand Up @@ -88,15 +89,18 @@ private[kyuubi] class EngineRef(

private var builder: ProcBuilder = _

private[kyuubi] def getEngineRefId(): String = engineRefId
private[kyuubi] def getEngineRefId: String = engineRefId

// Launcher of the engine
private[kyuubi] val appUser: String = shareLevel match {
// user for routing session to the engine
private[kyuubi] val routingUser: String = shareLevel match {
case SERVER => Utils.currentUser
case GROUP => groupProvider.primaryGroup(user, conf.getAll.asJava)
case _ => user
case GROUP => groupProvider.primaryGroup(sessionUser, conf.getAll.asJava)
case _ => sessionUser
}

// user for launching engine
private[kyuubi] val appUser: String = if (doAsEnabled) routingUser else Utils.currentUser

@VisibleForTesting
private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
case subdomain if clientPoolSize > 0 && (subdomain.isEmpty || enginePoolIgnoreSubdomain) =>
Expand All @@ -110,7 +114,7 @@ private[kyuubi] class EngineRef(
val snPath =
DiscoveryPaths.makePath(
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_seqNum",
appUser,
routingUser,
clientPoolName)
DiscoveryClientProvider.withDiscoveryClient(conf) { client =>
client.getAndIncrement(snPath)
Expand All @@ -128,7 +132,7 @@ private[kyuubi] class EngineRef(
*/
@VisibleForTesting
private[kyuubi] val defaultEngineName: String = {
val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${appUser}"
val commonNamePrefix = s"kyuubi_${shareLevel}_${engineType}_${routingUser}"
shareLevel match {
case CONNECTION => s"${commonNamePrefix}_$engineRefId"
case _ => s"${commonNamePrefix}_${subdomain}_$engineRefId"
Expand All @@ -151,8 +155,8 @@ private[kyuubi] class EngineRef(
private[kyuubi] lazy val engineSpace: String = {
val commonParent = s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType"
shareLevel match {
case CONNECTION => DiscoveryPaths.makePath(commonParent, appUser, engineRefId)
case _ => DiscoveryPaths.makePath(commonParent, appUser, subdomain)
case CONNECTION => DiscoveryPaths.makePath(commonParent, routingUser, engineRefId)
case _ => DiscoveryPaths.makePath(commonParent, routingUser, subdomain)
}
}

Expand All @@ -167,7 +171,7 @@ private[kyuubi] class EngineRef(
val lockPath =
DiscoveryPaths.makePath(
s"${serverSpace}_${KYUUBI_VERSION}_${shareLevel}_${engineType}_lock",
appUser,
routingUser,
subdomain)
discoveryClient.tryWithLock(
lockPath,
Expand All @@ -188,19 +192,25 @@ private[kyuubi] class EngineRef(
builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog, defaultEngineName)
HiveProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
extraEngineLog,
defaultEngineName)
case JDBC =>
new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new JdbcProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case CHAT =>
new ChatProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
}

MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
Expand All @@ -222,7 +232,7 @@ private[kyuubi] class EngineRef(
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue != Some(0)) {
if (!exitValue.contains(0)) {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
Expand All @@ -246,7 +256,7 @@ private[kyuubi] class EngineRef(

// even the submit process succeeds, the application might meet failure when initializing,
// check the engine application state from engine manager and fast fail on engine terminate
if (engineRef.isEmpty && exitValue == Some(0)) {
if (engineRef.isEmpty && exitValue.contains(0)) {
Option(engineManager).foreach { engineMgr =>
if (lastApplicationInfo.isDefined) {
TimeUnit.SECONDS.sleep(1)
Expand Down Expand Up @@ -341,7 +351,7 @@ private[kyuubi] class EngineRef(
discoveryClient: DiscoveryClient,
hostPort: (String, Int)): Option[ServiceNodeInfo] = {
val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace)
serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption
serviceNodes.find { sn => (sn.host, sn.port) == hostPort }
}

def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ trait ProcBuilder {

protected def proxyUser: String

protected def doAsEnabled: Boolean

protected val commands: Iterable[String]

def conf: KyuubiConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class ChatProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._
*/
class FlinkProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

val flinkHome: String = getEngineHome(shortName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class HiveProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

protected val hiveHome: String = getEngineHome(shortName)
Expand Down Expand Up @@ -121,16 +122,17 @@ object HiveProcessBuilder extends Logging {

def apply(
appUser: String,
doAsEnabled: Boolean,
conf: KyuubiConf,
engineRefId: String,
extraEngineLog: Option[OperationLog],
defaultEngineName: String): HiveProcessBuilder = {
DeployMode.withName(conf.get(ENGINE_HIVE_DEPLOY_MODE)) match {
case LOCAL => new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case LOCAL => new HiveProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case YARN =>
warn(s"Hive on YARN model is experimental.")
conf.setIfMissing(ENGINE_DEPLOY_YARN_MODE_APP_NAME, Some(defaultEngineName))
new HiveYarnModeProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
new HiveYarnModeProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case other => throw new KyuubiException(s"Unsupported deploy mode: $other")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ import org.apache.kyuubi.util.command.CommandLineUtils.{confKeyValue, confKeyVal
*/
class HiveYarnModeProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
override val engineRefId: String,
override val extraEngineLog: Option[OperationLog] = None)
extends HiveProcessBuilder(proxyUser, conf, engineRefId, extraEngineLog) with Logging {
extends HiveProcessBuilder(proxyUser, doAsEnabled, conf, engineRefId, extraEngineLog)
with Logging {

override protected def mainClass: String =
"org.apache.kyuubi.engine.hive.deploy.HiveYarnModeSubmitter"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import org.apache.kyuubi.util.command.CommandLineUtils._

class JdbcProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class SparkBatchProcessBuilder(
batchConf: Map[String, String],
batchArgs: Seq[String],
override val extraEngineLog: Option[OperationLog])
extends SparkProcessBuilder(proxyUser, conf, batchId, extraEngineLog) {
// TODO respect doAsEnabled
extends SparkProcessBuilder(proxyUser, true, conf, batchId, extraEngineLog) {
import SparkProcessBuilder._

override protected lazy val commands: Iterable[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,23 @@ import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManage
import org.apache.kyuubi.engine.KubernetesApplicationOperation.{KUBERNETES_SERVICE_HOST, KUBERNETES_SERVICE_PORT}
import org.apache.kyuubi.engine.ProcBuilder.KYUUBI_ENGINE_LOG_PATH_KEY
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE
import org.apache.kyuubi.ha.client.AuthTypes
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{KubernetesUtils, Validator}
import org.apache.kyuubi.util.command.CommandLineUtils._

class SparkProcessBuilder(
override val proxyUser: String,
override val doAsEnabled: Boolean,
override val conf: KyuubiConf,
val engineRefId: String,
val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {

@VisibleForTesting
def this(proxyUser: String, conf: KyuubiConf) {
this(proxyUser, conf, "")
def this(proxyUser: String, doAsEnabled: Boolean, conf: KyuubiConf) {
this(proxyUser, doAsEnabled, conf, "")
}

import SparkProcessBuilder._
Expand Down Expand Up @@ -135,14 +137,12 @@ class SparkProcessBuilder(
var allConf = conf.getAll

// if enable sasl kerberos authentication for zookeeper, need to upload the server keytab file
if (AuthTypes.withName(conf.get(HighAvailabilityConf.HA_ZK_ENGINE_AUTH_TYPE))
== AuthTypes.KERBEROS) {
if (AuthTypes.withName(conf.get(HA_ZK_ENGINE_AUTH_TYPE)) == AuthTypes.KERBEROS) {
allConf = allConf ++ zkAuthKeytabFileConf(allConf)
}
// pass spark engine log path to spark conf
(allConf ++ engineLogPathConf ++ extraYarnConf(allConf) ++ appendPodNameConf(allConf)).foreach {
case (k, v) =>
buffer ++= confKeyValue(convertConfigKey(k), v)
case (k, v) => buffer ++= confKeyValue(convertConfigKey(k), v)
}

setupKerberos(buffer)
Expand All @@ -157,10 +157,12 @@ class SparkProcessBuilder(
protected def setupKerberos(buffer: mutable.Buffer[String]): Unit = {
// if the keytab is specified, PROXY_USER is not supported
tryKeytab() match {
case None =>
case None if doAsEnabled =>
setSparkUserName(proxyUser, buffer)
buffer += PROXY_USER
buffer += proxyUser
case None => // doAs disabled
setSparkUserName(Utils.currentUser, buffer)
case Some(name) =>
setSparkUserName(name, buffer)
}
Expand All @@ -175,11 +177,16 @@ class SparkProcessBuilder(
try {
val ugi = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(principal.get, keytab.get)
if (ugi.getShortUserName != proxyUser) {
if (doAsEnabled && ugi.getShortUserName != proxyUser) {
warn(s"The session proxy user: $proxyUser is not same with " +
s"spark principal: ${ugi.getShortUserName}, so we can't support use keytab. " +
s"Fallback to use proxy user.")
None
} else if (!doAsEnabled && ugi.getShortUserName != Utils.currentUser) {
warn(s"The server's user: ${Utils.currentUser} is not same with " +
wForget marked this conversation as resolved.
Show resolved Hide resolved
s"spark principal: ${ugi.getShortUserName}, skip to use keytab. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be skip using keytab

"Fallback to use server's user.")
None
} else {
Some(ugi.getShortUserName)
}
Expand Down
Loading
Loading