Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27629][PySpark] Prevent Unpickler from intervening each unpickling #24521

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like close() clears the memoized map, the UnpickleStack, and the input stream. Since the input stream is a ByteArrayStream that's a no-op and is fine. Since each row is independent of the others, I don't see any reason why the other 2 would store anything necessary, so I believe that will be fine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know anything about this, but it looks odd to close the object repeatedly. It may not cause a problem now. What's the downside to using a new object for each row, just performance?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I missed Since each row is independent of the others - so I thought we should manually control memo. I had to look deeper :D. Looks fine. Yes, I guess just it needs an extra clear call for each row.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To use use a new object for each row is also ok. I didn't do it just for (possible) performance concern.

if (batched) {
obj match {
case array: Array[Any] => array.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,10 @@ private[spark] abstract class SerDeBase {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
// doesn't clear it up, so we manually clear it.
unpickle.close()
if (batched) {
obj match {
case list: JArrayList[_] => list.asScala
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@
if sys.version < '3':
import cPickle as pickle
from itertools import izip as zip, imap as map
pickle_protocol = 2
else:
import pickle
basestring = unicode = str
xrange = range
pickle_protocol = 3
pickle_protocol = pickle.HIGHEST_PROTOCOL

from pyspark import cloudpickle
from pyspark.util import _exception_message
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/tests/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ def test_BinaryType_serialization(self):

def test_int_array_serialization(self):
# Note that this test seems dependent on parallelism.
# This issue is because internal object map in Pyrolite is not cleared after op code
# STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store
# objects in the map. We need to clear up it to make sure next unpickling works on
# clear map.
data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12)
df = self.spark.createDataFrame(data, "array<integer>")
self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

outputIterator.flatMap { pickedResult =>
val unpickledBatch = unpickle.loads(pickedResult)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
// doesn't clear it up, so we manually clear it.
unpickle.close()
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
if (udfs.length == 1) {
Expand Down