Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,27 @@ private[yarn] class ExecutorRunnable(
val env = new HashMap[String, String]()
Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH))

sparkConf.getExecutorEnv.foreach { case (key, value) =>
// This assumes each executor environment variable set here is a path
// This is kept for backward compatibility and consistency with hadoop
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
}

// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"

System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }

sparkConf.getExecutorEnv.foreach { case (key, value) =>
if (key == Environment.CLASSPATH.name()) {
// If the key of env variable is CLASSPATH, we assume it is a path and append it.
// This is kept for backward compatibility and consistency with hadoop
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
} else {
// For other env variables, simply overwrite the value.
env(key) = value
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerryshao I think there is a potential issue with this change - it allows for users to (incorrectly) specify SPARK_LOG_URL_STDERR, SPARK_LOG_URL_STDOUT : which should be generated by driver. The section "// Add log urls" above this code snippet.

Note, this is an existing bug in the code regarding the same - if the same variables had been present in driver env, they would have overridden the generated value's.
Would be good to fix this issue as well as part of this change.

Solution would be to move the block for '// Add log urls' below this current block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will fix it, thanks for the reminder.


// Add log urls
container.foreach { c =>
sys.env.get("SPARK_USER").foreach { user =>
Expand All @@ -245,8 +253,6 @@ private[yarn] class ExecutorRunnable(
}
}

System.getenv().asScala.filterKeys(_.startsWith("SPARK"))
.foreach { case (k, v) => env(k) = v }
env
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
finalState should be (SparkAppHandle.State.FAILED)
}

test("executor env overwrite AM env in client mode") {
testExecutorEnv(true)
}

test("executor env overwrite AM env in cluster mode") {
testExecutorEnv(false)
}

private def testBasicYarnApp(clientMode: Boolean, conf: Map[String, String] = Map()): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
Expand Down Expand Up @@ -292,6 +300,17 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, executorResult, "OVERRIDDEN")
}

private def testExecutorEnv(clientMode: Boolean): Unit = {
val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(ExecutorEnvTestApp.getClass),
appArgs = Seq(result.getAbsolutePath),
extraConf = Map(
"spark.yarn.appMasterEnv.TEST_ENV" -> "am_val",
"spark.executorEnv.TEST_ENV" -> "executor_val"
)
)
checkResult(finalState, result, "true")
}
}

private[spark] class SaveExecutorInfo extends SparkListener {
Expand Down Expand Up @@ -508,3 +527,20 @@ private object SparkContextTimeoutApp {
}

}

private object ExecutorEnvTestApp {

def main(args: Array[String]): Unit = {
val status = args(0)
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val executorEnvs = sc.parallelize(Seq(1)).flatMap { _ => sys.env }.collect().toMap
val result = sparkConf.getExecutorEnv.forall { case (k, v) =>
executorEnvs.get(k).contains(v)
}

Files.write(result.toString, new File(status), StandardCharsets.UTF_8)
sc.stop()
}

}