diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 01e64b6972ae2..9462dfd950bab 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. + unpickle.close() if (batched) { obj match { case array: Array[Any] => array.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index db3f074ecfbac..1ce7a0f57fd2a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1348,6 +1348,10 @@ private[spark] abstract class SerDeBase { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() if (batched) { obj match { case list: JArrayList[_] => list.asScala diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 0423f2ab3169c..516ee7e7b3084 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -61,13 +61,12 @@ if sys.version < '3': import cPickle as pickle - protocol = 2 from itertools import izip as zip, imap as map else: import pickle - protocol = 3 basestring = unicode = str xrange = range +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message @@ -671,7 +670,7 @@ class PickleSerializer(FramedSerializer): """ def dumps(self, obj): - return pickle.dumps(obj, protocol) + return pickle.dumps(obj, pickle_protocol) if sys.version >= '3': def loads(self, obj, encoding="bytes"): @@ -685,7 +684,7 @@ class CloudPickleSerializer(PickleSerializer): def dumps(self, obj): try: - return cloudpickle.dumps(obj, 2) + return cloudpickle.dumps(obj, pickle_protocol) except pickle.PickleError: raise except Exception as e: diff --git a/python/pyspark/sql/tests/test_serde.py b/python/pyspark/sql/tests/test_serde.py index 8707f46b6a25a..ed4b9a7755879 100644 --- a/python/pyspark/sql/tests/test_serde.py +++ b/python/pyspark/sql/tests/test_serde.py @@ -126,6 +126,16 @@ def test_BinaryType_serialization(self): df = self.spark.createDataFrame(data, schema=schema) df.collect() + def test_int_array_serialization(self): + # Note that this test seems dependent on parallelism. + # This issue is because internal object map in Pyrolite is not cleared after op code + # STOP. If we use protocol 4 to pickle Python objects, op code MEMOIZE will store + # objects in the map. We need to clear up it to make sure next unpickling works on + # clear map. + data = self.spark.sparkContext.parallelize([[1, 2, 3, 4]] * 100, numSlices=12) + df = self.spark.createDataFrame(data, "array") + self.assertEqual(len(list(filter(lambda r: None in r.value, df.collect()))), 0) + if __name__ == "__main__": import unittest diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index d3736d24e5019..eff709ef7f729 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -92,6 +92,10 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi outputIterator.flatMap { pickedResult => val unpickledBatch = unpickle.loads(pickedResult) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. Pyrolite + // doesn't clear it up, so we manually clear it. + unpickle.close() unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala }.map { result => if (udfs.length == 1) {