From ebbcb7f887557e09eacc56dbd7bb88da445b8fa2 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Fri, 1 Sep 2017 15:32:20 -0500 Subject: [PATCH 1/7] Credentials not being passed to gather the tokens --- .../HadoopFSDelegationTokenProvider.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index f0ac7f501ceb..406384b1b7b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -46,11 +46,12 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration val fsToGetTokens = fileSystems(hadoopConf) val newCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), - fsToGetTokens) + fsToGetTokens, + creds) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens, creds) } // Get the time of next renewal. @@ -89,9 +90,9 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def fetchDelegationTokens( renewer: String, - filesystems: Set[FileSystem]): Credentials = { + filesystems: Set[FileSystem], + creds :Credentials): Credentials = { - val creds = new Credentials() filesystems.foreach { fs => logInfo("getting token for: " + fs) @@ -103,15 +104,17 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, - filesystems: Set[FileSystem]): Option[Long] = { + filesystems: Set[FileSystem], + creds:Credentials): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val creds = fetchDelegationTokens( + val newCreds = fetchDelegationTokens( UserGroupInformation.getCurrentUser.getUserName, - filesystems) + filesystems, + creds) - val renewIntervals = creds.getAllTokens.asScala.filter { + val renewIntervals = newCreds.getAllTokens.asScala.filter { _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] }.flatMap { token => Try { From 0cfca504e3ee30c1cb62ae5976a7784292418f45 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 5 Sep 2017 14:43:50 -0500 Subject: [PATCH 2/7] Feel fetchCreds is appropriate naming convention --- .../security/HadoopFSDelegationTokenProvider.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 406384b1b7b8..773e546b3c7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -44,7 +44,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration creds: Credentials): Option[Long] = { val fsToGetTokens = fileSystems(hadoopConf) - val newCreds = fetchDelegationTokens( + val fetchCreds = fetchDelegationTokens( getTokenRenewer(hadoopConf), fsToGetTokens, creds) @@ -56,7 +56,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration // Get the time of next renewal. val nextRenewalDate = tokenRenewalInterval.flatMap { interval => - val nextRenewalDates = newCreds.getAllTokens.asScala + val nextRenewalDates = fetchCreds.getAllTokens.asScala .filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) .map { token => val identifier = token @@ -67,7 +67,6 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) } - creds.addAll(newCreds) nextRenewalDate } @@ -109,12 +108,12 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val newCreds = fetchDelegationTokens( + val fetchCreds = fetchDelegationTokens( UserGroupInformation.getCurrentUser.getUserName, filesystems, creds) - val renewIntervals = newCreds.getAllTokens.asScala.filter { + val renewIntervals = fetchCreds.getAllTokens.asScala.filter { _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] }.flatMap { token => Try { From 542497294ee28a80795937644ed7dec72cddbe32 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 5 Sep 2017 15:03:38 -0500 Subject: [PATCH 3/7] Fix scala style tests --- .../deploy/security/HadoopFSDelegationTokenProvider.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 773e546b3c7f..c5203628a68d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -90,8 +90,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def fetchDelegationTokens( renewer: String, filesystems: Set[FileSystem], - creds :Credentials): Credentials = { - + creds: Credentials): Credentials = { filesystems.foreach { fs => logInfo("getting token for: " + fs) From d72c08f72d02b2288e09566f191bfe310d6cfbc7 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Tue, 5 Sep 2017 16:28:22 -0500 Subject: [PATCH 4/7] style tests --- .../spark/deploy/security/HadoopFSDelegationTokenProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index c5203628a68d..d6f0d49a7e15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -103,7 +103,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, filesystems: Set[FileSystem], - creds:Credentials): Option[Long] = { + creds: Credentials): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. From 075de5da67f394be9c79387f15bf6cfb8848f986 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 6 Sep 2017 13:33:47 -0500 Subject: [PATCH 5/7] add back PRINCIPAL check aka spark.yarn.principal for renewing tokens --- .../HBaseDelegationTokenProvider.scala | 3 ++ .../HadoopDelegationTokenManager.scala | 2 +- .../HadoopDelegationTokenProvider.scala | 2 + .../HadoopFSDelegationTokenProvider.scala | 47 +++++++++---------- .../HiveDelegationTokenProvider.scala | 3 ++ .../HadoopDelegationTokenManagerSuite.scala | 4 +- 6 files changed, 34 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index 35621daf9c0d..fa0ad622e7f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.security +import org.apache.spark.SparkConf + import scala.reflect.runtime.universe import scala.util.control.NonFatal @@ -34,6 +36,7 @@ private[security] class HBaseDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index c317c4fe3d82..c134b7ebe38f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -116,7 +116,7 @@ private[spark] class HadoopDelegationTokenManager( creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider => if (provider.delegationTokensRequired(hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, creds) + provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) } else { logDebug(s"Service ${provider.serviceName} does not require a token." + s" Check your configuration to see if security is disabled or not.") diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index f162e7e58c53..c72182619cf1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials +import org.apache.spark.SparkConf /** * Hadoop delegation token provider. @@ -46,5 +47,6 @@ private[spark] trait HadoopDelegationTokenProvider { */ def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index d6f0d49a7e15..300773c58b18 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -26,8 +26,9 @@ import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration => Set[FileSystem]) extends HadoopDelegationTokenProvider with Logging { @@ -41,17 +42,15 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { val fsToGetTokens = fileSystems(hadoopConf) - val fetchCreds = fetchDelegationTokens( - getTokenRenewer(hadoopConf), - fsToGetTokens, - creds) + val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds) // Get the token renewal interval if it is not set. It will only be called once. if (tokenRenewalInterval == null) { - tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fsToGetTokens, creds) + tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) } // Get the time of next renewal. @@ -102,27 +101,27 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration private def getTokenRenewalInterval( hadoopConf: Configuration, - filesystems: Set[FileSystem], - creds: Credentials): Option[Long] = { + sparkConf: SparkConf, + filesystems: Set[FileSystem]): Option[Long] = { // We cannot use the tokens generated with renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in // user as renewer. - val fetchCreds = fetchDelegationTokens( - UserGroupInformation.getCurrentUser.getUserName, - filesystems, - creds) - - val renewIntervals = fetchCreds.getAllTokens.asScala.filter { - _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] - }.flatMap { token => - Try { - val newExpiration = token.renew(hadoopConf) - val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption + sparkConf.get(PRINCIPAL).flatMap { renewer => + val creds = new Credentials() + fetchDelegationTokens(renewer, filesystems, creds) + + val renewIntervals = creds.getAllTokens.asScala.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") + interval + }.toOption + } + if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } - if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 53b9f898c6e7..44550f51b428 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -20,6 +20,8 @@ package org.apache.spark.deploy.security import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction +import org.apache.spark.SparkConf + import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -61,6 +63,7 @@ private[security] class HiveDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, + sparkConf: SparkConf, creds: Credentials): Option[Long] = { try { val conf = hiveConf(hadoopConf) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 5b05521e48f8..eeffc36070b4 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -94,7 +94,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hiveCredentialProvider = new HiveDelegationTokenProvider() val credentials = new Credentials() - hiveCredentialProvider.obtainDelegationTokens(hadoopConf, credentials) + hiveCredentialProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) credentials.getAllTokens.size() should be (0) } @@ -105,7 +105,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers { val hbaseTokenProvider = new HBaseDelegationTokenProvider() val creds = new Credentials() - hbaseTokenProvider.obtainDelegationTokens(hadoopConf, creds) + hbaseTokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, creds) creds.getAllTokens.size should be (0) } From 1184a735124d6119f515257a75b0721efc3adff9 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 6 Sep 2017 14:42:08 -0500 Subject: [PATCH 6/7] move spark conf to group scala style tests --- .../spark/deploy/security/HBaseDelegationTokenProvider.scala | 3 +-- .../spark/deploy/security/HiveDelegationTokenProvider.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index fa0ad622e7f7..78b0e6b2cbf3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -17,8 +17,6 @@ package org.apache.spark.deploy.security -import org.apache.spark.SparkConf - import scala.reflect.runtime.universe import scala.util.control.NonFatal @@ -26,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala index 44550f51b428..b31cc595ed83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala @@ -20,8 +20,6 @@ package org.apache.spark.deploy.security import java.lang.reflect.UndeclaredThrowableException import java.security.PrivilegedExceptionAction -import org.apache.spark.SparkConf - import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -32,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.Token +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.util.Utils From 98f0ff2a655c398e5b502ce2b340dfac88b385e9 Mon Sep 17 00:00:00 2001 From: Sanket Chintapalli Date: Wed, 6 Sep 2017 15:33:16 -0500 Subject: [PATCH 7/7] fix style tests again --- .../spark/deploy/security/HadoopDelegationTokenProvider.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index c72182619cf1..1ba245e84af4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration import org.apache.hadoop.security.Credentials + import org.apache.spark.SparkConf /**