Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21070][PYSPARK] Attempt to update cloudpickle again #18734

Closed

Conversation

holdenk
Copy link
Contributor

@holdenk holdenk commented Jul 26, 2017

What changes were proposed in this pull request?

Based on #18282 by @rgbkrk this PR attempts to update to the current released cloudpickle and minimize the difference between Spark cloudpickle and "stock" cloud pickle with the goal of eventually using the stock cloud pickle.

Some notable changes:

How was this patch tested?

Existing PySpark unit tests + the unit tests from the cloudpickle project on their own.

rgbkrk and others added 9 commits June 12, 2017 15:19
This brings in fixes and upgrades from the [cloudpickle](https://github.com/cloudpipe/cloudpickle) module, notably:

* Import submodules accessed by pickled functions (cloudpipe/cloudpickle#80)
* Support recursive functions inside closures (cloudpipe/cloudpickle#89, cloudpipe/cloudpickle#90)
* Fix ResourceWarnings and DeprecationWarnings (cloudpipe/cloudpickle#88)
* Assume modules with __file__ attribute are not dynamic (cloudpipe/cloudpickle#85)
* Make cloudpickle Python 3.6 compatible (cloudpipe/cloudpickle#72)
* Allow pickling of builtin methods (cloudpipe/cloudpickle#57)
* Add ability to pickle dynamically created modules (cloudpipe/cloudpickle#52)
* Support method descriptor (cloudpipe/cloudpickle#46)
* No more pickling of closed files, was broken on Python 3 (cloudpipe/cloudpickle#32)
…__transient__ support from PR#110"

This reverts commit 1cfd38f.
…ful cloudpickle fixes.

Revert "Revert "Copy over support work weakset, dynamic classess, and remove __transient__ support from PR#110""

This reverts commit cff6bfb.
@SparkQA
Copy link

SparkQA commented Jul 26, 2017

Test build #79947 has finished for PR 18734 at commit 09cf41e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@holdenk holdenk changed the title [WIP][SPARK-21070][PYSPARK] Attempt to update cloudpickle again [SPARK-21070][PYSPARK] Attempt to update cloudpickle again Jul 26, 2017
@holdenk
Copy link
Contributor Author

holdenk commented Jul 26, 2017

cc @rgbkrk since this is based on your original PR. + @HyukjinKwon

@rgbkrk
Copy link
Contributor

rgbkrk commented Jul 26, 2017

You are my hero, thank you.

@HyukjinKwon
Copy link
Member

I guess this should be fine in general if it simply ports the changes in cloudpickle. Will double check next week (to me, it takes a while to double check).

@rgbkrk, would you mind double checking this one and see if it looks good to you?

@rgbkrk
Copy link
Contributor

rgbkrk commented Jul 27, 2017

Yeah this is looking great, it also brings in the latest from cloudpickle that wasn't in my original patch.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk, LGTM with few questions.

What I checked was few custom changes after 04e44b3 assuming these were started from both cloudpipe/cloudpickle@c4f8851 and cloudpipe/cloudpickle@7aebb7e

I checked e044705, 04e44b3, e044705, 5520418, ee913e6, d489354, dbfc7aa, 20e6280 and 6297697 which I assume we added after that.

modname = pickle.whichmodule(obj, name)
except Exception:
modname = None
modname = pickle.whichmodule(obj, name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk, would it be safe to remove? It looks a custom fix we did to deal with https://issues.apache.org/jira/browse/SPARK-16077 and sounds we should keep this as https://bitbucket.org/gutworth/six/issues/63/importing-six-breaks-pickling describes Python 2.7.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question, the underlying issue was marked resolved in 2014 and from looking at the commit it seems like it should actually be resolved. That being said its true some people might be on a system installed with an old verison of six so perhaps we should keep this workaround.


def inject_numpy(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume removing this is fine per cloudpipe/cloudpickle@1e91fa7.

d.pop('__doc__', None)
# handle property and staticmethod
dd = {}
for k, v in d.items():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume we are fine per the tests added in e044705.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets double check this part with @davies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle re-ping to @davies - do you have an opinion on this?

@HyukjinKwon
Copy link
Member

BTW, I also checked it passes tests with Python 3.6 in my local.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 2, 2017

If we can reach agreement on this I'll see about trying to get our local workarounds upstreamed into cloudpickle.

@rgbkrk
Copy link
Contributor

rgbkrk commented Aug 3, 2017

Workarounds welcomed on cloudpickle. 😄

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this @holdenk , LGTM!

@rgbkrk
Copy link
Contributor

rgbkrk commented Aug 9, 2017

Just a note that we just shipped the fixes from @HyukjinKwon within cloudpickle (as v0.4.0), so we're at least roughly in sync now. 😄

@holdenk
Copy link
Contributor Author

holdenk commented Aug 9, 2017

huzzah! I'm in the middle of getting some code working for a talk tomorrow so I'll circle back on this on Friday. If @davies has any opinions though it would be great to hear them.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 14, 2017

cc @ueshin who I believe is also appropriate to review this.

@ueshin
Copy link
Member

ueshin commented Aug 14, 2017

Thank you for cc-ing me.
I looked over this and it looks good to me except for a comment #18734 (comment).
I was not exactly sure if we could remove these.

@rgbkrk
Copy link
Contributor

rgbkrk commented Aug 14, 2017

I looked over this and it looks good to me except for a comment #18734 (comment). I was not exactly sure if we could remove these.

cloudpickle now has this workaround, so it can stay.

# if func is lambda, def'ed at prompt, is in main, or is nested, then
# we'll pickle the actual function object rather than simply saving a
# reference (as is done in default pickler), via save_function_tuple.
if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
#print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
if (islambda(obj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a side note, it looks this PR includes cloudpipe/cloudpickle#51 too (SPARK-21753).

@SparkQA
Copy link

SparkQA commented Aug 18, 2017

Test build #80826 has finished for PR 18734 at commit 7ad278a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 18, 2017

Test build #80828 has finished for PR 18734 at commit f986c25.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 18, 2017

Test build #80842 has finished for PR 18734 at commit f986c25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Let's go with this. I am quite sure it is worth for a go. We can almost sync it and can deduplucate review costs and etc.. Will go merging this one in few days with another double check if there is no objection here and it is not mergrd by anyone.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 18, 2017

Sounds good @HyukjinKwon :) I think we should consider if we want to upstream the named tuple workaround as well.

@holdenk
Copy link
Contributor Author

holdenk commented Aug 18, 2017

But for now yes I think we are in a good position. If no one objects of course :)

@HyukjinKwon
Copy link
Member

Yea, it looks so. Named tuple one reminds me of the workaround we have for named tuple to make picklable -

def _hijack_namedtuple():
""" Hack namedtuple() to make it picklable """
# hijack only one time
if hasattr(collections.namedtuple, "__hijack"):
return
global _old_namedtuple # or it will put in closure
global _old_namedtuple_kwdefaults # or it will put in closure too
def _copy_func(f):
return types.FunctionType(f.__code__, f.__globals__, f.__name__,
f.__defaults__, f.__closure__)
def _kwdefaults(f):
# __kwdefaults__ contains the default values of keyword-only arguments which are
# introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple
# are as below:
#
# - Does not exist in Python 2.
# - Returns None in <= Python 3.5.x.
# - Returns a dictionary containing the default values to the keys from Python 3.6.x
# (See https://bugs.python.org/issue25628).
kargs = getattr(f, "__kwdefaults__", None)
if kargs is None:
return {}
else:
return kargs
_old_namedtuple = _copy_func(collections.namedtuple)
_old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple)
def namedtuple(*args, **kwargs):
for k, v in _old_namedtuple_kwdefaults.items():
kwargs[k] = kwargs.get(k, v)
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)
# replace namedtuple with new one
collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults
collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
collections.namedtuple.__code__ = namedtuple.__code__
collections.namedtuple.__hijack = 1
# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace

Maybe, we could take a look and see if we could get rid of this or port this. Anyway, let me take a final look.

@HyukjinKwon
Copy link
Member

I am merging this because:

cloudpickle looks initially ported from cloudpipe/cloudpickle@7aebb7e and cloudpipe/cloudpickle@c4f8851 (-> 04e44b3), where I see both are identical.

After 04e44b3, we have diff - e044705, 5520418, ee913e6, d489354, dbfc7aa, 20e6280 and 6297697

[SPARK-9116] [SQL] [PYSPARK] support Python only UDT in main, e044705: I think this part is only what we are worried of. It looks supporting classmethod, staticmethod and property. We have a test:

class PythonOnlyUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
"""
@classmethod
def sqlType(self):
return ArrayType(DoubleType(), False)
@classmethod
def module(cls):
return '__main__'
def serialize(self, obj):
return [obj.x, obj.y]
def deserialize(self, datum):
return PythonOnlyPoint(datum[0], datum[1])
@staticmethod
def foo():
pass
@property
def props(self):
return {}
class PythonOnlyPoint(ExamplePoint):
"""
An example class to demonstrate UDT in only Python
"""
__UDT__ = PythonOnlyUDT()

def test_udt(self):
from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _make_type_verifier
from pyspark.sql.tests import ExamplePointUDT, ExamplePoint
def check_datatype(datatype):
pickled = pickle.loads(pickle.dumps(datatype))
assert datatype == pickled
scala_datatype = self.spark._jsparkSession.parseDataType(datatype.json())
python_datatype = _parse_datatype_json_string(scala_datatype.json())
assert datatype == python_datatype
check_datatype(ExamplePointUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", ExamplePointUDT(), False)])
check_datatype(structtype_with_udt)
p = ExamplePoint(1.0, 2.0)
self.assertEqual(_infer_type(p), ExamplePointUDT())
_make_type_verifier(ExamplePointUDT())(ExamplePoint(1.0, 2.0))
self.assertRaises(ValueError, lambda: _make_type_verifier(ExamplePointUDT())([1.0, 2.0]))
check_datatype(PythonOnlyUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", PythonOnlyUDT(), False)])
check_datatype(structtype_with_udt)
p = PythonOnlyPoint(1.0, 2.0)
self.assertEqual(_infer_type(p), PythonOnlyUDT())
_make_type_verifier(PythonOnlyUDT())(PythonOnlyPoint(1.0, 2.0))
self.assertRaises(
ValueError,
lambda: _make_type_verifier(PythonOnlyUDT())([1.0, 2.0]))

[SPARK-10542] [PYSPARK] fix serialize namedtuple, 5520418: We keep the changes:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L1090-L1095

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L433-L436

and the related test pass:

P = namedtuple("P", "x y")
p1 = P(1, 3)
p2 = loads(dumps(p1, 2))
self.assertEqual(p1, p2)
from pyspark.cloudpickle import dumps
P2 = loads(dumps(P))
p3 = P2(1, 3)
self.assertEqual(p1, p3)

[SPARK-13697] [PYSPARK] Fix the missing module name of TransformFunctionSerializer.loads, ee913e6: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L528

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L1022-L1029

and the related test pass:

def test_function_module_name(self):
ser = CloudPickleSerializer()
func = lambda x: x
func2 = ser.loads(ser.dumps(func))
self.assertEqual(func.__module__, func2.__module__)

We should probably port this one to cloudpipe/cloudpickle.

[SPARK-16077] [PYSPARK] catch the exception from pickle.whichmodule(), d489354: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L325-L330

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L620-L625

This patch even should be safer as I and @rgbkrk verified this with some tests:

cloudpipe/cloudpickle#112

[SPARK-17472] [PYSPARK] Better error message for serialization failures of large objects in Python, dbfc7aa: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L240-L249

Probably, we should port this change into cloudpipe/cloudpickle.

[SPARK-19019] [PYTHON] Fix hijacked collections.namedtuple and port cloudpickle changes for PySpark to work with Python 3.6.0, 20e6280

This change was ported from cloudpipe/cloudpickle. I tested our PySpark tests pass with Python 3.6.0 in my local manually - #18734 (comment)

[SPARK-19505][PYTHON] AttributeError on Exception.message in Python3, 6297697: We keep this change:

https://github.com/holdenk/spark/blob/f986c2591a9a0b6962862c5cdfc33a7d65be7eda/python/pyspark/cloudpickle.py#L240-L249

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Aug 22, 2017

Test build #80950 has finished for PR 18734 at commit f986c25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

LGTM

I am going to credit this to @rgbkrk per (http://spark.apache.org/contributing.html)

In case several people contributed, prefer to assign to the more ‘junior’, non-committer contributor

I just double checked if the tests passes with Python 3.6.0, and if I could run pi example with pypy manually (SPARK-21753), with the current status.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 751f513 Aug 22, 2017
@ushine
Copy link

ushine commented Aug 22, 2017 via email

@HyukjinKwon
Copy link
Member

Oh, it was me cc'ing you mistakenly. Sorry for the noise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants