diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index b1a8af6bcc09..4b9da60e29b5 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -618,8 +618,7 @@ def assignments(self): """ Returns the cluster assignments of this model. """ - return self.call("getAssignments").map( - lambda x: (PowerIterationClustering.Assignment(*x))) + return self.call("getAssignments").map(lambda x: Assignment(*x)) @classmethod @since('1.5.0') @@ -633,6 +632,12 @@ def load(cls, sc, path): return PowerIterationClusteringModel(wrapper) +#: Represents an (id, cluster) tuple. +#: +#: .. versionadded:: 1.5.0 +Assignment = namedtuple("Assignment", ["id", "cluster"]) + + class PowerIterationClustering(object): """ Power Iteration Clustering (PIC), a scalable graph clustering algorithm @@ -671,12 +676,8 @@ def train(cls, rdd, k, maxIterations=100, initMode="random"): rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode) return PowerIterationClusteringModel(model) - class Assignment(namedtuple("Assignment", ["id", "cluster"])): - """ - Represents an (id, cluster) tuple. - - .. versionadded:: 1.5.0 - """ + # Backward-compatible alias. + Assignment = Assignment class StreamingKMeansModel(KMeansModel): diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py index de18dad1f675..7cd247873f28 100644 --- a/python/pyspark/mllib/fpm.py +++ b/python/pyspark/mllib/fpm.py @@ -55,7 +55,7 @@ def freqItemsets(self): """ Returns the frequent itemsets of this model. """ - return self.call("getFreqItemsets").map(lambda x: (FPGrowth.FreqItemset(x[0], x[1]))) + return self.call("getFreqItemsets").map(lambda x: (FreqItemset(x[0], x[1]))) @classmethod @since("2.0.0") @@ -68,6 +68,12 @@ def load(cls, sc, path): return FPGrowthModel(wrapper) +#: Represents an (items, freq) tuple. +#: +#: .. versionadded:: 1.4.0 +FreqItemset = namedtuple("FreqItemset", ["items", "freq"]) + + class FPGrowth(object): """ A Parallel FP-growth algorithm to mine frequent itemsets. @@ -94,12 +100,8 @@ def train(cls, data, minSupport=0.3, numPartitions=-1): model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions)) return FPGrowthModel(model) - class FreqItemset(namedtuple("FreqItemset", ["items", "freq"])): - """ - Represents an (items, freq) tuple. - - .. versionadded:: 1.4.0 - """ + # Backward-compatible alias. + FreqItemset = FreqItemset @inherit_doc @@ -124,7 +126,13 @@ class PrefixSpanModel(JavaModelWrapper): @since("1.6.0") def freqSequences(self): """Gets frequent sequences""" - return self.call("getFreqSequences").map(lambda x: PrefixSpan.FreqSequence(x[0], x[1])) + return self.call("getFreqSequences").map(lambda x: FreqSequence(x[0], x[1])) + + +#: Represents a (sequence, freq) tuple. +#: +#: .. versionadded:: 1.6.0 +FreqSequence = namedtuple("FreqSequence", ["sequence", "freq"]) class PrefixSpan(object): @@ -167,12 +175,8 @@ def train(cls, data, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=320 data, minSupport, maxPatternLength, maxLocalProjDBSize) return PrefixSpanModel(model) - class FreqSequence(namedtuple("FreqSequence", ["sequence", "freq"])): - """ - Represents a (sequence, freq) tuple. - - .. versionadded:: 1.6.0 - """ + # Backward-compatible alias. + FreqSequence = FreqSequence def _test(): diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ff9a612b77f6..e01a5522d6ca 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -54,8 +54,6 @@ from itertools import chain, product import marshal import struct -import types -import collections import zlib import itertools @@ -474,90 +472,6 @@ def dumps(self, obj): return obj -# Hack namedtuple, make it picklable - -__cls = {} - - -def _restore(name, fields, value): - """ Restore an object of namedtuple""" - k = (name, fields) - cls = __cls.get(k) - if cls is None: - cls = collections.namedtuple(name, fields) - __cls[k] = cls - return cls(*value) - - -def _hack_namedtuple(cls): - """ Make class generated by namedtuple picklable """ - name = cls.__name__ - fields = cls._fields - - def __reduce__(self): - return (_restore, (name, fields, tuple(self))) - cls.__reduce__ = __reduce__ - cls._is_namedtuple_ = True - return cls - - -def _hijack_namedtuple(): - """ Hack namedtuple() to make it picklable """ - # hijack only one time - if hasattr(collections.namedtuple, "__hijack"): - return - - global _old_namedtuple # or it will put in closure - global _old_namedtuple_kwdefaults # or it will put in closure too - - def _copy_func(f): - return types.FunctionType(f.__code__, f.__globals__, f.__name__, - f.__defaults__, f.__closure__) - - def _kwdefaults(f): - # __kwdefaults__ contains the default values of keyword-only arguments which are - # introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple - # are as below: - # - # - Does not exist in Python 2. - # - Returns None in <= Python 3.5.x. - # - Returns a dictionary containing the default values to the keys from Python 3.6.x - # (See https://bugs.python.org/issue25628). - kargs = getattr(f, "__kwdefaults__", None) - if kargs is None: - return {} - else: - return kargs - - _old_namedtuple = _copy_func(collections.namedtuple) - _old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple) - - def namedtuple(*args, **kwargs): - for k, v in _old_namedtuple_kwdefaults.items(): - kwargs[k] = kwargs.get(k, v) - cls = _old_namedtuple(*args, **kwargs) - return _hack_namedtuple(cls) - - # replace namedtuple with the new one - collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults - collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple - collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple - collections.namedtuple.__code__ = namedtuple.__code__ - collections.namedtuple.__hijack = 1 - - # hack the cls already generated by namedtuple. - # Those created in other modules can be pickled as normal, - # so only hack those in __main__ module - for n, o in sys.modules["__main__"].__dict__.items(): - if (type(o) is type and o.__base__ is tuple - and hasattr(o, "_fields") - and "__reduce__" not in o.__dict__): - _hack_namedtuple(o) # hack inplace - - -_hijack_namedtuple() - - class PickleSerializer(FramedSerializer): """ diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b88a6551f8ae..1a77461cf494 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1170,14 +1170,6 @@ def test_infer_nested_schema(self): df = self.spark.createDataFrame(nestedRdd2) self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0]) - from collections import namedtuple - CustomRow = namedtuple('CustomRow', 'field1 field2') - rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"), - CustomRow(field1=2, field2="row2"), - CustomRow(field1=3, field2="row3")]) - df = self.spark.createDataFrame(rdd) - self.assertEqual(Row(field1=1, field2=u'row1'), df.first()) - def test_create_dataframe_from_dict_respects_schema(self): df = self.spark.createDataFrame([{'a': 1}], ["b"]) self.assertEqual(df.columns, ['b']) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 050c2dd01836..a80d5fa890b2 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -236,19 +236,6 @@ def test_external_sort_in_rdd(self): class SerializationTestCase(unittest.TestCase): - def test_namedtuple(self): - from collections import namedtuple - from pickle import dumps, loads - P = namedtuple("P", "x y") - p1 = P(1, 3) - p2 = loads(dumps(p1, 2)) - self.assertEqual(p1, p2) - - from pyspark.cloudpickle import dumps - P2 = loads(dumps(P)) - p3 = P2(1, 3) - self.assertEqual(p1, p3) - def test_itemgetter(self): from operator import itemgetter ser = CloudPickleSerializer() @@ -902,6 +889,7 @@ def test_itemgetter(self): def test_namedtuple_in_rdd(self): from collections import namedtuple + global Person Person = namedtuple("Person", "id firstName lastName") jon = Person(1, "Jon", "Doe") jane = Person(2, "Jane", "Doe")