-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS #4688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
harishreedharan
wants to merge
42
commits into
apache:master
from
harishreedharan:kerberos-longrunning
Closed
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
77914dd
WIP: Add kerberos principal and keytab to YARN client.
harishreedharan ccba5bc
WIP: More changes wrt kerberos
harishreedharan 2b0d745
[SPARK-5342][YARN] Allow long running Spark apps to run on secure YAR…
harishreedharan f8fe694
Handle None if keytab-login is not scheduled.
harishreedharan bcfc374
Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with i…
harishreedharan d282d7a
Fix ClientSuite to set YARN mode, so that the correct class is used i…
harishreedharan 41efde0
Merge branch 'master' into kerberos-longrunning
harishreedharan fb27f46
Make sure principal and keytab are set before CoarseGrainedSchedulerB…
harishreedharan 8c6928a
Fix issue caused by direct creation of Actor object.
harishreedharan d79b2b9
Make sure correct credentials are passed to FileSystem#addDelegationT…
harishreedharan 0985b4e
Write tokens to HDFS and read them back when required, rather than se…
harishreedharan b4cb917
Send keytab to AM via DistributedCache rather than directly via HDFS
harishreedharan 5c11c3e
Move tests to YarnSparkHadoopUtil to fix compile issues.
harishreedharan f6954da
Got rid of Akka communication to renew, instead the executors check a…
harishreedharan f0f54cb
Be more defensive when updating the credentials file.
harishreedharan af6d5f0
Cleaning up files where changes weren't required.
harishreedharan 2debcea
Change the file structure for credentials files. I will push a follow…
harishreedharan f4fd711
Fix SparkConf usage.
harishreedharan 9ef5f1b
Added explanation of how the credentials refresh works, some other mi…
harishreedharan 55522e3
Fix failure caused by Preconditions ambiguity.
harishreedharan 0de27ee
Merge branch 'master' into kerberos-longrunning
harishreedharan 42813b4
Remove utils.sh, which was re-added due to merge with master.
harishreedharan fa233bd
Adding logging, fixing minor formatting and ordering issues.
harishreedharan 62c45ce
Relogin from keytab periodically.
harishreedharan 61b2b27
Account for AM restarts by making sure lastSuffix is read from the fi…
harishreedharan 2f9975c
Ensure new tokens are written out immediately on AM restart. Also, pi…
harishreedharan f74303c
Move the new logic into specialized classes. Add cleanup for old cred…
harishreedharan bcd11f9
Refactor AM and Executor token update code into separate classes, als…
harishreedharan 7f1bc58
Minor fixes, cleanup.
harishreedharan 0e9507e
Merge branch 'master' into kerberos-longrunning
harishreedharan e800c8b
Restore original RegisteredExecutor message, and send new tokens via …
harishreedharan 8a4f268
Added docs in the security guide. Changed some code to ensure that th…
harishreedharan 7b19643
Merge branch 'master' into kerberos-longrunning
harishreedharan bc083e3
Overload RegisteredExecutor to send tokens. Minor doc updates.
harishreedharan ebb36f5
Merge branch 'master' into kerberos-longrunning
harishreedharan 42eead4
Remove RPC part. Refactor and move methods around, use renewal interv…
harishreedharan f041dd3
Merge branch 'master' into kerberos-longrunning
harishreedharan 072659e
Fix build failure caused by thread factory getting moved to ThreadUtils.
harishreedharan 6963bbc
Schedule renewal in AM before starting user class. Else, a restarted …
harishreedharan 09fe224
Use token.renew to get token's renewal interval rather than using hdf…
harishreedharan 611923a
Make sure the namenodes are listed correctly for creating tokens.
harishreedharan 36eb8a9
Change the renewal interval config param. Fix a bunch of comments.
harishreedharan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
105 changes: 105 additions & 0 deletions
105
core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy | ||
|
|
||
| import java.util.concurrent.{Executors, TimeUnit} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
| import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
|
|
||
| import org.apache.spark.{Logging, SparkConf} | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| private[spark] class ExecutorDelegationTokenUpdater( | ||
| sparkConf: SparkConf, | ||
| hadoopConf: Configuration) extends Logging { | ||
|
|
||
| @volatile private var lastCredentialsFileSuffix = 0 | ||
|
|
||
| private val credentialsFile = sparkConf.get("spark.yarn.credentials.file") | ||
|
|
||
| private val delegationTokenRenewer = | ||
| Executors.newSingleThreadScheduledExecutor( | ||
| ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread")) | ||
|
|
||
| // On the executor, this thread wakes up and picks up new tokens from HDFS, if any. | ||
| private val executorUpdaterRunnable = | ||
| new Runnable { | ||
| override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired()) | ||
| } | ||
|
|
||
| def updateCredentialsIfRequired(): Unit = { | ||
| try { | ||
| val credentialsFilePath = new Path(credentialsFile) | ||
| val remoteFs = FileSystem.get(hadoopConf) | ||
| SparkHadoopUtil.get.listFilesSorted( | ||
| remoteFs, credentialsFilePath.getParent, | ||
| credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION) | ||
| .lastOption.foreach { credentialsStatus => | ||
| val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath) | ||
| if (suffix > lastCredentialsFileSuffix) { | ||
| logInfo("Reading new delegation tokens from " + credentialsStatus.getPath) | ||
| val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath) | ||
| lastCredentialsFileSuffix = suffix | ||
| UserGroupInformation.getCurrentUser.addCredentials(newCredentials) | ||
| logInfo("Tokens updated from credentials file.") | ||
| } else { | ||
| // Check every hour to see if new credentials arrived. | ||
| logInfo("Updated delegation tokens were expected, but the driver has not updated the " + | ||
| "tokens yet, will check again in an hour.") | ||
| delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) | ||
| return | ||
| } | ||
| } | ||
| val timeFromNowToRenewal = | ||
| SparkHadoopUtil.get.getTimeFromNowToRenewal( | ||
| sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials) | ||
| if (timeFromNowToRenewal <= 0) { | ||
| executorUpdaterRunnable.run() | ||
| } else { | ||
| logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.") | ||
| delegationTokenRenewer.schedule( | ||
| executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS) | ||
| } | ||
| } catch { | ||
| // Since the file may get deleted while we are reading it, catch the Exception and come | ||
| // back in an hour to try again | ||
| case NonFatal(e) => | ||
| logWarning("Error while trying to update credentials, will try again in 1 hour", e) | ||
| delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS) | ||
| } | ||
| } | ||
|
|
||
| private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = { | ||
| val stream = remoteFs.open(tokenPath) | ||
| try { | ||
| val newCredentials = new Credentials() | ||
| newCredentials.readTokenStorageStream(stream) | ||
| newCredentials | ||
| } finally { | ||
| stream.close() | ||
| } | ||
| } | ||
|
|
||
| def stop(): Unit = { | ||
| delegationTokenRenewer.shutdown() | ||
| } | ||
|
|
||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ SSL must be configured on each node and configured for each component involved i | |
| ### YARN mode | ||
| The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. | ||
|
|
||
| For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove the reference to the renew-interval if you remove use of it |
||
| ### Standalone mode | ||
| The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you say what's the sort order in the comment also?