From c59ea5eaffc9889074226cf96a0e704672cdb290 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Tue, 25 Sep 2018 14:20:30 -0700 Subject: [PATCH 1/2] [RMP-11860][SPARK-25501] Kafka Delegation Token Support for Spark --- .../HadoopDelegationTokenManager.scala | 3 +- .../KafkaDelegationTokenProvider.scala | 83 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index ab8d8d96a9b0..623610732dad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) // Filter out providers for which spark.security.credentials.{service}.enabled is false. providers diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala new file mode 100644 index 000000000000..ed70fce37a0c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +import scala.reflect.runtime._ +import scala.util.control.NonFatal + + +class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + kafkaConfig: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + + try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + + val clientClass = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.AdminClient") + val createMethod = clientClass.getMethod("create", classOf[java.util.Properties]) + val clientInstance = createMethod.invoke(null, kafkaConfig) + + val obtainToken = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.AdminClient"). + getMethod("createDelegationToken", clientClass.getClass) + val tokenInstance = obtainToken.invoke(clientInstance) + + val createDelegationTokenResult = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.CreateDelegationTokenResult") + + val getTokenMethod = createDelegationTokenResult.getMethod("delegationToken", createDelegationTokenResult.getClass) + val token = getTokenMethod.invoke(tokenInstance) + .asInstanceOf[Token[_ <: TokenIdentifier]] + + logInfo(s"Get token from Kafka: ${token.toString}") + creds.addToken(token.getService, token) + + } catch { + case NonFatal(e) => + logDebug(s"Failed to get token from service $serviceName", e) + } + + None + + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + kafkaConf: Configuration): Boolean = { + + kafkaConf.get("com.sun.security.auth.module.Krb5LoginModule") == "kerberos" + + } + +} From 7202ff968fa9a330e112a4958e38fd7f36e53341 Mon Sep 17 00:00:00 2001 From: Mingjie Tang Date: Tue, 25 Sep 2018 17:31:35 -0700 Subject: [PATCH 2/2] update with kafka config --- .../KafkaDelegationTokenProvider.scala | 32 +++++++++++++++++-- .../spark/internal/config/package.scala | 4 +++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala index ed70fce37a0c..b70794fbf95b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala @@ -22,8 +22,10 @@ import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils import scala.reflect.runtime._ @@ -36,7 +38,7 @@ class KafkaDelegationTokenProvider override def serviceName: String = "kafka" override def obtainDelegationTokens( - kafkaConfig: Configuration, + hadoopConf: Configuration, sparkConf: SparkConf, creds: Credentials): Option[Long] = { @@ -46,7 +48,7 @@ class KafkaDelegationTokenProvider val clientClass = mirror.classLoader. loadClass("org.apache.kafka.clients.admin.AdminClient") val createMethod = clientClass.getMethod("create", classOf[java.util.Properties]) - val clientInstance = createMethod.invoke(null, kafkaConfig) + val clientInstance = createMethod.invoke(null, kafakaConf(sparkConf)) val obtainToken = mirror.classLoader. loadClass("org.apache.kafka.clients.admin.AdminClient"). @@ -76,8 +78,32 @@ class KafkaDelegationTokenProvider sparkConf: SparkConf, kafkaConf: Configuration): Boolean = { - kafkaConf.get("com.sun.security.auth.module.Krb5LoginModule") == "kerberos" + sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) != null + + } + private def kafakaConf(sparkConf: SparkConf): Properties = { + val principal = sparkConf.get(PRINCIPAL).orNull + val keytab = sparkConf.get(KEYTAB).orNull + val protocol = "SASL_PLAINTEXT" + val bootstrapServer = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).orNull + + val jaasConfig = + s""" + |com.sun.security.auth.module.Krb5LoginModule required + | useKeyTab=true + | storeKey=true + | serviceName="kafka" + | keyTab="$keytab" + | principal="$principal"; + """.stripMargin.replace("\n", "") + + val adminClientProperties = new Properties + adminClientProperties.put("bootstrap.servers", bootstrapServer) + adminClientProperties.put("security.protocol", protocol) + adminClientProperties.put("sasl.jaas.config", jaasConfig) + + adminClientProperties } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9891b6a2196d..81019b5d8821 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -160,6 +160,10 @@ package object config { .doc("Name of the Kerberos principal.") .stringConf.createOptional + private[spark] val KAFKA_BOOTSTRAP_SERVERS = ConfigBuilder("bootstrap.servers") + .doc("Name of the kafka boostrap servers.") + .stringConf.createOptional + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional