Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -666,21 +666,6 @@ private[spark] class Client(
private def createConfArchive(): File = {
val hadoopConfFiles = new HashMap[String, File]()

// Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
// the executors will use the latest configurations instead of the default values. This is
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.

// Also uploading metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
hadoopConfFiles(prop) = new File(url.getPath)
}

Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey =>
sys.env.get(envKey).foreach { path =>
val dir = new File(path)
Expand All @@ -705,14 +690,43 @@ private[spark] class Client(

try {
confStream.setLevel(0)

// Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
// the executors will use the latest configurations instead of the default values. This is
// required when user changes log4j.properties directly to set the log configurations. If
// configuration file is provided through --files then executors will be taking configurations
// from --files instead of $SPARK_CONF_DIR/log4j.properties.

// Also upload metrics.properties to distributed cache if exists in classpath.
// If user specify this file using --files then executors will use the one
// from --files instead.
for { prop <- Seq("log4j.properties", "metrics.properties")
url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
if url.getProtocol == "file" } {
val file = new File(url.getPath())
confStream.putNextEntry(new ZipEntry(file.getName()))
Files.copy(file, confStream)
confStream.closeEntry()
}

// Save the Hadoop config files under a separate directory in the archive. This directory
// is appended to the classpath so that the cluster-provided configuration takes precedence.
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/"))
confStream.closeEntry()
hadoopConfFiles.foreach { case (name, file) =>
if (file.canRead()) {
confStream.putNextEntry(new ZipEntry(name))
confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
Files.copy(file, confStream)
confStream.closeEntry()
}
}

// Save the YARN configuration into a separate file that will be overlayed on top of the
// cluster's Hadoop conf.
confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
yarnConf.writeXml(confStream)
confStream.closeEntry()

// Save Spark configuration to a file in the archive.
val props = new Properties()
sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
Expand Down Expand Up @@ -1177,12 +1191,19 @@ private object Client extends Logging {
// Subdirectory where the user's Spark and Hadoop config files will be placed.
val LOCALIZED_CONF_DIR = "__spark_conf__"

// Subdirectory in the conf directory containing Hadoop config files.
val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__"

// File containing the conf archive in the AM. See prepareLocalResources().
val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip"

// Name of the file in the conf archive containing Spark configuration.
val SPARK_CONF_FILE = "__spark_conf__.properties"

// Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the
// cluster's Hadoop config.
val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml"

// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"

Expand Down Expand Up @@ -1288,6 +1309,12 @@ private object Client extends Logging {
sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
addClasspathEntry(getClusterPath(sparkConf, cp), env)
}

// Add the localized Hadoop config at the end of the classpath, in case it contains other
// files (such as configuration files for different services) that are not part of the
// YARN cluster's config.
addClasspathEntry(
buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {

// Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop
// subsystems. Always create a new config, don't reuse yarnConf.
override def newConfiguration(conf: SparkConf): Configuration =
new YarnConfiguration(super.newConfiguration(conf))
override def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new YarnConfiguration(super.newConfiguration(conf))
hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE)
hadoopConf
}

// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
// cluster
Expand Down