diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 89b6f52ba4bc..01cbfe1ee6ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging private[spark] class HadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val deprecatedProviderEnabledConfigs = List( diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 13157f33e2bf..f0ac7f501ceb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.SparkException import org.apache.spark.internal.Logging -private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem]) +private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. @@ -43,13 +43,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste hadoopConf: Configuration, creds: Credentials): Option[Long] = { + val fsToGetTokens = fileSystems(hadoopConf) val newCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), - fileSystems) + fsToGetTokens) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) } // Get the time of next renewal. diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 335f3449cb78..5b05521e48f8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None) delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None) @@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) val creds = new Credentials() // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests. diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ce290c399d9f..6ff210adcbc5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -270,7 +270,7 @@ private[spark] class ApplicationMaster( val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3a7adb7724ce..d408ca90a5d1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -125,7 +125,7 @@ private[spark] class Client( private val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a687f67c5b69..4fef4394bb3f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) credentialUpdater.start() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index bbd17c8fc127..163cfb4eb862 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils private[yarn] class YARNHadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) extends Logging { + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 2b226eff5ce1..c918998bde07 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialManager.credentialProviders.get("yarn-test") should not be (None) }