Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
Expand Down Expand Up @@ -112,7 +113,16 @@ private[spark] class YarnRMClient extends Logging {
val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)
val hosts = proxies.asScala.map(_.split(":").head)
val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase }
Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
val params =
Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))

// Handles RM HA urls
val rmIds = conf.getStringCollection(YarnConfiguration.RM_HA_IDS).asScala
if (rmIds != null && rmIds.nonEmpty) {
params + ("RM_HA_URLS" -> rmIds.map(getUrlByRmId(conf, _)).mkString(","))
} else {
params
}
}

/** Returns the maximum number of attempts to register the AM. */
Expand All @@ -126,4 +136,21 @@ private[spark] class YarnRMClient extends Logging {
}
}

private def getUrlByRmId(conf: Configuration, rmId: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks ok, but it also looks similar to this:

https://github.com/apache/hadoop/blob/branch-2.6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/amfilter/AmFilterInitializer.java

I'm wondering if we could just call that class instead, somehow? It seems available in 2.6 which is the oldest version we support.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Spark usage, I think it may not be so useful to use AmFilterInitializer, because we need to pass the filter parameters to driver either from RPC (client mode) or from configuration (cluster mode), in either way we should know how to set each parameter, so from my understanding using AmFilterInitializer seems not so useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, it would be hard to use that class in the client case.

val addressPropertyPrefix = if (YarnConfiguration.useHttps(conf)) {
YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS
} else {
YarnConfiguration.RM_WEBAPP_ADDRESS
}

val addressWithRmId = if (rmId == null || rmId.isEmpty) {
addressPropertyPrefix
} else if (rmId.startsWith(".")) {
throw new IllegalStateException(s"rmId $rmId should not already have '.' prepended.")
} else {
s"$addressPropertyPrefix.$rmId"
}

conf.get(addressWithRmId)
}
}