From ec9b71e2f0e137056951934491883a52a0f22ba8 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 7 Feb 2023 19:23:23 +0900 Subject: [PATCH 1/3] Add a script to start and sto Spark Connect server --- .../connect/service/SparkConnectServer.scala | 36 ++++++++++++++++++ .../connect/service/SparkConnectService.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 16 +++++++- .../launcher/SparkSubmitCommandBuilder.java | 19 ++++++++-- sbin/start-connectserver.sh | 38 +++++++++++++++++++ sbin/stop-connectserver.sh | 26 +++++++++++++ 6 files changed, 131 insertions(+), 6 deletions(-) create mode 100644 connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala create mode 100755 sbin/start-connectserver.sh create mode 100755 sbin/stop-connectserver.sh diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala new file mode 100644 index 000000000000..b27744aafd43 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import org.apache.spark.sql.SparkSession + +/** + * The Spark Connect server + */ +object SparkConnectServer { + def main(args: Array[String]): Unit = { + // Set the active Spark Session, and starts SparkEnv instance (via Spark Context) + val session = SparkSession.builder.getOrCreate() + try { + SparkConnectService.start() + SparkConnectService.server.awaitTermination() + } finally { + session.stop() + } + } +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index b7bfceed4215..683dae9cf900 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -224,7 +224,7 @@ object SparkConnectService { // different or complex type easily. private type SessionCacheKey = (String, String) - private var server: Server = _ + private[connect] var server: Server = _ // For testing purpose, it's package level private. private[connect] lazy val localPort = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fa19c7918af2..d7443951e7f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -290,6 +290,8 @@ private[spark] class SparkSubmit extends Logging { error("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.") + case (_, CLUSTER) if isConnectServer(args.mainClass) => + error("Cluster deploy mode is not applicable to Spark Connect server.") case _ => } @@ -972,6 +974,10 @@ private[spark] class SparkSubmit extends Logging { if (childMainClass.contains("thriftserver")) { logInfo(s"Failed to load main class $childMainClass.") logInfo("You need to build Spark with -Phive and -Phive-thriftserver.") + } else if (childMainClass.contains("org.apache.spark.sql.connect")) { + logInfo(s"Failed to load main class $childMainClass.") + // TODO(SPARK-42375): Should point out the user-facing page here instead. + logInfo("You need to specify Spark Connect jars with --jars or --packages.") } throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS) case e: NoClassDefFoundError => @@ -1006,7 +1012,8 @@ private[spark] class SparkSubmit extends Logging { throw findCause(t) } finally { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && - !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) { + !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) && + !isConnectServer(args.mainClass)) { try { SparkContext.getActive.foreach(_.stop()) } catch { @@ -1130,6 +1137,13 @@ object SparkSubmit extends CommandLineUtils with Logging { mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2" } + /** + * Return whether the given main class represents a connect server. + */ + private def isConnectServer(mainClass: String): Boolean = { + mainClass == "org.apache.spark.sql.connect.service.SparkConnectServer" + } + /** * Return whether the given primary resource requires running python. */ diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 21fce6779f4d..289eb31db31a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -87,6 +87,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { SparkLauncher.NO_RESOURCE); specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2", SparkLauncher.NO_RESOURCE); + specialClasses.put("org.apache.spark.sql.connect.service.SparkConnectServer", + SparkLauncher.NO_RESOURCE); } final List userArgs; @@ -267,8 +269,8 @@ private List buildSparkSubmitCommand(Map env) String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null; List cmd = buildJavaCommand(extraClassPath); - // Take Thrift Server as daemon - if (isThriftServer(mainClass)) { + // Take Thrift/Connect Server as daemon + if (isThriftServer(mainClass) || isConnectServer(mainClass)) { addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS")); } addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); @@ -288,9 +290,10 @@ private List buildSparkSubmitCommand(Map env) // - SPARK_DRIVER_MEMORY env variable // - SPARK_MEM env variable // - default value (1g) - // Take Thrift Server as daemon + // Take Thrift/Connect Server as daemon String tsMemory = - isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; + isThriftServer(mainClass) || isConnectServer(mainClass) ? + System.getenv("SPARK_DAEMON_MEMORY") : null; String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); cmd.add("-Xmx" + memory); @@ -423,6 +426,14 @@ private boolean isThriftServer(String mainClass) { mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2")); } + /** + * Return whether the given main class represents a connect server. + */ + private boolean isConnectServer(String mainClass) { + return (mainClass != null && + mainClass.equals("org.apache.spark.sql.connect.service.SparkConnectServer")); + } + private String findExamplesAppJar() { boolean isTesting = "1".equals(getenv("SPARK_TESTING")); if (isTesting) { diff --git a/sbin/start-connectserver.sh b/sbin/start-connectserver.sh new file mode 100755 index 000000000000..8ba37889f410 --- /dev/null +++ b/sbin/start-connectserver.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Shell script for starting the Spark Connect server +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +# NOTE: This exact class name is matched downstream by SparkSubmit. +# Any changes need to be reflected there. +CLASS="org.apache.spark.sql.connect.service.SparkConnectServer" + +if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then + echo "Usage: ./sbin/start-connectserver.sh [options]" + + "${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 + exit 1 +fi + +. "${SPARK_HOME}/bin/load-spark-env.sh" + +exec "${SPARK_HOME}/sbin"/spark-daemon.sh submit $CLASS 1 "$@" diff --git a/sbin/stop-connectserver.sh b/sbin/stop-connectserver.sh new file mode 100755 index 000000000000..7cf744072e35 --- /dev/null +++ b/sbin/stop-connectserver.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Stops the connect server on the machine this script is executed on. + +if [ -z "${SPARK_HOME}" ]; then + export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" +fi + +"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.sql.connect.service.SparkConnectServer 1 From f76cf9951fe8988b2b337a54e7435e6cab83f735 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 8 Feb 2023 10:22:06 +0900 Subject: [PATCH 2/3] Address comments --- .../sql/connect/service/SparkConnectServer.scala | 13 +++++++++++-- sbin/start-connectserver.sh | 5 ++++- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala index b27744aafd43..df28df59fa2a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectServer.scala @@ -17,17 +17,26 @@ package org.apache.spark.sql.connect.service +import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession /** * The Spark Connect server */ -object SparkConnectServer { +object SparkConnectServer extends Logging { def main(args: Array[String]): Unit = { // Set the active Spark Session, and starts SparkEnv instance (via Spark Context) + logInfo("Starting Spark session.") val session = SparkSession.builder.getOrCreate() try { - SparkConnectService.start() + try { + SparkConnectService.start() + logInfo("Spark Connect server started.") + } catch { + case e: Exception => + logError("Error starting Spark Connect server", e) + System.exit(-1) + } SparkConnectService.server.awaitTermination() } finally { session.stop() diff --git a/sbin/start-connectserver.sh b/sbin/start-connectserver.sh index 8ba37889f410..a995f95d535b 100755 --- a/sbin/start-connectserver.sh +++ b/sbin/start-connectserver.sh @@ -17,6 +17,9 @@ # limitations under the License. # +# Enter posix mode for bash +set -o posix + # Shell script for starting the Spark Connect server if [ -z "${SPARK_HOME}" ]; then export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" @@ -35,4 +38,4 @@ fi . "${SPARK_HOME}/bin/load-spark-env.sh" -exec "${SPARK_HOME}/sbin"/spark-daemon.sh submit $CLASS 1 "$@" +exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "Spark Connect server" "$@" From 8ba8e8b40ff701b62e437ab84a0d083e9f468572 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 8 Feb 2023 11:38:15 +0900 Subject: [PATCH 3/3] Rename scripts --- sbin/{start-connectserver.sh => start-connect-server.sh} | 2 +- sbin/{stop-connectserver.sh => stop-connect-server.sh} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename sbin/{start-connectserver.sh => start-connect-server.sh} (96%) rename sbin/{stop-connectserver.sh => stop-connect-server.sh} (100%) diff --git a/sbin/start-connectserver.sh b/sbin/start-connect-server.sh similarity index 96% rename from sbin/start-connectserver.sh rename to sbin/start-connect-server.sh index a995f95d535b..a347f43db8b1 100755 --- a/sbin/start-connectserver.sh +++ b/sbin/start-connect-server.sh @@ -30,7 +30,7 @@ fi CLASS="org.apache.spark.sql.connect.service.SparkConnectServer" if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-connectserver.sh [options]" + echo "Usage: ./sbin/start-connect-server.sh [options]" "${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2 exit 1 diff --git a/sbin/stop-connectserver.sh b/sbin/stop-connect-server.sh similarity index 100% rename from sbin/stop-connectserver.sh rename to sbin/stop-connect-server.sh