Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,13 @@ private[spark] class SparkSubmit extends Logging {
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote")
var childMainClass = ""

// Set the cluster manager
val clusterManager: Int = args.maybeMaster match {
case Some(v) =>
assert(args.maybeRemote.isEmpty)
assert(args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect"))
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
Expand Down Expand Up @@ -603,10 +604,15 @@ private[spark] class SparkSubmit extends Logging {

// All cluster managers
OptionAssigner(
if (args.maybeRemote.isDefined) args.maybeMaster.orNull else args.master,
// If remote is not set, sets the master,
// In local remote mode, starts the default master to to start the server.
if (args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect")) args.master
else args.maybeMaster.orNull,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(
args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
// In local remote mode, do not set remote.
if (sparkConf.contains("spark.local.connect")) null
else args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = SUBMIT_DEPLOY_MODE.key),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
if (args.length == 0) {
printUsageAndExit(-1)
}
if (maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) {
if (!sparkProperties.contains("spark.local.connect") &&
maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) {
error("Remote cannot be specified with master and/or deploy mode.")
}
if (primaryResource == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
/** The Spark master. */
public static final String SPARK_MASTER = "spark.master";

/** The Spark remote. */
public static final String SPARK_REMOTE = "spark.remote";
public static final String SPARK_LOCAL_REMOTE = "spark.local.connect";

/** The Spark deploy mode. */
public static final String DEPLOY_MODE = "spark.submit.deployMode";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.*;

import static org.apache.spark.launcher.CommandBuilderUtils.*;
import static org.apache.spark.launcher.CommandBuilderUtils.checkState;

/**
* Special command builder for handling a CLI invocation of SparkSubmit.
Expand Down Expand Up @@ -349,9 +350,18 @@ private List<String> buildPySparkShellCommand(Map<String, String> env) throws IO
// pass conf spark.pyspark.python to python by environment variable.
env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
}
if (remote != null) {
env.put("SPARK_REMOTE", remote);
String remoteStr = firstNonEmpty(remote, conf.getOrDefault(SparkLauncher.SPARK_REMOTE, null));
String masterStr = firstNonEmpty(master, conf.getOrDefault(SparkLauncher.SPARK_MASTER, null));
String deployStr = firstNonEmpty(
deployMode, conf.getOrDefault(SparkLauncher.DEPLOY_MODE, null));
if (!conf.containsKey(SparkLauncher.SPARK_LOCAL_REMOTE) &&
remoteStr != null && (masterStr != null || deployStr != null)) {
throw new IllegalStateException("Remote cannot be specified with master and/or deploy mode.");
}
if (remoteStr != null) {
env.put("SPARK_REMOTE", remoteStr);
}

if (!isEmpty(pyOpts)) {
pyargs.addAll(parseOptionString(pyOpts));
}
Expand Down Expand Up @@ -465,18 +475,12 @@ private class OptionParser extends SparkSubmitOptionParser {
protected boolean handle(String opt, String value) {
switch (opt) {
case MASTER:
checkArgument(remote == null,
"Both master (%s) and remote (%s) cannot be set together.", master, remote);
master = value;
break;
case REMOTE:
checkArgument(remote == null,
"Both master (%s) and remote (%s) cannot be set together.", master, remote);
remote = value;
break;
case DEPLOY_MODE:
checkArgument(remote == null,
"Both deploy-mode (%s) and remote (%s) cannot be set together.", deployMode, remote);
deployMode = value;
break;
case PROPERTIES_FILE:
Expand Down
118 changes: 55 additions & 63 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,23 +428,23 @@ def _start_connect_server(master: str) -> None:
"""
Starts the Spark Connect server given the master.

At the high level, there are two cases. The first case is when you locally build
Apache Spark, and run ``SparkSession.builder.remote("local")``:
At the high level, there are two cases. The first case is development case, e.g.,
you locally build Apache Spark, and run ``SparkSession.builder.remote("local")``:

1. This method automatically finds the jars for Spark Connect (because the jars for
Spark Connect are not bundled in the regular Apache Spark release).

2. Temporarily remove all states for Spark Connect, for example, ``SPARK_REMOTE``
environment variable.

2.1. If we're in PySpark application submission, e.g., ``bin/spark-submit app.py``
starts a JVM (without Spark Context) first, and adds the Spark Connect server jars
into the current class loader. Otherwise, Spark Context with ``spark.plugins``
cannot be initialized because the JVM is already running without the jars in
the class path before executing this Python process for driver side.
3. Starts a JVM (without Spark Context) first, and adds the Spark Connect server jars
into the current class loader. Otherwise, Spark Context with ``spark.plugins``
cannot be initialized because the JVM is already running without the jars in
the class path before executing this Python process for driver side (in case of
PySpark application submission).

3. Starts a regular Spark session that automatically starts a Spark Connect server
with JVM (if it is not up) via ``spark.plugins`` feature.
4. Starts a regular Spark session that automatically starts a Spark Connect server
via ``spark.plugins`` feature.

The second case is when you use Apache Spark release:

Expand All @@ -463,78 +463,70 @@ def _start_connect_server(master: str) -> None:
session = PySparkSession._instantiatedSession
if session is None or session._sc._jsc is None:
conf = SparkConf()
# Do not need to worry about the existing configurations because
# Py4J gateway is not created yet, and `conf` instance is empty here.
# The configurations belows are manually manipulated later to respect
# the user-specified configuration first right after Py4J gateway creation.
conf.set("spark.master", master)
conf.set("spark.plugins", "org.apache.spark.sql.connect.SparkConnectPlugin")
conf.set("spark.local.connect", "1")

# Check if we're using unreleased version that is in development.
# Also checks SPARK_TESTING for RC versions.
is_dev_mode = (
"dev" in LooseVersion(__version__).version or "SPARK_TESTING" in os.environ
)
connect_jar = None

if is_dev_mode:
from pyspark.testing.utils import search_jar

# Note that, in production, spark.jars.packages configuration should be
# set by users. Here we're automatically searching the jars locally built.
connect_jar = search_jar(
"connector/connect/server", "spark-connect-assembly-", "spark-connect"
)
if connect_jar is not None:
origin_jars = conf.get("spark.jars")
if origin_jars is not None:
conf.set("spark.jars", f"{origin_jars},{connect_jar}")
else:
conf.set("spark.jars", connect_jar)
else:
warnings.warn(
"Attempted to automatically find the Spark Connect jars because "
"'SPARK_TESTING' environment variable is set, or the current PySpark "
f"version is dev version ({__version__}). However, the jar was not found. "
"Manually locate the jars and specify them, e.g., 'spark.jars' "
"configuration."
)

conf.set("spark.master", master)

connect_plugin = "org.apache.spark.sql.connect.SparkConnectPlugin"
origin_plugin = conf.get("spark.plugins")
if origin_plugin is not None:
conf.set("spark.plugins", f"{origin_plugin},{connect_plugin}")
else:
conf.set("spark.plugins", connect_plugin)

origin_remote = os.environ.get("SPARK_REMOTE", None)
origin_args = os.environ.get("PYSPARK_SUBMIT_ARGS", None)
try:
if origin_remote is not None:
# So SparkSubmit thinks no remote is set in order to
# start the regular PySpark session.
del os.environ["SPARK_REMOTE"]

# PySpark shell launches Py4J server from Python.
# Remove "--remote" option specified, and use plain arguments.
# NOTE that this is not used in regular PySpark application
# submission because JVM at this point is already running.
os.environ["PYSPARK_SUBMIT_ARGS"] = '"--name" "PySparkShell" "pyspark-shell"'

if is_dev_mode and connect_jar is not None:
# In the case of Python application submission, JVM is already up.
# Therefore, we should manually manipulate the classpath in that case.
# Otherwise, the jars are added but the driver would not be able to
# find the server jars.
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm
connect_jar = None
if is_dev_mode:
# Try and catch for a possibility in production because pyspark.testing
# does not exist in the canonical release.
try:
from pyspark.testing.utils import search_jar

# Note that, in production, spark.jars.packages configuration should be
# set by users. Here we're automatically searching the jars locally built.
connect_jar = search_jar(
"connector/connect/server", "spark-connect-assembly-", "spark-connect"
)
if connect_jar is None:
warnings.warn(
"Attempted to automatically find the Spark Connect jars because "
"'SPARK_TESTING' environment variable is set, or the current "
f"PySpark version is dev version ({__version__}). However, the jar"
" was not found. Manually locate the jars and specify them, e.g., "
"'spark.jars' configuration."
)
except ImportError:
pass

# Note that JVM is already up at this point in the case of Python
# application submission.
with SparkContext._lock:
if not SparkContext._gateway:
SparkContext._gateway = launch_gateway(conf)
SparkContext._jvm = SparkContext._gateway.jvm
if connect_jar is not None:
SparkContext._jvm.PythonSQLUtils.addJarToCurrentClassLoader(connect_jar)

# Now, JVM is up, and respect the default set.
prev = conf
conf = SparkConf(_jvm=SparkContext._jvm)
conf.set("spark.master", master)
for k, v in prev.getAll():
if not conf.contains(k):
conf.set(k, v)

# The regular PySpark session is registered as an active session
# so would not be garbage-collected.
PySparkSession(SparkContext.getOrCreate(conf))
finally:
if origin_args is not None:
os.environ["PYSPARK_SUBMIT_ARGS"] = origin_args
else:
del os.environ["PYSPARK_SUBMIT_ARGS"]
if origin_remote is not None:
os.environ["SPARK_REMOTE"] = origin_remote
else:
Expand Down