diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index be19179b00a4..efcec1134aec 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -56,6 +56,7 @@ private[spark] case class SSLOptions( trustStorePassword: Option[String] = None, trustStoreType: Option[String] = None, protocol: Option[String] = None, + port: Int = 0, enabledAlgorithms: Set[String] = Set.empty) extends Logging { @@ -147,6 +148,7 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].trustStorePassword` - a password to the trust-store file * $ - `[ns].trustStoreType` - the type of trust-store * $ - `[ns].protocol` - a protocol name supported by a particular Java version + * $ - `[ns].port` - a port number * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers * * For a list of protocols and ciphers supported by particular Java versions, you may go to @@ -191,6 +193,8 @@ private[spark] object SSLOptions extends Logging { val protocol = conf.getOption(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) + val port = conf.getInt(s"$ns.port", defaultValue = defaults.map(_.port).getOrElse(0)) + val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms") .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet) .orElse(defaults.map(_.enabledAlgorithms)) @@ -207,6 +211,7 @@ private[spark] object SSLOptions extends Logging { trustStorePassword, trustStoreType, protocol, + port, enabledAlgorithms) } diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9..8c9825e4a004 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -78,7 +78,7 @@ private[spark] abstract class RestSubmissionServer( * Map the servlets to their corresponding contexts and attach them to a server. * Return a 2-tuple of the started server and the bound port. */ - private def doStart(startPort: Int): (Server, Int) = { + private def doStart(startPort: Int, securePort: Int): (Server, Int) = { val threadPool = new QueuedThreadPool threadPool.setDaemon(true) val server = new Server(threadPool) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index dc70eb82d2b5..b31e02061288 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -76,7 +76,7 @@ private[spark] class NettyBlockTransferService( /** Creates and binds the TransportServer, possibly trying multiple ports. */ private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = { - def startService(port: Int): (TransportServer, Int) = { + def startService(port: Int, securePort: Int): (TransportServer, Int) = { val server = transportContext.createServer(bindAddress, port, bootstraps.asJava) (server, server.getPort) } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index e51649a1ecce..22478f975612 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -444,7 +444,7 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager) if (!config.clientMode) { - val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => + val startNettyRpcEnv: (Int, Int) => (NettyRpcEnv, Int) = { (actualPort, securePort) => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 35c3c8d00f99..d42c429a97a5 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -284,7 +284,7 @@ private[spark] object JettyUtils extends Logging { } // Bind to the given port, or throw a java.net.BindException if the port is occupied - def connect(currentPort: Int): (Server, Int) = { + def connect(currentPort: Int, securePort: Int = sslOptions.port): (Server, Int) = { val pool = new QueuedThreadPool if (serverName.nonEmpty) { pool.setName(serverName) @@ -307,15 +307,26 @@ private[spark] object JettyUtils extends Logging { connectors += httpConnector sslOptions.createJettySslContextFactory().foreach { factory => - // If the new port wraps around, do not try a privileged port. + + require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536), + "securePort should be between 1024 and 65535 (inclusive)," + + " or 0 for determined automatically.") + val securePort = if (currentPort != 0) { - (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + if (sslOptions.port == 0) { + // If the new port wraps around, do not try a privileged port + (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + } else { + // use sslOptions.port value as securePort + sslOptions.port + } } else { 0 } val scheme = "https" - // Create a connector on port securePort to listen for HTTPS requests + // Create a connector on port securePort to listen for HTTPS requests. + val connector = new ServerConnector(server, factory) connector.setPort(securePort) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bfc609419ccd..2ff5063fe224 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2142,9 +2142,10 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - startService: Int => (T, Int), + startService: (Int, Int) => (T, Int), conf: SparkConf, - serviceName: String = ""): (T, Int) = { + serviceName: String = "", + securePort: Int = 0): (T, Int) = { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") @@ -2160,14 +2161,15 @@ private[spark] object Utils extends Logging { ((startPort + offset - 1024) % (65536 - 1024)) + 1024 } try { - val (service, port) = startService(tryPort) + val (service, port) = startService(tryPort, securePort + offset) logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " + - s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " + + s"$maxRetries retries (starting from $startPort and $securePort)! " + + s"Consider explicitly setting " + s"the appropriate port for the service$serviceString (for example spark.ui.port " + s"for SparkUI) to an available port or increasing spark.port.maxRetries." val exception = new BindException(exceptionMessage) diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 159b448e05b0..aee242eade23 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -113,6 +113,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF") conf.set("spark.ssl.protocol", "SSLv3") + conf.set("spark.ssl.port", "18999") val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None) val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts)) @@ -128,6 +129,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(opts.keyStorePassword === Some("12345")) assert(opts.keyPassword === Some("password")) assert(opts.protocol === Some("SSLv3")) + assert(opts.port === 18999) assert(opts.enabledAlgorithms === Set("ABC", "DEF")) } diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 62fe0eaedfd2..58c11e59509a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -119,7 +119,7 @@ class PersistenceEngineSuite extends SparkFunSuite { private def findFreePort(conf: SparkConf): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort, (trialPort: Int, securePort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort) diff --git a/docs/configuration.md b/docs/configuration.md index b07867d99aa9..2862f4e1ba34 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1663,6 +1663,16 @@ Apart from these, the following properties are also available, and may be useful page. + + spark.ssl.port + 0 + + Port number to listen on for SSL connections. + The SSL port should be between 1024 and 65535 (inclusive). + Default value of 0 means the port will be determined automatically. + The port can be specified for services individually, with properties like spark.ssl.YYY.port. + + spark.ssl.needClientAuth false