diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e5131e636dc0..f56794dfe4b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -666,21 +666,6 @@ private[spark] class Client( private def createConfArchive(): File = { val hadoopConfFiles = new HashMap[String, File]() - // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that - // the executors will use the latest configurations instead of the default values. This is - // required when user changes log4j.properties directly to set the log configurations. If - // configuration file is provided through --files then executors will be taking configurations - // from --files instead of $SPARK_CONF_DIR/log4j.properties. - - // Also uploading metrics.properties to distributed cache if exists in classpath. - // If user specify this file using --files then executors will use the one - // from --files instead. - for { prop <- Seq("log4j.properties", "metrics.properties") - url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) - if url.getProtocol == "file" } { - hadoopConfFiles(prop) = new File(url.getPath) - } - Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => sys.env.get(envKey).foreach { path => val dir = new File(path) @@ -705,14 +690,43 @@ private[spark] class Client( try { confStream.setLevel(0) + + // Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that + // the executors will use the latest configurations instead of the default values. This is + // required when user changes log4j.properties directly to set the log configurations. If + // configuration file is provided through --files then executors will be taking configurations + // from --files instead of $SPARK_CONF_DIR/log4j.properties. + + // Also upload metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + val file = new File(url.getPath()) + confStream.putNextEntry(new ZipEntry(file.getName())) + Files.copy(file, confStream) + confStream.closeEntry() + } + + // Save the Hadoop config files under a separate directory in the archive. This directory + // is appended to the classpath so that the cluster-provided configuration takes precedence. + confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/")) + confStream.closeEntry() hadoopConfFiles.foreach { case (name, file) => if (file.canRead()) { - confStream.putNextEntry(new ZipEntry(name)) + confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name")) Files.copy(file, confStream) confStream.closeEntry() } } + // Save the YARN configuration into a separate file that will be overlayed on top of the + // cluster's Hadoop conf. + confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE)) + yarnConf.writeXml(confStream) + confStream.closeEntry() + // Save Spark configuration to a file in the archive. val props = new Properties() sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } @@ -1177,12 +1191,19 @@ private object Client extends Logging { // Subdirectory where the user's Spark and Hadoop config files will be placed. val LOCALIZED_CONF_DIR = "__spark_conf__" + // Subdirectory in the conf directory containing Hadoop config files. + val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" + // File containing the conf archive in the AM. See prepareLocalResources(). val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip" // Name of the file in the conf archive containing Spark configuration. val SPARK_CONF_FILE = "__spark_conf__.properties" + // Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the + // cluster's Hadoop config. + val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml" + // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" @@ -1288,6 +1309,12 @@ private object Client extends Logging { sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) } + + // Add the localized Hadoop config at the end of the classpath, in case it contains other + // files (such as configuration files for different services) that are not part of the + // YARN cluster's config. + addClasspathEntry( + buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4522071bd92e..a687f67c5b69 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -61,8 +61,11 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop // subsystems. Always create a new config, don't reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = - new YarnConfiguration(super.newConfiguration(conf)) + override def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) + hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) + hadoopConf + } // Add any user credentials to the job conf which are necessary for running on a secure Hadoop // cluster