Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ce63a9b
[Mesosphere SPARK-126] Move YarnSparkHadoopUtil token helpers into th…
Feb 10, 2016
75d849a
[Mesosphere SPARK-126] Add Mesos Kerberos support
Feb 10, 2016
35002f2
Par down kerberos support
Apr 17, 2017
13981c8
cleanup
Apr 17, 2017
af4a3e4
style
Apr 17, 2017
5cc66dc
Add MesosSecurityManager
Apr 18, 2017
a47c9c0
info logs
Apr 18, 2017
c8ec049
style
Apr 18, 2017
954eeff
Re-add org.apache.spark.deploy.yarn.security.ServiceCredentialProvide…
Apr 18, 2017
2d76928
move YARNHadoopFSCredentialProviderSuite
Apr 18, 2017
d8a968d
Move hive test deps to the core module
Apr 19, 2017
b8093c8
remove test scope
Apr 19, 2017
25d5088
remove test scope
Apr 19, 2017
4c387eb
Removed MesosSecurityManager, added RPC call, removed META-INF Servic…
Apr 20, 2017
e32afee
add InterfaceStability annotation to ServiceCredentialProvider
Apr 20, 2017
be69f5a
Add HadoopAccessManager
Apr 21, 2017
55616da
Remove mesos code
Apr 21, 2017
240df31
re-add mistakenly removed files
Apr 21, 2017
810c6b2
test ConfigurableCredentialManager.obtainUserTokens
Apr 21, 2017
ad4e33b
add tests
Apr 21, 2017
e15f1ab
rat-excludes
Apr 21, 2017
a546aab
fix RAT
Apr 21, 2017
d6d21d1
style
Apr 21, 2017
092aac7
Remove unneeded import
Apr 24, 2017
38adaae
Make ServiceCredentialProvider private
May 18, 2017
92ac3f0
Addressed style comments
May 18, 2017
cd58b6c
review comments
May 22, 2017
bf758e6
style
May 23, 2017
e820b09
Remove YARNHadoopAccessManagerSuite.scala
May 23, 2017
7f4ca86
Move thrifts deps back to yarn/pom.xml
May 31, 2017
cda3538
dependency testing
Jun 2, 2017
376dba0
Fix dependency issues, and address style comments
Jun 2, 2017
0ffe8f0
Fix scalastyle
Jun 2, 2017
7796e14
Add other deps to provided scope
Jun 2, 2017
1479c60
Replicate deps in yarn to fix transitivity issue
Jun 5, 2017
4d57f7b
update comments
Jun 6, 2017
7e2f90d
style
Jun 8, 2017
563b80a
Don't throw an exception when Hive classes are not loaded
Jun 8, 2017
c684d88
rename
Jun 12, 2017
c4149dd
fix docs
Jun 15, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,34 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<!--
The following dependencies are depended upon in HiveCredentialProvider, but are only executed if Hive is enabled in
the user's Hadoop configuration. So in order to prevent spark-core from depending on Hive, these deps have been
placed in the "provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are handled
when the user has not explicitly compiled with the Hive module.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal
Expand All @@ -24,17 +24,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging {
private[security] class HBaseDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

override def serviceName: String = "hbase"

override def obtainCredentials(
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand All @@ -55,7 +54,7 @@ private[security] class HBaseCredentialProvider extends ServiceCredentialProvide
None
}

override def credentialsRequired(hadoopConf: Configuration): Boolean = {
override def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
hbaseConf(hadoopConf).get("hbase.security.authentication") == "kerberos"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

/**
* Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to
* obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]],
* [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not
* explicitly disabled.
*
* Also, each HadoopDelegationTokenProvider is controlled by
* spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to
* false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be
* enabled/disabled by the configuration spark.security.credentials.hive.enabled.
*
* @param sparkConf Spark configuration
* @param hadoopConf Hadoop configuration
* @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems.
*/
private[spark] class HadoopDelegationTokenManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
fileSystems: Set[FileSystem])
extends Logging {

private val deprecatedProviderEnabledConfigs = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

// Maintain all the registered delegation token providers
private val delegationTokenProviders = getDelegationTokenProviders
logDebug(s"Using the following delegation token providers: " +
s"${delegationTokenProviders.keys.mkString(", ")}.")

private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
new HBaseDelegationTokenProvider)

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}

def isServiceEnabled(serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)

deprecatedProviderEnabledConfigs.foreach { pattern =>
val deprecatedKey = pattern.format(serviceName)
if (sparkConf.contains(deprecatedKey)) {
logWarning(s"${deprecatedKey} is deprecated. Please use ${key} instead.")
}
}

val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern =>
sparkConf
.getOption(pattern.format(serviceName))
.map(_.toBoolean)
.getOrElse(true)
}

sparkConf
.getOption(key)
.map(_.toBoolean)
.getOrElse(isEnabledDeprecated)
}

/**
* Get delegation token provider for the specified service.
*/
def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = {
delegationTokenProviders.get(service)
}

/**
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @return Time after which the fetched delegation tokens should be renewed.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
}
}.foldLeft(Long.MaxValue)(math.min)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

/**
* Hadoop delegation token provider.
*/
private[spark] trait HadoopDelegationTokenProvider {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether it is ok to mark it private? The deprecated one (trait ServiceCredentialProvider) is not private. cc @vanzin @mridulm

Copy link
Author

@mgummelt mgummelt May 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that this is private, I've reverted ServiceCredentialProvider to be non-deprecated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update the PR description since it mentions deprecating the YARN interface still.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


/**
* Name of the service to provide delegation tokens. This name should be unique. Spark will
* internally use this name to differentiate delegation token providers.
*/
def serviceName: String

/**
* Returns true if delegation tokens are required for this service. By default, it is based on
* whether Hadoop security is enabled.
*/
def delegationTokensRequired(hadoopConf: Configuration): Boolean

/**
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging

private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem])
extends HadoopDelegationTokenProvider with Logging {

// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
// If None, no token renewer is specified or no token can be renewed,
// so we cannot get the token renewal interval.
private var tokenRenewalInterval: Option[Long] = null

override val serviceName: String = "hadoopfs"

override def obtainDelegationTokens(
hadoopConf: Configuration,
creds: Credentials): Option[Long] = {

val newCreds = fetchDelegationTokens(
getTokenRenewer(hadoopConf),
fileSystems)

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems)
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = newCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { token =>
val identifier = token
.decodeIdentifier()
.asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(newCreds)
nextRenewalDate
}

def delegationTokensRequired(hadoopConf: Configuration): Boolean = {
UserGroupInformation.isSecurityEnabled
}

private def getTokenRenewer(hadoopConf: Configuration): String = {
val tokenRenewer = Master.getMasterPrincipal(hadoopConf)
logDebug("Delegation token renewer is: " + tokenRenewer)

if (tokenRenewer == null || tokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer."
logError(errorMessage)
throw new SparkException(errorMessage)
}

tokenRenewer
}

private def fetchDelegationTokens(
renewer: String,
filesystems: Set[FileSystem]): Credentials = {

val creds = new Credentials()

filesystems.foreach { fs =>
logInfo("getting token for: " + fs)
fs.addDelegationTokens(renewer, creds)
}

creds
}

private def getTokenRenewalInterval(
hadoopConf: Configuration,
filesystems: Set[FileSystem]): Option[Long] = {
// We cannot use the tokens generated with renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
val creds = fetchDelegationTokens(
UserGroupInformation.getCurrentUser.getUserName,
filesystems)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
Loading