diff --git a/core/pom.xml b/core/pom.xml
index 36d93212ba9f..49b1a54e3259 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -408,6 +408,19 @@
provided
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+ provided
+
+
target/scala-${scala.binary.version}/classes
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 1169b2878e99..126a6ab80136 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -274,7 +274,8 @@ private[spark] class HadoopDelegationTokenManager(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
- safeCreateProvider(new HBaseDelegationTokenProvider)
+ safeCreateProvider(new HBaseDelegationTokenProvider) ++
+ safeCreateProvider(new KafkaDelegationTokenProvider)
// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
new file mode 100644
index 000000000000..45995be630cc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaDelegationTokenProvider.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import scala.language.existentials
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[security] class KafkaDelegationTokenProvider
+ extends HadoopDelegationTokenProvider with Logging {
+
+ override def serviceName: String = "kafka"
+
+ override def obtainDelegationTokens(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = {
+ try {
+ logDebug("Attempting to fetch Kafka security token.")
+ val (token, nextRenewalDate) = KafkaTokenUtil.obtainToken(sparkConf)
+ creds.addToken(token.getService, token)
+ return Some(nextRenewalDate)
+ } catch {
+ case NonFatal(e) =>
+ logInfo(s"Failed to get token from service $serviceName", e)
+ }
+ None
+ }
+
+ override def delegationTokensRequired(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration): Boolean = {
+ val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
+ sparkConf.contains(Kafka.BOOTSTRAP_SERVERS) &&
+ (protocol == SASL_SSL.name ||
+ protocol == SSL.name ||
+ protocol == SASL_PLAINTEXT.name)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
new file mode 100644
index 000000000000..c890cee59ffe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import java.text.SimpleDateFormat
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
+import org.apache.kafka.common.security.token.delegation.DelegationToken
+
+import org.apache.spark.SparkConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[spark] object KafkaTokenUtil extends Logging {
+ val TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN")
+ val TOKEN_SERVICE = new Text("kafka.server.delegation.token")
+
+ private[spark] class KafkaDelegationTokenIdentifier extends AbstractDelegationTokenIdentifier {
+ override def getKind: Text = TOKEN_KIND
+ }
+
+ private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
+ val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))
+ val createDelegationTokenOptions = new CreateDelegationTokenOptions()
+ val createResult = adminClient.createDelegationToken(createDelegationTokenOptions)
+ val token = createResult.delegationToken().get()
+ printToken(token)
+
+ (new Token[KafkaDelegationTokenIdentifier](
+ token.tokenInfo.tokenId.getBytes,
+ token.hmacAsBase64String.getBytes,
+ TOKEN_KIND,
+ TOKEN_SERVICE
+ ), token.tokenInfo.expiryTimestamp)
+ }
+
+ private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = {
+ val adminClientProperties = new ju.Properties
+
+ val bootstrapServers = sparkConf.get(Kafka.BOOTSTRAP_SERVERS)
+ require(bootstrapServers.nonEmpty, s"Tried to obtain kafka delegation token but bootstrap " +
+ "servers not configured.")
+ adminClientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers.get)
+
+ val protocol = sparkConf.get(Kafka.SECURITY_PROTOCOL)
+ adminClientProperties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol)
+ protocol match {
+ case SASL_SSL.name =>
+ setTrustStoreProperties(sparkConf, adminClientProperties)
+
+ case SSL.name =>
+ setTrustStoreProperties(sparkConf, adminClientProperties)
+ setKeyStoreProperties(sparkConf, adminClientProperties)
+ logWarning("Obtaining kafka delegation token with SSL protocol. Please " +
+ "configure 2-way authentication on the broker side.")
+
+ case SASL_PLAINTEXT.name =>
+ logWarning("Obtaining kafka delegation token through plain communication channel. Please " +
+ "consider the security impact.")
+ }
+
+ // There are multiple possibilities to log in and applied in the following order:
+ // - JVM global security provided -> try to log in with JVM global security configuration
+ // which can be configured for example with 'java.security.auth.login.config'.
+ // For this no additional parameter needed.
+ // - Keytab is provided -> try to log in with kerberos module and keytab using kafka's dynamic
+ // JAAS configuration.
+ // - Keytab not provided -> try to log in with kerberos module and ticket cache using kafka's
+ // dynamic JAAS configuration.
+ // Kafka client is unable to use subject from JVM which already logged in
+ // to kdc (see KAFKA-7677)
+ if (isGlobalJaasConfigurationProvided) {
+ logDebug("JVM global security configuration detected, using it for login.")
+ } else {
+ adminClientProperties.put(SaslConfigs.SASL_MECHANISM, SaslConfigs.GSSAPI_MECHANISM)
+ if (sparkConf.contains(KEYTAB)) {
+ logDebug("Keytab detected, using it for login.")
+ val jaasParams = getKeytabJaasParams(sparkConf)
+ adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ } else {
+ logDebug("Using ticket cache for login.")
+ val jaasParams = getTicketCacheJaasParams(sparkConf)
+ adminClientProperties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ }
+ }
+
+ adminClientProperties
+ }
+
+ def isGlobalJaasConfigurationProvided: Boolean = {
+ try {
+ JaasContext.loadClientContext(ju.Collections.emptyMap[String, Object]())
+ true
+ } catch {
+ case NonFatal(_) => false
+ }
+ }
+
+ private def setTrustStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
+ sparkConf.get(Kafka.TRUSTSTORE_LOCATION).foreach { truststoreLocation =>
+ properties.put("ssl.truststore.location", truststoreLocation)
+ }
+ sparkConf.get(Kafka.TRUSTSTORE_PASSWORD).foreach { truststorePassword =>
+ properties.put("ssl.truststore.password", truststorePassword)
+ }
+ }
+
+ private def setKeyStoreProperties(sparkConf: SparkConf, properties: ju.Properties): Unit = {
+ sparkConf.get(Kafka.KEYSTORE_LOCATION).foreach { keystoreLocation =>
+ properties.put("ssl.keystore.location", keystoreLocation)
+ }
+ sparkConf.get(Kafka.KEYSTORE_PASSWORD).foreach { keystorePassword =>
+ properties.put("ssl.keystore.password", keystorePassword)
+ }
+ sparkConf.get(Kafka.KEY_PASSWORD).foreach { keyPassword =>
+ properties.put("ssl.key.password", keyPassword)
+ }
+ }
+
+ private[security] def getKeytabJaasParams(sparkConf: SparkConf): String = {
+ val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+ require(serviceName.nonEmpty, "Kerberos service name must be defined")
+
+ val params =
+ s"""
+ |${getKrb5LoginModuleName} required
+ | useKeyTab=true
+ | serviceName="${serviceName.get}"
+ | keyTab="${sparkConf.get(KEYTAB).get}"
+ | principal="${sparkConf.get(PRINCIPAL).get}";
+ """.stripMargin.replace("\n", "")
+ logDebug(s"Krb keytab JAAS params: $params")
+ params
+ }
+
+ def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
+ val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+ require(serviceName.nonEmpty, "Kerberos service name must be defined")
+
+ val params =
+ s"""
+ |${getKrb5LoginModuleName} required
+ | useTicketCache=true
+ | serviceName="${serviceName.get}";
+ """.stripMargin.replace("\n", "")
+ logDebug(s"Krb ticket cache JAAS params: $params")
+ params
+ }
+
+ /**
+ * Krb5LoginModule package vary in different JVMs.
+ * Please see Hadoop UserGroupInformation for further details.
+ */
+ private def getKrb5LoginModuleName(): String = {
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ "com.ibm.security.auth.module.Krb5LoginModule"
+ } else {
+ "com.sun.security.auth.module.Krb5LoginModule"
+ }
+ }
+
+ private def printToken(token: DelegationToken): Unit = {
+ if (log.isDebugEnabled) {
+ val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
+ logDebug("%-15s %-30s %-15s %-25s %-15s %-15s %-15s".format(
+ "TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"))
+ val tokenInfo = token.tokenInfo
+ logDebug("%-15s [hidden] %-15s %-25s %-15s %-15s %-15s".format(
+ tokenInfo.tokenId,
+ tokenInfo.owner,
+ tokenInfo.renewersAsString,
+ dateFormat.format(tokenInfo.issueTimestamp),
+ dateFormat.format(tokenInfo.expiryTimestamp),
+ dateFormat.format(tokenInfo.maxTimestamp)))
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
new file mode 100644
index 000000000000..85d74c27142a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Kafka.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+private[spark] object Kafka {
+
+ val BOOTSTRAP_SERVERS =
+ ConfigBuilder("spark.kafka.bootstrap.servers")
+ .doc("A list of coma separated host/port pairs to use for establishing the initial " +
+ "connection to the Kafka cluster. For further details please see kafka documentation. " +
+ "Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val SECURITY_PROTOCOL =
+ ConfigBuilder("spark.kafka.security.protocol")
+ .doc("Protocol used to communicate with brokers. For further details please see kafka " +
+ "documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createWithDefault("SASL_SSL")
+
+ val KERBEROS_SERVICE_NAME =
+ ConfigBuilder("spark.kafka.sasl.kerberos.service.name")
+ .doc("The Kerberos principal name that Kafka runs as. This can be defined either in " +
+ "Kafka's JAAS config or in Kafka's config. For further details please see kafka " +
+ "documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val TRUSTSTORE_LOCATION =
+ ConfigBuilder("spark.kafka.ssl.truststore.location")
+ .doc("The location of the trust store file. For further details please see kafka " +
+ "documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val TRUSTSTORE_PASSWORD =
+ ConfigBuilder("spark.kafka.ssl.truststore.password")
+ .doc("The store password for the trust store file. This is optional for client and only " +
+ "needed if ssl.truststore.location is configured. For further details please see kafka " +
+ "documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val KEYSTORE_LOCATION =
+ ConfigBuilder("spark.kafka.ssl.keystore.location")
+ .doc("The location of the key store file. This is optional for client and can be used for " +
+ "two-way authentication for client. For further details please see kafka documentation. " +
+ "Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val KEYSTORE_PASSWORD =
+ ConfigBuilder("spark.kafka.ssl.keystore.password")
+ .doc("The store password for the key store file. This is optional for client and only " +
+ "needed if ssl.keystore.location is configured. For further details please see kafka " +
+ "documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+
+ val KEY_PASSWORD =
+ ConfigBuilder("spark.kafka.ssl.key.password")
+ .doc("The password of the private key in the key store file. This is optional for client. " +
+ "For further details please see kafka documentation. Only used to obtain delegation token.")
+ .stringConf
+ .createOptional
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index e0e630e3be63..def9e626a2df 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,8 +19,6 @@ package org.apache.spark.deploy.security
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.security.Credentials
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils
@@ -33,6 +31,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(manager.isProviderLoaded("hive"))
+ assert(manager.isProviderLoaded("kafka"))
}
test("disable hive credential provider") {
@@ -41,6 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
+ assert(manager.isProviderLoaded("kafka"))
}
test("using deprecated configurations") {
@@ -51,6 +51,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
+ assert(manager.isProviderLoaded("kafka"))
}
test("SPARK-23209: obtain tokens when Hive classes are not available") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
new file mode 100644
index 000000000000..682bebde916f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import java.{ util => ju }
+import javax.security.auth.login.{AppConfigurationEntry, Configuration}
+
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+
+class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+ private val bootStrapServers = "127.0.0.1:0"
+ private val trustStoreLocation = "/path/to/trustStore"
+ private val trustStorePassword = "trustStoreSecret"
+ private val keyStoreLocation = "/path/to/keyStore"
+ private val keyStorePassword = "keyStoreSecret"
+ private val keyPassword = "keySecret"
+ private val keytab = "/path/to/keytab"
+ private val kerberosServiceName = "kafka"
+ private val principal = "user@domain.com"
+
+ private var sparkConf: SparkConf = null
+
+ private class KafkaJaasConfiguration extends Configuration {
+ val entry =
+ new AppConfigurationEntry(
+ "DummyModule",
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ ju.Collections.emptyMap[String, Object]()
+ )
+
+ override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
+ if (name.equals("KafkaClient")) {
+ Array(entry)
+ } else {
+ null
+ }
+ }
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ sparkConf = new SparkConf()
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ resetGlobalConfig()
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ private def setGlobalKafkaClientConfig(): Unit = {
+ Configuration.setConfiguration(new KafkaJaasConfiguration)
+ }
+
+ private def resetGlobalConfig(): Unit = {
+ Configuration.setConfiguration(null)
+ }
+
+ test("createAdminClientProperties without bootstrap servers should throw exception") {
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaTokenUtil.createAdminClientProperties(sparkConf)
+ }
+ assert(thrown.getMessage contains
+ "Tried to obtain kafka delegation token but bootstrap servers not configured.")
+ }
+
+ test("createAdminClientProperties with SASL_PLAINTEXT protocol should not include " +
+ "keystore and truststore config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_PLAINTEXT.name)
+ sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+ sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStoreLocation)
+ sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+ sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+ sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SASL_PLAINTEXT.name)
+ assert(!adminClientProperties.containsKey("ssl.truststore.location"))
+ assert(!adminClientProperties.containsKey("ssl.truststore.password"))
+ assert(!adminClientProperties.containsKey("ssl.keystore.location"))
+ assert(!adminClientProperties.containsKey("ssl.keystore.password"))
+ assert(!adminClientProperties.containsKey("ssl.key.password"))
+ }
+
+ test("createAdminClientProperties with SASL_SSL protocol should include truststore config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+ sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+ sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
+ sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+ sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+ sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SASL_SSL.name)
+ assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
+ assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
+ assert(!adminClientProperties.containsKey("ssl.keystore.location"))
+ assert(!adminClientProperties.containsKey("ssl.keystore.password"))
+ assert(!adminClientProperties.containsKey("ssl.key.password"))
+ }
+
+ test("createAdminClientProperties with SSL protocol should include keystore and truststore " +
+ "config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SSL.name)
+ sparkConf.set(Kafka.TRUSTSTORE_LOCATION, trustStoreLocation)
+ sparkConf.set(Kafka.TRUSTSTORE_PASSWORD, trustStorePassword)
+ sparkConf.set(Kafka.KEYSTORE_LOCATION, keyStoreLocation)
+ sparkConf.set(Kafka.KEYSTORE_PASSWORD, keyStorePassword)
+ sparkConf.set(Kafka.KEY_PASSWORD, keyPassword)
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SSL.name)
+ assert(adminClientProperties.get("ssl.truststore.location") === trustStoreLocation)
+ assert(adminClientProperties.get("ssl.truststore.password") === trustStorePassword)
+ assert(adminClientProperties.get("ssl.keystore.location") === keyStoreLocation)
+ assert(adminClientProperties.get("ssl.keystore.password") === keyStorePassword)
+ assert(adminClientProperties.get("ssl.key.password") === keyPassword)
+ }
+
+ test("createAdminClientProperties with global config should not set dynamic jaas config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+ setGlobalKafkaClientConfig()
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SASL_SSL.name)
+ assert(!adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+ assert(!adminClientProperties.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
+ }
+
+ test("createAdminClientProperties with keytab should set keytab dynamic jaas config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+ sparkConf.set(KEYTAB, keytab)
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+ sparkConf.set(PRINCIPAL, principal)
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SASL_SSL.name)
+ assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+ val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
+ assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains("useKeyTab=true"))
+ }
+
+ test("createAdminClientProperties without keytab should set ticket cache dynamic jaas config") {
+ sparkConf.set(Kafka.BOOTSTRAP_SERVERS, bootStrapServers)
+ sparkConf.set(Kafka.SECURITY_PROTOCOL, SASL_SSL.name)
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+ val adminClientProperties = KafkaTokenUtil.createAdminClientProperties(sparkConf)
+
+ assert(adminClientProperties.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
+ === bootStrapServers)
+ assert(adminClientProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)
+ === SASL_SSL.name)
+ assert(adminClientProperties.containsKey(SaslConfigs.SASL_MECHANISM))
+ val saslJaasConfig = adminClientProperties.getProperty(SaslConfigs.SASL_JAAS_CONFIG)
+ assert(saslJaasConfig.contains("Krb5LoginModule required"))
+ assert(saslJaasConfig.contains("useTicketCache=true"))
+ }
+
+ test("isGlobalJaasConfigurationProvided without global config should return false") {
+ assert(!KafkaTokenUtil.isGlobalJaasConfigurationProvided)
+ }
+
+ test("isGlobalJaasConfigurationProvided with global config should return false") {
+ setGlobalKafkaClientConfig()
+
+ assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
+ }
+
+ test("getKeytabJaasParams with keytab no service should throw exception") {
+ sparkConf.set(KEYTAB, keytab)
+
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaTokenUtil.getKeytabJaasParams(sparkConf)
+ }
+
+ assert(thrown.getMessage contains "Kerberos service name must be defined")
+ }
+
+ test("getTicketCacheJaasParams without service should throw exception") {
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaTokenUtil.getTicketCacheJaasParams(sparkConf)
+ }
+
+ assert(thrown.getMessage contains "Kerberos service name must be defined")
+ }
+}
diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml
index 1af407167597..de8731c4b774 100644
--- a/external/kafka-0-10-sql/pom.xml
+++ b/external/kafka-0-10-sql/pom.xml
@@ -29,8 +29,6 @@
spark-sql-kafka-0-10_2.12
sql-kafka-0-10
-
- 2.1.0
jar
Kafka 0.10+ Source for Structured Streaming
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
new file mode 100644
index 000000000000..74d5ef9c05f1
--- /dev/null
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.token.{Token, TokenIdentifier}
+import org.apache.kafka.common.security.scram.ScramLoginModule
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+
+private[kafka010] object KafkaSecurityHelper extends Logging {
+ def isTokenAvailable(): Boolean = {
+ UserGroupInformation.getCurrentUser().getCredentials.getToken(
+ KafkaTokenUtil.TOKEN_SERVICE) != null
+ }
+
+ def getTokenJaasParams(sparkConf: SparkConf): String = {
+ val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
+ KafkaTokenUtil.TOKEN_SERVICE)
+ val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
+ require(serviceName.isDefined, "Kerberos service name must be defined")
+ val username = new String(token.getIdentifier)
+ val password = new String(token.getPassword)
+
+ val loginModuleName = classOf[ScramLoginModule].getName
+ val params =
+ s"""
+ |$loginModuleName required
+ | tokenauth=true
+ | serviceName="${serviceName.get}"
+ | username="$username"
+ | password="$password";
+ """.stripMargin.replace("\n", "")
+ logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
+
+ params
+ }
+}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f770f0c2a04c..0ac330435e5c 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,16 +18,19 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.{Locale, Optional, UUID}
+import java.util.{Locale, UUID}
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
+import org.apache.spark.SparkEnv
+import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
@@ -80,12 +83,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
@@ -122,12 +120,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
@@ -198,12 +191,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
parameters: Map[String, String]): BaseRelation = {
validateBatchOptions(parameters)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
- val specifiedKafkaParams =
- parameters
- .keySet
- .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
- .map { k => k.drop(6).toString -> parameters(k) }
- .toMap
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
@@ -230,8 +218,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
outputMode: OutputMode): Sink = {
val defaultTopic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
- new KafkaSink(sqlContext,
- new ju.HashMap[String, Object](specifiedKafkaParams.asJava), defaultTopic)
+ new KafkaSink(sqlContext, specifiedKafkaParams, defaultTopic)
}
override def createRelation(
@@ -248,8 +235,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
val topic = parameters.get(TOPIC_OPTION_KEY).map(_.trim)
val specifiedKafkaParams = kafkaParamsForProducer(parameters)
- KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution,
- new ju.HashMap[String, Object](specifiedKafkaParams.asJava), topic)
+ KafkaWriter.write(outerSQLContext.sparkSession, data.queryExecution, specifiedKafkaParams,
+ topic)
/* This method is suppose to return a relation that reads the data that was written.
* We cannot support this for Kafka. Therefore, in order to make things consistent,
@@ -274,13 +261,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
options: DataSourceOptions): StreamingWriteSupport = {
import scala.collection.JavaConverters._
- val spark = SparkSession.getActiveSession.get
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable.
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
- KafkaWriter.validateQuery(
- schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic)
+ KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic)
new KafkaStreamingWriteSupport(topic, producerParams, schema)
}
@@ -481,6 +466,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
+ private val serClassName = classOf[ByteArraySerializer].getName
private val deserClassName = classOf[ByteArrayDeserializer].getName
def getKafkaOffsetRangeLimit(
@@ -515,6 +501,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+ .setTokenJaasConfigIfNeeded()
.build()
def kafkaParamsForExecutors(
@@ -536,6 +523,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
// If buffer config is not set, set it to reasonable value to work around
// buffer issues (see KAFKA-3135)
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
+ .setTokenJaasConfigIfNeeded()
.build()
/**
@@ -568,11 +556,32 @@ private[kafka010] object KafkaSourceProvider extends Logging {
this
}
+ def setTokenJaasConfigIfNeeded(): ConfigUpdater = {
+ // There are multiple possibilities to log in and applied in the following order:
+ // - JVM global security provided -> try to log in with JVM global security configuration
+ // which can be configured for example with 'java.security.auth.login.config'.
+ // For this no additional parameter needed.
+ // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
+ // configuration.
+ if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
+ logDebug("JVM global security configuration detected, using it for login.")
+ } else if (KafkaSecurityHelper.isTokenAvailable()) {
+ logDebug("Delegation token detected, using it for login.")
+ val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+ val mechanism = kafkaParams
+ .getOrElse(SaslConfigs.SASL_MECHANISM, SaslConfigs.DEFAULT_SASL_MECHANISM)
+ require(mechanism.startsWith("SCRAM"),
+ "Delegation token works only with SCRAM mechanism.")
+ set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
+ }
+ this
+ }
+
def build(): ju.Map[String, Object] = map
}
private[kafka010] def kafkaParamsForProducer(
- parameters: Map[String, String]): Map[String, String] = {
+ parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) {
throw new IllegalArgumentException(
@@ -580,17 +589,26 @@ private[kafka010] object KafkaSourceProvider extends Logging {
+ "are serialized with ByteArraySerializer.")
}
- if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
- {
+ if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) {
throw new IllegalArgumentException(
s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as "
+ "value are serialized with ByteArraySerializer.")
}
+
+ val specifiedKafkaParams = convertToSpecifiedParams(parameters)
+
+ ConfigUpdater("executor", specifiedKafkaParams)
+ .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
+ .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
+ .setTokenJaasConfigIfNeeded()
+ .build()
+ }
+
+ private def convertToSpecifiedParams(parameters: Map[String, String]): Map[String, String] = {
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
- .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
+ .toMap
}
}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
index 927c56d9ce82..0d831c388460 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.kafka010
-import scala.collection.JavaConverters._
+import java.{util => ju}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -41,10 +41,12 @@ case object KafkaWriterCommitMessage extends WriterCommitMessage
* @param schema The schema of the input data.
*/
class KafkaStreamingWriteSupport(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ topic: Option[String],
+ producerParams: ju.Map[String, Object],
+ schema: StructType)
extends StreamingWriteSupport {
- validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic)
+ validateQuery(schema.toAttributes, producerParams, topic)
override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
KafkaStreamWriterFactory(topic, producerParams, schema)
@@ -62,7 +64,9 @@ class KafkaStreamingWriteSupport(
* @param schema The schema of the input data.
*/
case class KafkaStreamWriterFactory(
- topic: Option[String], producerParams: Map[String, String], schema: StructType)
+ topic: Option[String],
+ producerParams: ju.Map[String, Object],
+ schema: StructType)
extends StreamingDataWriterFactory {
override def createWriter(
@@ -83,12 +87,12 @@ case class KafkaStreamWriterFactory(
* @param inputSchema The attributes in the input data.
*/
class KafkaStreamDataWriter(
- targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute])
+ targetTopic: Option[String],
+ producerParams: ju.Map[String, Object],
+ inputSchema: Seq[Attribute])
extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] {
- import scala.collection.JavaConverters._
- private lazy val producer = CachedKafkaProducer.getOrCreate(
- new java.util.HashMap[String, Object](producerParams.asJava))
+ private lazy val producer = CachedKafkaProducer.getOrCreate(producerParams)
def write(row: InternalRow): Unit = {
checkForErrors()
@@ -112,7 +116,7 @@ class KafkaStreamDataWriter(
if (producer != null) {
producer.flush()
checkForErrors()
- CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava))
+ CachedKafkaProducer.close(producerParams)
}
}
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index 3f6fcf6b2e52..b21037b1340c 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -409,7 +409,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
*/
val topic = newTopic()
testUtils.createTopic(topic, 1)
- val options = new java.util.HashMap[String, String]
+ val options = new java.util.HashMap[String, Object]
options.put("bootstrap.servers", testUtils.brokerAddress)
options.put("buffer.memory", "16384") // min buffer size
options.put("block.on.buffer.full", "true")
@@ -417,7 +417,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName)
val inputSchema = Seq(AttributeReference("value", BinaryType)())
val data = new Array[Byte](15000) // large value
- val writeTask = new KafkaStreamDataWriter(Some(topic), options.asScala.toMap, inputSchema)
+ val writeTask = new KafkaStreamDataWriter(Some(topic), options, inputSchema)
try {
val fieldTypes: Array[DataType] = Array(BinaryType)
val converter = UnsafeProjection.create(fieldTypes)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
new file mode 100644
index 000000000000..772fe4614bad
--- /dev/null
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.apache.hadoop.security.token.Token
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.security.KafkaTokenUtil
+import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
+import org.apache.spark.internal.config._
+
+class KafkaSecurityHelperSuite extends SparkFunSuite with BeforeAndAfterEach {
+ private val keytab = "/path/to/keytab"
+ private val kerberosServiceName = "kafka"
+ private val principal = "user@domain.com"
+ private val tokenId = "tokenId" + UUID.randomUUID().toString
+ private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString
+
+ private var sparkConf: SparkConf = null
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ sparkConf = new SparkConf()
+ }
+
+ override def afterEach(): Unit = {
+ try {
+ resetUGI
+ } finally {
+ super.afterEach()
+ }
+ }
+
+ private def addTokenToUGI(): Unit = {
+ val token = new Token[KafkaDelegationTokenIdentifier](
+ tokenId.getBytes,
+ tokenPassword.getBytes,
+ KafkaTokenUtil.TOKEN_KIND,
+ KafkaTokenUtil.TOKEN_SERVICE
+ )
+ val creds = new Credentials()
+ creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
+ UserGroupInformation.getCurrentUser.addCredentials(creds)
+ }
+
+ private def resetUGI: Unit = {
+ UserGroupInformation.setLoginUser(null)
+ }
+
+ test("isTokenAvailable without token should return false") {
+ assert(!KafkaSecurityHelper.isTokenAvailable())
+ }
+
+ test("isTokenAvailable with token should return true") {
+ addTokenToUGI()
+
+ assert(KafkaSecurityHelper.isTokenAvailable())
+ }
+
+ test("getTokenJaasParams with token no service should throw exception") {
+ addTokenToUGI()
+
+ val thrown = intercept[IllegalArgumentException] {
+ KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+ }
+
+ assert(thrown.getMessage contains "Kerberos service name must be defined")
+ }
+
+ test("getTokenJaasParams with token should return scram module") {
+ addTokenToUGI()
+ sparkConf.set(Kafka.KERBEROS_SERVICE_NAME, kerberosServiceName)
+
+ val jaasParams = KafkaSecurityHelper.getTokenJaasParams(sparkConf)
+
+ assert(jaasParams.contains("ScramLoginModule required"))
+ assert(jaasParams.contains("tokenauth=true"))
+ assert(jaasParams.contains(tokenId))
+ assert(jaasParams.contains(tokenPassword))
+ }
+}
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index ea18b7e03591..333572e99b1c 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -28,8 +28,6 @@
spark-streaming-kafka-0-10_2.12
streaming-kafka-0-10
-
- 2.1.0
jar
Spark Integration for Kafka 0.10
diff --git a/pom.xml b/pom.xml
index 3ca2f739ce0e..dfc3c540dc18 100644
--- a/pom.xml
+++ b/pom.xml
@@ -128,6 +128,8 @@
1.2.1.spark2
1.2.1
+
+ 2.1.0
10.12.1.1
1.10.0
1.5.3