Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,34 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<!--
Testing Hive reflection needs hive on the test classpath only.
It doesn't need the spark hive modules, so the -Phive flag is not checked.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
Copy link
Author

@mgummelt mgummelt Apr 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a problem here that I could use some help on.

hive-exec has a dependency on pentaho that doesn't exist in maven central anymore. This is why we have an entry for it in spark-parent's dependencyManagement section that excludes pentaho via calcite-core.

But the scope in spark-parent's dependencyManagement entry for hive-exec is compile, so if I make it test here, which is what it was in YARN, then it fails to match that entry anymore, and thus doesn't inherit the excludes. This shouldn't be a problem, because dependencies in the test scope aren't transitive, so we should never be trying to fetch the missing pentaho dependency. But sbt builds don't seem to recognize this transitivity, so if I add the test scope, I get this error:

[warn] 	Note: Unresolved dependencies path:
[warn] 		org.pentaho:pentaho-aggdesigner-algorithm:5.1.5-jhyde
[warn] 		  +- org.apache.calcite:calcite-core:1.2.0-incubating ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.spark-project.hive:hive-exec:1.2.1.spark2 ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-core_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-catalyst_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-sql_2.11:2.2.0-SNAPSHOT ((com.typesafe.sbt.pom.MavenHelper) MavenHelper.scala#L76)
[warn] 		  +- org.apache.spark:spark-hive_2.11:2.2.0-SNAPSHOT

What I can't understand is how this wasn't a problem when these dependencies were in YARN. For some reason sbt dependencyTree on master shows that calcite-core is excluded from the hive-exec dependency tree, but the same command on this PR tries to resolve calcite-core.

Let me know if you have any ideas here. My best idea right now is to just try to duplicate the excludes in spark-parent.

</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.Properties
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... you're adding a YARN dependency in core, which should be able to build without YARN... but you're not actually adding a new dependency to the POM, so I guess the dependency is already there indirectly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it looks like this should require hadoop-yarn-api. I'll add this dependency to core unless you object, in which case I suppose I could just use the raw string instead of YarnConfiguration.RM_PRINCIPAL, but that seems hacky.

FYI, we originally talked about placing the code below into the Mesos scheduler, but that seems to be too late. The SparkContext constructor creates a copy of the configuration, so we need to set the required YARN property before SparkContext is created, which means before user code gets run, which probably means somewhere in SparkSubmit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need the explicit dependency (otherwise this would not be compiling). This is probably being brought by hadoop-client already.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to explicitly declare a dependency rather than rely on transitivity, right?

import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor._
Expand All @@ -45,6 +46,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBibl
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util._

Expand All @@ -63,7 +65,7 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit extends CommandLineUtils {
object SparkSubmit extends CommandLineUtils with Logging {

// Cluster managers
private val YARN = 1
Expand Down Expand Up @@ -564,12 +566,22 @@ object SparkSubmit extends CommandLineUtils {
// properties and then loaded by SparkConf
sysProps.put("spark.yarn.keytab", args.keytab)
sysProps.put("spark.yarn.principal", args.principal)

UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
}


// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos mode, we
// must trick it into thinking we're YARN.
if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
val shortUserName = UserGroupInformation.getCurrentUser.getShortUserName
val key = s"spark.hadoop.${YarnConfiguration.RM_PRINCIPAL}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work in user impersonation scenario? Here shortUserName is a real user, while HadoopRDD may execute as a proxy user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it? It looks like getCurrentUser returns the subject from the current AccessControllerContext, which is set by the doAs when --proxy-user is set.

Regardless, the renewer specified here actually has no effect, since we aren't renewing yet. Once I add renewal, I'll need to revisit this to make sure it's consistent with the renewal we're going to do in MesosSecurity. I'm only setting this now to avoid an Exception that gets thrown in the hadoop library if the master principal is not configured. See the JIRA in the above code comment for details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The renewer for delegation tokens should be the user that created them (if the service is not doing it for the user, like in the YARN case); only the logged in user can create tokens, so this has to be the current user (not the proxy user, which happens later).

The tokens should be created in the proxy user's name (so user = "proxy" renewer = "real user"), when you care to support that.

Once I add renewal

So do you mean that currently this only works until the delegation tokens need renewal (which is 1 day by default - a lot shorter than the max life time)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you mean that currently this only works until the delegation tokens need renewal (which is 1 day by default - a lot shorter than the max life time)?

Yes


logDebug(s"Setting ${key} to ${shortUserName}.")
sysProps.put(key, shortUserName)
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.util.ServiceLoader

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand All @@ -37,33 +37,59 @@ import org.apache.spark.util.Utils
* interface and put into resources/META-INF/services to be loaded by ServiceLoader.
*
* Also each credential provider is controlled by
* spark.yarn.security.credentials.{service}.enabled, it will not be loaded in if set to false.
* spark.security.credentials.{service}.enabled, it will not be loaded in if set to false.
* For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by
* the configuration spark.yarn.security.credentials.hive.enabled.
* the configuration spark.security.credentials.hive.enabled.
*/
private[yarn] final class ConfigurableCredentialManager(
sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
private val deprecatedProviderEnabledConfig = "spark.yarn.security.tokens.%s.enabled"
private val providerEnabledConfig = "spark.yarn.security.credentials.%s.enabled"
private[spark] class ConfigurableCredentialManager(
sparkConf: SparkConf,
hadoopConf: Configuration)
extends Logging {

private val deprecatedProviderEnabledConfigs = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

// Maintain all the registered credential providers
private val credentialProviders = {
val providers = ServiceLoader.load(classOf[ServiceCredentialProvider],
Utils.getContextOrSparkClassLoader).asScala
private val credentialProviders = getCredentialProviders()
logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.")

private def getCredentialProviders(): Map[String, ServiceCredentialProvider] = {
val providers = loadCredentialProviders

// Filter out credentials in which spark.yarn.security.credentials.{service}.enabled is false.
// Filter out credentials in which spark.security.credentials.{service}.enabled is false.
providers.filter { p =>
sparkConf.getOption(providerEnabledConfig.format(p.serviceName))
.orElse {
sparkConf.getOption(deprecatedProviderEnabledConfig.format(p.serviceName)).map { c =>
logWarning(s"${deprecatedProviderEnabledConfig.format(p.serviceName)} is deprecated, " +
s"using ${providerEnabledConfig.format(p.serviceName)} instead")
c
}
}.map(_.toBoolean).getOrElse(true)

val key = providerEnabledConfig.format(p)

deprecatedProviderEnabledConfigs.foreach { pattern =>
val deprecatedKey = pattern.format(p.serviceName)
if (sparkConf.contains(deprecatedKey)) {
logWarning(s"${deprecatedKey} is deprecated, using ${key} instead")
}
}

val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern =>
sparkConf
.getOption(pattern.format(p.serviceName))
.map(_.toBoolean)
.getOrElse(true)
}

sparkConf
.getOption(key)
.map(_.toBoolean)
.getOrElse(isEnabledDeprecated)

}.map { p => (p.serviceName, p) }.toMap
}

protected def loadCredentialProviders: List[ServiceCredentialProvider] = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala.toList
}

/**
* Get credential provider for the specified service.
*/
Expand All @@ -72,7 +98,9 @@ private[yarn] final class ConfigurableCredentialManager(
}

/**
* Obtain credentials from all the registered providers.
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
* otherwise the nearest renewal time of any credentials will be returned.
*/
Expand All @@ -89,19 +117,16 @@ private[yarn] final class ConfigurableCredentialManager(
}

/**
* Create an [[AMCredentialRenewer]] instance, caller should be responsible to stop this
* instance when it is not used. AM will use it to renew credentials periodically.
* Returns a copy of the current user's credentials, augmented with new delegation tokens.
*/
def credentialRenewer(): AMCredentialRenewer = {
new AMCredentialRenewer(sparkConf, hadoopConf, this)
}
def obtainUserCredentials: Credentials = {
val userCreds = UserGroupInformation.getCurrentUser.getCredentials
val numTokensBefore = userCreds.numberOfTokens
obtainCredentials(hadoopConf, userCreds)

/**
* Create an [[CredentialUpdater]] instance, caller should be resposible to stop this intance
* when it is not used. Executors and driver (client mode) will use it to update credentials.
* periodically.
*/
def credentialUpdater(): CredentialUpdater = {
new CredentialUpdater(sparkConf, hadoopConf, this)
logDebug(s"Fetched ${userCreds.numberOfTokens - numTokensBefore} delegation token(s).")

userCreds
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import java.io.{ByteArrayOutputStream, DataOutputStream}

import org.apache.hadoop.security.Credentials

class CredentialsSerializer {
def serializeTokens(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

def deserializeTokens(tokenBytes: Array[Byte]): Credentials = {
val tokensBuf = new java.io.ByteArrayInputStream(tokenBytes)

val creds = new Credentials()
creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf))
creds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging

private[deploy] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
// if None means no token renewer specified or no token can be renewed,
// so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null

override val serviceName: String = "hadoopfs"

override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
// NameNode to access, used to get tokens from different FileSystems
val tmpCreds = new Credentials()
val tokenRenewer = getTokenRenewer(hadoopConf)
hadoopFSsToAccess(hadoopConf, sparkConf).foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
logInfo("getting token for: " + dst)
dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
}

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf)
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = tmpCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t =>
val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(tmpCreds)
nextRenewalDate
}

protected def getTokenRenewalInterval(
hadoopConf: Configuration,
sparkConf: SparkConf): Option[Long] = None

protected def getTokenRenewer(hadoopConf: Configuration): String = {
UserGroupInformation.getCurrentUser.getShortUserName
}

protected def hadoopFSsToAccess(hadoopConf: Configuration, sparkConf: SparkConf): Set[Path] = {
Set(FileSystem.get(hadoopConf).getHomeDirectory)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
Expand Down
Loading