Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.2 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.2-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9.1</version>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.2-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.1.jar
py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ object KafkaUtils {
/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
* instantiated and called from Python's KafkaUtils.
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
Expand Down
2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.2-src.zip)

# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
Expand Down
Binary file not shown.
9 changes: 3 additions & 6 deletions python/pyspark/streaming/flume.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,9 @@ def func(event):
@staticmethod
def _get_helper(sc):
try:
helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
return helperClass.newInstance()
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
FlumeUtils._printErrorMsg(sc)
raise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's still keep this check. For other errors (e.g., the py4j java server is down), we should not call _printErrorMsg as it's confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change because the call now fails with a different set of exceptions (such as "attempting to call a package") and wanted to err on the side of over-displaying the warning message. Let me try to figure out a narrower exception pattern match.

Expand Down
10 changes: 3 additions & 7 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,9 @@ def funcWithMessageHandler(m):
@staticmethod
def _get_helper(sc):
try:
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
return helperClass.newInstance()
except Py4JJavaError as e:
# TODO: use --jar once it also work on driver
if 'ClassNotFoundException' in str(e.java_exception):
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
KafkaUtils._printErrorMsg(sc)
raise

Expand Down
14 changes: 6 additions & 8 deletions python/pyspark/streaming/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,14 @@ def createStream(ssc, kinesisAppName, streamName, endpointUrl, regionName,

try:
# Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
awsAccessKeyId, awsSecretKey)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
KinesisUtils._printErrorMsg(ssc.sparkContext)
raise
jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
regionName, initialPositionInStream, jduration, jlevel,
awsAccessKeyId, awsSecretKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment

stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))

Expand Down
13 changes: 5 additions & 8 deletions python/pyspark/streaming/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,15 @@ def createStream(ssc, brokerUrl, topic,
:param storageLevel: RDD storage level.
:return: A DStream object
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)

try:
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
helper = helperClass.newInstance()
jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
except Py4JJavaError as e:
if 'ClassNotFoundException' in str(e.java_exception):
helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
except TypeError as e:
if str(e) == "'JavaPackage' object is not callable":
MQTTUtils._printErrorMsg(ssc.sparkContext)
raise

jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the above comment

return DStream(jstream, ssc, UTF8Deserializer())

@staticmethod
Expand Down
25 changes: 5 additions & 20 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(KafkaStreamTests, self).setUp()

kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
.loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils()
self._kafkaTestUtils.setup()

def tearDown(self):
Expand Down Expand Up @@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(FlumeStreamTests, self).setUp()

utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
self._utils = utilsClz.newInstance()
self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()

def tearDown(self):
if self._utils is not None:
Expand Down Expand Up @@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
maxAttempts = 5

def setUp(self):
utilsClz = \
self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
self._utils = utilsClz.newInstance()
self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()

def tearDown(self):
if self._utils is not None:
Expand Down Expand Up @@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase):

def setUp(self):
super(MQTTStreamTests, self).setUp()

MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
self._MQTTTestUtils.setup()

def tearDown(self):
Expand Down Expand Up @@ -1498,10 +1486,7 @@ def test_kinesis_stream(self):

import random
kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
kinesisTestUtilsClz = \
self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
.loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
kinesisTestUtils = kinesisTestUtilsClz.newInstance()
kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils()
try:
kinesisTestUtils.createStream()
aWSCredentials = kinesisTestUtils.getAWSCredentials()
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:${PYTHONPATH}"
4 changes: 2 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
val py4jFile = new File(pyLibPath, "py4j-0.9.2-src.zip")
require(py4jFile.exists(),
"py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
"py4j-0.9.2-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// needed locations.
val sparkHome = sys.props("spark.test.home")
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
s"$sparkHome/python/lib/py4j-0.9.2-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
Expand Down