Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect.service

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/**
* The Spark Connect server
*/
object SparkConnectServer extends Logging {
def main(args: Array[String]): Unit = {
// Set the active Spark Session, and starts SparkEnv instance (via Spark Context)
logInfo("Starting Spark session.")
val session = SparkSession.builder.getOrCreate()
try {
try {
SparkConnectService.start()
logInfo("Spark Connect server started.")
} catch {
case e: Exception =>
logError("Error starting Spark Connect server", e)
System.exit(-1)
}
SparkConnectService.server.awaitTermination()
} finally {
session.stop()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ object SparkConnectService {
// different or complex type easily.
private type SessionCacheKey = (String, String)

private var server: Server = _
private[connect] var server: Server = _

// For testing purpose, it's package level private.
private[connect] lazy val localPort = {
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ private[spark] class SparkSubmit extends Logging {
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case (_, CLUSTER) if isConnectServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Connect server.")
case _ =>
}

Expand Down Expand Up @@ -972,6 +974,10 @@ private[spark] class SparkSubmit extends Logging {
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
} else if (childMainClass.contains("org.apache.spark.sql.connect")) {
logInfo(s"Failed to load main class $childMainClass.")
// TODO(SPARK-42375): Should point out the user-facing page here instead.
logInfo("You need to specify Spark Connect jars with --jars or --packages.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
Expand Down Expand Up @@ -1006,7 +1012,8 @@ private[spark] class SparkSubmit extends Logging {
throw findCause(t)
} finally {
if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) {
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
!isConnectServer(args.mainClass)) {
try {
SparkContext.getActive.foreach(_.stop())
} catch {
Expand Down Expand Up @@ -1130,6 +1137,13 @@ object SparkSubmit extends CommandLineUtils with Logging {
mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
}

/**
* Return whether the given main class represents a connect server.
*/
private def isConnectServer(mainClass: String): Boolean = {
mainClass == "org.apache.spark.sql.connect.service.SparkConnectServer"
}

/**
* Return whether the given primary resource requires running python.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
SparkLauncher.NO_RESOURCE);
specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
SparkLauncher.NO_RESOURCE);
specialClasses.put("org.apache.spark.sql.connect.service.SparkConnectServer",
SparkLauncher.NO_RESOURCE);
}

final List<String> userArgs;
Expand Down Expand Up @@ -267,8 +269,8 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

List<String> cmd = buildJavaCommand(extraClassPath);
// Take Thrift Server as daemon
if (isThriftServer(mainClass)) {
// Take Thrift/Connect Server as daemon
if (isThriftServer(mainClass) || isConnectServer(mainClass)) {
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
Expand All @@ -288,9 +290,10 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
// - SPARK_DRIVER_MEMORY env variable
// - SPARK_MEM env variable
// - default value (1g)
// Take Thrift Server as daemon
// Take Thrift/Connect Server as daemon
String tsMemory =
isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
isThriftServer(mainClass) || isConnectServer(mainClass) ?
System.getenv("SPARK_DAEMON_MEMORY") : null;
String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
cmd.add("-Xmx" + memory);
Expand Down Expand Up @@ -423,6 +426,14 @@ private boolean isThriftServer(String mainClass) {
mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
}

/**
* Return whether the given main class represents a connect server.
*/
private boolean isConnectServer(String mainClass) {
return (mainClass != null &&
mainClass.equals("org.apache.spark.sql.connect.service.SparkConnectServer"));
}

private String findExamplesAppJar() {
boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
if (isTesting) {
Expand Down
41 changes: 41 additions & 0 deletions sbin/start-connect-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Enter posix mode for bash
set -o posix

# Shell script for starting the Spark Connect server
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.sql.connect.service.SparkConnectServer"

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-connect-server.sh [options]"

"${SPARK_HOME}"/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 1
fi

. "${SPARK_HOME}/bin/load-spark-env.sh"

exec "${SPARK_HOME}"/sbin/spark-daemon.sh submit $CLASS 1 --name "Spark Connect server" "$@"
26 changes: 26 additions & 0 deletions sbin/stop-connect-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Stops the connect server on the machine this script is executed on.

if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

"${SPARK_HOME}/sbin"/spark-daemon.sh stop org.apache.spark.sql.connect.service.SparkConnectServer 1