diff --git a/core/pom.xml b/core/pom.xml index 24ce36deeb16..d96841008ae3 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -357,6 +357,34 @@ org.apache.commons commons-crypto + + + + ${hive.group} + hive-exec + + + ${hive.group} + hive-metastore + + + org.apache.thrift + libthrift + test + + + org.apache.thrift + libfb303 + test + + + + org.apache.hadoop + hadoop-yarn-api + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 77005aa9040b..e6e59fdf127e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -30,6 +30,7 @@ import scala.util.Properties import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.ivy.Ivy import org.apache.ivy.core.LogOptions import org.apache.ivy.core.module.descriptor._ @@ -45,6 +46,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ +import org.apache.spark.internal.Logging import org.apache.spark.launcher.SparkLauncher import org.apache.spark.util._ @@ -63,7 +65,7 @@ private[deploy] object SparkSubmitAction extends Enumeration { * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ -object SparkSubmit extends CommandLineUtils { +object SparkSubmit extends CommandLineUtils with Logging { // Cluster managers private val YARN = 1 @@ -564,12 +566,22 @@ object SparkSubmit extends CommandLineUtils { // properties and then loaded by SparkConf sysProps.put("spark.yarn.keytab", args.keytab) sysProps.put("spark.yarn.principal", args.principal) - - UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } } + + // [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with + // renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we + // must trick it into thinking we're YARN. + if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) { + val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName + val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}" + + logDebug(s"Setting ${key} to ${shortUserName}.") + sysProps.put(key, shortUserName) + } + // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = "org.apache.spark.deploy.yarn.Client" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala similarity index 54% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala rename to core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala index 4f4be52a0d69..2d62fb0339de 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/ConfigurableCredentialManager.scala @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.util.ServiceLoader import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.SparkConf import org.apache.spark.internal.Logging @@ -37,33 +37,59 @@ import org.apache.spark.util.Utils * interface and put into resources/META-INF/services to be loaded by ServiceLoader. * * Also each credential provider is controlled by - * spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false. + * spark.security.credentials.{service}.enabled, it will not be loaded in if set to false. * For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by - * the configuration spark.yarn.security.credentials.hive.enabled. + * the configuration spark.security.credentials.hive.enabled. */ -private[yarn] final class ConfigurableCredentialManager( - sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { - private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled" - private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled" +private[spark] class ConfigurableCredentialManager( + sparkConf: SparkConf, + hadoopConf: Configuration) + extends Logging { + + private val deprecatedProviderEnabledConfigs = List( + "spark.yarn.security.tokens.%s.enabled", + "spark.yarn.security.credentials.%s.enabled") + private val providerEnabledConfig = "spark.security.credentials.%s.enabled" // Maintain all the registered credential providers - private val credentialProviders = { - val providers = ServiceLoader.load(classOf[ServiceCredentialProvider], - Utils.getContextOrSparkClassLoader).asScala + private val credentialProviders = getCredentialProviders() + logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.") + + private def getCredentialProviders(): Map[String, ServiceCredentialProvider] = { + val providers = loadCredentialProviders - // Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false. + // Filter out credentials in which spark.security.credentials.{service}.enabled is false. providers.filter { p => - sparkConf.getOption(providerEnabledConfig.format(p.serviceName)) - .orElse { - sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c => - logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " + - s"using ${providerEnabledConfig.format(p.serviceName)} instead") - c - } - }.map(_.toBoolean).getOrElse(true) + + val key = providerEnabledConfig.format(p) + + deprecatedProviderEnabledConfigs.foreach { pattern => + val deprecatedKey = pattern.format(p.serviceName) + if (sparkConf.contains(deprecatedKey)) { + logWarning(s"${deprecatedKey} is deprecated, using ${key} instead") + } + } + + val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern => + sparkConf + .getOption(pattern.format(p.serviceName)) + .map(_.toBoolean) + .getOrElse(true) + } + + sparkConf + .getOption(key) + .map(_.toBoolean) + .getOrElse(isEnabledDeprecated) + }.map { p => (p.serviceName, p) }.toMap } + protected def loadCredentialProviders: List[ServiceCredentialProvider] = { + ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader) + .asScala.toList + } + /** * Get credential provider for the specified service. */ @@ -72,7 +98,9 @@ private[yarn] final class ConfigurableCredentialManager( } /** - * Obtain credentials from all the registered providers. + * Writes delegation tokens to creds. Delegation tokens are fetched from all registered + * providers. + * * @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable, * otherwise the nearest renewal time of any credentials will be returned. */ @@ -89,19 +117,16 @@ private[yarn] final class ConfigurableCredentialManager( } /** - * Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this - * instance when it is not used. AM will use it to renew credentials periodically. + * Returns a copy of the current user's credentials, augmented with new delegation tokens. */ - def credentialRenewer(): AMCredentialRenewer = { - new AMCredentialRenewer(sparkConf, hadoopConf, this) - } + def obtainUserCredentials: Credentials = { + val userCreds = UserGroupInformation.getCurrentUser.getCredentials + val numTokensBefore = userCreds.numberOfTokens + obtainCredentials(hadoopConf, userCreds) - /** - * Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance - * when it is not used. Executors and driver (client mode) will use it to update credentials. - * periodically. - */ - def credentialUpdater(): CredentialUpdater = { - new CredentialUpdater(sparkConf, hadoopConf, this) + logDebug(s"Fetched ${userCreds.numberOfTokens - numTokensBefore} delegation token(s).") + + userCreds } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala b/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala new file mode 100644 index 000000000000..e74a4820b5ee --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/CredentialsSerializer.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import java.io.{ByteArrayOutputStream, DataOutputStream} + +import org.apache.hadoop.security.Credentials + +class CredentialsSerializer { + def serializeTokens(creds: Credentials): Array[Byte] = { + val byteStream = new ByteArrayOutputStream + val dataStream = new DataOutputStream(byteStream) + creds.writeTokenStorageToStream(dataStream) + byteStream.toByteArray + } + + def deserializeTokens(tokenBytes: Array[Byte]): Credentials = { + val tokensBuf = new java.io.ByteArrayInputStream(tokenBytes) + + val creds = new Credentials() + creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf)) + creds + } +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala similarity index 98% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala index 5adeb8e605ff..29b0b9e9210d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HBaseCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseCredentialProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import scala.reflect.runtime.universe import scala.util.control.NonFatal diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala new file mode 100644 index 000000000000..489554a925c5 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSCredentialProvider.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + +private[deploy] class HadoopFSCredentialProvider + extends ServiceCredentialProvider with Logging { + // Token renewal interval, this value will be set in the first call, + // if None means no token renewer specified or no token can be renewed, + // so cannot get token renewal interval. + private var tokenRenewalInterval: Option[Long] = null + + override val serviceName: String = "hadoopfs" + + override def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + // NameNode to access, used to get tokens from different FileSystems + val tmpCreds = new Credentials() + val tokenRenewer = getTokenRenewer(hadoopConf) + hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst => + val dstFs = dst.getFileSystem(hadoopConf) + logInfo("getting token for: " + dst) + dstFs.addDelegationTokens(tokenRenewer, tmpCreds) + } + + // Get the token renewal interval if it is not set. It will only be called once. + if (tokenRenewalInterval == null) { + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf) + } + + // Get the time of next renewal. + val nextRenewalDate = tokenRenewalInterval.flatMap { interval => + val nextRenewalDates = tmpCreds.getAllTokens.asScala + .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) + .map { t => + val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + identifier.getIssueDate + interval + } + if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) + } + + creds.addAll(tmpCreds) + nextRenewalDate + } + + protected def getTokenRenewalInterval( + hadoopConf: Configuration, + sparkConf: SparkConf): Option[Long] = None + + protected def getTokenRenewer(hadoopConf: Configuration): String = { + UserGroupInformation.getCurrentUser.getShortUserName + } + + protected def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { + Set(FileSystem.get(hadoopConf).getHomeDirectory) + } +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala similarity index 99% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala rename to core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala index 16d8fc32bb42..d8d4cccbd085 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HiveCredentialProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveCredentialProvider.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.deploy.yarn.security +package org.apache.spark.deploy.security import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction diff --git a/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala new file mode 100644 index 000000000000..9016c256be64 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/ServiceCredentialProvider.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.spark.SparkConf +import org.apache.spark.annotation.InterfaceStability + +/** + * A credential provider for a service. User must implement this if they need to access a + * secure service from Spark. + */ +@InterfaceStability.Unstable +trait ServiceCredentialProvider { + + /** + * Name of the service to provide credentials. This name should unique, Spark internally will + * use this name to differentiate credential provider. + */ + def serviceName: String + + /** + * To decide whether credential is required for this service. By default it based on whether + * Hadoop security is enabled. + */ + def credentialsRequired(hadoopConf: Configuration): Boolean = { + UserGroupInformation.isSecurityEnabled + } + + /** + * Obtain credentials for this service and get the time of the next renewal. + * @param hadoopConf Configuration of current Hadoop Compatible system. + * @param sparkConf Spark configuration. + * @param creds Credentials to add tokens and security keys to. + * @return If this Credential is renewable and can be renewed, return the time of the next + * renewal, otherwise None should be returned. + */ + def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b2b26ee107c0..3fc27dbd1352 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -26,9 +26,12 @@ import scala.collection.mutable import scala.util.{Failure, Success} import scala.util.control.NonFatal +import org.apache.hadoop.security.UserGroupInformation + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.security.CredentialsSerializer import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ @@ -174,6 +177,15 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { + private def addDelegationTokens(tokenBytes: Array[Byte], driverConf: SparkConf) { + val creds = new CredentialsSerializer().deserializeTokens(tokenBytes) + + logInfo(s"Adding ${creds.numberOfTokens()} tokens and ${creds.numberOfSecretKeys()} secret" + + s"keys to the current user's credentials.") + + UserGroupInformation.getCurrentUser.addCredentials(creds) + } + private def run( driverUrl: String, executorId: String, @@ -220,6 +232,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SparkHadoopUtil.get.startCredentialUpdater(driverConf) } + cfg.ugiTokens.foreach(addDelegationTokens(_, driverConf)) + val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 6b49bd699a13..02dd8dd24869 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,6 +19,8 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import org.apache.hadoop.security.Credentials + import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.ExecutorLossReason @@ -32,7 +34,8 @@ private[spark] object CoarseGrainedClusterMessages { case class SparkAppConfig( sparkProperties: Seq[(String, String)], - ioEncryptionKey: Option[Array[Byte]]) + ioEncryptionKey: Option[Array[Byte]], + ugiTokens: Option[Array[Byte]]) extends CoarseGrainedClusterMessage case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4eedaaea6119..9c4c46de177f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -23,9 +23,11 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration + +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.deploy.security.{ConfigurableCredentialManager, CredentialsSerializer} import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -42,7 +44,10 @@ import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} * (spark.deploy.*). */ private[spark] -class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: RpcEnv) +class CoarseGrainedSchedulerBackend( + scheduler: TaskSchedulerImpl, + val rpcEnv: RpcEnv, + credentialManager: Option[ConfigurableCredentialManager]) extends ExecutorAllocationClient with SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed @@ -92,6 +97,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // Hadoop delegation tokens to send executors. + private val userTokens = if (UserGroupInformation.isSecurityEnabled) { + credentialManager.map { manager => + new CredentialsSerializer().serializeTokens(manager.obtainUserCredentials) + } + } else { + None + } + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { @@ -216,8 +230,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RetrieveSparkAppConfig => - val reply = SparkAppConfig(sparkProperties, - SparkEnv.get.securityManager.getIOEncryptionKey()) + val reply = SparkAppConfig( + sparkProperties, + SparkEnv.get.securityManager.getIOEncryptionKey(), + userTokens + ) context.reply(reply) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 0529fe9eed4d..96db24ef5af1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -38,7 +38,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext, masters: Array[String]) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) with StandaloneAppClientListener with Logging { diff --git a/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..2676a0ad589f --- /dev/null +++ b/core/src/test/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1 @@ +org.apache.spark.deploy.security.TestCredentialProvider diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 88916488c0de..14d0575a13cc 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -268,7 +268,7 @@ private class FakeSchedulerBackend( scheduler: TaskSchedulerImpl, rpcEnv: RpcEnv, clusterManagerEndpoint: RpcEndpointRef) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv, None) { protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { clusterManagerEndpoint.ask[Boolean]( diff --git a/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala new file mode 100644 index 000000000000..0838aadb1021 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/security/ConfigurableCredentialManagerSuite.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.Token +import org.scalatest.{BeforeAndAfter, Matchers} + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { + private var credentialManager: ConfigurableCredentialManager = null + private var sparkConf: SparkConf = null + private var hadoopConf: Configuration = null + + override def beforeAll(): Unit = { + super.beforeAll() + + sparkConf = new SparkConf() + hadoopConf = new Configuration() + } + + test("Correctly load default credential providers") { + credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + + credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) + credentialManager.getServiceCredentialProvider("hbase") should not be (None) + credentialManager.getServiceCredentialProvider("hive") should not be (None) + } + + test("disable hive credential provider") { + sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") + credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + + credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) + credentialManager.getServiceCredentialProvider("hbase") should not be (None) + credentialManager.getServiceCredentialProvider("hive") should be (None) + } + + test("using deprecated configurations") { + sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false") + sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false") + credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + + credentialManager.getServiceCredentialProvider("hadoopfs") should be (None) + credentialManager.getServiceCredentialProvider("hive") should be (None) + credentialManager.getServiceCredentialProvider("test") should not be (None) + credentialManager.getServiceCredentialProvider("hbase") should not be (None) + } + + test("verify obtaining credentials from provider") { + credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + val creds = new Credentials() + + // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot + // be obtained. + credentialManager.obtainCredentials(hadoopConf, creds) + val tokens = creds.getAllTokens + tokens.size() should be (1) + tokens.iterator().next().getService should be (new Text("test")) + } + + test("verify getting credential renewal info") { + credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + val creds = new Credentials() + + val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get + .asInstanceOf[TestCredentialProvider] + // Only TestTokenProvider can get the time of next token renewal + val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds) + nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal) + } + + test("obtain tokens For HiveMetastore") { + val hadoopConf = new Configuration() + hadoopConf.set("hive.metastore.kerberos.principal", "bob") + // thrift picks up on port 0 and bails out, without trying to talk to endpoint + hadoopConf.set("hive.metastore.uris", "http://localhost:0") + + val hiveCredentialProvider = new HiveCredentialProvider() + val credentials = new Credentials() + hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials) + + credentials.getAllTokens.size() should be (0) + } + + test("Obtain tokens For HBase") { + val hadoopConf = new Configuration() + hadoopConf.set("hbase.security.authentication", "kerberos") + + val hbaseTokenProvider = new HBaseCredentialProvider() + val creds = new Credentials() + hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds) + + creds.getAllTokens.size should be (0) + } +} + +class TestCredentialProvider extends ServiceCredentialProvider { + val tokenRenewalInterval = 86400 * 1000L + var timeOfNextTokenRenewal = 0L + + override def serviceName: String = "test" + + override def credentialsRequired(conf: Configuration): Boolean = true + + override def obtainCredentials( + hadoopConf: Configuration, + sparkConf: SparkConf, + creds: Credentials): Option[Long] = { + if (creds == null) { + // Guard out other unit test failures. + return None + } + + val emptyToken = new Token() + emptyToken.setService(new Text("test")) + creds.addToken(emptyToken.getService, emptyToken) + + val currTime = System.currentTimeMillis() + timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval + + Some(timeOfNextTokenRenewal) + } +} diff --git a/dev/.rat-excludes b/dev/.rat-excludes index 2355d40d1e6f..4ae5b0b08324 100644 --- a/dev/.rat-excludes +++ b/dev/.rat-excludes @@ -102,7 +102,7 @@ spark-deps-.* org.apache.spark.scheduler.ExternalClusterManager .*\.sql .Rbuildignore -org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +org.apache.spark.deploy.security.ServiceCredentialProvider spark-warehouse structured-streaming/* kafka-source-initial-offset-version-2.1.0.bin diff --git a/docs/configuration.md b/docs/configuration.md index 2687f542b8bd..0f8a4254b796 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -685,7 +685,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.retainedJobs 1000 - How many jobs the Spark UI and status APIs remember before garbage collecting. + How many jobs the Spark UI and status APIs remember before garbage collecting. This is a target maximum, and fewer elements may be retained in some circumstances. @@ -693,7 +693,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.retainedStages 1000 - How many stages the Spark UI and status APIs remember before garbage collecting. + How many stages the Spark UI and status APIs remember before garbage collecting. This is a target maximum, and fewer elements may be retained in some circumstances. @@ -701,7 +701,7 @@ Apart from these, the following properties are also available, and may be useful spark.ui.retainedTasks 100000 - How many tasks the Spark UI and status APIs remember before garbage collecting. + How many tasks the Spark UI and status APIs remember before garbage collecting. This is a target maximum, and fewer elements may be retained in some circumstances. diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e9ddaa76a797..07415952fec5 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -426,7 +426,7 @@ To use a custom metrics.properties for the application master and executors, upd - spark.yarn.security.credentials.${service}.enabled + spark.security.credentials.${service}.enabled true Controls whether to obtain credentials for services when security is enabled. @@ -489,11 +489,11 @@ token for the cluster's default Hadoop filesystem, and potentially for HBase and An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`), -and `spark.yarn.security.credentials.hbase.enabled` is not set to `false`. +and `spark.security.credentials.hbase.enabled` is not set to `false`. Similarly, a Hive token will be obtained if Hive is on the classpath, its configuration includes a URI of the metadata store in `"hive.metastore.uris`, and -`spark.yarn.security.credentials.hive.enabled` is not set to `false`. +`spark.security.credentials.hive.enabled` is not set to `false`. If an application needs to interact with other secure Hadoop filesystems, then the tokens needed to access these clusters must be explicitly requested at @@ -504,10 +504,10 @@ spark.yarn.access.hadoopFileSystems hdfs://ireland.example.org:8020/,webhdfs://f ``` Spark supports integrating with other security-aware services through Java Services mechanism (see -`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` +`java.util.ServiceLoader`). To do that, implementations of `org.apache.spark.deploy.security.ServiceCredentialProvider` should be available to Spark by listing their names in the corresponding file in the jar's `META-INF/services` directory. These plug-ins can be disabled by setting -`spark.yarn.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of +`spark.security.credentials.{service}.enabled` to `false`, where `{service}` is the name of credential provider. ## Configuring the External Shuffle Service @@ -571,8 +571,8 @@ the Spark configuration must be set to disable token collection for the services The Spark configuration must include the lines: ``` -spark.yarn.security.credentials.hive.enabled false -spark.yarn.security.credentials.hbase.enabled false +spark.security.credentials.hive.enabled false +spark.security.credentials.hbase.enabled false ``` The configuration option `spark.yarn.access.hadoopFileSystems` must be unset. diff --git a/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..9ffeb4d50029 --- /dev/null +++ b/resource-managers/mesos/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1,3 @@ +org.apache.spark.deploy.security.HadoopFSCredentialProvider +org.apache.spark.deploy.security.HBaseCredentialProvider +org.apache.spark.deploy.security.HiveCredentialProvider diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 19e253394f1b..55ef6ea38281 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,4 +56,8 @@ package object config { .stringConf .createOptional + private[spark] val USER_CREDENTIALS = ConfigBuilder("spark.mesos.kerberos.userCredentials") + .doc("Base64 encoding of UGI credentials.") + .stringConf + .createOptional } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 2a36ec4fa811..b4da4285bdf9 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -20,15 +20,20 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, List => JList} import java.util.concurrent.locks.ReentrantLock +import javax.xml.bind.DatatypeConverter import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.Future -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.mesos.Protos.{Credentials => _, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.mesos.config +import org.apache.spark.deploy.security.ConfigurableCredentialManager import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointAddress @@ -51,7 +56,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc: SparkContext, master: String, securityManager: SecurityManager) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) + extends CoarseGrainedSchedulerBackend( + scheduler, + sc.env.rpcEnv, + Option(new ConfigurableCredentialManager( + sc.conf, + SparkHadoopUtil.get.newConfiguration(sc.conf)))) with org.apache.mesos.Scheduler with MesosSchedulerUtils { @@ -159,6 +169,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( override def start() { super.start() + val driver = createSchedulerDriver( master, MesosCoarseGrainedSchedulerBackend.this, @@ -238,6 +249,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( command.build() } + protected def driverURL: String = { if (conf.contains("spark.testing")) { "driverURL" diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 9d81025a3016..514491d76222 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -442,7 +442,7 @@ trait MesosSchedulerUtils extends Logging { /** * The values of the non-zero ports to be used by the executor process. - * + * * @param conf the spark config to use * @return the ono-zero values of the ports */ diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c040f05d93b3..1ba05a1aa140 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -33,10 +33,11 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.mesos.config import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.mesos.Utils._ @@ -59,7 +60,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite implicit override val patienceConfig = PatienceConfig(timeout = Duration(0, TimeUnit.SECONDS)) test("mesos supports killing and limiting executors") { - setBackend() + init() sparkConf.set("spark.driver.host", "driverHost") sparkConf.set("spark.driver.port", "1234") @@ -88,7 +89,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos supports killing and relaunching tasks with executors") { - setBackend() + init() // launches a task on a valid offer val minMem = backend.executorMemory(sc) + 1024 @@ -110,7 +111,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports spark.executor.cores") { val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) + init(Map("spark.executor.cores" -> executorCores.toString)) val executorMemory = backend.executorMemory(sc) val offers = List(Resources(executorMemory * 2, executorCores + 1)) @@ -124,7 +125,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos supports unset spark.executor.cores") { - setBackend() + init() val executorMemory = backend.executorMemory(sc) val offerCores = 10 @@ -139,7 +140,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos does not acquire more than spark.cores.max") { val maxCores = 10 - setBackend(Map("spark.cores.max" -> maxCores.toString)) + init(Map("spark.cores.max" -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, maxCores + 1))) @@ -152,7 +153,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos does not acquire gpus if not specified") { - setBackend() + init() val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, 1, 1))) @@ -167,7 +168,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos does not acquire more than spark.mesos.gpus.max") { val maxGpus = 5 - setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString)) + init(Map("spark.mesos.gpus.max" -> maxGpus.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List(Resources(executorMemory, 1, maxGpus + 1))) @@ -181,14 +182,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos declines offers that violate attribute constraints") { - setBackend(Map("spark.mesos.constraints" -> "x:true")) + init(Map("spark.mesos.constraints" -> "x:true")) offerResources(List(Resources(backend.executorMemory(sc), 4))) verifyDeclinedOffer(driver, createOfferId("o1"), true) } test("mesos declines offers with a filter when reached spark.cores.max") { val maxCores = 3 - setBackend(Map("spark.cores.max" -> maxCores.toString)) + init(Map("spark.cores.max" -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) offerResources(List( @@ -202,7 +203,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos assigns tasks round-robin on offers") { val executorCores = 4 val maxCores = executorCores * 2 - setBackend(Map("spark.executor.cores" -> executorCores.toString, + init(Map("spark.executor.cores" -> executorCores.toString, "spark.cores.max" -> maxCores.toString)) val executorMemory = backend.executorMemory(sc) @@ -216,7 +217,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos creates multiple executors on a single slave") { val executorCores = 4 - setBackend(Map("spark.executor.cores" -> executorCores.toString)) + init(Map("spark.executor.cores" -> executorCores.toString)) // offer with room for two executors val executorMemory = backend.executorMemory(sc) @@ -228,7 +229,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos doesn't register twice with the same shuffle service") { - setBackend(Map("spark.shuffle.service.enabled" -> "true")) + init(Map("spark.shuffle.service.enabled" -> "true")) val (mem, cpu) = (backend.executorMemory(sc), 4) val offer1 = createOffer("o1", "s1", mem, cpu) @@ -249,7 +250,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("Port offer decline when there is no appropriate range") { - setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100")) + init(Map(BLOCK_MANAGER_PORT.key -> "30100")) val offeredPorts = (31100L, 31200L) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -259,7 +260,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("Port offer accepted when ephemeral ports are used") { - setBackend() + init() val offeredPorts = (31100L, 31200L) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -270,7 +271,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("Port offer accepted with user defined port numbers") { val port = 30100 - setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port")) + init(Map(BLOCK_MANAGER_PORT.key -> s"$port")) val offeredPorts = (30000L, 31000L) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -289,7 +290,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos kills an executor when told") { - setBackend() + init() val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -302,7 +303,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("weburi is set in created scheduler driver") { - initializeSparkConf() + getSparkConf() sc = new SparkContext(sparkConf) val taskScheduler = mock[TaskSchedulerImpl] @@ -336,7 +337,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("honors unset spark.mesos.containerizer") { - setBackend(Map("spark.mesos.executor.docker.image" -> "test")) + init(Map("spark.mesos.executor.docker.image" -> "test")) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -348,7 +349,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("honors spark.mesos.containerizer=\"mesos\"") { - setBackend(Map( + init(Map( "spark.mesos.executor.docker.image" -> "test", "spark.mesos.containerizer" -> "mesos")) @@ -362,7 +363,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("docker settings are reflected in created tasks") { - setBackend(Map( + init(Map( "spark.mesos.executor.docker.image" -> "some_image", "spark.mesos.executor.docker.forcePullImage" -> "true", "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", @@ -400,7 +401,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("force-pull-image option is disabled by default") { - setBackend(Map( + init(Map( "spark.mesos.executor.docker.image" -> "some_image" )) @@ -423,7 +424,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports spark.executor.uri") { val url = "spark.spark.spark.com" - setBackend(Map( + init(Map( "spark.executor.uri" -> url ), null) @@ -438,7 +439,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports setting fetcher cache") { val url = "spark.spark.spark.com" - setBackend(Map( + init(Map( "spark.mesos.fetcherCache.enable" -> "true", "spark.executor.uri" -> url ), null) @@ -452,7 +453,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos supports disabling fetcher cache") { val url = "spark.spark.spark.com" - setBackend(Map( + init(Map( "spark.mesos.fetcherCache.enable" -> "false", "spark.executor.uri" -> url ), null) @@ -465,7 +466,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos sets task name to spark.app.name") { - setBackend() + init() val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) @@ -477,7 +478,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos sets configurable labels on tasks") { val taskLabelsString = "mesos:test,label:test" - setBackend(Map( + init(Map( "spark.mesos.task.labels" -> taskLabelsString )) @@ -500,7 +501,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("mesos ignored invalid labels and sets configurable labels on tasks") { val taskLabelsString = "mesos:test,label:test,incorrect:label:here" - setBackend(Map( + init(Map( "spark.mesos.task.labels" -> taskLabelsString )) @@ -522,7 +523,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("mesos supports spark.mesos.network.name") { - setBackend(Map( + init(Map( "spark.mesos.network.name" -> "test-network-name" )) @@ -539,7 +540,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("supports spark.scheduler.minRegisteredResourcesRatio") { val expectedCores = 1 - setBackend(Map( + init(Map( "spark.cores.max" -> expectedCores.toString, "spark.scheduler.minRegisteredResourcesRatio" -> "1.0")) @@ -552,6 +553,17 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(backend.isReady) } + test("start() sets spark.mesos.kerberos.userCredentials") { + init() + + assert(backend + .driverEndpoint + .askSync[SparkAppConfig](RetrieveSparkAppConfig) + .sparkProperties + .toMap + .contains(config.USER_CREDENTIALS.key)) + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { @@ -587,26 +599,37 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .build } - private def createSchedulerBackend( - taskScheduler: TaskSchedulerImpl, - driver: SchedulerDriver, - shuffleClient: MesosExternalShuffleClient) = { + private def init( + properties: Map[String, String] = null, + home: String = "/path"): Unit = { + + sparkConf = getSparkConf(properties, home) + sc = new SparkContext(sparkConf) + + driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + + externalShuffleClient = mock[MesosExternalShuffleClient] + val securityManager = mock[SecurityManager] - val backend = new MesosCoarseGrainedSchedulerBackend( - taskScheduler, sc, "master", securityManager) { + backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = driver - - override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = driver + + override protected def getShuffleClient(): MesosExternalShuffleClient = externalShuffleClient // override to avoid race condition with the driver thread on `mesosDriver` override def startScheduler(newDriver: SchedulerDriver): Unit = {} @@ -617,13 +640,12 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } backend.start() backend.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO) - backend } - private def initializeSparkConf( + private def getSparkConf( sparkConfVars: Map[String, String] = null, - home: String = "/path"): Unit = { - sparkConf = (new SparkConf) + home: String = "/path"): SparkConf = { + val sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .set("spark.mesos.driver.webui.url", "http://webui") @@ -635,20 +657,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite if (sparkConfVars != null) { sparkConf.setAll(sparkConfVars) } - } - - private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") { - initializeSparkConf(sparkConfVars, home) - sc = new SparkContext(sparkConf) - - driver = mock[SchedulerDriver] - when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) - - taskScheduler = mock[TaskSchedulerImpl] - when(taskScheduler.sc).thenReturn(sc) - - externalShuffleClient = mock[MesosExternalShuffleClient] - backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient) + sparkConf } } diff --git a/resource-managers/yarn/pom.xml b/resource-managers/yarn/pom.xml index a1b641c8eeb8..6d2af5e02b43 100644 --- a/resource-managers/yarn/pom.xml +++ b/resource-managers/yarn/pom.xml @@ -167,30 +167,7 @@ ${jersey-1.version} - - - ${hive.group} - hive-exec - test - - - ${hive.group} - hive-metastore - test - - - org.apache.thrift - libthrift - test - - - org.apache.thrift - libfb303 - test - + diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider new file mode 100644 index 000000000000..f36407942a08 --- /dev/null +++ b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.security.ServiceCredentialProvider @@ -0,0 +1,3 @@ +org.apache.spark.deploy.yarn.security.YARNHadoopFSCredentialProvider +org.apache.spark.deploy.security.HBaseCredentialProvider +org.apache.spark.deploy.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index f5a807ecac9d..000000000000 --- a/resource-managers/yarn/src/main/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1,3 +0,0 @@ -org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider -org.apache.spark.deploy.yarn.security.HBaseCredentialProvider -org.apache.spark.deploy.yarn.security.HiveCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 864c834d110f..a2248d91d8b8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -38,7 +38,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, ConfigurableCredentialManager} +import org.apache.spark.deploy.yarn.security.{AMCredentialRenewer, YARNConfigurableCredentialManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rpc._ @@ -247,8 +247,8 @@ private[spark] class ApplicationMaster( if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { // If a principal and keytab have been set, use that to create new credentials for executors // periodically - credentialRenewer = - new ConfigurableCredentialManager(sparkConf, yarnConf).credentialRenewer() + val credentialManager = new YARNConfigurableCredentialManager(sparkConf, yarnConf) + val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) credentialRenewer.scheduleLoginFromKeytab() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 424bbca12319..537a43c355cd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.ConfigurableCredentialManager +import org.apache.spark.deploy.yarn.security.YARNConfigurableCredentialManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} @@ -121,7 +121,7 @@ private[spark] class Client( private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) - private val credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + private val credentialManager = new YARNConfigurableCredentialManager(sparkConf, hadoopConf) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 93578855122c..da4f5edad001 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -35,7 +35,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.security.{ConfigurableCredentialManager, CredentialUpdater} +import org.apache.spark.deploy.yarn.security.CredentialUpdater +import org.apache.spark.deploy.yarn.security.YARNConfigurableCredentialManager import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -87,8 +88,9 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = { - credentialUpdater = - new ConfigurableCredentialManager(sparkConf, newConfiguration(sparkConf)).credentialUpdater() + val hadoopConf = newConfiguration(sparkConf) + val credentialManager = new YARNConfigurableCredentialManager(sparkConf, hadoopConf) + credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) credentialUpdater.start() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala index 7e76f402db24..e81d072c5ff7 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.ThreadUtils private[yarn] class AMCredentialRenewer( sparkConf: SparkConf, hadoopConf: Configuration, - credentialManager: ConfigurableCredentialManager) extends Logging { + credentialManager: YARNConfigurableCredentialManager) extends Logging { private var lastCredentialsFileSuffix = 0 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala index 41b7b5d60b03..11c046052c2b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/CredentialUpdater.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CredentialUpdater( sparkConf: SparkConf, hadoopConf: Configuration, - credentialManager: ConfigurableCredentialManager) extends Logging { + credentialManager: YARNConfigurableCredentialManager) extends Logging { @volatile private var lastCredentialsFileSuffix = 0 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala index 4e3fcce8dbb1..ce33f834272c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/ServiceCredentialProvider.scala @@ -17,41 +17,6 @@ package org.apache.spark.deploy.yarn.security -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.security.{Credentials, UserGroupInformation} - -import org.apache.spark.SparkConf - -/** - * A credential provider for a service. User must implement this if they need to access a - * secure service from Spark. - */ -trait ServiceCredentialProvider { - - /** - * Name of the service to provide credentials. This name should unique, Spark internally will - * use this name to differentiate credential provider. - */ - def serviceName: String - - /** - * To decide whether credential is required for this service. By default it based on whether - * Hadoop security is enabled. - */ - def credentialsRequired(hadoopConf: Configuration): Boolean = { - UserGroupInformation.isSecurityEnabled - } - - /** - * Obtain credentials for this service and get the time of the next renewal. - * @param hadoopConf Configuration of current Hadoop Compatible system. - * @param sparkConf Spark configuration. - * @param creds Credentials to add tokens and security keys to. - * @return If this Credential is renewable and can be renewed, return the time of the next - * renewal, otherwise None should be returned. - */ - def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] -} +@deprecated("Use org.apache.spark.deploy.security.ServiceCredentialProvider", "2.3.0") +trait ServiceCredentialProvider + extends org.apache.spark.deploy.security.ServiceCredentialProvider diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNConfigurableCredentialManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNConfigurableCredentialManager.scala new file mode 100644 index 000000000000..9c5836f5205a --- /dev/null +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNConfigurableCredentialManager.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn.security + +import java.util.ServiceLoader + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.security.ConfigurableCredentialManager +import org.apache.spark.util.Utils + +/** + * This class exists for backwards compatibility. It loads services registered under the + * deprecated [[org.apache.spark.deploy.yarn.security.ServiceCredentialProvider]]. + */ +private[yarn] class YARNConfigurableCredentialManager( + sparkConf: SparkConf, + hadoopConf: Configuration) + extends ConfigurableCredentialManager(sparkConf, hadoopConf) { + + override def loadCredentialProviders: + List[org.apache.spark.deploy.security.ServiceCredentialProvider] = { + val superProviders = super.loadCredentialProviders + val yarnProviders = ServiceLoader.load( + classOf[org.apache.spark.deploy.yarn.security.ServiceCredentialProvider], + Utils.getContextOrSparkClassLoader) + .asScala + .toList + + superProviders ++ yarnProviders + } + +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProvider.scala similarity index 58% rename from resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala rename to resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProvider.scala index f65c886db944..76e308372c8d 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProvider.scala @@ -21,60 +21,39 @@ import scala.collection.JavaConverters._ import scala.util.Try import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.security.HadoopFSCredentialProvider import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -private[security] class HadoopFSCredentialProvider - extends ServiceCredentialProvider with Logging { - // Token renewal interval, this value will be set in the first call, - // if None means no token renewer specified or no token can be renewed, - // so cannot get token renewal interval. - private var tokenRenewalInterval: Option[Long] = null +class YARNHadoopFSCredentialProvider extends HadoopFSCredentialProvider { - override val serviceName: String = "hadoopfs" - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - // NameNode to access, used to get tokens from different FileSystems - val tmpCreds = new Credentials() - val tokenRenewer = getTokenRenewer(hadoopConf) - hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst => - val dstFs = dst.getFileSystem(hadoopConf) - logInfo("getting token for: " + dst) - dstFs.addDelegationTokens(tokenRenewer, tmpCreds) - } - - // Get the token renewal interval if it is not set. It will only be called once. - if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf) + override def getTokenRenewer(conf: Configuration): String = { + val delegTokenRenewer = Master.getMasterPrincipal(conf) + logDebug("delegation token renewer is: " + delegTokenRenewer) + if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { + val errorMessage = "Can't get Master Kerberos principal for use as renewer" + logError(errorMessage) + throw new SparkException(errorMessage) } - // Get the time of next renewal. - val nextRenewalDate = tokenRenewalInterval.flatMap { interval => - val nextRenewalDates = tmpCreds.getAllTokens.asScala - .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) - .map { t => - val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - identifier.getIssueDate + interval - } - if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) - } + delegTokenRenewer + } - creds.addAll(tmpCreds) - nextRenewalDate + override def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { + sparkConf.get(FILESYSTEMS_TO_ACCESS).map(new Path(_)).toSet + + sparkConf.get(STAGING_DIR).map(new Path(_)) + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory) } - private def getTokenRenewalInterval( - hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = { + override def getTokenRenewalInterval( + hadoopConf: Configuration, sparkConf: SparkConf): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. @@ -100,21 +79,4 @@ private[security] class HadoopFSCredentialProvider } } - private def getTokenRenewer(conf: Configuration): String = { - val delegTokenRenewer = Master.getMasterPrincipal(conf) - logDebug("delegation token renewer is: " + delegTokenRenewer) - if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { - val errorMessage = "Can't get Master Kerberos principal for use as renewer" - logError(errorMessage) - throw new SparkException(errorMessage) - } - - delegTokenRenewer - } - - private def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = { - sparkConf.get(FILESYSTEMS_TO_ACCESS).map(new Path(_)).toSet + - sparkConf.get(STAGING_DIR).map(new Path(_)) - .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory) - } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index cbc6e60e839c..12158d0f2d86 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.{RpcUtils, ThreadUtils} private[spark] abstract class YarnSchedulerBackend( scheduler: TaskSchedulerImpl, sc: SparkContext) - extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv, None) { override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { diff --git a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider b/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider deleted file mode 100644 index d0ef5efa36e8..000000000000 --- a/resource-managers/yarn/src/test/resources/META-INF/services/org.apache.spark.deploy.yarn.security.ServiceCredentialProvider +++ /dev/null @@ -1 +0,0 @@ -org.apache.spark.deploy.yarn.security.TestCredentialProvider diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala index b0067aa4517c..f46dd7d2e0f5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/ConfigurableCredentialManagerSuite.scala @@ -18,16 +18,13 @@ package org.apache.spark.deploy.yarn.security import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.Text -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.token.Token import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.deploy.security.ConfigurableCredentialManager class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter { - private var credentialManager: ConfigurableCredentialManager = null + private var credentialManager: YARNConfigurableCredentialManager = null private var sparkConf: SparkConf = null private var hadoopConf: Configuration = null @@ -45,106 +42,13 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit super.afterAll() } - test("Correctly load default credential providers") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) + test("Correctly load YARNHadoopFSCredentialProvider") { + credentialManager = new YARNConfigurableCredentialManager(sparkConf, hadoopConf) - credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - credentialManager.getServiceCredentialProvider("hive") should not be (None) + assert(credentialManager + .getServiceCredentialProvider("hadoopfs") + .get + .isInstanceOf[YARNHadoopFSCredentialProvider]) } - test("disable hive credential provider") { - sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false") - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - - credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - credentialManager.getServiceCredentialProvider("hive") should be (None) - } - - test("using deprecated configurations") { - sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false") - sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false") - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - - credentialManager.getServiceCredentialProvider("hadoopfs") should be (None) - credentialManager.getServiceCredentialProvider("hive") should be (None) - credentialManager.getServiceCredentialProvider("test") should not be (None) - credentialManager.getServiceCredentialProvider("hbase") should not be (None) - } - - test("verify obtaining credentials from provider") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - val creds = new Credentials() - - // Tokens can only be obtained from TestTokenProvider, for hdfs, hbase and hive tokens cannot - // be obtained. - credentialManager.obtainCredentials(hadoopConf, creds) - val tokens = creds.getAllTokens - tokens.size() should be (1) - tokens.iterator().next().getService should be (new Text("test")) - } - - test("verify getting credential renewal info") { - credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf) - val creds = new Credentials() - - val testCredentialProvider = credentialManager.getServiceCredentialProvider("test").get - .asInstanceOf[TestCredentialProvider] - // Only TestTokenProvider can get the time of next token renewal - val nextRenewal = credentialManager.obtainCredentials(hadoopConf, creds) - nextRenewal should be (testCredentialProvider.timeOfNextTokenRenewal) - } - - test("obtain tokens For HiveMetastore") { - val hadoopConf = new Configuration() - hadoopConf.set("hive.metastore.kerberos.principal", "bob") - // thrift picks up on port 0 and bails out, without trying to talk to endpoint - hadoopConf.set("hive.metastore.uris", "http://localhost:0") - - val hiveCredentialProvider = new HiveCredentialProvider() - val credentials = new Credentials() - hiveCredentialProvider.obtainCredentials(hadoopConf, sparkConf, credentials) - - credentials.getAllTokens.size() should be (0) - } - - test("Obtain tokens For HBase") { - val hadoopConf = new Configuration() - hadoopConf.set("hbase.security.authentication", "kerberos") - - val hbaseTokenProvider = new HBaseCredentialProvider() - val creds = new Credentials() - hbaseTokenProvider.obtainCredentials(hadoopConf, sparkConf, creds) - - creds.getAllTokens.size should be (0) - } -} - -class TestCredentialProvider extends ServiceCredentialProvider { - val tokenRenewalInterval = 86400 * 1000L - var timeOfNextTokenRenewal = 0L - - override def serviceName: String = "test" - - override def credentialsRequired(conf: Configuration): Boolean = true - - override def obtainCredentials( - hadoopConf: Configuration, - sparkConf: SparkConf, - creds: Credentials): Option[Long] = { - if (creds == null) { - // Guard out other unit test failures. - return None - } - - val emptyToken = new Token() - emptyToken.setService(new Text("test")) - creds.addToken(emptyToken.getService, emptyToken) - - val currTime = System.currentTimeMillis() - timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval - - Some(timeOfNextTokenRenewal) - } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProviderSuite.scala similarity index 85% rename from resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala rename to resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProviderSuite.scala index f50ee193c258..31c8b847a2ba 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProviderSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopFSCredentialProviderSuite.scala @@ -21,25 +21,26 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.{Matchers, PrivateMethodTester} import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.deploy.yarn.security.YARNHadoopFSCredentialProvider -class HadoopFSCredentialProviderSuite +class YARNHadoopFSCredentialProviderSuite extends SparkFunSuite with PrivateMethodTester with Matchers { private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer) private def getTokenRenewer( - fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = { + fsCredentialProvider: YARNHadoopFSCredentialProvider, conf: Configuration): String = { fsCredentialProvider invokePrivate _getTokenRenewer(conf) } - private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null + private var hadoopFsCredentialProvider: YARNHadoopFSCredentialProvider = null override def beforeAll() { super.beforeAll() if (hadoopFsCredentialProvider == null) { - hadoopFsCredentialProvider = new HadoopFSCredentialProvider() + hadoopFsCredentialProvider = new YARNHadoopFSCredentialProvider() } }