Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,23 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}

if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}
}

// Lock for controlling the allocator (heartbeat) thread.
private val allocatorLock = new Object()

Expand Down Expand Up @@ -242,16 +259,27 @@ private[spark] class ApplicationMaster(

// If the credentials file config is present, we must periodically renew tokens. So create
// a new AMDelegationTokenRenewer
if (sparkConf.contains(CREDENTIALS_FILE_PATH.key)) {
// If a principal and keytab have been set, use that to create new credentials for executors
// periodically
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
yarnConf,
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))

val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
if (sparkConf.contains(CREDENTIALS_FILE_PATH)) {
// Start a short-lived thread for AMCredentialRenewer, the only purpose is to set the
// classloader so that main jar and secondary jars could be used by AMCredentialRenewer.
val credentialRenewerThread = new Thread {
setName("AMCredentialRenewerStarter")
setContextClassLoader(userClassLoader)

override def run(): Unit = {
val credentialManager = new YARNHadoopDelegationTokenManager(
sparkConf,
yarnConf,
YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, yarnConf))

val credentialRenewer =
new AMCredentialRenewer(sparkConf, yarnConf, credentialManager)
credentialRenewer.scheduleLoginFromKeytab()
}
}

credentialRenewerThread.start()
credentialRenewerThread.join()
}

if (isClusterMode) {
Expand Down Expand Up @@ -609,17 +637,6 @@ private[spark] class ApplicationMaster(
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")

val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
val userClassLoader =
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
} else {
new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader)
}

var userArgs = args.userArgs
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
// When running pyspark, the app is run using PythonRunner. The second argument is the list
Expand Down