Skip to content

Commit 7dbc726

Browse files
committed
refactor the code
Change-Id: Iea5e1eb1f61bcd1ad063ac2c9958950858879756
1 parent 189cc77 commit 7dbc726

File tree

1 file changed

+25
-35
lines changed

1 file changed

+25
-35
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,31 @@ private[spark] class ApplicationMaster(
257257
// doAs in order for the credentials to be passed on to the executor containers.
258258
val securityMgr = new SecurityManager(sparkConf)
259259

260+
// If the credentials file config is present, we must periodically renew tokens. So create
261+
// a new AMDelegationTokenRenewer
262+
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
263+
// Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
264+
// classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
265+
val credentialRenewerThread = new Thread {
266+
setName("AMCredentialRenewerStarter")
267+
setContextClassLoader(userClassLoader)
268+
269+
override def run(): Unit = {
270+
val credentialManager = new YARNHadoopDelegationTokenManager(
271+
sparkConf,
272+
yarnConf,
273+
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
274+
275+
val credentialRenewer =
276+
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
277+
credentialRenewer.scheduleLoginFromKeytab()
278+
}
279+
}
280+
281+
credentialRenewerThread.start()
282+
credentialRenewerThread.join()
283+
}
284+
260285
if (isClusterMode) {
261286
runDriver(securityMgr)
262287
} else {
@@ -441,24 +466,6 @@ private[spark] class ApplicationMaster(
441466
registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"),
442467
securityMgr)
443468

444-
// If the credentials file config is present, we must periodically renew tokens. So create
445-
// a new AMDelegationTokenRenewer
446-
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
447-
// Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
448-
// classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
449-
val credentialRenewerThread = new Thread {
450-
setName("AMCredentialRenewerStarter")
451-
setContextClassLoader(userClassLoader)
452-
453-
override def run(): Unit = {
454-
startAMCredentialRenewer()
455-
}
456-
}
457-
458-
credentialRenewerThread.start()
459-
credentialRenewerThread.join()
460-
}
461-
462469
// In client mode the actor will stop the reporter thread.
463470
reporterThread.join()
464471
}
@@ -645,11 +652,6 @@ private[spark] class ApplicationMaster(
645652
val userThread = new Thread {
646653
override def run() {
647654
try {
648-
// If the credentials file config is present, we must periodically renew tokens. So create
649-
// a new AMDelegationTokenRenewer
650-
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
651-
startAMCredentialRenewer()
652-
}
653655
mainMethod.invoke(null, userArgs.toArray)
654656
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
655657
logDebug("Done running users class")
@@ -689,18 +691,6 @@ private[spark] class ApplicationMaster(
689691
allocatorLock.notifyAll()
690692
}
691693

692-
private def startAMCredentialRenewer(): Unit = {
693-
// If a principal and keytab have been set, use that to create new credentials for executors
694-
// periodically
695-
val credentialManager = new YARNHadoopDelegationTokenManager(
696-
sparkConf,
697-
yarnConf,
698-
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))
699-
700-
val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
701-
credentialRenewer.scheduleLoginFromKeytab()
702-
}
703-
704694
/**
705695
* An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
706696
*/

0 commit comments

Comments
 (0)