From e8256e5278da8a8b4aaefb7638a2683e2102ab4a Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 16 Aug 2014 08:20:44 +0800 Subject: [PATCH 01/21] support https in spark web ui --- .../apache/spark/deploy/DeployMessage.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 7 +-- .../spark/deploy/master/WorkerInfo.scala | 6 +-- .../apache/spark/deploy/worker/Worker.scala | 4 +- .../spark/deploy/worker/WorkerArguments.scala | 3 ++ .../org/apache/spark/ui/JettyUtils.scala | 49 ++++++++++++++++++- .../scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../scala/org/apache/spark/ui/WebUI.scala | 6 ++- .../spark/deploy/JsonProtocolSuite.scala | 2 +- 9 files changed, 68 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index a7368f9f3dfb..352ad2a7980e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -39,7 +39,7 @@ private[deploy] object DeployMessages { port: Int, cores: Int, memory: Int, - webUiPort: Int, + workerWebUiUrl: String, publicAddress: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index cfa2c028a807..7d5df1d3ea75 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -122,7 +122,8 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort + val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://" + masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) @@ -190,7 +191,7 @@ private[spark] class Master( System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) @@ -200,7 +201,7 @@ private[spark] class Master( sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerUiPort, publicAddress) + sender, workerWebUiUrl, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c5fa9cf7d7c2..0335894bae51 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -30,7 +30,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiPort: Int, + val webUiAddress: String, val publicAddress: String) extends Serializable { @@ -99,10 +99,6 @@ private[spark] class WorkerInfo( coresUsed -= driver.desc.cores } - def webUiAddress : String = { - "http://" + this.publicAddress + ":" + this.webUiPort - } - def setState(state: WorkerState.Value) = { this.state = state } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 80fde7e4b262..40753ba51963 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -78,6 +78,7 @@ private[spark] class Worker( var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) + var workerWebUiUrl: String = _ @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -138,6 +139,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() + workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) @@ -163,7 +165,7 @@ private[spark] class Worker( for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 1e295aaa48c3..f49bdd35c3f3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -53,6 +53,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } + if (conf.contains("worker.ui.port")) { + webUiPort = conf.get("worker.ui.port").toInt + } parse(args.toList) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 6b4689291097..2950a01fc294 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -26,7 +26,7 @@ import scala.language.implicitConversions import scala.util.{Failure, Success, Try} import scala.xml.Node -import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.{Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool @@ -35,6 +35,8 @@ import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector /** * Utilities for launching a web server using Jetty's HTTP Server class @@ -183,7 +185,8 @@ private[spark] object JettyUtils extends Logging { // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { - val server = new Server(new InetSocketAddress(hostName, currentPort)) + val server = new Server + server.addConnector(getConnector(currentPort, conf)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -207,6 +210,48 @@ private[spark] object JettyUtils extends Logging { private def attachPrefix(basePath: String, relativePath: String): String = { if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") } + + private def getConnector(port: Int, conf: SparkConf): Connector = { + val https = getHttpPolicy(conf) + if (https) { + buildSslSelectChannelConnector(port, conf) + } else { + conf.set("spark.http.policy", "http") + val connector = new SelectChannelConnector + connector.setPort(port) + connector + } + } + + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = + { + val connector = new SslSelectChannelConnector + connector.setPort(port) + + val context = connector.getSslContextFactory + val needAuth = conf.getBoolean("spark.client.https.need-auth", false) + context.setNeedClientAuth(needAuth) + context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword")) + if (conf.contains("spark.ssl.server.keystore.location")) { + context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password")) + context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + } + if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { + context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) + context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) + context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) + } + connector + } + + def getHttpPolicy(conf: SparkConf): Boolean = { + if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) { + true + } else { + false + } + } } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 6c788a37dc70..548f305db06b 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -97,7 +97,9 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private[spark] def appUIAddress = s"http://$appUIHostPort" + private def appUiAddressPrefix = conf.get("spark.http.policy") + + private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" } private[spark] object SparkUI { diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 5f52f9508800..d498661ccdaf 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -99,7 +99,11 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + if (conf.get("spark.http.policy").equals("https")) { + logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort)) + } else { + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + } } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 31aa7ec837f4..545be8eac8af 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, 80, "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://host:8080", "publicAddress") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } From 8f7cc967e6fd1b9d0b3768543afe06211e2178fa Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 16 Aug 2014 10:27:15 +0800 Subject: [PATCH 02/21] add unit test --- .../org/apache/spark/ui/JettyUtils.scala | 12 +++--- core/src/test/resources/spark.keystore | Bin 0 -> 1383 bytes .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../scala/org/apache/spark/ui/UISuite.scala | 40 ++++++++++++++++++ 4 files changed, 47 insertions(+), 7 deletions(-) create mode 100644 core/src/test/resources/spark.keystore diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 2950a01fc294..f72f46b10610 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -230,13 +230,13 @@ private[spark] object JettyUtils extends Logging { val context = connector.getSslContextFactory val needAuth = conf.getBoolean("spark.client.https.need-auth", false) + context.setNeedClientAuth(needAuth) - context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword")) - if (conf.contains("spark.ssl.server.keystore.location")) { - context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password")) - context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) - } + context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) + context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) + context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) diff --git a/core/src/test/resources/spark.keystore b/core/src/test/resources/spark.keystore new file mode 100644 index 0000000000000000000000000000000000000000..f30716b57b30298e78e85011d7595bb42d6a85d7 GIT binary patch literal 1383 zcmezO_TO6u1_mY|W&~sI#Dc`+jMU;q4hU zrO=-|ukKbAC9>v6Xj~6^dGz-;rEIy-hpu9VYZQKRFxVT-dox2Ha~PrWerl|?>*gSJ~<=YJoCs7 z*}f&=)m!s=l$QKIcjfZVg^^3XXEFT`vO68ixrQZv8(+};usDIh+i!Q@T=VR`^89pT z6UnxZD&GWt=NYjq{y2LTm;dYUELm3^{5SfoauvOH{%7jM zPd9ov=e?iU^JzzTZ}X+bIj3AJ&nEJhmq@bQG`idUM*r_}6^5YVRe!GJFIIo??*6>C z7vIbpE||~zQ!sDui>F(=K72@uKDoT)BuD=)FFucpI|Dxnt`Wa(wwCK?M(X@4_g0qv ztpD_f>7yp2OTnRV-yp{hjyJClXHNV+>q73A6Efx(7W|w4hRtoVLjNK5_p&_tOri@G z{Lh`aywmi#B-gd1RwMiDXR`Sds`^gv$v>tTp*Z=;ypD7=&V}2nyWfYn94Z5+Yt{%o zQv*w2N=^W#ngfr2=%p^1Top_ze^fw_@olmx$#AyB~F$QUX>2RAg% zNA@=`KQlM>G8i;=GBq|b>|Y()b*zay^1{UnwF`D`zQs^EAxidc%t7#mf$4xWh?;QP`aNzKxWvBigJX3kB%Cn;TgK>#Ki-+Ei zc;e5llU;DN!SO-MqGZpx%c~L(NKLurv%1*WaJBx7*DqEbh)WH4UC(G58aeeo6Eh`HsJjn8~h_Q&2Yt_uD=KU+=u`I5WC28qf3udlRWFG)yjv46m)qD=0ESebC ziYo0X-*0sLL%{Uzrn!0-|1q(j+ZbcPrTDqLXr@NmG%jVavp<7ao@I!_uIxq zi}0s+x-40>E*VS9-92;o$X37h9T#V&HuGLNv9?}$<0-YDvbCT06;8F*j&j~ua5_~f w#jTO2x_9%}r!KmNKLjS12(B>byIgicKkS7jrwSLx5-rOqYs=I>YaZYR0Q-$KZvX%Q literal 0 HcmV?d00001 diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 545be8eac8af..f6c7c19c4101 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://host:8080", "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80", "publicAddress") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 038746d2eda4..597d3dd72d16 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -115,6 +115,30 @@ class UISuite extends FunSuite { assert(boundPort1 != boundPort2) } + test("jetty with https selects different port under contention") { + val startPort = 4040 + val server = new Server(startPort) + + Try { server.start() } match { + case Success(s) => + case Failure(e) => + // Either case server port is busy hence setup for test complete + } + val sparkConf = new SparkConf() + .set("spark.http.policy", "https") + .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + val serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + val serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } + test("jetty binds to port 0 correctly") { val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) @@ -128,6 +152,22 @@ class UISuite extends FunSuite { } } + test("jetty with https binds to port 0 correctly") { + val sparkConf = new SparkConf() + .set("spark.http.policy", "https") + .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + val serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + Try { new ServerSocket(boundPort) } match { + case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) + case Failure(e) => + } + } + test("verify appUIAddress contains the scheme") { withSpark(new SparkContext("local", "test")) { sc => val uiAddress = sc.ui.appUIAddress From c90d84e0c70a88742feb453c6f1517163878c003 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 22:44:55 +0800 Subject: [PATCH 03/21] fix according to comments --- .../org/apache/spark/deploy/DeployMessage.scala | 3 +-- .../org/apache/spark/deploy/master/Master.scala | 10 +++++++--- .../apache/spark/deploy/master/WorkerInfo.scala | 3 +-- .../org/apache/spark/deploy/worker/Worker.scala | 11 ++++++++--- .../spark/deploy/worker/WorkerArguments.scala | 3 --- .../scala/org/apache/spark/ui/JettyUtils.scala | 14 +++----------- .../main/scala/org/apache/spark/ui/SparkUI.scala | 2 -- .../src/main/scala/org/apache/spark/ui/WebUI.scala | 12 +++++++----- .../test/scala/org/apache/spark/ui/UISuite.scala | 6 ++++-- 9 files changed, 31 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 352ad2a7980e..964ede01922b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -39,8 +39,7 @@ private[deploy] object DeployMessages { port: Int, cores: Int, memory: Int, - workerWebUiUrl: String, - publicAddress: String) + workerWebUiUrl: String) extends DeployMessage { Utils.checkHost(host, "Required hostname") assert (port > 0) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7d5df1d3ea75..a7e928e5908b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -122,7 +122,11 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - val masterWebUiUrlPrefix = conf.get("spark.http.policy") + "://" + val masterWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https://" + } else{ + "http://" + } masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) @@ -191,7 +195,7 @@ private[spark] class Master( System.exit(0) } - case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl, publicAddress) => + case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiUrl) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) @@ -201,7 +205,7 @@ private[spark] class Master( sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - sender, workerWebUiUrl, publicAddress) + sender, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 0335894bae51..f775d0d783e0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -30,8 +30,7 @@ private[spark] class WorkerInfo( val cores: Int, val memory: Int, val actor: ActorRef, - val webUiAddress: String, - val publicAddress: String) + val webUiAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 40753ba51963..616b2d01b64c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -78,7 +78,7 @@ private[spark] class Worker( var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) - var workerWebUiUrl: String = _ + var workerWebUiUrl: String = "" @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -139,7 +139,12 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - workerWebUiUrl = conf.get("spark.http.policy") + "://" + publicAddress + ":" + webUi.boundPort + val workerWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https://" + } else{ + "http://" + } + workerWebUiUrl = workerWebUiUrlPrefix + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) @@ -165,7 +170,7 @@ private[spark] class Worker( for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) - actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index f49bdd35c3f3..1e295aaa48c3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -53,9 +53,6 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) { if (System.getenv("SPARK_WORKER_DIR") != null) { workDir = System.getenv("SPARK_WORKER_DIR") } - if (conf.contains("worker.ui.port")) { - webUiPort = conf.get("worker.ui.port").toInt - } parse(args.toList) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index f72f46b10610..9e4bad4a54a8 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -30,13 +30,13 @@ import org.eclipse.jetty.server.{Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool +import org.eclipse.jetty.server.nio.SelectChannelConnector +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils -import org.eclipse.jetty.server.nio.SelectChannelConnector -import org.eclipse.jetty.server.ssl.SslSelectChannelConnector /** * Utilities for launching a web server using Jetty's HTTP Server class @@ -212,11 +212,10 @@ private[spark] object JettyUtils extends Logging { } private def getConnector(port: Int, conf: SparkConf): Connector = { - val https = getHttpPolicy(conf) + val https = conf.get("spark.ui.https.enabled", "false").toBoolean if (https) { buildSslSelectChannelConnector(port, conf) } else { - conf.set("spark.http.policy", "http") val connector = new SelectChannelConnector connector.setPort(port) connector @@ -245,13 +244,6 @@ private[spark] object JettyUtils extends Logging { connector } - def getHttpPolicy(conf: SparkConf): Boolean = { - if (conf.contains("spark.http.policy") && conf.get("spark.http.policy").equals("https")) { - true - } else { - false - } - } } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 548f305db06b..c450160e5064 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -97,8 +97,6 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private def appUiAddressPrefix = conf.get("spark.http.policy") - private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index d498661ccdaf..2e98c6ebd1c5 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -91,6 +91,12 @@ private[spark] abstract class WebUI( } } + def appUiAddressPrefix = if(conf.get("spark.ui.https.enabled", "false").toBoolean) { + "https" + } else { + "http" + } + /** Initialize all components of the server. */ def initialize() @@ -99,11 +105,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - if (conf.get("spark.http.policy").equals("https")) { - logInfo("Started %s at https://%s:%d".format(className, publicHostName, boundPort)) - } else { - logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) - } + logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 597d3dd72d16..13990628b98f 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -124,8 +124,10 @@ class UISuite extends FunSuite { case Failure(e) => // Either case server port is busy hence setup for test complete } + //keytool -export -keystore core/src/test/resources/spark.keystore -alias spark -file /home/wf/code/spark2/core/src/test/resources/spark.cer -storepass 123456 + val sparkConf = new SparkConf() - .set("spark.http.policy", "https") + .set("spark.ui.https.enabled", "true") .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") val serverInfo1 = JettyUtils.startJettyServer( "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) @@ -154,7 +156,7 @@ class UISuite extends FunSuite { test("jetty with https binds to port 0 correctly") { val sparkConf = new SparkConf() - .set("spark.http.policy", "https") + .set("spark.ui.https.enabled", "true") .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) From de8d1bd5cc3c840230d6d5dd4a4866bf0f0f7575 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 23:00:55 +0800 Subject: [PATCH 04/21] fix scalastyle --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2e98c6ebd1c5..b16c3e346d2e 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -105,7 +105,8 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format(className, publicHostName, boundPort)) + logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format( + className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) From 35074fd465dee6c3dae8e0a43f2787aed50f4537 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 21 Sep 2014 23:27:32 +0800 Subject: [PATCH 05/21] fix workerinfo in JsonProtocolSuite --- .../test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala | 2 +- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index f6c7c19c4101..fe7e0cb98941 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -111,7 +111,7 @@ class JsonProtocolSuite extends FunSuite { createDriverDesc(), new Date()) def createWorkerInfo(): WorkerInfo = { - val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80", "publicAddress") + val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80") workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis workerInfo } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 13990628b98f..cedb593cc801 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -124,7 +124,6 @@ class UISuite extends FunSuite { case Failure(e) => // Either case server port is busy hence setup for test complete } - //keytool -export -keystore core/src/test/resources/spark.keystore -alias spark -file /home/wf/code/spark2/core/src/test/resources/spark.cer -storepass 123456 val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "true") From 9591c9c5b90e0d1d65b4a48b08b53f6a3ed3e49d Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 22 Sep 2014 00:27:37 +0800 Subject: [PATCH 06/21] import org.eclipse.jetty.server.Server to fix test error --- core/src/test/scala/org/apache/spark/ui/UISuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index fffc29ec5034..ac503e4dc756 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,6 +24,7 @@ import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.servlet.ServletContextHandler +import org.eclipse.jetty.server.Server import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ From 333334adcefca2f52eeef58884b62eed87e65d15 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 25 Sep 2014 08:04:35 +0800 Subject: [PATCH 07/21] fix comments --- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../org/apache/spark/ui/JettyUtils.scala | 51 ++++++++++++++----- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 98165277911a..f073799a95d7 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -138,7 +138,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - val workerWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { + val workerWebUiUrlPrefix = if ( conf.get("spark.ui.https.enabled", "false").toBoolean) { "https://" } else{ "http://" diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9e4bad4a54a8..574f9f165528 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -21,6 +21,8 @@ import java.net.{InetSocketAddress, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import org.eclipse.jetty.util.ssl.SslContextFactory + import scala.annotation.tailrec import scala.language.implicitConversions import scala.util.{Failure, Success, Try} @@ -186,7 +188,9 @@ private[spark] object JettyUtils extends Logging { // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server - server.addConnector(getConnector(currentPort, conf)) + val connector = getConnector(currentPort, conf) + connector.setHost(hostName) + server.addConnector(connector) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -222,28 +226,47 @@ private[spark] object JettyUtils extends Logging { } } - private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = - { - val connector = new SslSelectChannelConnector - connector.setPort(port) + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { - val context = connector.getSslContextFactory + val ctxFactory = new SslContextFactory() val needAuth = conf.getBoolean("spark.client.https.need-auth", false) - context.setNeedClientAuth(needAuth) - context.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) - context.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - context.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) - context.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) + conf.getAll + .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } + .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } + + ctxFactory.setNeedClientAuth(needAuth) + ctxFactory.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) + ctxFactory.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) + ctxFactory.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) + ctxFactory.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { - context.setTrustStore(conf.get("spark.ssl.server.truststore.location")) - context.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) - context.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) + ctxFactory.setTrustStore(conf.get("spark.ssl.server.truststore.location")) + ctxFactory.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) + ctxFactory.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) } + + val connector = new SslSelectChannelConnector(ctxFactory) + connector.setPort(port) connector } + private def setSslContextFactoryProps( + key: String, value: String, ctxFactory:SslContextFactory) = { + key match { + case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) + case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) + case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) + case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value) + case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value) + case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) + case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) + } + ctxFactory + + } + } private[spark] case class ServerInfo( diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 8584ac054d50..bd49559afff7 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -92,7 +92,7 @@ private[spark] abstract class WebUI( } } - def appUiAddressPrefix = if(conf.get("spark.ui.https.enabled", "false").toBoolean) { + def appUiAddressPrefix = if (conf.get("spark.ui.https.enabled", "false").toBoolean) { "https" } else { "http" From 64d7dc0eb846b7aba6ab42e4157b8f65bfb3f133 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 17:22:22 +0800 Subject: [PATCH 08/21] add redirect from http to https --- .../apache/spark/deploy/master/Master.scala | 7 +- .../apache/spark/deploy/worker/Worker.scala | 7 +- .../org/apache/spark/ui/JettyUtils.scala | 120 ++++++++++++------ .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/WebUI.scala | 8 +- .../scala/org/apache/spark/util/Utils.scala | 13 +- .../scala/org/apache/spark/ui/UISuite.scala | 29 +++-- 7 files changed, 110 insertions(+), 76 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 00d0216ebd94..f66caa6355d8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -123,12 +123,7 @@ private[spark] class Master( // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() - val masterWebUiUrlPrefix = if( conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https://" - } else{ - "http://" - } - masterWebUiUrl = masterWebUiUrlPrefix + masterPublicAddress + ":" + webUi.boundPort + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f073799a95d7..d590c262e9a3 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -138,12 +138,7 @@ private[spark] class Worker( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() - val workerWebUiUrlPrefix = if ( conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https://" - } else{ - "http://" - } - workerWebUiUrl = workerWebUiUrlPrefix + publicAddress + ":" + webUi.boundPort + workerWebUiUrl = "http://" + publicAddress + ":" + webUi.boundPort registerWithMaster() metricsSystem.registerSource(workerSource) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 574f9f165528..b60eae337cb9 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,23 +17,24 @@ package org.apache.spark.ui -import java.net.{InetSocketAddress, URL} +import java.net.URL import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} +import org.eclipse.jetty.http.HttpStatus import org.eclipse.jetty.util.ssl.SslContextFactory -import scala.annotation.tailrec +import scala.collection.mutable.StringBuilder import scala.language.implicitConversions -import scala.util.{Failure, Success, Try} import scala.xml.Node -import org.eclipse.jetty.server.{Connector, Server} +import org.eclipse.jetty.server.{Request, Connector, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector + import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} @@ -175,22 +176,38 @@ private[spark] object JettyUtils extends Logging { * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers. */ def startJettyServer( - hostName: String, - port: Int, - handlers: Seq[ServletContextHandler], - conf: SparkConf, - serverName: String = ""): ServerInfo = { + hostName: String, + port: Int, + handlers: Seq[ServletContextHandler], + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection - collection.setHandlers(handlers.toArray) addFilters(handlers, conf) // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server - val connector = getConnector(currentPort, conf) - connector.setHost(hostName) - server.addConnector(connector) + // Create a connector on port currentPort to listen for HTTP requests + val httpConnector = new SelectChannelConnector() + httpConnector.setPort(currentPort) + httpConnector.setHost(hostName) + + if (conf.get("spark.ui.https.enabled", "false").toBoolean) { + val securePort = (currentPort + 8000) % 65536 + val schema = "https" + // Create a connector on port currentPort+1 to listen for HTTPS requests + val connector = buildSslSelectChannelConnector(securePort, conf) + connector.setHost(hostName) + server.setConnectors(Seq(httpConnector,connector).toArray) + + // redirect the HTTP requests to HTTPS port + val newHandlers = Seq(createRedirectHttpsHandler(securePort, schema)) ++ handlers + collection.setHandlers(newHandlers.toArray) + } else { + server.addConnector(httpConnector) + collection.setHandlers(handlers.toArray) + } val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -210,43 +227,69 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - /** Attach a prefix to the given path, but avoid returning an empty path */ - private def attachPrefix(basePath: String, relativePath: String): String = { - if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") + private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { + val builder = newURIBuilder(scheme, server, port) + builder.append(path) + if (query != null && query.length > 0) builder.append('?').append(query) + builder.toString + } + + private def newURIBuilder(scheme: String, server: String, port: Int) = { + val builder = new StringBuilder + appendSchemeHostPort(builder, scheme, server, port) + builder } - private def getConnector(port: Int, conf: SparkConf): Connector = { - val https = conf.get("spark.ui.https.enabled", "false").toBoolean - if (https) { - buildSslSelectChannelConnector(port, conf) + private def appendSchemeHostPort(url: StringBuilder, scheme: String, server: String, port: Int) { + if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { + url.append(scheme).append("://").append('[').append(server).append(']') } else { - val connector = new SelectChannelConnector - connector.setPort(port) - connector + url.append(scheme).append("://").append(server) + } + if (port > 0) { + url.append(':').append(port) } } + def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + val redirectHandler: ContextHandler = new ContextHandler + redirectHandler.setContextPath("/") + redirectHandler.setHandler(new AbstractHandler { + @Override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { + if (baseRequest.isSecure) { + return + } + if (securePort > 0) { + val url = newURI(schema, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.sendRedirect(url) + } + else { + response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") + } + baseRequest.setHandled(true) + } + }) + redirectHandler + } + + /** Attach a prefix to the given path, but avoid returning an empty path */ + private def attachPrefix(basePath: String, relativePath: String): String = { + if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") + } + private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { val ctxFactory = new SslContextFactory() - val needAuth = conf.getBoolean("spark.client.https.need-auth", false) - conf.getAll .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } - ctxFactory.setNeedClientAuth(needAuth) - ctxFactory.setKeyManagerPassword(conf.get("spark.ssl.server.keystore.keypassword", "123456")) - ctxFactory.setKeyStorePath(conf.get("spark.ssl.server.keystore.location")) - ctxFactory.setKeyStorePassword(conf.get("spark.ssl.server.keystore.password", "123456")) - ctxFactory.setKeyStoreType(conf.get("spark.ssl.server.keystore.type", "jks")) - - if (needAuth && conf.contains("spark.ssl.server.truststore.location")) { - ctxFactory.setTrustStore(conf.get("spark.ssl.server.truststore.location")) - ctxFactory.setTrustStorePassword(conf.get("spark.ssl.server.truststore.password")) - ctxFactory.setTrustStoreType(conf.get("spark.ssl.server.truststore.type", "jks")) - } - val connector = new SslSelectChannelConnector(ctxFactory) connector.setPort(port) connector @@ -255,6 +298,7 @@ private[spark] object JettyUtils extends Logging { private def setSslContextFactoryProps( key: String, value: String, ctxFactory:SslContextFactory) = { key match { + case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) @@ -263,8 +307,8 @@ private[spark] object JettyUtils extends Logging { case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) } - ctxFactory + ctxFactory } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 283629ea1254..cccd59d122a9 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -99,7 +99,7 @@ private[spark] class SparkUI( */ private[spark] def appUIHostPort = publicHostName + ":" + boundPort - private[spark] def appUIAddress = s"$appUiAddressPrefix://$appUIHostPort" + private[spark] def appUIAddress = s"http://$appUIHostPort" } private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index bd49559afff7..1ddaf947741f 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -92,12 +92,6 @@ private[spark] abstract class WebUI( } } - def appUiAddressPrefix = if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - "https" - } else { - "http" - } - /** Initialize all components of the server. */ def initialize() @@ -106,7 +100,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at $appUiAddressPrefix://%s:%d".format( + logInfo(s"Started %s at http://%s:%d".format( className, publicHostName, boundPort)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ed063844323a..4c054f996d16 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -24,6 +24,7 @@ import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} import org.apache.log4j.PropertyConfigurator +import org.eclipse.jetty.util.MultiException import scala.collection.JavaConversions._ import scala.collection.Map @@ -1415,9 +1416,10 @@ private[spark] object Utils extends Logging { for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 + println(s"start $serviceName at tryport: $tryPort") try { val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") + println(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -1429,7 +1431,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - logWarning(s"Service$serviceString could not bind on port $tryPort. " + + println(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } @@ -1447,8 +1449,11 @@ private[spark] object Utils extends Logging { return true } isBindCollision(e.getCause) - case e: Exception => isBindCollision(e.getCause) - case _ => false + case e: MultiException => e.getThrowables.exists(isBindCollision) + case e: Exception => + isBindCollision(e.getCause) + case _ => + return false } } diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index ac503e4dc756..66d1237605a5 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -24,7 +24,6 @@ import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.servlet.ServletContextHandler -import org.eclipse.jetty.server.Server import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -126,28 +125,27 @@ class UISuite extends FunSuite { } test("jetty with https selects different port under contention") { - val startPort = 4040 - val server = new Server(startPort) - - Try { server.start() } match { - case Success(s) => - case Failure(e) => - // Either case server port is busy hence setup for test complete - } + val server = new ServerSocket(0) + val startPort = server.getLocalPort val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "true") - .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1") val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf) + "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2") // Allow some wiggle room in case ports on the machine are under contention val boundPort1 = serverInfo1.boundPort val boundPort2 = serverInfo2.boundPort assert(boundPort1 != startPort) assert(boundPort2 != startPort) assert(boundPort1 != boundPort2) + serverInfo1.server.stop() + serverInfo2.server.stop() + server.close() } test("jetty binds to port 0 correctly") { @@ -165,8 +163,10 @@ class UISuite extends FunSuite { test("jetty with https binds to port 0 correctly") { val sparkConf = new SparkConf() - .set("spark.ui.https.enabled", "true") - .set("spark.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.https.enabled", "false") + .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") + .set("spark.ui.ssl.server.keystore.password", "123456") + .set("spark.ui.ssl.server.keystore.keypassword", "123456") val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) val server = serverInfo.server @@ -177,6 +177,7 @@ class UISuite extends FunSuite { case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) case Failure(e) => } + serverInfo.server.stop() } test("verify appUIAddress contains the scheme") { From 89bf98644477050b149891c6cceb344316970652 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 17:25:30 +0800 Subject: [PATCH 09/21] revert debug code --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 4c054f996d16..fa14a839b170 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1416,10 +1416,10 @@ private[spark] object Utils extends Logging { for (offset <- 0 to maxRetries) { // Do not increment port if startPort is 0, which is treated as a special port val tryPort = if (startPort == 0) startPort else (startPort + offset) % 65536 - println(s"start $serviceName at tryport: $tryPort") + logInfo(s"start $serviceName at tryport: $tryPort") try { val (service, port) = startService(tryPort) - println(s"Successfully started service$serviceString on port $port.") + logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => @@ -1431,7 +1431,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - println(s"Service$serviceString could not bind on port $tryPort. " + + logInfo(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } From 677b746d4e1ef56cebfe0ec06b1e7ba7c5cd1206 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 21:09:01 +0800 Subject: [PATCH 10/21] add https/ssl to docs --- .../org/apache/spark/ui/JettyUtils.scala | 10 ++-- .../scala/org/apache/spark/ui/WebUI.scala | 3 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- docs/security.md | 56 ++++++++++++++++++- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b60eae337cb9..295ccd366988 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -176,11 +176,11 @@ private[spark] object JettyUtils extends Logging { * found. Return the jetty Server object, the chosen port, and a mutable collection of handlers. */ def startJettyServer( - hostName: String, - port: Int, - handlers: Seq[ServletContextHandler], - conf: SparkConf, - serverName: String = ""): ServerInfo = { + hostName: String, + port: Int, + handlers: Seq[ServletContextHandler], + conf: SparkConf, + serverName: String = ""): ServerInfo = { val collection = new ContextHandlerCollection addFilters(handlers, conf) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 1ddaf947741f..5d88ca403a67 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -100,8 +100,7 @@ private[spark] abstract class WebUI( assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) - logInfo(s"Started %s at http://%s:%d".format( - className, publicHostName, boundPort)) + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ec38119f554e..dd496d0d2a3e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1455,7 +1455,7 @@ private[spark] object Utils extends Logging { exception.setStackTrace(e.getStackTrace) throw exception } - logInfo(s"Service$serviceString could not bind on port $tryPort. " + + logWarning(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } diff --git a/docs/security.md b/docs/security.md index ec0523184d66..64fd65e6a95b 100644 --- a/docs/security.md +++ b/docs/security.md @@ -11,12 +11,66 @@ Spark currently supports authentication via a shared secret. Authentication can ## Web UI -The Spark UI can also be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting and by using [Jetty https/SSL](http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html) via the `spark.ui.https.enabled` setting. + +### Authentication + +A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. +### Encryption + +Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI server and browser client. The config `spark.ui.https.enabled` open switch for encryption, other configs of SSL encryption is as follows + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.ui.ssl.server.keystore.keypassword(none)The password for the specific key within the key store.
spark.ui.ssl.server.keystore.location(none)The file or URL of the SSL Key store.
spark.ui.ssl.server.keystore.password(none)The password for the key store.
spark.ui.ssl.server.keystore.typeJKSThe type of the key store (default "JKS").
spark.ui.ssl.client.https.needAuth(none) + Set true if SSL needs client authentication. +
spark.ui.ssl.server.truststore.location(none)The file name or URL of the trust store location.
spark.ui.ssl.server.truststore.password(none)The password for the trust store
spark.ui.ssl.server.truststore.typeJKSThe type of the trust store (default "JKS")
+ ## Event Logging If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. From a4ce9239e8ec912e0ba7d9a2a132de8b97cc8f71 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 22:25:00 +0800 Subject: [PATCH 11/21] fix docs --- docs/security.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/security.md b/docs/security.md index 64fd65e6a95b..0e1c8422df55 100644 --- a/docs/security.md +++ b/docs/security.md @@ -50,9 +50,7 @@ Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI se spark.ui.ssl.client.https.needAuth (none) - - Set true if SSL needs client authentication. - + Set true if SSL needs client authentication. spark.ui.ssl.server.truststore.location From 6c31dc71a932083e158ef9c6737f33ebe9de1348 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 1 Oct 2014 23:09:02 +0800 Subject: [PATCH 12/21] fix code format --- .../org/apache/spark/ui/JettyUtils.scala | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 295ccd366988..58559c94ff15 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,9 +194,13 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - val securePort = (currentPort + 8000) % 65536 + val securePort = if (currentPort + 1 > 65536) { + currentPort - 1 + } else { + currentPort + 1 + } val schema = "https" - // Create a connector on port currentPort+1 to listen for HTTPS requests + // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) connector.setHost(hostName) server.setConnectors(Seq(httpConnector,connector).toArray) @@ -251,15 +255,15 @@ private[spark] object JettyUtils extends Logging { } } - def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") redirectHandler.setHandler(new AbstractHandler { @Override def handle( - target: String, - baseRequest: Request, - request: HttpServletRequest, - response: HttpServletResponse): Unit = { + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { if (baseRequest.isSecure) { return } @@ -268,8 +272,7 @@ private[spark] object JettyUtils extends Logging { baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) response.sendRedirect(url) - } - else { + }else { response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") } baseRequest.setHandled(true) @@ -284,11 +287,10 @@ private[spark] object JettyUtils extends Logging { } private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { - val ctxFactory = new SslContextFactory() conf.getAll .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } - .foreach { case (k, v) => setSslContextFactoryProps(k,v,ctxFactory) } + .foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) } val connector = new SslSelectChannelConnector(ctxFactory) connector.setPort(port) From 7a898fbbb566172ee33f462357bb4fb02cced237 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 2 Oct 2014 00:08:41 +0800 Subject: [PATCH 13/21] fix securePort --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 58559c94ff15..b306e2b0e15a 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,10 +194,12 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - val securePort = if (currentPort + 1 > 65536) { - currentPort - 1 + // do not use 1 - 1024 ports for securePort + val tmpPort = (currentPort + 1) % 65536 + val securePort = if ( tmpPort <= 1024) { + tmpPort + 1024 } else { - currentPort + 1 + tmpPort } val schema = "https" // Create a connector on port securePort to listen for HTTPS requests From e5c87cb596ab3ae16ae52458c8e1edf6d1d3647a Mon Sep 17 00:00:00 2001 From: scwf Date: Fri, 3 Oct 2014 20:19:57 +0800 Subject: [PATCH 14/21] fix comments by JoshRosen --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 9 ++------- docs/security.md | 5 +++++ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b306e2b0e15a..35e4ef044ccd 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -194,13 +194,8 @@ private[spark] object JettyUtils extends Logging { httpConnector.setHost(hostName) if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - // do not use 1 - 1024 ports for securePort - val tmpPort = (currentPort + 1) % 65536 - val securePort = if ( tmpPort <= 1024) { - tmpPort + 1024 - } else { - tmpPort - } + // / If the new port wraps around, do not try a privilege port + val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 val schema = "https" // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) diff --git a/docs/security.md b/docs/security.md index 0e1c8422df55..83198d4ec049 100644 --- a/docs/security.md +++ b/docs/security.md @@ -27,6 +27,11 @@ Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI se + + + + + From a48c6fc585c7fdb9c859b00f9b7d7d46836706b8 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 5 Oct 2014 08:22:17 +0800 Subject: [PATCH 15/21] address JoshRosen's comments --- .../org/apache/spark/ui/JettyUtils.scala | 36 ++++++------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 51a422f067d6..194b6a943ffd 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -232,27 +232,17 @@ private[spark] object JettyUtils extends Logging { } private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { - val builder = newURIBuilder(scheme, server, port) - builder.append(path) - if (query != null && query.length > 0) builder.append('?').append(query) - builder.toString - } - - private def newURIBuilder(scheme: String, server: String, port: Int) = { val builder = new StringBuilder - appendSchemeHostPort(builder, scheme, server, port) - builder - } - private def appendSchemeHostPort(url: StringBuilder, scheme: String, server: String, port: Int) { if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { - url.append(scheme).append("://").append('[').append(server).append(']') + builder.append(scheme).append("://").append('[').append(server).append(']') } else { - url.append(scheme).append("://").append(server) - } - if (port > 0) { - url.append(':').append(port) + builder.append(scheme).append("://").append(server) } + builder.append(':').append(port) + builder.append(path) + if (query != null && query.length > 0) builder.append('?').append(query) + builder.toString } private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { @@ -267,14 +257,10 @@ private[spark] object JettyUtils extends Logging { if (baseRequest.isSecure) { return } - if (securePort > 0) { - val url = newURI(schema, baseRequest.getServerName, securePort, - baseRequest.getRequestURI, baseRequest.getQueryString) - response.setContentLength(0) - response.sendRedirect(url) - }else { - response.sendError(HttpStatus.FORBIDDEN_403, "!Secure") - } + val url = newURI(schema, baseRequest.getServerName, securePort, + baseRequest.getRequestURI, baseRequest.getQueryString) + response.setContentLength(0) + response.sendRedirect(url) baseRequest.setHandled(true) } }) @@ -298,7 +284,7 @@ private[spark] object JettyUtils extends Logging { } private def setSslContextFactoryProps( - key: String, value: String, ctxFactory:SslContextFactory) = { + key: String, value: String, ctxFactory: SslContextFactory) = { key match { case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) From 2dadb2f206a81d5c45c898833dd9561f53bdaf0e Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 7 Oct 2014 10:22:16 +0800 Subject: [PATCH 16/21] address vanzin's comments --- .../org/apache/spark/ui/JettyUtils.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 194b6a943ffd..9598779f9e99 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -199,14 +199,14 @@ private[spark] object JettyUtils extends Logging { if (conf.get("spark.ui.https.enabled", "false").toBoolean) { // / If the new port wraps around, do not try a privilege port val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 - val schema = "https" + val scheme = "https" // Create a connector on port securePort to listen for HTTPS requests val connector = buildSslSelectChannelConnector(securePort, conf) connector.setHost(hostName) server.setConnectors(Seq(httpConnector,connector).toArray) // redirect the HTTP requests to HTTPS port - val newHandlers = Seq(createRedirectHttpsHandler(securePort, schema)) ++ handlers + val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers collection.setHandlers(newHandlers.toArray) } else { server.addConnector(httpConnector) @@ -231,7 +231,8 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { + // to generate a new url string scheme://server:port+path + private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = { val builder = new StringBuilder if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { @@ -245,22 +246,23 @@ private[spark] object JettyUtils extends Logging { builder.toString } - private def createRedirectHttpsHandler(securePort: Int, schema: String): ContextHandler = { + private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") redirectHandler.setHandler(new AbstractHandler { - @Override def handle( - target: String, - baseRequest: Request, - request: HttpServletRequest, - response: HttpServletResponse): Unit = { + override def handle( + target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse): Unit = { if (baseRequest.isSecure) { return } - val url = newURI(schema, baseRequest.getServerName, securePort, + val httpsURL = newURL(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) - response.sendRedirect(url) + response.encodeRedirectURL(httpsURL) + response.sendRedirect(httpsURL) baseRequest.setHandled(true) } }) @@ -295,8 +297,6 @@ private[spark] object JettyUtils extends Logging { case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) } - - ctxFactory } } From 3b01d3a66d21854b5e053de89f3d301fee58b315 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 10 Oct 2014 16:03:27 +0800 Subject: [PATCH 17/21] add reference to method newURI --- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 9598779f9e99..371002abe4c5 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -231,8 +231,10 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - // to generate a new url string scheme://server:port+path - private def newURL(scheme: String, server: String, port: Int, path: String, query: String) = { + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. Based on: + // https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/ + // jetty/util/URIUtil.java#L726-L733 + private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { val builder = new StringBuilder if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { @@ -258,11 +260,11 @@ private[spark] object JettyUtils extends Logging { if (baseRequest.isSecure) { return } - val httpsURL = newURL(scheme, baseRequest.getServerName, securePort, + val httpsURI = newURI(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) - response.encodeRedirectURL(httpsURL) - response.sendRedirect(httpsURL) + response.encodeRedirectURL(httpsURI) + response.sendRedirect(httpsURI) baseRequest.setHandled(true) } }) From 9deebf326914152fd038451db8755837c5c68fbb Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Sat, 25 Apr 2015 18:08:08 +0800 Subject: [PATCH 18/21] rewrite using SSLOptions --- .../scala/org/apache/spark/SSLOptions.scala | 18 +++++- .../apache/spark/deploy/worker/Worker.scala | 4 +- .../org/apache/spark/ui/JettyUtils.scala | 62 ++++++------------- docs/configuration.md | 1 + docs/security.md | 44 ++++++++----- 5 files changed, 68 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 2cdc167f85af..1cc9238b00a5 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -45,8 +45,11 @@ private[spark] case class SSLOptions( keyStore: Option[File] = None, keyStorePassword: Option[String] = None, keyPassword: Option[String] = None, + keyStoreType: Option[String] = None, + needAuth: Boolean = false, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, + trustStoreType: Option[String] = None, protocol: Option[String] = None, enabledAlgorithms: Set[String] = Set.empty) { @@ -63,7 +66,9 @@ private[spark] case class SSLOptions( trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) keyPassword.foreach(sslContextFactory.setKeyManagerPassword) protocol.foreach(sslContextFactory.setProtocol) - sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) + if (enabledAlgorithms.nonEmpty) { + sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) + } Some(sslContextFactory) } else { @@ -149,12 +154,20 @@ private[spark] object SSLOptions extends Logging { val keyPassword = conf.getOption(s"$ns.keyPassword") .orElse(defaults.flatMap(_.keyPassword)) + val keyStoreType = conf.getOption(s"$ns.keyStoreType") + .orElse(defaults.flatMap(_.keyStoreType)) + + val needAuth = conf.getBoolean(s"$ns.needAuth", defaultValue = defaults.exists(_.needAuth)) + val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) .orElse(defaults.flatMap(_.trustStore)) val trustStorePassword = conf.getOption(s"$ns.trustStorePassword") .orElse(defaults.flatMap(_.trustStorePassword)) + val trustStoreType = conf.getOption(s"$ns.trustStoreType") + .orElse(defaults.flatMap(_.trustStoreType)) + val protocol = conf.getOption(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) @@ -168,8 +181,11 @@ private[spark] object SSLOptions extends Logging { keyStore, keyStorePassword, keyPassword, + keyStoreType, + needAuth, trustStore, trustStorePassword, + trustStoreType, protocol, enabledAlgorithms) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 894eb5baa7de..dc4dcef4ba46 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -199,7 +199,7 @@ private[worker] class Worker( for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) - actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + actor ! RegisterWorker(workerId, host, port, cores, memory, workerWebUiUrl) } } @@ -238,7 +238,7 @@ private[worker] class Worker( */ if (master != null) { master ! RegisterWorker( - workerId, host, port, cores, memory, webUi.boundPort, publicAddress) + workerId, host, port, cores, memory, workerWebUiUrl) } else { // We are retrying the initial registration tryRegisterAllMasters() diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 139d665543e4..e6f8b644c80a 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -21,23 +21,21 @@ import java.net.URL import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} -import scala.collection.mutable.StringBuilder +import scala.collection.mutable.{ArrayBuffer, StringBuilder} import scala.language.implicitConversions import scala.xml.Node -import org.eclipse.jetty.server.{Request, Connector, Server} +import org.eclipse.jetty.server.{Connector, Request, Server} import org.eclipse.jetty.server.handler._ import org.eclipse.jetty.servlet._ import org.eclipse.jetty.util.thread.QueuedThreadPool import org.eclipse.jetty.server.nio.SelectChannelConnector import org.eclipse.jetty.server.ssl.SslSelectChannelConnector -import org.eclipse.jetty.http.HttpStatus -import org.eclipse.jetty.util.ssl.SslContextFactory import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.{SSLOptions, Logging, SecurityManager, SparkConf} import org.apache.spark.util.Utils /** @@ -217,27 +215,31 @@ private[spark] object JettyUtils extends Logging { // Bind to the given port, or throw a java.net.BindException if the port is occupied def connect(currentPort: Int): (Server, Int) = { val server = new Server + val connectors = new ArrayBuffer[Connector] // Create a connector on port currentPort to listen for HTTP requests val httpConnector = new SelectChannelConnector() httpConnector.setPort(currentPort) - httpConnector.setHost(hostName) + connectors += httpConnector - if (conf.get("spark.ui.https.enabled", "false").toBoolean) { - // / If the new port wraps around, do not try a privilege port - val securePort = (currentPort + 1 - 1024) % (65536 - 1024) + 1024 + val sslContextFactory = + SSLOptions.parse(conf, "spark.ui.https").createJettySslContextFactory() + sslContextFactory.foreach { factory => + // If the new port wraps around, do not try a privilege port + val securePort = (currentPort + 400 - 1024) % (65536 - 1024) + 1024 val scheme = "https" // Create a connector on port securePort to listen for HTTPS requests - val connector = buildSslSelectChannelConnector(securePort, conf) - connector.setHost(hostName) - server.setConnectors(Seq(httpConnector,connector).toArray) + val connector = new SslSelectChannelConnector(factory) + connector.setPort(securePort) + connectors += connector // redirect the HTTP requests to HTTPS port - val newHandlers = Seq(createRedirectHttpsHandler(securePort, scheme)) ++ handlers - collection.setHandlers(newHandlers.toArray) - } else { - server.addConnector(httpConnector) - collection.setHandlers(handlers.toArray) + collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) } + + handlers.foreach(collection.addHandler) + connectors.foreach(_.setHost(hostName)) + server.setConnectors(connectors.toArray) + val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) @@ -301,32 +303,6 @@ private[spark] object JettyUtils extends Logging { private def attachPrefix(basePath: String, relativePath: String): String = { if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") } - - private def buildSslSelectChannelConnector(port: Int, conf: SparkConf): Connector = { - val ctxFactory = new SslContextFactory() - conf.getAll - .filter { case (k, v) => k.startsWith("spark.ui.ssl.") } - .foreach { case (k, v) => setSslContextFactoryProps(k, v, ctxFactory) } - - val connector = new SslSelectChannelConnector(ctxFactory) - connector.setPort(port) - connector - } - - private def setSslContextFactoryProps( - key: String, value: String, ctxFactory: SslContextFactory) = { - key match { - case "spark.ui.ssl.client.https.needAuth" => ctxFactory.setNeedClientAuth(value.toBoolean) - case "spark.ui.ssl.server.keystore.keypassword" => ctxFactory.setKeyManagerPassword(value) - case "spark.ui.ssl.server.keystore.location" => ctxFactory.setKeyStorePath(value) - case "spark.ui.ssl.server.keystore.password" => ctxFactory.setKeyStorePassword(value) - case "spark.ui.ssl.server.keystore.type" => ctxFactory.setKeyStoreType(value) - case "spark.ui.ssl.server.truststore.location" => ctxFactory.setTrustStore(value) - case "spark.ui.ssl.server.truststore.password" => ctxFactory.setTrustStorePassword(value) - case "spark.ui.ssl.server.truststore.type" => ctxFactory.setTrustStoreType(value) - } - } - } private[spark] case class ServerInfo( diff --git a/docs/configuration.md b/docs/configuration.md index d587b91124cb..c3c7e2eee1fd 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1340,6 +1340,7 @@ Apart from these, the following properties are also available, and may be useful The reference list of protocols one can find on this page. + Note: If not set, it will use the default cipher suites of JVM. diff --git a/docs/security.md b/docs/security.md index e748187c53e9..43b246032bad 100644 --- a/docs/security.md +++ b/docs/security.md @@ -33,44 +33,58 @@ Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI se - - - + + + - - - + + + - + - + - + - + - + - + - - - + + + + + + + +
Property NameDefaultMeaning
spark.ui.https.enabledfalseWhether to enable https in web ui.
spark.ui.ssl.server.keystore.keypassword (none)
Whether to enable https in web ui.
spark.ui.ssl.server.keystore.keypassword(none)The password for the specific key within the key store.spark.ui.https.keyStore(none)The file or URL of the SSL Key store.
spark.ui.ssl.server.keystore.location(none)The file or URL of the SSL Key store.spark.ui.https.keyStorePassword(none)The password for the key store.
spark.ui.ssl.server.keystore.passwordspark.ui.https.keyPassword (none)The password for the key store.The password for the specific key within the key store.
spark.ui.ssl.server.keystore.typespark.ui.https.keyStoreType JKS The type of the key store (default "JKS").
spark.ui.ssl.client.https.needAuthspark.ui.https.needAuth (none) Set true if SSL needs client authentication.
spark.ui.ssl.server.truststore.locationspark.ui.https.trustStore (none) The file name or URL of the trust store location.
spark.ui.ssl.server.truststore.passwordspark.ui.https.trustStorePassword (none) The password for the trust store
spark.ui.ssl.server.truststore.typeJKSThe type of the trust store (default "JKS")spark.ui.https.protocolNone + A protocol name. The protocol must be supported by JVM. The reference list of protocols + one can find on this + page. +
spark.ui.https.enabledAlgorithmsEmptyA comma separated list of ciphers. The specified ciphers must be supported by JVM. + The reference list of protocols one can find on + this + page. + Note: If not set, it will use the default cipher suites of JVM. +
From 7def14e7e87dbcf4da0d37416e3b464f3eb10e72 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Sat, 25 Apr 2015 18:30:37 +0800 Subject: [PATCH 19/21] fix uisuites --- .../src/test/scala/org/apache/spark/ui/UISuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 65d0a2139bcf..36676d3d8b4a 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -94,9 +94,9 @@ class UISuite extends FunSuite { val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "true") - .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") - .set("spark.ui.ssl.server.keystore.password", "123456") - .set("spark.ui.ssl.server.keystore.keypassword", "123456") + .set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ui.https.keyStorePassword", "123456") + .set("spark.ui.https.keyPassword", "123456") val serverInfo1 = JettyUtils.startJettyServer( "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1") val serverInfo2 = JettyUtils.startJettyServer( @@ -128,9 +128,9 @@ class UISuite extends FunSuite { test("jetty with https binds to port 0 correctly") { val sparkConf = new SparkConf() .set("spark.ui.https.enabled", "false") - .set("spark.ui.ssl.server.keystore.location", "./src/test/resources/spark.keystore") - .set("spark.ui.ssl.server.keystore.password", "123456") - .set("spark.ui.ssl.server.keystore.keypassword", "123456") + .set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ui.https.keyStorePassword", "123456") + .set("spark.ui.https.keyPassword", "123456") val serverInfo = JettyUtils.startJettyServer( "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) val server = serverInfo.server From 18982b40786112b670c659b982f129abf092dac8 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Mon, 4 May 2015 15:30:51 +0800 Subject: [PATCH 20/21] use spark.ssl.ui.* instead, update docs, fixes in Suite and other minor improvements --- .../scala/org/apache/spark/SSLOptions.scala | 3 + .../org/apache/spark/SecurityManager.scala | 2 + .../org/apache/spark/ui/JettyUtils.scala | 37 ++-- .../scala/org/apache/spark/ui/WebUI.scala | 2 +- .../scala/org/apache/spark/ui/UISuite.scala | 165 +++++++++++------- docs/configuration.md | 21 +++ docs/security.md | 133 ++++++-------- 7 files changed, 198 insertions(+), 165 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 1cc9238b00a5..6ed2f5a49bfd 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -124,9 +124,12 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory * $ - `[ns].keyStorePassword` - a password to the key-store file * $ - `[ns].keyPassword` - a password to the private key + * $ - `[ns].keyStoreType` - the type of the key-store + * $ - `[ns].needAuth` - whether SSL needs client authentication * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file + * $ - `[ns].trustStoreType` - the type of trust-store * $ - `[ns].protocol` - a protocol name supported by a particular Java version * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers * diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 3653f724ba19..bdea91c06cc7 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -241,9 +241,11 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // configuration at a specified namespace. The namespace *must* start with spark.ssl. val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions)) val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions)) + val webUISSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.ui", Some(defaultSSLOptions)) logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") + logDebug(s"SSLConfiguration for Akka: $webUISSLOptions") val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { val trustStoreManagers = diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index e6f8b644c80a..b9d1fe8f818b 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui -import java.net.URL +import java.net.{URI, URL} import javax.servlet.DispatcherType import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -205,6 +205,7 @@ private[spark] object JettyUtils extends Logging { def startJettyServer( hostName: String, port: Int, + securityManager: SecurityManager, handlers: Seq[ServletContextHandler], conf: SparkConf, serverName: String = ""): ServerInfo = { @@ -221,8 +222,7 @@ private[spark] object JettyUtils extends Logging { httpConnector.setPort(currentPort) connectors += httpConnector - val sslContextFactory = - SSLOptions.parse(conf, "spark.ui.https").createJettySslContextFactory() + val sslContextFactory = securityManager.webUISSLOptions.createJettySslContextFactory() sslContextFactory.foreach { factory => // If the new port wraps around, do not try a privilege port val securePort = (currentPort + 400 - 1024) % (65536 - 1024) + 1024 @@ -259,23 +259,6 @@ private[spark] object JettyUtils extends Logging { ServerInfo(server, boundPort, collection) } - // Create a new URI from the arguments, handling IPv6 host encoding and default ports. Based on: - // https://github.com/eclipse/jetty.project/blob/master/jetty-util/src/main/java/org/eclipse/ - // jetty/util/URIUtil.java#L726-L733 - private def newURI(scheme: String, server: String, port: Int, path: String, query: String) = { - val builder = new StringBuilder - - if (server.indexOf(':') >= 0 && server.charAt(0) != '[') { - builder.append(scheme).append("://").append('[').append(server).append(']') - } else { - builder.append(scheme).append("://").append(server) - } - builder.append(':').append(port) - builder.append(path) - if (query != null && query.length > 0) builder.append('?').append(query) - builder.toString - } - private def createRedirectHttpsHandler(securePort: Int, scheme: String): ContextHandler = { val redirectHandler: ContextHandler = new ContextHandler redirectHandler.setContextPath("/") @@ -288,7 +271,7 @@ private[spark] object JettyUtils extends Logging { if (baseRequest.isSecure) { return } - val httpsURI = newURI(scheme, baseRequest.getServerName, securePort, + val httpsURI = createRedirectURI(scheme, baseRequest.getServerName, securePort, baseRequest.getRequestURI, baseRequest.getQueryString) response.setContentLength(0) response.encodeRedirectURL(httpsURI) @@ -299,6 +282,18 @@ private[spark] object JettyUtils extends Logging { redirectHandler } + // Create a new URI from the arguments, handling IPv6 host encoding and default ports. + private def createRedirectURI( + scheme: String, server: String, port: Int, path: String, query: String) = { + val redirectServer = if (server.contains(":") && !server.startsWith("[")) { + s"[${server}]" + } else { + server + } + val authority = s"$redirectServer:$port" + new URI(scheme, authority, path, query, null).toString + } + /** Attach a prefix to the given path, but avoid returning an empty path */ private def attachPrefix(basePath: String, relativePath: String): String = { if (basePath == "") relativePath else (basePath + relativePath).stripSuffix("/") diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index f9860d1a5ce7..99a3ba7a4f7e 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -117,7 +117,7 @@ private[spark] abstract class WebUI( def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) + serverInfo = Some(startJettyServer("0.0.0.0", port, securityManager, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 36676d3d8b4a..78980c22865f 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -17,18 +17,18 @@ package org.apache.spark.ui -import java.net.ServerSocket +import java.net.{BindException, ServerSocket} import scala.io.Source -import scala.util.{Failure, Success, Try} +import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.FunSuite import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ import org.apache.spark.LocalSparkContext._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} class UISuite extends FunSuite { @@ -71,77 +71,106 @@ class UISuite extends FunSuite { } test("jetty selects different port under contention") { - val server = new ServerSocket(0) - val startPort = server.getLocalPort - val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), new SparkConf) - // Allow some wiggle room in case ports on the machine are under contention - val boundPort1 = serverInfo1.boundPort - val boundPort2 = serverInfo2.boundPort - assert(boundPort1 != startPort) - assert(boundPort2 != startPort) - assert(boundPort1 != boundPort2) - serverInfo1.server.stop() - serverInfo2.server.stop() - server.close() + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + val conf = new SparkConf + val securityManager = new SecurityManager(conf) + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), conf) + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), conf) + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + UISuite.stopServer(serverInfo1.server) + UISuite.stopServer(serverInfo2.server) + UISuite.closeSocket(server) + } } test("jetty with https selects different port under contention") { - val server = new ServerSocket(0) - val startPort = server.getLocalPort - - val sparkConf = new SparkConf() - .set("spark.ui.https.enabled", "true") - .set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore") - .set("spark.ui.https.keyStorePassword", "123456") - .set("spark.ui.https.keyPassword", "123456") - val serverInfo1 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server1") - val serverInfo2 = JettyUtils.startJettyServer( - "0.0.0.0", startPort, Seq[ServletContextHandler](), sparkConf, "server2") - // Allow some wiggle room in case ports on the machine are under contention - val boundPort1 = serverInfo1.boundPort - val boundPort2 = serverInfo2.boundPort - assert(boundPort1 != startPort) - assert(boundPort2 != startPort) - assert(boundPort1 != boundPort2) - serverInfo1.server.stop() - serverInfo2.server.stop() - server.close() + var server: ServerSocket = null + var serverInfo1: ServerInfo = null + var serverInfo2: ServerInfo = null + try { + server = new ServerSocket(0) + val startPort = server.getLocalPort + + val sparkConf = new SparkConf() + .set("spark.ssl.ui.enabled", "true") + .set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ssl.ui.keyStorePassword", "123456") + .set("spark.ssl.ui.keyPassword", "123456") + val securityManager = new SecurityManager(sparkConf) + serverInfo1 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), sparkConf, "server1") + serverInfo2 = JettyUtils.startJettyServer( + "0.0.0.0", startPort, securityManager, Seq[ServletContextHandler](), sparkConf, "server2") + // Allow some wiggle room in case ports on the machine are under contention + val boundPort1 = serverInfo1.boundPort + val boundPort2 = serverInfo2.boundPort + assert(boundPort1 != startPort) + assert(boundPort2 != startPort) + assert(boundPort1 != boundPort2) + } finally { + UISuite.stopServer(serverInfo1.server) + UISuite.stopServer(serverInfo2.server) + UISuite.closeSocket(server) + } } test("jetty binds to port 0 correctly") { - val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), new SparkConf) - val server = serverInfo.server - val boundPort = serverInfo.boundPort - assert(server.getState === "STARTED") - assert(boundPort != 0) - Try { new ServerSocket(boundPort) } match { - case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) - case Failure(e) => + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + val conf = new SparkConf + val securityManager = new SecurityManager(conf) + try { + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, securityManager, Seq[ServletContextHandler](), conf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + UISuite.stopServer(serverInfo.server) + UISuite.closeSocket(socket) } } test("jetty with https binds to port 0 correctly") { - val sparkConf = new SparkConf() - .set("spark.ui.https.enabled", "false") - .set("spark.ui.https.keyStore", "./src/test/resources/spark.keystore") - .set("spark.ui.https.keyStorePassword", "123456") - .set("spark.ui.https.keyPassword", "123456") - val serverInfo = JettyUtils.startJettyServer( - "0.0.0.0", 0, Seq[ServletContextHandler](), sparkConf) - val server = serverInfo.server - val boundPort = serverInfo.boundPort - assert(server.getState === "STARTED") - assert(boundPort != 0) - Try { new ServerSocket(boundPort) } match { - case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort)) - case Failure(e) => + var socket: ServerSocket = null + var serverInfo: ServerInfo = null + try { + val sparkConf = new SparkConf() + .set("spark.ssl.ui.enabled", "false") + .set("spark.ssl.ui.keyStore", "./src/test/resources/spark.keystore") + .set("spark.ssl.ui.keyStorePassword", "123456") + .set("spark.ssl.ui.keyPassword", "123456") + val securityManager = new SecurityManager(sparkConf) + serverInfo = JettyUtils.startJettyServer( + "0.0.0.0", 0, securityManager, Seq[ServletContextHandler](), sparkConf) + val server = serverInfo.server + val boundPort = serverInfo.boundPort + assert(server.getState === "STARTED") + assert(boundPort != 0) + intercept[BindException] { + socket = new ServerSocket(boundPort) + } + } finally { + UISuite.stopServer(serverInfo.server) + UISuite.closeSocket(socket) } - serverInfo.server.stop() } test("verify appUIAddress contains the scheme") { @@ -162,3 +191,13 @@ class UISuite extends FunSuite { } } } + +object UISuite { + def stopServer(server: Server): Unit = { + if (server != null) server.stop + } + + def closeSocket(socket: ServerSocket): Unit = { + if (socket != null) socket.close + } +} diff --git a/docs/configuration.md b/docs/configuration.md index c3c7e2eee1fd..429e01a99733 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1365,6 +1365,13 @@ Apart from these, the following properties are also available, and may be useful A password to the key-store. + + spark.ssl.keyStoreType + JKS + + The type of the key-store. + + spark.ssl.protocol None @@ -1374,6 +1381,13 @@ Apart from these, the following properties are also available, and may be useful page. + + spark.ssl.needAuth + false + + Set true if SSL needs client authentication. + + spark.ssl.trustStore None @@ -1389,6 +1403,13 @@ Apart from these, the following properties are also available, and may be useful A password to the trust-store. + + spark.ssl.trustStoreType + JKS + + The type of the trust-store. + + diff --git a/docs/security.md b/docs/security.md index 43b246032bad..00225c8ac788 100644 --- a/docs/security.md +++ b/docs/security.md @@ -4,107 +4,80 @@ displayTitle: Spark Security title: Security --- -Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. The shared secret is created as follows: - -* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. -* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. +Spark currently supports authentication via a shared secret. Authentication can be configured to be +on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark +communication protocols do authentication using the shared secret. This authentication is a basic +handshake to make sure both sides have the same shared secret and are allowed to communicate. If the +shared secret is not identical they will not be allowed to communicate. The shared secret is created +as follows: + +* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` +will automatically handle generating and distributing the shared secret. Each application will use +a unique shared secret. +* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be +configured on each of the nodes. This secret will be used by all the Master/Workers and applications. ## Web UI -The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting and by using [Jetty https/SSL](http://www.eclipse.org/jetty/documentation/current/configuring-ssl.html) via the `spark.ui.https.enabled` setting. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) +via the `spark.ui.filters` setting and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via the `spark.ui.https.enabled` setting. ### Authentication -A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. - -Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. - -Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. - -### Encryption +A user may want to secure the UI if it has data that other users should not be allowed to see. The +javax servlet filter specified by the user can authenticate the user and then once the user is logged +in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. +The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that +the user who started the application always has view access to the UI. On YARN, the Spark UI uses +the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. -Spark use SSL(Secure Sockets Layer) to establish an encrypted link between UI server and browser client. The config `spark.ui.https.enabled` open switch for encryption, other configs of SSL encryption is as follows +Spark also supports modify ACLs to control who has access to modify a running Spark application. +This includes things like killing the application or a task. This is controlled by the configs +`spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order +to use the kill button on the web UI it might be necessary to add the users in the modify acls to +the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
Property NameDefaultMeaning
spark.ui.https.enabledfalseWhether to enable https in web ui.
spark.ui.https.keyStore(none)The file or URL of the SSL Key store.
spark.ui.https.keyStorePassword(none)The password for the key store.
spark.ui.https.keyPassword(none)The password for the specific key within the key store.
spark.ui.https.keyStoreTypeJKSThe type of the key store (default "JKS").
spark.ui.https.needAuth(none)Set true if SSL needs client authentication.
spark.ui.https.trustStore(none)The file name or URL of the trust store location.
spark.ui.https.trustStorePassword(none)The password for the trust store
spark.ui.https.protocolNone - A protocol name. The protocol must be supported by JVM. The reference list of protocols - one can find on this - page. -
spark.ui.https.enabledAlgorithmsEmptyA comma separated list of ciphers. The specified ciphers must be supported by JVM. - The reference list of protocols one can find on - this - page. - Note: If not set, it will use the default cipher suites of JVM. -
+Spark allows for a set of administrators to be specified in the acls who always have view and modify + permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful + on a shared cluster where you might have administrators or support staff who help users debug applications. ## Event Logging -If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. +If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) +should be manually created and have the proper permissions set on it. If you want those log files secured, +the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should +be the super user who is running the history server and the group permissions should be restricted to +super user group. This will allow all users to write to the directory but will prevent unprivileged +users from removing or renaming a file unless they own the file or directory. The event log files will +be created by Spark with permissions such that only the user and group have read and write access. ## Encryption -Spark supports SSL for Akka and HTTP (for broadcast and file server) protocols. However SSL is not supported yet for WebUI and block transfer service. +Spark supports SSL for Akka and HTTP protocols. However SSL is not supported yet for block transfer service. -Connection encryption (SSL) configuration is organized hierarchically. The user can configure the default SSL settings which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka` and HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`. The full breakdown can be found on the [configuration page](configuration.html). +Connection encryption (SSL) configuration is organized hierarchically. The user can configure the +default SSL settings which will be used for all the supported communication protocols unless they +are overwritten by protocol-specific settings. This way the user can easily provide the common settings +for all the protocols without disabling the ability to configure each one individually. The common SSL +settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at +`spark.ssl.akka`, HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`, and HTTP +for web UI configuration is at `spark.ssl.ui`. The full breakdown can be found on the [configuration page](configuration.html). SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode -The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. +The key-store can be prepared on the client side and then distributed and used by the executors as +the part of the application. It is possible because the user is able to deploy files before the application +is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. +The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. ### Standalone mode -The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. +The user needs to provide key-stores and configuration options for master and workers. They have to +be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` +environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors +to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished +by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided +by user on the client side, are not used by the executors. ### Preparing the key-stores Key-stores can be generated by `keytool` program. The reference documentation for this tool is From dfbe1d6f4cc2405067ec9f29c8ad6d9037304a89 Mon Sep 17 00:00:00 2001 From: WangTaoTheTonic Date: Tue, 5 May 2015 10:58:22 +0800 Subject: [PATCH 21/21] per Marcelo's comments --- .../scala/org/apache/spark/SSLOptions.scala | 20 ++++-- .../org/apache/spark/SecurityManager.scala | 2 +- .../org/apache/spark/ui/JettyUtils.scala | 2 +- docs/configuration.md | 2 +- docs/security.md | 71 +++++-------------- 5 files changed, 35 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 6ed2f5a49bfd..73313988f1c7 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -35,8 +35,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory * @param keyStore a path to the key-store file * @param keyStorePassword a password to access the key-store file * @param keyPassword a password to access the private key in the key-store + * @param keyStoreType the type of the key-store + * @param needClientAuth set true if SSL needs client authentication * @param trustStore a path to the trust-store file * @param trustStorePassword a password to access the trust-store file + * @param trustStoreType the type of the trust-store * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java * @param enabledAlgorithms a set of encryption algorithms to use */ @@ -46,7 +49,7 @@ private[spark] case class SSLOptions( keyStorePassword: Option[String] = None, keyPassword: Option[String] = None, keyStoreType: Option[String] = None, - needAuth: Boolean = false, + needClientAuth: Boolean = false, trustStore: Option[File] = None, trustStorePassword: Option[String] = None, trustStoreType: Option[String] = None, @@ -61,10 +64,14 @@ private[spark] case class SSLOptions( val sslContextFactory = new SslContextFactory() keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath)) - trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) keyStorePassword.foreach(sslContextFactory.setKeyStorePassword) - trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) keyPassword.foreach(sslContextFactory.setKeyManagerPassword) + keyStoreType.foreach(sslContextFactory.setKeyStoreType) + if (needClientAuth) { + trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath)) + trustStorePassword.foreach(sslContextFactory.setTrustStorePassword) + trustStoreType.foreach(sslContextFactory.setTrustStoreType) + } protocol.foreach(sslContextFactory.setProtocol) if (enabledAlgorithms.nonEmpty) { sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*) @@ -125,7 +132,7 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].keyStorePassword` - a password to the key-store file * $ - `[ns].keyPassword` - a password to the private key * $ - `[ns].keyStoreType` - the type of the key-store - * $ - `[ns].needAuth` - whether SSL needs client authentication + * $ - `[ns].needClientAuth` - whether SSL needs client authentication * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current * directory * $ - `[ns].trustStorePassword` - a password to the trust-store file @@ -160,7 +167,8 @@ private[spark] object SSLOptions extends Logging { val keyStoreType = conf.getOption(s"$ns.keyStoreType") .orElse(defaults.flatMap(_.keyStoreType)) - val needAuth = conf.getBoolean(s"$ns.needAuth", defaultValue = defaults.exists(_.needAuth)) + val needClientAuth = + conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth)) val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_)) .orElse(defaults.flatMap(_.trustStore)) @@ -185,7 +193,7 @@ private[spark] object SSLOptions extends Logging { keyStorePassword, keyPassword, keyStoreType, - needAuth, + needClientAuth, trustStore, trustStorePassword, trustStoreType, diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index bdea91c06cc7..54a2f2c80588 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -245,7 +245,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions") logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions") - logDebug(s"SSLConfiguration for Akka: $webUISSLOptions") + logDebug(s"SSLConfiguration for web UI: $webUISSLOptions") val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) { val trustStoreManagers = diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index b9d1fe8f818b..cf51a81f3637 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -35,7 +35,7 @@ import org.eclipse.jetty.server.ssl.SslSelectChannelConnector import org.json4s.JValue import org.json4s.jackson.JsonMethods.{pretty, render} -import org.apache.spark.{SSLOptions, Logging, SecurityManager, SparkConf} +import org.apache.spark.{Logging, SSLOptions, SecurityManager, SparkConf} import org.apache.spark.util.Utils /** diff --git a/docs/configuration.md b/docs/configuration.md index 429e01a99733..b8957caf87ac 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1382,7 +1382,7 @@ Apart from these, the following properties are also available, and may be useful - spark.ssl.needAuth + spark.ssl.needClientAuth false Set true if SSL needs client authentication. diff --git a/docs/security.md b/docs/security.md index 00225c8ac788..048dc799f37c 100644 --- a/docs/security.md +++ b/docs/security.md @@ -4,80 +4,45 @@ displayTitle: Spark Security title: Security --- -Spark currently supports authentication via a shared secret. Authentication can be configured to be -on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark -communication protocols do authentication using the shared secret. This authentication is a basic -handshake to make sure both sides have the same shared secret and are allowed to communicate. If the -shared secret is not identical they will not be allowed to communicate. The shared secret is created -as follows: - -* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` -will automatically handle generating and distributing the shared secret. Each application will use -a unique shared secret. -* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be -configured on each of the nodes. This secret will be used by all the Master/Workers and applications. +Spark currently supports authentication via a shared secret. Authentication can be configured to be on via the `spark.authenticate` configuration parameter. This parameter controls whether the Spark communication protocols do authentication using the shared secret. This authentication is a basic handshake to make sure both sides have the same shared secret and are allowed to communicate. If the shared secret is not identical they will not be allowed to communicate. The shared secret is created as follows: + +* For Spark on [YARN](running-on-yarn.html) deployments, configuring `spark.authenticate` to `true` will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. +* For other types of Spark deployments, the Spark parameter `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. ## Web UI -The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) -via the `spark.ui.filters` setting and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via the `spark.ui.https.enabled` setting. +The Spark UI can be secured by using [javax servlet filters](http://docs.oracle.com/javaee/6/api/javax/servlet/Filter.html) via the `spark.ui.filters` setting +and by using [https/SSL](http://en.wikipedia.org/wiki/HTTPS) via the `spark.ui.https.enabled` setting. ### Authentication -A user may want to secure the UI if it has data that other users should not be allowed to see. The -javax servlet filter specified by the user can authenticate the user and then once the user is logged -in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. -The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that -the user who started the application always has view access to the UI. On YARN, the Spark UI uses -the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. - -Spark also supports modify ACLs to control who has access to modify a running Spark application. -This includes things like killing the application or a task. This is controlled by the configs -`spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order -to use the kill button on the web UI it might be necessary to add the users in the modify acls to -the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. +A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view ACLs to make sure they are authorized to view the UI. The configs `spark.acls.enable` and `spark.ui.view.acls` control the behavior of the ACLs. Note that the user who started the application always has view access to the UI. On YARN, the Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. -Spark allows for a set of administrators to be specified in the acls who always have view and modify - permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful - on a shared cluster where you might have administrators or support staff who help users debug applications. +Spark also supports modify ACLs to control who has access to modify a running Spark application. This includes things like killing the application or a task. This is controlled by the configs `spark.acls.enable` and `spark.modify.acls`. Note that if you are authenticating the web UI, in order to use the kill button on the web UI it might be necessary to add the users in the modify acls to the view acls also. On YARN, the modify acls are passed in and control who has modify access via YARN interfaces. +Spark allows for a set of administrators to be specified in the acls who always have view and modify permissions to all the applications. is controlled by the config `spark.admin.acls`. This is useful on a shared cluster where you might have administrators or support staff who help users debug applications. ## Event Logging -If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) -should be manually created and have the proper permissions set on it. If you want those log files secured, -the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should -be the super user who is running the history server and the group permissions should be restricted to -super user group. This will allow all users to write to the directory but will prevent unprivileged -users from removing or renaming a file unless they own the file or directory. The event log files will -be created by Spark with permissions such that only the user and group have read and write access. +If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secured, the permissions should be set to `drwxrwxrwxt` for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access. ## Encryption Spark supports SSL for Akka and HTTP protocols. However SSL is not supported yet for block transfer service. -Connection encryption (SSL) configuration is organized hierarchically. The user can configure the -default SSL settings which will be used for all the supported communication protocols unless they -are overwritten by protocol-specific settings. This way the user can easily provide the common settings -for all the protocols without disabling the ability to configure each one individually. The common SSL -settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at -`spark.ssl.akka`, HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`, and HTTP -for web UI configuration is at `spark.ssl.ui`. The full breakdown can be found on the [configuration page](configuration.html). +Connection encryption (SSL) configuration is organized hierarchically. The user can configure the default SSL settings +which will be used for all the supported communication protocols unless they are overwritten by protocol-specific settings. +This way the user can easily provide the common settings for all the protocols without disabling the ability to configure each one individually. +The common SSL settings are at `spark.ssl` namespace in Spark configuration, while Akka SSL configuration is at `spark.ssl.akka`, +HTTP for broadcast and file server SSL configuration is at `spark.ssl.fs`, and HTTP for web UI configuration is at `spark.ssl.ui`. +The full breakdown can be found on the [configuration page](configuration.html). SSL must be configured on each node and configured for each component involved in communication using the particular protocol. ### YARN mode -The key-store can be prepared on the client side and then distributed and used by the executors as -the part of the application. It is possible because the user is able to deploy files before the application -is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. -The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. +The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark. ### Standalone mode -The user needs to provide key-stores and configuration options for master and workers. They have to -be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` -environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors -to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished -by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided -by user on the client side, are not used by the executors. +The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors. ### Preparing the key-stores Key-stores can be generated by `keytool` program. The reference documentation for this tool is