diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 486818056977..ce290c399d9f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -90,6 +90,23 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + private val userClassLoader = { + val classpath = Client.getUserClasspath(sparkConf) + val urls = classpath.map { entry => + new URL("file:" + new File(entry.getPath()).getAbsolutePath()) + } + + if (isClusterMode) { + if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { + new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + } else { + new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) + } + } + // Lock for controlling the allocator (heartbeat) thread. private val allocatorLock = new Object() @@ -242,16 +259,27 @@ private[spark] class ApplicationMaster( // If the credentials file config is present, we must periodically renew tokens. So create // a new AMDelegationTokenRenewer - if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) { - // If a principal and keytab have been set, use that to create new credentials for executors - // periodically - val credentialManager = new YARNHadoopDelegationTokenManager( - sparkConf, - yarnConf, - YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) - - val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) - credentialRenewer.scheduleLoginFromKeytab() + if (sparkConf.contains(CREDENTIALS_FILE_PATH)) { + // Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the + // classloader so that main jar and secondary jars could be used by AMCredentialRenewer. + val credentialRenewerThread = new Thread { + setName("AMCredentialRenewerStarter") + setContextClassLoader(userClassLoader) + + override def run(): Unit = { + val credentialManager = new YARNHadoopDelegationTokenManager( + sparkConf, + yarnConf, + YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf)) + + val credentialRenewer = + new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) + credentialRenewer.scheduleLoginFromKeytab() + } + } + + credentialRenewerThread.start() + credentialRenewerThread.join() } if (isClusterMode) { @@ -609,17 +637,6 @@ private[spark] class ApplicationMaster( private def startUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") - val classpath = Client.getUserClasspath(sparkConf) - val urls = classpath.map { entry => - new URL("file:" + new File(entry.getPath()).getAbsolutePath()) - } - val userClassLoader = - if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { - new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } else { - new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) - } - var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { // When running pyspark, the app is run using PythonRunner. The second argument is the list