Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui

import java.net.{InetSocketAddress, URL}
import java.net.{InetSocketAddress, URL, URLDecoder}
import javax.servlet.DispatcherType
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

Expand Down Expand Up @@ -147,15 +147,25 @@ private[spark] object JettyUtils extends Logging {
val holder : FilterHolder = new FilterHolder()
holder.setClassName(filter)
// Get any parameters for each filter
val paramName = "spark." + filter + ".params"
val params = conf.get(paramName, "").split(',').map(_.trim()).toSet
var paramName = "spark." + filter + ".params"
var params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0), parts(1))
}
}
paramName = "spark." + filter + ".encodedparams"
params = conf.get(paramName, "").split(',').map(_.trim()).toSet
params.foreach {
case param : String =>
if (!param.isEmpty) {
val parts = param.split("=")
if (parts.length == 2) holder.setInitParameter(parts(0),
URLDecoder.decode(parts(1), "UTF-8"))
}
}
val enumDispatcher = java.util.EnumSet.of(DispatcherType.ASYNC, DispatcherType.ERROR,
DispatcherType.FORWARD, DispatcherType.INCLUDE, DispatcherType.REQUEST)
handlers.foreach { case(handler) => handler.addFilter(holder, "/*", enumDispatcher) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
}

override def getProxyHostAndPort(conf: YarnConfiguration) =
YarnConfiguration.getProxyHostAndPort(conf)
List(YarnConfiguration.getProxyHostAndPort(conf))

override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.io.IOException
import java.net.Socket
import java.net.URLEncoder
import java.util.concurrent.atomic.AtomicReference

import scala.collection.JavaConversions._
Expand All @@ -32,6 +33,7 @@ import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.webapp.util.WebAppUtils

import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -324,17 +326,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
/** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = {
val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
val proxy = client.getProxyHostAndPort(yarnConf)
val parts = proxy.split(":")
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val uriBase = "http://" + proxy + proxyBase
val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase

val proxies = client.getProxyHostsAndPorts(yarnConf)
var sbProxies = new StringBuilder
var sbUrlBases = new StringBuilder
for (proxy <- proxies) {
sbProxies ++= proxy.split(":")(0)
sbProxies +=','
sbUrlBases ++= WebAppUtils.getHttpSchemePrefix(yarnConf)
sbUrlBases ++= proxy
sbUrlBases ++= System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
sbUrlBases +=','
}
var params = "PROXY_HOSTS=" + URLEncoder.encode(sbProxies.toString(), "UTF-8") + ","
params ++= "PROXY_URI_BASES=" + URLEncoder.encode(sbUrlBases.toString(), "UTF-8")
if (isDriver) {
System.setProperty("spark.ui.filters", amFilter)
System.setProperty(s"spark.$amFilter.params", params)
System.setProperty(s"spark.$amFilter.encodedparams", params)
} else {
actor ! AddWebUIFilter(amFilter, params, proxyBase)
actor ! AddWebUIFilter(amFilter, params, sbUrlBases.toString())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ trait YarnRMClient {
def getAttemptId(): ApplicationAttemptId

/** Returns the RM's proxy host and port. */
def getProxyHostAndPort(conf: YarnConfiguration): String
def getProxyHostsAndPorts(conf: YarnConfiguration): scala.collection.mutable.Buffer[String]

/** Returns the maximum number of attempts to register the AM. */
def getMaxRegAttempts(conf: YarnConfiguration): Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn

import scala.collection.{Map, Set}
import scala.collection.JavaConversions._

import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.protocolrecords._
Expand Down Expand Up @@ -68,7 +69,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId
}

override def getProxyHostAndPort(conf: YarnConfiguration) = WebAppUtils.getProxyHostAndPort(conf)
override def getProxyHostsAndPorts(conf: YarnConfiguration) =
WebAppUtils.getProxyHostsAndPortsForAmFilter(conf)

override def getMaxRegAttempts(conf: YarnConfiguration) =
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
Expand Down