From ffd475fb10b8d82c5c76ab80321e42cc4484ef2c Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Wed, 25 Apr 2018 16:45:25 +0200 Subject: [PATCH 1/8] [SPARK-22674][PYTHON] Removed the namedtuple pickling patch This is a breaking change. Prior to this commit PySpark patched ``collections.namedtuple`` to make namedtuple instances serializable even if the namedtuple class has been defined outside of ``globals()``, e.g. def do_something(): Foo = namedtuple("Foo", ["foo"]) sc.parallelize(range(1)).map(lambda _: Foo(42)) The patch changed the pickled representation of the namedtuple instance to include the structure of namedtuple class, and recreate the class on each unpickling. This behaviour causes hard to diagnose failures both in the user code with namedtuples, as well as third-party libraries relying on them. See [1] and [2] for details. [1]: https://superbobry.github.io/pyspark-silently-breaks-your-namedtuples.html [2]: https://superbobry.github.io/tensorflowonspark-or-the-namedtuple-patch-strikes-again.html --- python/pyspark/serializers.py | 86 ----------------------------------- python/pyspark/tests.py | 13 ------ 2 files changed, 99 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ff9a612b77f61..e01a5522d6ca7 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -54,8 +54,6 @@ from itertools import chain, product import marshal import struct -import types -import collections import zlib import itertools @@ -474,90 +472,6 @@ def dumps(self, obj): return obj -# Hack namedtuple, make it picklable - -__cls = {} - - -def _restore(name, fields, value): - """ Restore an object of namedtuple""" - k = (name, fields) - cls = __cls.get(k) - if cls is None: - cls = collections.namedtuple(name, fields) - __cls[k] = cls - return cls(*value) - - -def _hack_namedtuple(cls): - """ Make class generated by namedtuple picklable """ - name = cls.__name__ - fields = cls._fields - - def __reduce__(self): - return (_restore, (name, fields, tuple(self))) - cls.__reduce__ = __reduce__ - cls._is_namedtuple_ = True - return cls - - -def _hijack_namedtuple(): - """ Hack namedtuple() to make it picklable """ - # hijack only one time - if hasattr(collections.namedtuple, "__hijack"): - return - - global _old_namedtuple # or it will put in closure - global _old_namedtuple_kwdefaults # or it will put in closure too - - def _copy_func(f): - return types.FunctionType(f.__code__, f.__globals__, f.__name__, - f.__defaults__, f.__closure__) - - def _kwdefaults(f): - # __kwdefaults__ contains the default values of keyword-only arguments which are - # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple - # are as below: - # - # - Does not exist in Python 2. - # - Returns None in <= Python 3.5.x. - # - Returns a dictionary containing the default values to the keys from Python 3.6.x - # (See https://bugs.python.org/issue25628). - kargs = getattr(f, "__kwdefaults__", None) - if kargs is None: - return {} - else: - return kargs - - _old_namedtuple = _copy_func(collections.namedtuple) - _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple) - - def namedtuple(*args, **kwargs): - for k, v in _old_namedtuple_kwdefaults.items(): - kwargs[k] = kwargs.get(k, v) - cls = _old_namedtuple(*args, **kwargs) - return _hack_namedtuple(cls) - - # replace namedtuple with the new one - collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults - collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple - collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple - collections.namedtuple.__code__ = namedtuple.__code__ - collections.namedtuple.__hijack = 1 - - # hack the cls already generated by namedtuple. - # Those created in other modules can be pickled as normal, - # so only hack those in __main__ module - for n, o in sys.modules["__main__"].__dict__.items(): - if (type(o) is type and o.__base__ is tuple - and hasattr(o, "_fields") - and "__reduce__" not in o.__dict__): - _hack_namedtuple(o) # hack inplace - - -_hijack_namedtuple() - - class PickleSerializer(FramedSerializer): """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 050c2dd018360..3fb1cb2e50a63 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -236,19 +236,6 @@ def test_external_sort_in_rdd(self): class SerializationTestCase(unittest.TestCase): - def test_namedtuple(self): - from collections import namedtuple - from pickle import dumps, loads - P = namedtuple("P", "x y") - p1 = P(1, 3) - p2 = loads(dumps(p1, 2)) - self.assertEqual(p1, p2) - - from pyspark.cloudpickle import dumps - P2 = loads(dumps(P)) - p3 = P2(1, 3) - self.assertEqual(p1, p3) - def test_itemgetter(self): from operator import itemgetter ser = CloudPickleSerializer() From 0dc139107f34dea9c83d9ce5a7891bcbb88ddb0e Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Thu, 26 Apr 2018 11:45:55 +0200 Subject: [PATCH 2/8] Fixed test_namedtuple_in_rdd --- python/pyspark/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3fb1cb2e50a63..a80d5fa890b2d 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -889,6 +889,7 @@ def test_itemgetter(self): def test_namedtuple_in_rdd(self): from collections import namedtuple + global Person Person = namedtuple("Person", "id firstName lastName") jon = Person(1, "Jon", "Doe") jane = Person(2, "Jane", "Doe") From 7f2ad870dc82e058a76403099e2fc98d9f62e899 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Thu, 26 Apr 2018 12:43:47 +0200 Subject: [PATCH 3/8] Fixed test_infer_nested_schema --- python/pyspark/sql/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b88a6551f8ae5..0c8d9108c6820 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1171,6 +1171,7 @@ def test_infer_nested_schema(self): self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0]) from collections import namedtuple + global CustomRow CustomRow = namedtuple('CustomRow', 'field1 field2') rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"), CustomRow(field1=2, field2="row2"), From ac6c7bce70b1929d54162090a0cb5df0cf53041a Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Fri, 28 Sep 2018 13:58:38 +0200 Subject: [PATCH 4/8] Made FreqItemset picklable --- python/pyspark/mllib/fpm.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index de18dad1f675d..eb8f825e9d418 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -68,6 +68,12 @@ def load(cls, sc, path): return FPGrowthModel(wrapper) +#: Represents an (items, freq) tuple. +#: +#: .. versionadded:: 1.4.0 +FreqItemset = namedtuple("FreqItemset", ["items", "freq"]) + + class FPGrowth(object): """ A Parallel FP-growth algorithm to mine frequent itemsets. @@ -94,12 +100,9 @@ def train(cls, data, minSupport=0.3, numPartitions=-1): model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) return FPGrowthModel(model) - class FreqItemset(namedtuple("FreqItemset", ["items", "freq"])): - """ - Represents an (items, freq) tuple. + # Backward-compatible alias. + FreqItemset = FreqItemset - .. versionadded:: 1.4.0 - """ @inherit_doc From 65dc2662287c64fdd26e7d46f5da5354e4e9eff6 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Fri, 28 Sep 2018 15:24:42 +0200 Subject: [PATCH 5/8] Removed extra whitespace in pyspark.mllib.fpm --- python/pyspark/mllib/fpm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index eb8f825e9d418..151b5a500830b 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -104,7 +104,6 @@ def train(cls, data, minSupport=0.3, numPartitions=-1): FreqItemset = FreqItemset - @inherit_doc @ignore_unicode_prefix class PrefixSpanModel(JavaModelWrapper): From 765362efb1f40fde3820704fee1628c90d6c0566 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Fri, 28 Sep 2018 16:20:08 +0200 Subject: [PATCH 6/8] Removed CustomRow test case from SQLTests Now that _hijack_namedtuple has been removed, only globally defined importable namedtuples can be used inside RDDs and datasets. --- python/pyspark/sql/tests.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0c8d9108c6820..1a77461cf494c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1170,15 +1170,6 @@ def test_infer_nested_schema(self): df = self.spark.createDataFrame(nestedRdd2) self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0]) - from collections import namedtuple - global CustomRow - CustomRow = namedtuple('CustomRow', 'field1 field2') - rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"), - CustomRow(field1=2, field2="row2"), - CustomRow(field1=3, field2="row3")]) - df = self.spark.createDataFrame(rdd) - self.assertEqual(Row(field1=1, field2=u'row1'), df.first()) - def test_create_dataframe_from_dict_respects_schema(self): df = self.spark.createDataFrame([{'a': 1}], ["b"]) self.assertEqual(df.columns, ['b']) From 9c690474986be63e41eed7aba59399c5a154b72e Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Mon, 1 Oct 2018 11:36:17 +0200 Subject: [PATCH 7/8] Made FreqSequence picklable --- python/pyspark/mllib/fpm.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index 151b5a500830b..7cd247873f286 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -55,7 +55,7 @@ def freqItemsets(self): """ Returns the frequent itemsets of this model. """ - return self.call("getFreqItemsets").map(lambda x: (FPGrowth.FreqItemset(x[0], x[1]))) + return self.call("getFreqItemsets").map(lambda x: (FreqItemset(x[0], x[1]))) @classmethod @since("2.0.0") @@ -126,7 +126,13 @@ class PrefixSpanModel(JavaModelWrapper): @since("1.6.0") def freqSequences(self): """Gets frequent sequences""" - return self.call("getFreqSequences").map(lambda x: PrefixSpan.FreqSequence(x[0], x[1])) + return self.call("getFreqSequences").map(lambda x: FreqSequence(x[0], x[1])) + + +#: Represents a (sequence, freq) tuple. +#: +#: .. versionadded:: 1.6.0 +FreqSequence = namedtuple("FreqSequence", ["sequence", "freq"]) class PrefixSpan(object): @@ -169,12 +175,8 @@ def train(cls, data, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=320 data, minSupport, maxPatternLength, maxLocalProjDBSize) return PrefixSpanModel(model) - class FreqSequence(namedtuple("FreqSequence", ["sequence", "freq"])): - """ - Represents a (sequence, freq) tuple. - - .. versionadded:: 1.6.0 - """ + # Backward-compatible alias. + FreqSequence = FreqSequence def _test(): From 2addefb9ae2ec12f30e0e939f7782b521efccdd0 Mon Sep 17 00:00:00 2001 From: Sergei Lebedev Date: Mon, 1 Oct 2018 14:01:19 +0200 Subject: [PATCH 8/8] Made Assignment picklable --- python/pyspark/mllib/clustering.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b1a8af6bcc094..4b9da60e29b5b 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -618,8 +618,7 @@ def assignments(self): """ Returns the cluster assignments of this model. """ - return self.call("getAssignments").map( - lambda x: (PowerIterationClustering.Assignment(*x))) + return self.call("getAssignments").map(lambda x: Assignment(*x)) @classmethod @since('1.5.0') @@ -633,6 +632,12 @@ def load(cls, sc, path): return PowerIterationClusteringModel(wrapper) +#: Represents an (id, cluster) tuple. +#: +#: .. versionadded:: 1.5.0 +Assignment = namedtuple("Assignment", ["id", "cluster"]) + + class PowerIterationClustering(object): """ Power Iteration Clustering (PIC), a scalable graph clustering algorithm @@ -671,12 +676,8 @@ def train(cls, rdd, k, maxIterations=100, initMode="random"): rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode) return PowerIterationClusteringModel(model) - class Assignment(namedtuple("Assignment", ["id", "cluster"])): - """ - Represents an (id, cluster) tuple. - - .. versionadded:: 1.5.0 - """ + # Backward-compatible alias. + Assignment = Assignment class StreamingKMeansModel(KMeansModel):