Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9</version>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.2
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.jar
py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.jar
py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.4
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.jar
py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
py4j-0.9.jar
py4j-0.9.1.jar
pyrolite-4.9.jar
quasiquotes_2.10-2.0.0-M8.jar
reflectasm-1.07-shaded.jar
Expand Down
2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)

# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
Expand Down
Binary file removed python/lib/py4j-0.9-src.zip
Binary file not shown.
Binary file added python/lib/py4j-0.9.1-src.zip
Binary file not shown.
89 changes: 3 additions & 86 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import os
import sys
from threading import RLock, Timer

from py4j.java_gateway import java_import, JavaObject

Expand All @@ -33,63 +32,6 @@
__all__ = ["StreamingContext"]


class Py4jCallbackConnectionCleaner(object):

"""
A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
It will scan all callback connections every 30 seconds and close the dead connections.
"""

def __init__(self, gateway):
self._gateway = gateway
self._stopped = False
self._timer = None
self._lock = RLock()

def start(self):
if self._stopped:
return

def clean_closed_connections():
from py4j.java_gateway import quiet_close, quiet_shutdown

callback_server = self._gateway._callback_server
if callback_server:
with callback_server.lock:
try:
closed_connections = []
for connection in callback_server.connections:
if not connection.isAlive():
quiet_close(connection.input)
quiet_shutdown(connection.socket)
quiet_close(connection.socket)
closed_connections.append(connection)

for closed_connection in closed_connections:
callback_server.connections.remove(closed_connection)
except Exception:
import traceback
traceback.print_exc()

self._start_timer(clean_closed_connections)

self._start_timer(clean_closed_connections)

def _start_timer(self, f):
with self._lock:
if not self._stopped:
self._timer = Timer(30.0, f)
self._timer.daemon = True
self._timer.start()

def stop(self):
with self._lock:
self._stopped = True
if self._timer:
self._timer.cancel()
self._timer = None


class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
Expand All @@ -105,9 +47,6 @@ class StreamingContext(object):
# Reference to a currently active StreamingContext
_activeContext = None

# A cleaner to clean leak sockets of callback server every 30 seconds
_py4j_cleaner = None

def __init__(self, sparkContext, batchDuration=None, jssc=None):
"""
Create a new StreamingContext.
Expand Down Expand Up @@ -155,34 +94,12 @@ def _ensure_initialized(cls):
# get the GatewayServer object in JVM by ID
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
_py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
_py4j_cleaner.start()
jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port)

# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing
if cls._transformerSerializer is None:
transformer_serializer = TransformFunctionSerializer()
transformer_serializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
# SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM
# There is an issue that Py4J's PythonProxyHandler.finalize blocks forever.
# (https://github.com/bartdag/py4j/pull/184)
#
# Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when
# calling "registerSerializer". If we call "registerSerializer" twice, the second
# PythonProxyHandler will override the first one, then the first one will be GCed and
# trigger "PythonProxyHandler.finalize". To avoid that, we should not call
# "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't
# be GCed.
#
# TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version.
transformer_serializer.gateway.jvm.PythonDStream.registerSerializer(
transformer_serializer)
cls._transformerSerializer = transformer_serializer
else:
cls._transformerSerializer.init(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)
cls._transformerSerializer = TransformFunctionSerializer(
SparkContext._active_spark_context, CloudPickleSerializer(), gw)

@classmethod
def getOrCreate(cls, checkpointPath, setupFunc):
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/streaming/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ class TransformFunctionSerializer(object):
it uses this class to invoke Python, which returns the serialized function
as a byte array.
"""
def init(self, ctx, serializer, gateway=None):
def __init__(self, ctx, serializer, gateway=None):
self.ctx = ctx
self.serializer = serializer
self.gateway = gateway or self.ctx._gateway
self.gateway.jvm.PythonDStream.registerSerializer(self)
self.failure = None

def dumps(self, id):
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@ private[python] object PythonDStream {
PythonTransformFunctionSerializer.register(ser)
}

/**
* Update the port of callback client to `port`
*/
def updatePythonGatewayPort(gws: GatewayServer, port: Int): Unit = {
val cl = gws.getCallbackClient
val f = cl.getClass.getDeclaredField("port")
f.setAccessible(true)
f.setInt(cl, port)
}

/**
* helper function for DStream.foreachRDD(),
* cannot be `foreachRDD`, it will confusing py4j
Expand Down
4 changes: 2 additions & 2 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1044,9 +1044,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
val py4jFile = new File(pyLibPath, "py4j-0.9-src.zip")
val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
require(py4jFile.exists(),
"py4j-0.9-src.zip not found; cannot run pyspark application in YARN mode.")
"py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// When running tests, let's not assume the user has built the assembly module, which also
// creates the pyspark archive. Instead, let's use PYSPARK_ARCHIVES_PATH to point at the
// needed locations.
val sparkHome = sys.props("spark.test.home");
val sparkHome = sys.props("spark.test.home")
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.9-src.zip",
s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
Expand Down