Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand All @@ -34,6 +35,7 @@ private[security] class HBaseDelegationTokenProvider

override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[spark] class HadoopDelegationTokenManager(
creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, creds)
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.deploy.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf

/**
* Hadoop delegation token provider.
*/
Expand All @@ -46,5 +48,6 @@ private[spark] trait HadoopDelegationTokenProvider {
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem])
extends HadoopDelegationTokenProvider with Logging {
Expand All @@ -41,21 +42,20 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration

override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {

val fsToGetTokens = fileSystems(hadoopConf)
val newCreds = fetchDelegationTokens(
getTokenRenewer(hadoopConf),
fsToGetTokens)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds)

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens)
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = newCreds.getAllTokens.asScala
val nextRenewalDates = fetchCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { token =>
val identifier = token
Expand All @@ -66,7 +66,6 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(newCreds)
nextRenewalDate
}

Expand All @@ -89,9 +88,8 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration

private def fetchDelegationTokens(
renewer: String,
filesystems: Set[FileSystem]): Credentials = {

val creds = new Credentials()
filesystems: Set[FileSystem],
creds: Credentials): Credentials = {

filesystems.foreach { fs =>
logInfo("getting token for: " + fs)
Expand All @@ -103,25 +101,27 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration

private def getTokenRenewalInterval(
hadoopConf: Configuration,
sparkConf: SparkConf,
filesystems: Set[FileSystem]): Option[Long] = {
// We cannot use the tokens generated with renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
val creds = fetchDelegationTokens(
UserGroupInformation.getCurrentUser.getUserName,
filesystems)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
sparkConf.get(PRINCIPAL).flatMap { renewer =>
val creds = new Credentials()
fetchDelegationTokens(renewer, filesystems, creds)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -61,6 +62,7 @@ private[security] class HiveDelegationTokenProvider

override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
try {
val conf = hiveConf(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {

val hiveCredentialProvider = new HiveDelegationTokenProvider()
val credentials = new Credentials()
hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials)
hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)

credentials.getAllTokens.size() should be (0)
}
Expand All @@ -105,7 +105,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {

val hbaseTokenProvider = new HBaseDelegationTokenProvider()
val creds = new Credentials()
hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds)
hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds)

creds.getAllTokens.size should be (0)
}
Expand Down