diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 745836dfbefe..65e4367b33af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -226,12 +226,13 @@ private[spark] class SparkSubmit extends Logging { val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sparkConf = args.toSparkConf() + if (sparkConf.contains("spark.local.connect")) sparkConf.remove("spark.remote") var childMainClass = "" // Set the cluster manager val clusterManager: Int = args.maybeMaster match { case Some(v) => - assert(args.maybeRemote.isEmpty) + assert(args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect")) v match { case "yarn" => YARN case m if m.startsWith("spark") => STANDALONE @@ -603,10 +604,15 @@ private[spark] class SparkSubmit extends Logging { // All cluster managers OptionAssigner( - if (args.maybeRemote.isDefined) args.maybeMaster.orNull else args.master, + // If remote is not set, sets the master, + // In local remote mode, starts the default master to to start the server. + if (args.maybeRemote.isEmpty || sparkConf.contains("spark.local.connect")) args.master + else args.maybeMaster.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"), OptionAssigner( - args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"), + // In local remote mode, do not set remote. + if (sparkConf.contains("spark.local.connect")) null + else args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"), OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 92c67d5156f9..016eeee63642 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -245,7 +245,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S if (args.length == 0) { printUsageAndExit(-1) } - if (maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) { + if (!sparkProperties.contains("spark.local.connect") && + maybeRemote.isDefined && (maybeMaster.isDefined || deployMode != null)) { error("Remote cannot be specified with master and/or deploy mode.") } if (primaryResource == null) { diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index b25417688562..61624779027b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -45,6 +45,10 @@ public class SparkLauncher extends AbstractLauncher { /** The Spark master. */ public static final String SPARK_MASTER = "spark.master"; + /** The Spark remote. */ + public static final String SPARK_REMOTE = "spark.remote"; + public static final String SPARK_LOCAL_REMOTE = "spark.local.connect"; + /** The Spark deploy mode. */ public static final String DEPLOY_MODE = "spark.submit.deployMode"; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 055123d8a741..21fce6779f4d 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -22,6 +22,7 @@ import java.util.*; import static org.apache.spark.launcher.CommandBuilderUtils.*; +import static org.apache.spark.launcher.CommandBuilderUtils.checkState; /** * Special command builder for handling a CLI invocation of SparkSubmit. @@ -349,9 +350,18 @@ private List buildPySparkShellCommand(Map env) throws IO // pass conf spark.pyspark.python to python by environment variable. env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); } - if (remote != null) { - env.put("SPARK_REMOTE", remote); + String remoteStr = firstNonEmpty(remote, conf.getOrDefault(SparkLauncher.SPARK_REMOTE, null)); + String masterStr = firstNonEmpty(master, conf.getOrDefault(SparkLauncher.SPARK_MASTER, null)); + String deployStr = firstNonEmpty( + deployMode, conf.getOrDefault(SparkLauncher.DEPLOY_MODE, null)); + if (!conf.containsKey(SparkLauncher.SPARK_LOCAL_REMOTE) && + remoteStr != null && (masterStr != null || deployStr != null)) { + throw new IllegalStateException("Remote cannot be specified with master and/or deploy mode."); + } + if (remoteStr != null) { + env.put("SPARK_REMOTE", remoteStr); } + if (!isEmpty(pyOpts)) { pyargs.addAll(parseOptionString(pyOpts)); } @@ -465,18 +475,12 @@ private class OptionParser extends SparkSubmitOptionParser { protected boolean handle(String opt, String value) { switch (opt) { case MASTER: - checkArgument(remote == null, - "Both master (%s) and remote (%s) cannot be set together.", master, remote); master = value; break; case REMOTE: - checkArgument(remote == null, - "Both master (%s) and remote (%s) cannot be set together.", master, remote); remote = value; break; case DEPLOY_MODE: - checkArgument(remote == null, - "Both deploy-mode (%s) and remote (%s) cannot be set together.", deployMode, remote); deployMode = value; break; case PROPERTIES_FILE: diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 0314f41bff66..82bde57beb34 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -428,8 +428,8 @@ def _start_connect_server(master: str) -> None: """ Starts the Spark Connect server given the master. - At the high level, there are two cases. The first case is when you locally build - Apache Spark, and run ``SparkSession.builder.remote("local")``: + At the high level, there are two cases. The first case is development case, e.g., + you locally build Apache Spark, and run ``SparkSession.builder.remote("local")``: 1. This method automatically finds the jars for Spark Connect (because the jars for Spark Connect are not bundled in the regular Apache Spark release). @@ -437,14 +437,14 @@ def _start_connect_server(master: str) -> None: 2. Temporarily remove all states for Spark Connect, for example, ``SPARK_REMOTE`` environment variable. - 2.1. If we're in PySpark application submission, e.g., ``bin/spark-submit app.py`` - starts a JVM (without Spark Context) first, and adds the Spark Connect server jars - into the current class loader. Otherwise, Spark Context with ``spark.plugins`` - cannot be initialized because the JVM is already running without the jars in - the class path before executing this Python process for driver side. + 3. Starts a JVM (without Spark Context) first, and adds the Spark Connect server jars + into the current class loader. Otherwise, Spark Context with ``spark.plugins`` + cannot be initialized because the JVM is already running without the jars in + the class path before executing this Python process for driver side (in case of + PySpark application submission). - 3. Starts a regular Spark session that automatically starts a Spark Connect server - with JVM (if it is not up) via ``spark.plugins`` feature. + 4. Starts a regular Spark session that automatically starts a Spark Connect server + via ``spark.plugins`` feature. The second case is when you use Apache Spark release: @@ -463,78 +463,70 @@ def _start_connect_server(master: str) -> None: session = PySparkSession._instantiatedSession if session is None or session._sc._jsc is None: conf = SparkConf() + # Do not need to worry about the existing configurations because + # Py4J gateway is not created yet, and `conf` instance is empty here. + # The configurations belows are manually manipulated later to respect + # the user-specified configuration first right after Py4J gateway creation. + conf.set("spark.master", master) + conf.set("spark.plugins", "org.apache.spark.sql.connect.SparkConnectPlugin") + conf.set("spark.local.connect", "1") + # Check if we're using unreleased version that is in development. # Also checks SPARK_TESTING for RC versions. is_dev_mode = ( "dev" in LooseVersion(__version__).version or "SPARK_TESTING" in os.environ ) - connect_jar = None - - if is_dev_mode: - from pyspark.testing.utils import search_jar - - # Note that, in production, spark.jars.packages configuration should be - # set by users. Here we're automatically searching the jars locally built. - connect_jar = search_jar( - "connector/connect/server", "spark-connect-assembly-", "spark-connect" - ) - if connect_jar is not None: - origin_jars = conf.get("spark.jars") - if origin_jars is not None: - conf.set("spark.jars", f"{origin_jars},{connect_jar}") - else: - conf.set("spark.jars", connect_jar) - else: - warnings.warn( - "Attempted to automatically find the Spark Connect jars because " - "'SPARK_TESTING' environment variable is set, or the current PySpark " - f"version is dev version ({__version__}). However, the jar was not found. " - "Manually locate the jars and specify them, e.g., 'spark.jars' " - "configuration." - ) - - conf.set("spark.master", master) - - connect_plugin = "org.apache.spark.sql.connect.SparkConnectPlugin" - origin_plugin = conf.get("spark.plugins") - if origin_plugin is not None: - conf.set("spark.plugins", f"{origin_plugin},{connect_plugin}") - else: - conf.set("spark.plugins", connect_plugin) - origin_remote = os.environ.get("SPARK_REMOTE", None) - origin_args = os.environ.get("PYSPARK_SUBMIT_ARGS", None) try: if origin_remote is not None: # So SparkSubmit thinks no remote is set in order to # start the regular PySpark session. del os.environ["SPARK_REMOTE"] - # PySpark shell launches Py4J server from Python. - # Remove "--remote" option specified, and use plain arguments. - # NOTE that this is not used in regular PySpark application - # submission because JVM at this point is already running. - os.environ["PYSPARK_SUBMIT_ARGS"] = '"--name" "PySparkShell" "pyspark-shell"' - - if is_dev_mode and connect_jar is not None: - # In the case of Python application submission, JVM is already up. - # Therefore, we should manually manipulate the classpath in that case. - # Otherwise, the jars are added but the driver would not be able to - # find the server jars. - with SparkContext._lock: - if not SparkContext._gateway: - SparkContext._gateway = launch_gateway(conf) - SparkContext._jvm = SparkContext._gateway.jvm + connect_jar = None + if is_dev_mode: + # Try and catch for a possibility in production because pyspark.testing + # does not exist in the canonical release. + try: + from pyspark.testing.utils import search_jar + + # Note that, in production, spark.jars.packages configuration should be + # set by users. Here we're automatically searching the jars locally built. + connect_jar = search_jar( + "connector/connect/server", "spark-connect-assembly-", "spark-connect" + ) + if connect_jar is None: + warnings.warn( + "Attempted to automatically find the Spark Connect jars because " + "'SPARK_TESTING' environment variable is set, or the current " + f"PySpark version is dev version ({__version__}). However, the jar" + " was not found. Manually locate the jars and specify them, e.g., " + "'spark.jars' configuration." + ) + except ImportError: + pass + + # Note that JVM is already up at this point in the case of Python + # application submission. + with SparkContext._lock: + if not SparkContext._gateway: + SparkContext._gateway = launch_gateway(conf) + SparkContext._jvm = SparkContext._gateway.jvm + if connect_jar is not None: SparkContext._jvm.PythonSQLUtils.addJarToCurrentClassLoader(connect_jar) + # Now, JVM is up, and respect the default set. + prev = conf + conf = SparkConf(_jvm=SparkContext._jvm) + conf.set("spark.master", master) + for k, v in prev.getAll(): + if not conf.contains(k): + conf.set(k, v) + # The regular PySpark session is registered as an active session # so would not be garbage-collected. PySparkSession(SparkContext.getOrCreate(conf)) finally: - if origin_args is not None: - os.environ["PYSPARK_SUBMIT_ARGS"] = origin_args - else: - del os.environ["PYSPARK_SUBMIT_ARGS"] if origin_remote is not None: os.environ["SPARK_REMOTE"] = origin_remote else: