Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ export("as.DataFrame",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
"spark.addJar",
"spark.getSparkFilesRootDirectory",
"spark.getSparkFiles",
"sql",
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,32 @@ spark.addFile <- function(path, recursive = FALSE) {
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive))
}


#' Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really expose SparkContext in R actually

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case do we want to bother having this method for R?

#'
#' The \code{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported
#' filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
#' If \code{addToCurrentClassLoader} is true, add the jar to the current threads' classloader. In
#' general adding to the current threads' class loader will impact all other application threads
#' unless they have explicitly changed their class loader.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should

add the jar to the current threads' classloader. In
 +#' general adding to the current threads' class loader will impact all other application threads
 +#' unless they have explicitly changed their class loader.

be reworded for R or Python? It should be the JVM. threads or classloader aren't familiar concepts to R users.

#'
#' @rdname spark.addJar
#' @param path The path of the jar to be added
#' @param addToCurrentClassLoader Whether to add the jar to the current driver classloader.
#' @export
#' @examples
#'\dontrun{
#' spark.addJar("/path/to/something.jar", TRUE)
#'}
#' @note spark.addJar since 2.2.0
spark.addJar <- function(path, addToCurrentClassLoader = FALSE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we want to add it to the driver classpath by default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly for backwards compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.addJar needs to be added to the NAMESPACE file, otherwise it won't be accessible. Tests are running within the same namespace so it's always able to access private methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

normalizedPath <- suppressWarnings(normalizePath(path))
sc <- callJMethod(getSparkContext(), "sc")
invisible(callJMethod(sc, "addJar", normalizedPath, addToCurrentClassLoader))
}



Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little nit: I guess we just need a single newline.

#' Get the root directory that contains files added through spark.addFile.
#'
#' @rdname spark.getSparkFilesRootDirectory
Expand Down
12 changes: 12 additions & 0 deletions R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ test_that("spark.lapply should perform simple transforms", {
sparkR.session.stop()
})

test_that("add jar should work and allow usage of the jar on the driver node", {
sparkR.sparkContext()

destDir <- file.path(tempdir(), "testjar")
Copy link
Member

@HyukjinKwon HyukjinKwon Oct 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this tempdir() btw (I mean removing the actual directory after this test).

jarName <- callJStatic("org.apache.spark.TestUtils", "createDummyJar",
destDir, "sparkrTests", "DummyClassForAddJarTest")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it looks the problem is here. createDummyJar returns URI string, for example,

> normalizePath("file:/C:/a/b/c")
[1] "C:\\Users\\IEUser\\workspace\\spark\\file:\\C:\\a\\b\\c"
Warning message:
In normalizePath(path.expand(path), winslash, mustWork) :
  path[1]="file:/C:/a/b/c": The filename, directory name, or volume label syntax
 is incorrect

This looks ending up with an weird path like "C:\\Users\\IEUser\\workspace\\spark\\file:\\C:\\a\\b\\c".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little nit:

jarName <- callJStatic("org.apache.spark.TestUtils", "createDummyJar",
                        destDir, "sparkrTests", "DummyClassForAddJarTest")


spark.addJar(jarName, addToCurrentClassLoader = TRUE)
testClass <- newJObject("sparkrTests.DummyClassForAddJarTest")
expect_true(class(testClass) == "jobj")
})

test_that("add and get file to be downloaded with Spark job on every node", {
sparkR.sparkContext(master = sparkRTestMaster)
# Test add file.
Expand Down
33 changes: 31 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1801,9 +1801,23 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
def addJar(path: String): Unit = {
addJar(path, false)
}

/**
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* @param addToCurrentClassLoader if true will add the jar to the current threads' classloader.
* In general adding to the current threads' class loader will
* impact all other application threads unless they have explicitly
* changed their class loader.
*/
@DeveloperApi
def addJar(path: String, addToCurrentClassLoader: Boolean) {
def addJarFile(file: File): String = {
try {
if (!file.exists()) {
Expand Down Expand Up @@ -1845,6 +1859,21 @@ class SparkContext(config: SparkConf) extends Logging {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
}

if (addToCurrentClassLoader) {
val currentCL = Utils.getContextOrSparkClassLoader
currentCL match {
case cl: MutableURLClassLoader =>
val uri = if (path.contains("\\")) {
// For local paths with backslashes on Windows, URI throws an exception
new File(path).toURI
} else {
new URI(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we maybe just use Utils.resolveURI(path)? looks the logic is similar.

}
cl.addURL(uri.toURL)
case _ => logWarning(s"Unsupported cl $currentCL will not update jars thread cl")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd sayclass loader instead of cl.

}
}
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,27 @@ private[spark] object TestUtils {
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

/** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */
def createDummyJar(destDir: String, packageName: String, className: String): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is moved code, but could we just use createJarWithClasses or do the R tests require these inner functions?

If we do keep this though I'd give it a clearer name and description because without reading the code, createDummyJar and createJarWithClasses don't seem all that different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah when i wrote this that didn't exist yet. Changing.

Copy link
Member Author

@mariusvniekerk mariusvniekerk Dec 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The R tests do indeed verify that they can call the internal functions.

I can revert that part of the changes.

In R\pkg\inst\tests\testthat\jarTest.R

helloTest <- SparkR:::callJStatic("sparkrtest.DummyClass",
                                  "helloWorld",
                                  "Dave")
stopifnot(identical(helloTest, "Hello Dave"))

basicFunction <- SparkR:::callJStatic("sparkrtest.DummyClass",
                                      "addStuff",
                                      2L,
                                      2L)
stopifnot(basicFunction == 4L)

val srcDir = new File(destDir, packageName)
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, className).toURI.getPath,
s"""package $packageName;
|
|public class $className implements java.io.Serializable {
| public static String helloWorld(String arg) { return "Hello " + arg; }
| public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
|}
""".
stripMargin)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We could make this lined.)

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this inlined.

val excFile = createCompiledClass(className, srcDir, excSource, Seq.empty)
val jarFile = new File(destDir,
s"$packageName-$className-%s.jar".format(System.currentTimeMillis()))
val jarURL = createJar(Seq(excFile), jarFile, directoryPrefix = Some(packageName))
jarURL.toString
}


/**
* Run some code involving jobs submitted to the given context and assert that the jobs spilled.
*/
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io.File
import java.net.{MalformedURLException, URI}
import java.net.{MalformedURLException, URI, URL}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

Expand All @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.Matchers._

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{MutableURLClassLoader, ThreadUtils, Utils}


class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand Down Expand Up @@ -309,6 +309,34 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(sc.listJars().head.contains(tmpJar.getName))
}

Seq("local_mode", "non_local_mode").foreach { schedulingMode =>
val tempDir = Utils.createTempDir().toString
val master = schedulingMode match {
case "local_mode" => "local"
case "non_local_mode" => "local-cluster[1,1,1024]"
}
val packageName = s"scala_$schedulingMode"
val className = "DummyClass"
val jarURI = TestUtils.createDummyJar(tempDir, packageName, className)

// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
try {
// load the exception from the jar
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)

test(s"jar can be added and used driver side in $schedulingMode") {
sc = new SparkContext(master, "test")
Thread.currentThread().setContextClassLoader(loader)
sc.addJar(jarURI, addToCurrentClassLoader = true)
val cl = Utils.getContextOrSparkClassLoader
cl.loadClass(s"$packageName.$className")
}
} finally {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Expand Down
19 changes: 4 additions & 15 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.io._
import java.net.URI
import java.net.URL
import java.nio.charset.StandardCharsets

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -530,26 +531,14 @@ class SparkSubmitSuite
Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator)
assert(new File(rScriptDir).exists)

val tempDir = Utils.createTempDir().toString
// compile a small jar containing a class that will be called from R code.
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "sparkrtest")
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
"""package sparkrtest;
|
|public class DummyClass implements java.io.Serializable {
| public static String helloWorld(String arg) { return "Hello " + arg; }
| public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
|}
""".stripMargin)
val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
val jarURL = TestUtils.createDummyJar(tempDir, "sparkrtest", "DummyClass")

val args = Seq(
"--name", "testApp",
"--master", "local",
"--jars", jarURL.toString,
"--jars", jarURL,
"--verbose",
"--conf", "spark.ui.enabled=false",
rScriptDir)
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,21 @@ def addPyFile(self, path):
import importlib
importlib.invalidate_caches()

def addJar(self, path, addToCurrentClassLoader=False):
"""
Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
If addToCurrentClassLoader is true, add the jar to the current threads' classloader.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

little nit: addToCurrentClassLoader ->`addToCurrentClassLoader` and ads' cl -> ads' cl.

In general adding to the current threads' class loader will impact all other application
threads unless they have explicitly changed their class loader.

:param path: The path of the jar to be added
:param addToCurrentClassLoader: Whether to add the jar to the current driver classloader.
This defaults to False.
"""
self._jsc.sc().addJar(path, addToCurrentClassLoader)

def setCheckpointDir(self, dirName):
"""
Set the directory under which RDDs are going to be checkpointed. The
Expand Down
14 changes: 14 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import hashlib

from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaClass
try:
import xmlrunner
except ImportError:
Expand Down Expand Up @@ -435,6 +436,19 @@ def test_add_file_locally(self):
with open(download_path) as test_file:
self.assertEqual("Hello World!\n", test_file.readline())

def test_add_jar(self):
jvm = self.sc._jvm
# We shouldn't be able to load anything from the package before it is added
self.assertFalse(isinstance(jvm.pysparktests.DummyClass, JavaClass))
# Generate and compile the test jar
destDir = os.path.join(SPARK_HOME, "python/test_support/jar")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this directory too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead you want to use a temp directory?

jarName = jvm.org.apache.spark.TestUtils.createDummyJar(
destDir, "pysparktests", "DummyClass")
# Load the new jar
self.sc.addJar(jarName, True)
# Try and load the class
self.assertTrue(isinstance(jvm.pysparktests.DummyClass, JavaClass))

def test_add_file_recursively_locally(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello")
self.sc.addFile(path, True)
Expand Down