Skip to content

Commit

Permalink
Prevent Unpickler from intervening each unpickling by calling close.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed May 3, 2019
1 parent 5c47924 commit d7312fb
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
if (batched) {
obj match {
case array: Array[Any] => array.toSeq
Expand Down
3 changes: 1 addition & 2 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@
if sys.version < '3':
import cPickle as pickle
from itertools import izip as zip, imap as map
pickle_protocol = 2
else:
import pickle
basestring = unicode = str
xrange = range
pickle_protocol = 3
pickle_protocol = pickle.HIGHEST_PROTOCOL

from pyspark import cloudpickle
from pyspark.util import _exception_message
Expand Down

0 comments on commit d7312fb

Please sign in to comment.