Skip to content

Commit

Permalink
[SPARK-27629][PYSPARK] Prevent Unpickler from intervening each unpick…
Browse files Browse the repository at this point in the history
…ling

In SPARK-27612, one correctness issue was reported. When protocol 4 is used to pickle Python objects, we found that unpickled objects were wrong. A temporary fix was proposed by not using highest protocol.

It was found that Opcodes.MEMOIZE was appeared in the opcodes in protocol 4. It is suspect to this issue.

A deeper dive found that Opcodes.MEMOIZE stores objects into internal map of Unpickler object. We use single Unpickler object to unpickle serialized Python bytes. Stored objects intervenes next round of unpickling, if the map is not cleared.

We has two options:

1. Continues to reuse Unpickler, but calls its close after each unpickling.
2. Not to reuse Unpickler and create new Unpickler object in each unpickling.

This patch takes option 1.

Passing the test added in SPARK-27612 (apache#24519).

Closes apache#24521 from viirya/SPARK-27629.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
viirya authored and Willi Raschkowski committed Jun 5, 2020
1 parent 80c23bb commit 7999d1c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
if (batched) {
obj match {
case array: Array[Any] => array.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,10 @@ private[spark] abstract class SerDeBase {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
// doesn't clear it up, so we manually clear it.
unpickle.close()
if (batched) {
obj match {
case list: JArrayList[_] => list.asScala
Expand Down
7 changes: 3 additions & 4 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@

if sys.version < '3':
import cPickle as pickle
protocol = 2
from itertools import izip as zip, imap as map
else:
import pickle
protocol = 3
basestring = unicode = str
xrange = range
pickle_protocol = pickle.HIGHEST_PROTOCOL

from pyspark import cloudpickle
from pyspark.util import _exception_message
Expand Down Expand Up @@ -671,7 +670,7 @@ class PickleSerializer(FramedSerializer):
"""

def dumps(self, obj):
return pickle.dumps(obj, protocol)
return pickle.dumps(obj, pickle_protocol)

if sys.version >= '3':
def loads(self, obj, encoding="bytes"):
Expand All @@ -685,7 +684,7 @@ class CloudPickleSerializer(PickleSerializer):

def dumps(self, obj):
try:
return cloudpickle.dumps(obj, 2)
return cloudpickle.dumps(obj, pickle_protocol)
except pickle.PickleError:
raise
except Exception as e:
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/tests/test_serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ def test_BinaryType_serialization(self):
df = self.spark.createDataFrame(data, schema=schema)
df.collect()

def test_int_array_serialization(self):
# Note that this test seems dependent on parallelism.
# This issue is because internal object map in Pyrolite is not cleared after op code
# STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store
# objects in the map. We need to clear up it to make sure next unpickling works on
# clear map.
data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12)
df = self.spark.createDataFrame(data, "array<integer>")
self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0)


if __name__ == "__main__":
import unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi

outputIterator.flatMap { pickedResult =>
val unpickledBatch = unpickle.loads(pickedResult)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite
// doesn't clear it up, so we manually clear it.
unpickle.close()
unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
}.map { result =>
if (udfs.length == 1) {
Expand Down

0 comments on commit 7999d1c

Please sign in to comment.