diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 01e64b6972ae2..9462dfd950bab 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging { val unpickle = new Unpickler iter.flatMap { row => val obj = unpickle.loads(row) + // `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map + // of `Unpickler`. This map is cleared when calling `Unpickler.close()`. + unpickle.close() if (batched) { obj match { case array: Array[Any] => array.toSeq diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 531108738f6c9..6058e94d471e9 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -62,12 +62,11 @@ if sys.version < '3': import cPickle as pickle from itertools import izip as zip, imap as map - pickle_protocol = 2 else: import pickle basestring = unicode = str xrange = range - pickle_protocol = 3 +pickle_protocol = pickle.HIGHEST_PROTOCOL from pyspark import cloudpickle from pyspark.util import _exception_message