Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,22 @@ if [ $(command -v "$JAR_CMD") ] ; then
fi
fi

LAUNCH_CLASSPATH="$SPARK_ASSEMBLY_JAR"

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="$SPARK_HOME/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

export _SPARK_ASSEMBLY="$SPARK_ASSEMBLY_JAR"

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <("$RUNNER" -cp "$SPARK_ASSEMBLY_JAR" org.apache.spark.launcher.Main "$@")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")

if [ "${CMD[0]}" = "usage" ]; then
"${CMD[@]}"
Expand Down
11 changes: 10 additions & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ if "%SPARK_ASSEMBLY_JAR%"=="0" (
exit /b 1
)

set LAUNCH_CLASSPATH=%SPARK_ASSEMBLY_JAR%

rem Add the launcher build dir to the classpath if requested.
if not "x%SPARK_PREPEND_CLASSES%"=="x" (
set LAUNCH_CLASSPATH=%SPARK_HOME%\launcher\target\scala-%SPARK_SCALA_VERSION%\classes;%LAUNCH_CLASSPATH%
)

set _SPARK_ASSEMBLY=%SPARK_ASSEMBLY_JAR%

rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %SPARK_ASSEMBLY_JAR% org.apache.spark.launcher.Main %*"') do (
for /f "tokens=*" %%i in ('cmd /C ""%RUNNER%" -cp %LAUNCH_CLASSPATH% org.apache.spark.launcher.Main %*"') do (
set SPARK_CMD=%%i
)
%SPARK_CMD%
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,24 @@ List<String> buildClassPath(String appClassPath) throws IOException {
addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
}

final String assembly = AbstractCommandBuilder.class.getProtectionDomain().getCodeSource().
getLocation().getPath();
// We can't rely on the ENV_SPARK_ASSEMBLY variable to be set. Certain situations, such as
// when running unit tests, or user code that embeds Spark and creates a SparkContext
// with a local or local-cluster master, will cause this code to be called from an
// environment where that env variable is not guaranteed to exist.
//
// For the testing case, we rely on the test code to set and propagate the test classpath
// appropriately.
//
// For the user code case, we fall back to looking for the Spark assembly under SPARK_HOME.
// That duplicates some of the code in the shell scripts that look for the assembly, though.
String assembly = getenv(ENV_SPARK_ASSEMBLY);
if (assembly == null && isEmpty(getenv("SPARK_TESTING"))) {
assembly = findAssembly();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need "&& isEmpty(getenv("SPARK_TESTING")" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has already been pushed, but it's explained in the big comment right before the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (assembly == null) findAssembly() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has already been pushed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can always push a hot fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But there is nothing to fix.

We don't want to look for the assembly when tests are running because it may not exist. Tests do a lot of "new SparkContext()" with master = local-cluster[blah], and this check ensures those tests work even if the assembly is not there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this check 'ensures' that those tests work, this check 'requires' that those tests work if the assembly is not there (more like an assert). I don't feel strongly for or against it, but it does seem unnecessary.

addToClassPath(cp, assembly);

// Datanucleus jars must be included on the classpath. Datanucleus jars do not work if only
// included in the uber jar as plugin.xml metadata is lost. Both sbt and maven will populate
// Datanucleus jars must be included on the classpath. Datanucleus jars do not work if only
// included in the uber jar as plugin.xml metadata is lost. Both sbt and maven will populate
// "lib_managed/jars/" with the datanucleus jars when Spark is built with Hive
File libdir;
if (new File(sparkHome, "RELEASE").isFile()) {
Expand Down Expand Up @@ -299,6 +311,30 @@ String getenv(String key) {
return firstNonEmpty(childEnv.get(key), System.getenv(key));
}

private String findAssembly() {
String sparkHome = getSparkHome();
File libdir;
if (new File(sparkHome, "RELEASE").isFile()) {
libdir = new File(sparkHome, "lib");
checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
libdir.getAbsolutePath());
} else {
libdir = new File(sparkHome, String.format("assembly/target/scala-%s", getScalaVersion()));
}

final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
FileFilter filter = new FileFilter() {
@Override
public boolean accept(File file) {
return file.isFile() && re.matcher(file.getName()).matches();
}
};
File[] assemblies = libdir.listFiles(filter);
checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
return assemblies[0].getAbsolutePath();
}

private String getConfDir() {
String confDir = getenv("SPARK_CONF_DIR");
return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CommandBuilderUtils {
static final String DEFAULT_MEM = "512m";
static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
static final String ENV_SPARK_HOME = "SPARK_HOME";
static final String ENV_SPARK_ASSEMBLY = "_SPARK_ASSEMBLY";

/** Returns whether the given string is null or empty. */
static boolean isEmpty(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testShellCliParser() throws Exception {
parser.NAME,
"appName");

List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
List<String> args = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
}
Expand All @@ -110,7 +110,7 @@ public void testAlternateSyntaxParsing() throws Exception {
parser.MASTER + "=foo",
parser.DEPLOY_MODE + "=bar");

List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
assertEquals("foo", findArgValue(cmd, parser.MASTER));
assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
Expand Down Expand Up @@ -153,7 +153,7 @@ private void testCmdBuilder(boolean isDriver) throws Exception {
String deployMode = isDriver ? "client" : "cluster";

SparkSubmitCommandBuilder launcher =
new SparkSubmitCommandBuilder(Collections.<String>emptyList());
newCommandBuilder(Collections.<String>emptyList());
launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
System.getProperty("spark.test.home"));
launcher.master = "yarn";
Expand Down Expand Up @@ -273,10 +273,15 @@ private boolean findInStringList(String list, String sep, String needle) {
return contains(needle, list.split(sep));
}

private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
private SparkSubmitCommandBuilder newCommandBuilder(List<String> args) {
SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
return builder.buildCommand(env);
builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_ASSEMBLY, "dummy");
return builder;
}

private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
return newCommandBuilder(args).buildCommand(env);
}

}