- spark.yarn.security.credentials.${service}.enabled |
+ spark.security.credentials.${service}.enabled |
true |
Controls whether to obtain credentials for services when security is enabled.
@@ -489,11 +489,11 @@ token for the cluster's default Hadoop filesystem, and potentially for HBase and
An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
-and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`.
+and `spark.security.credentials.hbase.enabled` is not set to `false`.
Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration
includes a URI of the metadata store in `"hive.metastore.uris`, and
-`spark.yarn.security.credentials.hive.enabled` is not set to `false`.
+`spark.security.credentials.hive.enabled` is not set to `false`.
If an application needs to interact with other secure Hadoop filesystems, then
the tokens needed to access these clusters must be explicitly requested at
@@ -507,7 +507,7 @@ Spark supports integrating with other security-aware services through Java Servi
`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider`
should be available to Spark by listing their names in the corresponding file in the jar's
`META-INF/services` directory. These plug-ins can be disabled by setting
-`spark.yarn.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of
+`spark.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of
credential provider.
## Configuring the External Shuffle Service
@@ -571,8 +571,8 @@ the Spark configuration must be set to disable token collection for the services
The Spark configuration must include the lines:
```
-spark.yarn.security.credentials.hive.enabled false
-spark.yarn.security.credentials.hbase.enabled false
+spark.security.credentials.hive.enabled false
+spark.security.credentials.hbase.enabled false
```
The configuration option `spark.yarn.access.hadoopFileSystems` must be unset.
diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml
index a1b641c8eeb8..5ab9758c5ee9 100644
--- a/resource-managers/yarn/pom.xml
+++ b/resource-managers/yarn/pom.xml
@@ -167,29 +167,27 @@
${jersey-1.version}
-
+
${hive.group}
hive-exec
- test
+ provided
${hive.group}
hive-metastore
- test
+ provided
org.apache.thrift
libthrift
- test
+ provided
org.apache.thrift
libfb303
- test
+ provided
diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
deleted file mode 100644
index f5a807ecac9d..000000000000
--- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ /dev/null
@@ -1,3 +0,0 @@
-org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
-org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
-org.apache.spark.deploy.yarn.security.HiveCredentialProvider
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 864c834d110f..31f4567a1766 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -38,7 +38,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager}
+import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, YARNHadoopDelegationTokenManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc._
@@ -247,8 +247,12 @@ private[spark] class ApplicationMaster(
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
- credentialRenewer =
- new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer()
+ val credentialManager = new YARNHadoopDelegationTokenManager(
+ sparkConf,
+ yarnConf,
+ YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
+
+ val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 424bbca12319..8a07c630d5c0 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
@@ -121,7 +121,10 @@ private[spark] class Client(
private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
.getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
- private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
+ private val credentialManager = new YARNHadoopDelegationTokenManager(
+ sparkConf,
+ hadoopConf,
+ YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
@@ -368,7 +371,7 @@ private[spark] class Client(
val fs = destDir.getFileSystem(hadoopConf)
// Merge credentials obtained from registered providers
- val nearestTimeOfNextRenewal = credentialManager.obtainCredentials(hadoopConf, credentials)
+ val nearestTimeOfNextRenewal = credentialManager.obtainDelegationTokens(hadoopConf, credentials)
if (credentials != null) {
// Add credentials to current user's UGI, so that following operations don't need to use the
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 93578855122c..d4d03654266f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -24,8 +24,9 @@ import java.util.regex.Pattern
import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.{JobConf, Master}
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.ApplicationConstants
@@ -35,11 +36,14 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater}
+import org.apache.spark.deploy.yarn.config._
+import org.apache.spark.deploy.yarn.security.CredentialUpdater
+import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
+
/**
* Contains util methods to interact with Hadoop from spark.
*/
@@ -87,8 +91,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = {
- credentialUpdater =
- new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater()
+ val hadoopConf = newConfiguration(sparkConf)
+ val credentialManager = new YARNHadoopDelegationTokenManager(
+ sparkConf,
+ hadoopConf,
+ YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+ credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager)
credentialUpdater.start()
}
@@ -103,6 +111,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
+
+ /** The filesystems for which YARN should fetch delegation tokens. */
+ private[spark] def hadoopFSsToAccess(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration): Set[FileSystem] = {
+ val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
+ .map(new Path(_).getFileSystem(hadoopConf))
+ .toSet
+
+ val stagingFS = sparkConf.get(STAGING_DIR)
+ .map(new Path(_).getFileSystem(hadoopConf))
+ .getOrElse(FileSystem.get(hadoopConf))
+
+ filesystemsToAccess + stagingFS
+ }
}
object YarnSparkHadoopUtil {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
index 7e76f402db24..68a2e9e70a78 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala
@@ -54,7 +54,7 @@ import org.apache.spark.util.ThreadUtils
private[yarn] class AMCredentialRenewer(
sparkConf: SparkConf,
hadoopConf: Configuration,
- credentialManager: ConfigurableCredentialManager) extends Logging {
+ credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
private var lastCredentialsFileSuffix = 0
@@ -174,7 +174,9 @@ private[yarn] class AMCredentialRenewer(
keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
// Get a copy of the credentials
override def run(): Void = {
- nearestNextRenewalTime = credentialManager.obtainCredentials(freshHadoopConf, tempCreds)
+ nearestNextRenewalTime = credentialManager.obtainDelegationTokens(
+ freshHadoopConf,
+ tempCreds)
null
}
})
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
deleted file mode 100644
index 4f4be52a0d69..000000000000
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import java.util.ServiceLoader
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.security.Credentials
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * A ConfigurableCredentialManager to manage all the registered credential providers and offer
- * APIs for other modules to obtain credentials as well as renewal time. By default
- * [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
- * be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
- * managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
- * interface and put into resources/META-INF/services to be loaded by ServiceLoader.
- *
- * Also each credential provider is controlled by
- * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
- * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by
- * the configuration spark.yarn.security.credentials.hive.enabled.
- */
-private[yarn] final class ConfigurableCredentialManager(
- sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
- private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
- private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
-
- // Maintain all the registered credential providers
- private val credentialProviders = {
- val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
- Utils.getContextOrSparkClassLoader).asScala
-
- // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
- providers.filter { p =>
- sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
- .orElse {
- sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
- logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
- s"using ${providerEnabledConfig.format(p.serviceName)} instead")
- c
- }
- }.map(_.toBoolean).getOrElse(true)
- }.map { p => (p.serviceName, p) }.toMap
- }
-
- /**
- * Get credential provider for the specified service.
- */
- def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
- credentialProviders.get(service)
- }
-
- /**
- * Obtain credentials from all the registered providers.
- * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
- * otherwise the nearest renewal time of any credentials will be returned.
- */
- def obtainCredentials(hadoopConf: Configuration, creds: Credentials): Long = {
- credentialProviders.values.flatMap { provider =>
- if (provider.credentialsRequired(hadoopConf)) {
- provider.obtainCredentials(hadoopConf, sparkConf, creds)
- } else {
- logDebug(s"Service ${provider.serviceName} does not require a token." +
- s" Check your configuration to see if security is disabled or not.")
- None
- }
- }.foldLeft(Long.MaxValue)(math.min)
- }
-
- /**
- * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
- * instance when it is not used. AM will use it to renew credentials periodically.
- */
- def credentialRenewer(): AMCredentialRenewer = {
- new AMCredentialRenewer(sparkConf, hadoopConf, this)
- }
-
- /**
- * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
- * when it is not used. Executors and driver (client mode) will use it to update credentials.
- * periodically.
- */
- def credentialUpdater(): CredentialUpdater = {
- new CredentialUpdater(sparkConf, hadoopConf, this)
- }
-}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
index 41b7b5d60b03..fe173dffc22a 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
private[spark] class CredentialUpdater(
sparkConf: SparkConf,
hadoopConf: Configuration,
- credentialManager: ConfigurableCredentialManager) extends Logging {
+ credentialManager: YARNHadoopDelegationTokenManager) extends Logging {
@volatile private var lastCredentialsFileSuffix = 0
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
deleted file mode 100644
index f65c886db944..000000000000
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import scala.collection.JavaConverters._
-import scala.util.Try
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
-
-import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-
-private[security] class HadoopFSCredentialProvider
- extends ServiceCredentialProvider with Logging {
- // Token renewal interval, this value will be set in the first call,
- // if None means no token renewer specified or no token can be renewed,
- // so cannot get token renewal interval.
- private var tokenRenewalInterval: Option[Long] = null
-
- override val serviceName: String = "hadoopfs"
-
- override def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long] = {
- // NameNode to access, used to get tokens from different FileSystems
- val tmpCreds = new Credentials()
- val tokenRenewer = getTokenRenewer(hadoopConf)
- hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
- val dstFs = dst.getFileSystem(hadoopConf)
- logInfo("getting token for: " + dst)
- dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
- }
-
- // Get the token renewal interval if it is not set. It will only be called once.
- if (tokenRenewalInterval == null) {
- tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
- }
-
- // Get the time of next renewal.
- val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
- val nextRenewalDates = tmpCreds.getAllTokens.asScala
- .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
- .map { t =>
- val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
- identifier.getIssueDate + interval
- }
- if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
- }
-
- creds.addAll(tmpCreds)
- nextRenewalDate
- }
-
- private def getTokenRenewalInterval(
- hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = {
- // We cannot use the tokens generated with renewer yarn. Trying to renew
- // those will fail with an access control issue. So create new tokens with the logged in
- // user as renewer.
- sparkConf.get(PRINCIPAL).flatMap { renewer =>
- val creds = new Credentials()
- hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
- val dstFs = dst.getFileSystem(hadoopConf)
- dstFs.addDelegationTokens(renewer, creds)
- }
-
- val renewIntervals = creds.getAllTokens.asScala.filter {
- _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
- }.flatMap { token =>
- Try {
- val newExpiration = token.renew(hadoopConf)
- val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
- val interval = newExpiration - identifier.getIssueDate
- logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
- interval
- }.toOption
- }
- if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
- }
- }
-
- private def getTokenRenewer(conf: Configuration): String = {
- val delegTokenRenewer = Master.getMasterPrincipal(conf)
- logDebug("delegation token renewer is: " + delegTokenRenewer)
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- val errorMessage = "Can't get Master Kerberos principal for use as renewer"
- logError(errorMessage)
- throw new SparkException(errorMessage)
- }
-
- delegTokenRenewer
- }
-
- private def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
- sparkConf.get(FILESYSTEMS_TO_ACCESS).map(new Path(_)).toSet +
- sparkConf.get(STAGING_DIR).map(new Path(_))
- .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory)
- }
-}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
index 4e3fcce8dbb1..cc24ac4d9bcf 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala
@@ -35,7 +35,7 @@ trait ServiceCredentialProvider {
def serviceName: String
/**
- * To decide whether credential is required for this service. By default it based on whether
+ * Returns true if credentials are required by this service. By default, it is based on whether
* Hadoop security is enabled.
*/
def credentialsRequired(hadoopConf: Configuration): Boolean = {
@@ -44,6 +44,7 @@ trait ServiceCredentialProvider {
/**
* Obtain credentials for this service and get the time of the next renewal.
+ *
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param sparkConf Spark configuration.
* @param creds Credentials to add tokens and security keys to.
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
new file mode 100644
index 000000000000..bbd17c8fc127
--- /dev/null
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * This class loads delegation token providers registered under the YARN-specific
+ * [[ServiceCredentialProvider]] interface, as well as the builtin providers defined
+ * in [[HadoopDelegationTokenManager]].
+ */
+private[yarn] class YARNHadoopDelegationTokenManager(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration,
+ fileSystems: Set[FileSystem]) extends Logging {
+
+ private val delegationTokenManager =
+ new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems)
+
+ // public for testing
+ val credentialProviders = getCredentialProviders
+
+ /**
+ * Writes delegation tokens to creds. Delegation tokens are fetched from all registered
+ * providers.
+ *
+ * @return Time after which the fetched delegation tokens should be renewed.
+ */
+ def obtainDelegationTokens(hadoopConf: Configuration, creds: Credentials): Long = {
+ val superInterval = delegationTokenManager.obtainDelegationTokens(hadoopConf, creds)
+
+ credentialProviders.values.flatMap { provider =>
+ if (provider.credentialsRequired(hadoopConf)) {
+ provider.obtainCredentials(hadoopConf, sparkConf, creds)
+ } else {
+ logDebug(s"Service ${provider.serviceName} does not require a token." +
+ s" Check your configuration to see if security is disabled or not.")
+ None
+ }
+ }.foldLeft(superInterval)(math.min)
+ }
+
+ private def getCredentialProviders: Map[String, ServiceCredentialProvider] = {
+ val providers = loadCredentialProviders
+
+ providers.
+ filter { p => delegationTokenManager.isServiceEnabled(p.serviceName) }
+ .map { p => (p.serviceName, p) }
+ .toMap
+ }
+
+ private def loadCredentialProviders: List[ServiceCredentialProvider] = {
+ ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
+ .asScala
+ .toList
+ }
+}
diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
index d0ef5efa36e8..f31c23269313 100644
--- a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
+++ b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
@@ -1 +1 @@
-org.apache.spark.deploy.yarn.security.TestCredentialProvider
+org.apache.spark.deploy.yarn.security.YARNTestCredentialProvider
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
deleted file mode 100644
index b0067aa4517c..000000000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
-import org.apache.hadoop.security.token.Token
-import org.scalatest.{BeforeAndAfter, Matchers}
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-
-class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
- private var credentialManager: ConfigurableCredentialManager = null
- private var sparkConf: SparkConf = null
- private var hadoopConf: Configuration = null
-
- override def beforeAll(): Unit = {
- super.beforeAll()
-
- sparkConf = new SparkConf()
- hadoopConf = new Configuration()
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- override def afterAll(): Unit = {
- System.clearProperty("SPARK_YARN_MODE")
-
- super.afterAll()
- }
-
- test("Correctly load default credential providers") {
- credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
- credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
- credentialManager.getServiceCredentialProvider("hbase") should not be (None)
- credentialManager.getServiceCredentialProvider("hive") should not be (None)
- }
-
- test("disable hive credential provider") {
- sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
- credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
- credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
- credentialManager.getServiceCredentialProvider("hbase") should not be (None)
- credentialManager.getServiceCredentialProvider("hive") should be (None)
- }
-
- test("using deprecated configurations") {
- sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
- sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
- credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
-
- credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
- credentialManager.getServiceCredentialProvider("hive") should be (None)
- credentialManager.getServiceCredentialProvider("test") should not be (None)
- credentialManager.getServiceCredentialProvider("hbase") should not be (None)
- }
-
- test("verify obtaining credentials from provider") {
- credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- val creds = new Credentials()
-
- // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot
- // be obtained.
- credentialManager.obtainCredentials(hadoopConf, creds)
- val tokens = creds.getAllTokens
- tokens.size() should be (1)
- tokens.iterator().next().getService should be (new Text("test"))
- }
-
- test("verify getting credential renewal info") {
- credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)
- val creds = new Credentials()
-
- val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get
- .asInstanceOf[TestCredentialProvider]
- // Only TestTokenProvider can get the time of next token renewal
- val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds)
- nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal)
- }
-
- test("obtain tokens For HiveMetastore") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hive.metastore.kerberos.principal", "bob")
- // thrift picks up on port 0 and bails out, without trying to talk to endpoint
- hadoopConf.set("hive.metastore.uris", "http://localhost:0")
-
- val hiveCredentialProvider = new HiveCredentialProvider()
- val credentials = new Credentials()
- hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials)
-
- credentials.getAllTokens.size() should be (0)
- }
-
- test("Obtain tokens For HBase") {
- val hadoopConf = new Configuration()
- hadoopConf.set("hbase.security.authentication", "kerberos")
-
- val hbaseTokenProvider = new HBaseCredentialProvider()
- val creds = new Credentials()
- hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds)
-
- creds.getAllTokens.size should be (0)
- }
-}
-
-class TestCredentialProvider extends ServiceCredentialProvider {
- val tokenRenewalInterval = 86400 * 1000L
- var timeOfNextTokenRenewal = 0L
-
- override def serviceName: String = "test"
-
- override def credentialsRequired(conf: Configuration): Boolean = true
-
- override def obtainCredentials(
- hadoopConf: Configuration,
- sparkConf: SparkConf,
- creds: Credentials): Option[Long] = {
- if (creds == null) {
- // Guard out other unit test failures.
- return None
- }
-
- val emptyToken = new Token()
- emptyToken.setService(new Text("test"))
- creds.addToken(emptyToken.getService, emptyToken)
-
- val currTime = System.currentTimeMillis()
- timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval
-
- Some(timeOfNextTokenRenewal)
- }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
deleted file mode 100644
index f50ee193c258..000000000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn.security
-
-import org.apache.hadoop.conf.Configuration
-import org.scalatest.{Matchers, PrivateMethodTester}
-
-import org.apache.spark.{SparkException, SparkFunSuite}
-
-class HadoopFSCredentialProviderSuite
- extends SparkFunSuite
- with PrivateMethodTester
- with Matchers {
- private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)
-
- private def getTokenRenewer(
- fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
- fsCredentialProvider invokePrivate _getTokenRenewer(conf)
- }
-
- private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null
-
- override def beforeAll() {
- super.beforeAll()
-
- if (hadoopFsCredentialProvider == null) {
- hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
- }
- }
-
- override def afterAll() {
- if (hadoopFsCredentialProvider != null) {
- hadoopFsCredentialProvider = null
- }
-
- super.afterAll()
- }
-
- test("check token renewer") {
- val hadoopConf = new Configuration()
- hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
- hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
- val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
- renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
- }
-
- test("check token renewer default") {
- val hadoopConf = new Configuration()
- val caught =
- intercept[SparkException] {
- getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
- }
- assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
- }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
new file mode 100644
index 000000000000..2b226eff5ce1
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.Credentials
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+
+class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
+ private var credentialManager: YARNHadoopDelegationTokenManager = null
+ private var sparkConf: SparkConf = null
+ private var hadoopConf: Configuration = null
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ System.setProperty("SPARK_YARN_MODE", "true")
+
+ sparkConf = new SparkConf()
+ hadoopConf = new Configuration()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+
+ System.clearProperty("SPARK_YARN_MODE")
+ }
+
+ test("Correctly loads credential providers") {
+ credentialManager = new YARNHadoopDelegationTokenManager(
+ sparkConf,
+ hadoopConf,
+ YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf))
+
+ credentialManager.credentialProviders.get("yarn-test") should not be (None)
+ }
+}
+
+class YARNTestCredentialProvider extends ServiceCredentialProvider {
+ override def serviceName: String = "yarn-test"
+
+ override def credentialsRequired(conf: Configuration): Boolean = true
+
+ override def obtainCredentials(
+ hadoopConf: Configuration,
+ sparkConf: SparkConf,
+ creds: Credentials): Option[Long] = None
+}
|