From 7c2b6fb5825d56f1d449c5659bb60cab758eebd3 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 13 Jul 2017 20:28:59 -0700 Subject: [PATCH 1/2] Lazily create FS within kerberized UGI to avoid token acquiring failure Change-Id: I8b35ac9c946ae4396aa26ca4ba3107f4b37ac3d6 --- .../deploy/security/HadoopDelegationTokenManager.scala | 2 +- .../deploy/security/HadoopFSDelegationTokenProvider.scala | 6 +++--- .../deploy/security/HadoopDelegationTokenManagerSuite.scala | 4 ++-- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../yarn/security/YARNHadoopDelegationTokenManager.scala | 2 +- .../security/YARNHadoopDelegationTokenManagerSuite.scala | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 89b6f52ba4bc..65258621d221 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging private[spark] class HadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) + fileSystems: () => Set[FileSystem]) extends Logging { private val deprecatedProviderEnabledConfigs = List( diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 13157f33e2bf..8dc44f7833f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.SparkException import org.apache.spark.internal.Logging -private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem]) +private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. @@ -45,11 +45,11 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSyste val newCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), - fileSystems) + fileSystems()) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems()) } // Get the time of next renewal. diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 335f3449cb78..7b6f291647a1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -110,7 +110,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { creds.getAllTokens.size should be (0) } - private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = { - Set(FileSystem.get(hadoopConf)) + private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): () => Set[FileSystem] = { + () => Set(FileSystem.get(hadoopConf)) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ce290c399d9f..883c8e164641 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -270,7 +270,7 @@ private[spark] class ApplicationMaster( val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3a7adb7724ce..711eed0b6336 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -125,7 +125,7 @@ private[spark] class Client( private val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index a687f67c5b69..8ecbcb8a59d3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) credentialUpdater.start() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index bbd17c8fc127..fbba16b4fb50 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils private[yarn] class YARNHadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: Set[FileSystem]) extends Logging { + fileSystems: () => Set[FileSystem]) extends Logging { private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 2b226eff5ce1..98b0c963c9db 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) credentialManager.credentialProviders.get("yarn-test") should not be (None) } From 95988c112905018d20c6d78a2ab688164735ede6 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 17 Jul 2017 15:32:53 -0700 Subject: [PATCH 2/2] Change the codes to address the comments Change-Id: I256f2fd9fe666fe66606d5c2fa7dd8cf36346d7b --- .../security/HadoopDelegationTokenManager.scala | 2 +- .../security/HadoopFSDelegationTokenProvider.scala | 7 ++++--- .../security/HadoopDelegationTokenManagerSuite.scala | 12 ++++++------ .../apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../security/YARNHadoopDelegationTokenManager.scala | 2 +- .../YARNHadoopDelegationTokenManagerSuite.scala | 2 +- 8 files changed, 16 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 65258621d221..01cbfe1ee6ae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging private[spark] class HadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: () => Set[FileSystem]) + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val deprecatedProviderEnabledConfigs = List( diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 8dc44f7833f8..f0ac7f501ceb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.SparkException import org.apache.spark.internal.Logging -private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem]) +private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { // This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. @@ -43,13 +43,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil hadoopConf: Configuration, creds: Credentials): Option[Long] = { + val fsToGetTokens = fileSystems(hadoopConf) val newCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), - fileSystems()) + fsToGetTokens) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems()) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) } // Get the time of next renewal. diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 7b6f291647a1..5b05521e48f8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -40,7 +40,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -53,7 +53,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should not be (None) delegationTokenManager.getServiceDelegationTokenProvider("hbase") should not be (None) @@ -66,7 +66,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) delegationTokenManager.getServiceDelegationTokenProvider("hadoopfs") should be (None) delegationTokenManager.getServiceDelegationTokenProvider("hive") should be (None) @@ -77,7 +77,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { delegationTokenManager = new HadoopDelegationTokenManager( sparkConf, hadoopConf, - hadoopFSsToAccess(hadoopConf)) + hadoopFSsToAccess) val creds = new Credentials() // Tokens cannot be obtained from HDFS, Hive, HBase in unit tests. @@ -110,7 +110,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { creds.getAllTokens.size should be (0) } - private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): () => Set[FileSystem] = { - () => Set(FileSystem.get(hadoopConf)) + private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = { + Set(FileSystem.get(hadoopConf)) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 883c8e164641..6ff210adcbc5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -270,7 +270,7 @@ private[spark] class ApplicationMaster( val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, yarnConf, - () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 711eed0b6336..d408ca90a5d1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -125,7 +125,7 @@ private[spark] class Client( private val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 8ecbcb8a59d3..4fef4394bb3f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -98,7 +98,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) credentialUpdater.start() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index fbba16b4fb50..163cfb4eb862 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils private[yarn] class YARNHadoopDelegationTokenManager( sparkConf: SparkConf, hadoopConf: Configuration, - fileSystems: () => Set[FileSystem]) extends Logging { + fileSystems: Configuration => Set[FileSystem]) extends Logging { private val delegationTokenManager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, fileSystems) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index 98b0c963c9db..c918998bde07 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - () => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, hadoopConf)) + conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) credentialManager.credentialProviders.get("yarn-test") should not be (None) }