diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index b59dcf158d87c..05a7b1e1310c4 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -112,7 +113,16 @@ private[spark] class YarnRMClient extends Logging { val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) val hosts = proxies.asScala.map(_.split(":").head) val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + val params = + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + + // Handles RM HA urls + val rmIds = conf.getStringCollection(YarnConfiguration.RM_HA_IDS).asScala + if (rmIds != null && rmIds.nonEmpty) { + params + ("RM_HA_URLS" -> rmIds.map(getUrlByRmId(conf, _)).mkString(",")) + } else { + params + } } /** Returns the maximum number of attempts to register the AM. */ @@ -126,4 +136,21 @@ private[spark] class YarnRMClient extends Logging { } } + private def getUrlByRmId(conf: Configuration, rmId: String): String = { + val addressPropertyPrefix = if (YarnConfiguration.useHttps(conf)) { + YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS + } else { + YarnConfiguration.RM_WEBAPP_ADDRESS + } + + val addressWithRmId = if (rmId == null || rmId.isEmpty) { + addressPropertyPrefix + } else if (rmId.startsWith(".")) { + throw new IllegalStateException(s"rmId $rmId should not already have '.' prepended.") + } else { + s"$addressPropertyPrefix.$rmId" + } + + conf.get(addressWithRmId) + } }