Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Secure HDFS Support #514

Open
wants to merge 10 commits into
base: branch-2.2-kubernetes
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
55 changes: 55 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,61 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.enabled</code></td>
<td>false</td>
<td>
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.rewewer.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you
we will set the principal to be the job users principal by default.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
created.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging

/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
* mounted as volumes and an ENV variable pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File],
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
val keyPaths = hadoopConfigFiles.map(file =>
new KeyToPathBuilder()
.withKey(file.toPath.getFileName.toString)
.withPath(file.toPath.getFileName.toString)
.build()).toList
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for the .toList here?

val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(hadoopUGI.getShortName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = hadoopSupportedPod,
mainContainer = mainContainerWithMountedHadoopConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier


// Function of this class is merely for mocking reasons
private[spark] class HadoopUGIUtil{
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser

def getShortName: String = getCurrentUser.getShortUserName

def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)

def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials)
: Iterable[Token[_ <: TokenIdentifier]] =
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds)

def getCurrentTime: Long = System.currentTimeMillis()

// Functions that should be in Core with Rebase to 2.3
@deprecated("Moved to core in 2.2", "2.2")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we mean 2.3 in these comments.

def getTokenRenewalInterval(
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
hadoopConf: Configuration): Option[Long] = {
val renewIntervals = renewedTokens.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indent as follows:

val renewIntervals = renewedTokens.filter {
    _.decodeIdentifier()....
  }.flatMap { token =>
    Try {
      // logic
    }.toOption
  }

.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
interval
}.toOption}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renewIntervals.map instead of checking on isEmpty and returning None.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually since renewIntervals is a Seq, use reduceLeftOption with math.min.

}

@deprecated("Moved to core in 2.2", "2.2")
def serialize(creds: Credentials): Array[Byte] = {
val byteStream = new ByteArrayOutputStream
val dataStream = new DataOutputStream(byteStream)
creds.writeTokenStorageToStream(dataStream)
byteStream.toByteArray
}

@deprecated("Moved to core in 2.2", "2.2")
def deserialize(tokenBytes: Array[Byte]): Credentials = {
val creds = new Credentials()
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes)))
creds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenConfBootstrap {
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
// pointing to the mounted file containing the DT for Secure HDFS interaction
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class KerberosTokenConfBootstrapImpl(
secretName: String,
secretItemKey: String,
userName: String) extends KerberosTokenConfBootstrap with Logging{


override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedKerberos = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(userName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = dtMountedPod,
mainContainer = mainContainerWithMountedKerberos)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{Container, Pod}

/**
* The purpose of this case class is so that we can package together
* the driver pod with its container so we can bootstrap and modify
* the class instead of each component seperately
*/
private[spark] case class PodWithMainContainer(
pod: Pod,
mainContainer: Container)
Loading