Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ export("as.DataFrame",
"setCurrentDatabase",
"spark.lapply",
"spark.addFile",
"spark.addJar",
"spark.getSparkFilesRootDirectory",
"spark.getSparkFiles",
"sql",
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,32 @@ spark.addFile <- function(path, recursive = FALSE) {
invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive))
}

#' Adds a JAR dependency for Spark tasks to be executed in the future.
#'
#' The \code{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported
#' filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is local:/path referring to windows drive/path, or the actual text local:/ should be there?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it refers the actual local:/:

case "local" => "file:" + uri.getPath

#' If \code{addToCurrentClassLoader} is true, add the jar to the current threads' class loader
#' in the backing JVM. In general adding to the current threads' class loader will impact all
#' other application threads unless they have explicitly changed their class loader.
#'
#' Note: \code{addToCurrentClassLoader} parameter is a developer API, which change or be removed
#' in minor versions of Spark.
#'
#' @rdname spark.addJar
#' @param path The path of the jar to be added
#' @param addToCurrentClassLoader Whether to add the jar to the current driver class loader.
#' @export
#' @examples
#'\dontrun{
#' spark.addJar("/path/to/something.jar", TRUE)
#'}
#' @note spark.addJar since 2.3.0
spark.addJar <- function(path, addToCurrentClassLoader = FALSE) {
normalizedPath <- suppressWarnings(normalizePath(path))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks we have a problem here with handling URI, Windows path, although most of other cases should be fine though:

> normalizePath("file:/C:/a/b/c")
[1] "C:\\Users\\IEUser\\workspace\\spark\\file:\\C:\\a\\b\\c"
Warning message:
In normalizePath(path.expand(path), winslash, mustWork) :
  path[1]="file:/C:/a/b/c": The filename, directory name, or volume label syntax
 is incorrect

This looks ending up with an weird path like "C:\\Users\\IEUser\\workspace\\spark\\file:\\C:\\a\\b\\c".

I am not sure how we should handle this as this pattern normalizedPath <- suppressWarnings(normalizePath(path)) looks quite common.

If it is fine, I would like to address this issue separately for other APIs, for example, spark.addFile right above ..

Copy link
Member Author

@HyukjinKwon HyukjinKwon Nov 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I avoided to pass URI here by passing the abs path for now in the test BTW.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, normalizePath wouldn't handle url...
https://stat.ethz.ch/R-manual/R-devel/library/base/html/normalizePath.html

I think we should require absolute paths in their canonical form here and just pass through..

sc <- callJMethod(getSparkContext(), "sc")
invisible(callJMethod(sc, "addJar", normalizedPath, addToCurrentClassLoader))
}

#' Get the root directory that contains files added through spark.addFile.
#'
#' @rdname spark.getSparkFilesRootDirectory
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ test_that("spark.lapply should perform simple transforms", {
sparkR.session.stop()
})

test_that("add jar should work and allow usage of the jar on the driver node", {
sparkR.sparkContext()

destDir <- file.path(tempdir(), "testjar")
jarFile <- callJStatic("org.apache.spark.TestUtils", "createDummyJar",
destDir, "sparkrTests", "DummyClassForAddJarTest")
jarPath <- callJMethod(jarFile, "getAbsolutePath")

spark.addJar(jarPath, addToCurrentClassLoader = TRUE)
testClass <- newJObject("sparkrTests.DummyClassForAddJarTest")
expect_true(class(testClass) == "jobj")
unlink(destDir)
})

test_that("add and get file to be downloaded with Spark job on every node", {
sparkR.sparkContext(master = sparkRTestMaster)
# Test add file.
Expand Down
25 changes: 24 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1802,7 +1802,21 @@ class SparkContext(config: SparkConf) extends Logging {
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
def addJar(path: String): Unit = {
addJar(path, addToCurrentClassLoader = false)
}

/**
* Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
* an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* @param addToCurrentClassLoader if true will add the jar to the current threads' class loader.
* In general adding to the current threads' class loader will impact all other application
* threads unless they have explicitly changed their class loader.
*/
@DeveloperApi
def addJar(path: String, addToCurrentClassLoader: Boolean): Unit = {
def addJarFile(file: File): String = {
try {
if (!file.exists()) {
Expand Down Expand Up @@ -1838,12 +1852,21 @@ class SparkContext(config: SparkConf) extends Logging {
case _ => path
}
}

if (key != null) {
val timestamp = System.currentTimeMillis
if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added JAR $path at $key with timestamp $timestamp")
postEnvironmentUpdate()
}

if (addToCurrentClassLoader) {
Utils.getContextOrSparkClassLoader match {
case cl: MutableURLClassLoader => cl.addURL(Utils.resolveURI(path).toURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure does it support remote jars on HTTPS or Hadoop FileSystems?In the executor side, we handle this explicitly by downloading jars to local and add to classpath, but here looks like we don't have such logic. I'm not sure how this URLClassLoader communicate with Hadoop or Https without certificates.

The addJar is just adding jars to fileserver, so that executor could fetch them from driver and add to classpath. It will not affect driver's classpath. If we support adding jars to current driver's classloader, then how do we leverage this newly added jars?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jerryshao. Will check through this concern within this weekend and be back.

case cl => logWarning(
s"Unsupported class loader $cl will not update jars in the thread class loader.")
}
}
}
}
}
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,27 @@ private[spark] object TestUtils {
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}

/** Create a dummy compile jar for a given package, classname. Jar will be placed in destDir */
def createDummyJar(destDir: String, packageName: String, className: String): File = {
val srcDir = new File(destDir, packageName)
srcDir.mkdirs()
val excSource = new JavaSourceFromString(
new File(srcDir, className).toURI.getPath,
s"""
|package $packageName;
|
|public class $className implements java.io.Serializable {
| public static String helloWorld(String arg) { return "Hello " + arg; }
| public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
|}
""".stripMargin)
val excFile = createCompiledClass(className, srcDir, excSource, Seq.empty)
val jarFile = new File(destDir,
s"$packageName-$className-%s.jar".format(System.currentTimeMillis()))
createJar(Seq(excFile), jarFile, directoryPrefix = Some(packageName))
jarFile
}

/**
* Run some code involving jobs submitted to the given context and assert that the jobs spilled.
*/
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io.File
import java.net.{MalformedURLException, URI}
import java.net.{MalformedURLException, URI, URL}
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

Expand All @@ -34,7 +34,7 @@ import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually

import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{MutableURLClassLoader, ThreadUtils, Utils}


class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand Down Expand Up @@ -309,6 +309,34 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(sc.listJars().head.contains(tmpJar.getName))
}

Seq("local_mode", "non_local_mode").foreach { schedulingMode =>
val tempDir = Utils.createTempDir().toString
val master = schedulingMode match {
case "local_mode" => "local"
case "non_local_mode" => "local-cluster[1,1,1024]"
}
val packageName = s"scala_$schedulingMode"
val className = "DummyClass"
val jarURI = TestUtils.createDummyJar(tempDir, packageName, className).toURI

// ensure we reset the classloader after the test completes
val originalClassLoader = Thread.currentThread.getContextClassLoader
try {
// load the exception from the jar
val loader = new MutableURLClassLoader(new Array[URL](0), originalClassLoader)

test(s"jar can be added and used driver side in $schedulingMode") {
sc = new SparkContext(master, "test")
Thread.currentThread().setContextClassLoader(loader)
sc.addJar(jarURI.toString, addToCurrentClassLoader = true)
val cl = Utils.getContextOrSparkClassLoader
cl.loadClass(s"$packageName.$className")
}
} finally {
Thread.currentThread.setContextClassLoader(originalClassLoader)
}
}

test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
try {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
Expand Down
17 changes: 3 additions & 14 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.io._
import java.net.URI
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.Files

Expand Down Expand Up @@ -549,21 +550,9 @@ class SparkSubmitSuite
Seq(sparkHome, "R", "pkg", "tests", "fulltests", "jarTest.R").mkString(File.separator)
assert(new File(rScriptDir).exists)

val tempDir = Utils.createTempDir().getAbsolutePath
// compile a small jar containing a class that will be called from R code.
val tempDir = Utils.createTempDir()
val srcDir = new File(tempDir, "sparkrtest")
srcDir.mkdirs()
val excSource = new JavaSourceFromString(new File(srcDir, "DummyClass").toURI.getPath,
"""package sparkrtest;
|
|public class DummyClass implements java.io.Serializable {
| public static String helloWorld(String arg) { return "Hello " + arg; }
| public static int addStuff(int arg1, int arg2) { return arg1 + arg2; }
|}
""".stripMargin)
val excFile = TestUtils.createCompiledClass("DummyClass", srcDir, excSource, Seq.empty)
val jarFile = new File(tempDir, "sparkRTestJar-%s.jar".format(System.currentTimeMillis()))
val jarURL = TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = Some("sparkrtest"))
val jarURL = TestUtils.createDummyJar(tempDir, "sparkrtest", "DummyClass").toURI.toURL

val args = Seq(
"--name", "testApp",
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,23 @@ def addPyFile(self, path):
import importlib
importlib.invalidate_caches()

def addJar(self, path, addToCurrentClassLoader=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mention that adding a jar to the current class loader is a developer API and may change.

"""
Adds a JAR dependency for Spark tasks to be executed in the future.
The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
If `addToCurrentClassLoader` is true, add the jar to the current threads' class loader
in the backing JVM. In general adding to the current threads' class loader will impact all
other application threads unless they have explicitly changed their class loader.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk and @felixcheung, here I just added the comments back. I thought it's a developer API and might be fine to describe some words related with JVM but .. please let me know if you guys feel we need to take out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we currently use .. note:: DeveloperApi to indicate it's a developer API (see ml/pipeline and friends for an example).


.. note:: `addToCurrentClassLoader` parameter is a developer API, which change or be removed
in minor versions of Spark.

:param path: The path of the jar to be added
:param addToCurrentClassLoader: Whether to add the jar to the current driver class loader.
"""
self._jsc.sc().addJar(path, addToCurrentClassLoader)

def setCheckpointDir(self, dirName):
"""
Set the directory under which RDDs are going to be checkpointed. The
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import hashlib

from py4j.protocol import Py4JJavaError
from py4j.java_gateway import JavaClass
try:
import xmlrunner
except ImportError:
Expand Down Expand Up @@ -435,6 +436,22 @@ def test_add_file_locally(self):
with open(download_path) as test_file:
self.assertEqual("Hello World!\n", test_file.readline())

def test_add_jar(self):
jvm = self.sc._jvm
# We shouldn't be able to load anything from the package before it is added
self.assertFalse(isinstance(jvm.pysparktests.DummyClass, JavaClass))
try:
# Generate and compile the test jar
destDir = tempfile.mkdtemp()
jarPath = jvm.org.apache.spark.TestUtils.createDummyJar(
destDir, "pysparktests", "DummyClass").getAbsolutePath()
# Load the new jar
self.sc.addJar(jarPath, True)
# Try and load the class
self.assertTrue(isinstance(jvm.pysparktests.DummyClass, JavaClass))
finally:
shutil.rmtree(destDir)

def test_add_file_recursively_locally(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello")
self.sc.addFile(path, True)
Expand Down