diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index ab8d8d96a9b0..623610732dad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -66,7 +66,8 @@ private[spark] class HadoopDelegationTokenManager( private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ - safeCreateProvider(new HBaseDelegationTokenProvider) + safeCreateProvider(new HBaseDelegationTokenProvider) ++ + safeCreateProvider(new KafkaDelegationTokenProvider) // Filter out providers for which spark.security.credentials.{service}.enabled is false. providers diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala new file mode 100644 index 000000000000..b70794fbf95b --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +import scala.reflect.runtime._ +import scala.util.control.NonFatal + + +class KafkaDelegationTokenProvider + extends HadoopDelegationTokenProvider with Logging { + + override def serviceName: String = "kafka" + + override def obtainDelegationTokens( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + + try { + val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) + + val clientClass = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.AdminClient") + val createMethod = clientClass.getMethod("create", classOf[java.util.Properties]) + val clientInstance = createMethod.invoke(null, kafakaConf(sparkConf)) + + val obtainToken = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.AdminClient"). + getMethod("createDelegationToken", clientClass.getClass) + val tokenInstance = obtainToken.invoke(clientInstance) + + val createDelegationTokenResult = mirror.classLoader. + loadClass("org.apache.kafka.clients.admin.CreateDelegationTokenResult") + + val getTokenMethod = createDelegationTokenResult.getMethod("delegationToken", createDelegationTokenResult.getClass) + val token = getTokenMethod.invoke(tokenInstance) + .asInstanceOf[Token[_ <: TokenIdentifier]] + + logInfo(s"Get token from Kafka: ${token.toString}") + creds.addToken(token.getService, token) + + } catch { + case NonFatal(e) => + logDebug(s"Failed to get token from service $serviceName", e) + } + + None + + } + + override def delegationTokensRequired( + sparkConf: SparkConf, + kafkaConf: Configuration): Boolean = { + + sparkConf.get(KAFKA_BOOTSTRAP_SERVERS) != null + + } + + private def kafakaConf(sparkConf: SparkConf): Properties = { + val principal = sparkConf.get(PRINCIPAL).orNull + val keytab = sparkConf.get(KEYTAB).orNull + val protocol = "SASL_PLAINTEXT" + val bootstrapServer = sparkConf.get(KAFKA_BOOTSTRAP_SERVERS).orNull + + val jaasConfig = + s""" + |com.sun.security.auth.module.Krb5LoginModule required + | useKeyTab=true + | storeKey=true + | serviceName="kafka" + | keyTab="$keytab" + | principal="$principal"; + """.stripMargin.replace("\n", "") + + val adminClientProperties = new Properties + adminClientProperties.put("bootstrap.servers", bootstrapServer) + adminClientProperties.put("security.protocol", protocol) + adminClientProperties.put("sasl.jaas.config", jaasConfig) + + adminClientProperties + } + +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9891b6a2196d..81019b5d8821 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -160,6 +160,10 @@ package object config { .doc("Name of the Kerberos principal.") .stringConf.createOptional + private[spark] val KAFKA_BOOTSTRAP_SERVERS = ConfigBuilder("bootstrap.servers") + .doc("Name of the kafka boostrap servers.") + .stringConf.createOptional + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional