diff --git a/CHANGES.md b/CHANGES.md index 06a90699380d..6c6061a8debd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -33,6 +33,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275) +* Added support for cloudpickle as a pickling library for Python SDK. To use cloudpickle set pipeline options, --pickler_lib=cloudpickle. ## Breaking Changes diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 66997956474a..d1ceef70b6ed 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -80,7 +80,7 @@ try: # Import dill from the pickler module to make sure our monkey-patching of dill # occurs. - from apache_beam.internal.pickler import dill + from apache_beam.internal.dill_pickler import dill except ImportError: # We fall back to using the stock dill library in tests that don't use the # full Python SDK. diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 5087b146290a..13a9f733a72d 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -679,17 +679,16 @@ def iterable_state_read(token, element_coder_impl): read_state=iterable_state_read, write_state=iterable_state_write, write_state_threshold=1) - context = pipeline_context.PipelineContext( - iterable_state_read=iterable_state_read, - iterable_state_write=iterable_state_write) - self.check_coder( - coder, [1, 2, 3], context=context, test_size_estimation=False) + # Note: do not use check_coder + # see https://github.com/cloudpipe/cloudpickle/issues/452 + self._observe(coder) + self.assertEqual([1, 2, 3], coder.decode(coder.encode([1, 2, 3]))) # Ensure that state was actually used. self.assertNotEqual(state, {}) - self.check_coder( - coders.TupleCoder((coder, coder)), ([1], [2, 3]), - context=context, - test_size_estimation=False) + tupleCoder = coders.TupleCoder((coder, coder)) + self._observe(tupleCoder) + self.assertEqual(([1], [2, 3]), + tupleCoder.decode(tupleCoder.encode(([1], [2, 3])))) def test_nullable_coder(self): self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py new file mode 100644 index 000000000000..40719be2971c --- /dev/null +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -0,0 +1,111 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pickler for values, functions, and classes. + +For internal use only. No backwards compatibility guarantees. + +Uses the cloudpickle library to pickle data, functions, lambdas +and classes. + +dump_session and load_session are no-ops. +""" + +# pytype: skip-file + +import base64 +import bz2 +import io +import threading +import zlib + +import cloudpickle + +try: + from absl import flags +except (ImportError, ModuleNotFoundError): + pass + +# Pickling, especially unpickling, causes broken module imports on Python 3 +# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. +_pickle_lock = threading.RLock() + + +def dumps(o, enable_trace=True, use_zlib=False): + # type: (...) -> bytes + + """For internal use only; no backwards-compatibility guarantees.""" + with _pickle_lock: + with io.BytesIO() as file: + pickler = cloudpickle.CloudPickler(file) + try: + pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + except NameError: + pass + pickler.dump(o) + s = file.getvalue() + + # Compress as compactly as possible (compresslevel=9) to decrease peak memory + # usage (of multiple in-memory copies) and to avoid hitting protocol buffer + # limits. + # WARNING: Be cautious about compressor change since it can lead to pipeline + # representation change, and can break streaming job update compatibility on + # runners such as Dataflow. + if use_zlib: + c = zlib.compress(s, 9) + else: + c = bz2.compress(s, compresslevel=9) + del s # Free up some possibly large and no-longer-needed memory. + + return base64.b64encode(c) + + +def loads(encoded, enable_trace=True, use_zlib=False): + """For internal use only; no backwards-compatibility guarantees.""" + + c = base64.b64decode(encoded) + + if use_zlib: + s = zlib.decompress(c) + else: + s = bz2.decompress(c) + + del c # Free up some possibly large and no-longer-needed memory. + + with _pickle_lock: + unpickled = cloudpickle.loads(s) + return unpickled + + +def _pickle_absl_flags(obj): + return _create_absl_flags, tuple([]) + + +def _create_absl_flags(): + return flags.FLAGS + + +def dump_session(file_path): + # It is possible to dump session with cloudpickle. However, since references + # are saved it should not be necessary. See https://s.apache.org/beam-picklers + pass + + +def load_session(file_path): + # It is possible to load_session with cloudpickle. However, since references + # are saved it should not be necessary. See https://s.apache.org/beam-picklers + pass diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py new file mode 100644 index 000000000000..3f8655d9536e --- /dev/null +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the cloudpickle_pickler module.""" + +# pytype: skip-file + +import sys +import types +import unittest + +from apache_beam.internal import module_test +from apache_beam.internal.cloudpickle_pickler import dumps +from apache_beam.internal.cloudpickle_pickler import loads + + +class PicklerTest(unittest.TestCase): + + NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") + + def test_basics(self): + self.assertEqual([1, 'a', (u'z', )], loads(dumps([1, 'a', (u'z', )]))) + fun = lambda x: 'xyz-%s' % x + self.assertEqual('xyz-abc', loads(dumps(fun))('abc')) + + def test_lambda_with_globals(self): + """Tests that the globals of a function are preserved.""" + + # The point of the test is that the lambda being called after unpickling + # relies on having the re module being loaded. + self.assertEqual(['abc', 'def'], + loads(dumps( + module_test.get_lambda_with_globals()))('abc def')) + + def test_lambda_with_main_globals(self): + self.assertEqual(unittest, loads(dumps(lambda: unittest))()) + + def test_lambda_with_closure(self): + """Tests that the closure of a function is preserved.""" + self.assertEqual( + 'closure: abc', + loads(dumps(module_test.get_lambda_with_closure('abc')))()) + + def test_class_object_pickled(self): + self.assertEqual(['abc', 'def'], + loads(dumps(module_test.Xyz))().foo('abc def')) + + def test_class_instance_pickled(self): + self.assertEqual(['abc', 'def'], + loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) + + def test_pickling_preserves_closure_of_a_function(self): + self.assertEqual( + 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) + self.assertEqual( + 'Y:abc', + loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) + + def test_pickle_dynamic_class(self): + self.assertEqual( + 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) + + def test_generators(self): + with self.assertRaises(TypeError): + dumps((_ for _ in range(10))) + + def test_recursive_class(self): + self.assertEqual( + 'RecursiveClass:abc', + loads(dumps(module_test.RecursiveClass('abc').datum))) + + def test_function_with_external_reference(self): + out_of_scope_var = 'expected_value' + + def foo(): + return out_of_scope_var + + self.assertEqual('expected_value', loads(dumps(foo))()) + + @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') + def test_dump_and_load_mapping_proxy(self): + self.assertEqual( + 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) + self.assertEqual( + types.MappingProxyType, type(loads(dumps(types.MappingProxyType({}))))) + + # pylint: disable=exec-used + @unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only') + def test_dataclass(self): + exec( + ''' +from apache_beam.internal.module_test import DataClass +self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc')))) + ''') + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py new file mode 100644 index 000000000000..f99bf2af59eb --- /dev/null +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -0,0 +1,322 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pickler for values, functions, and classes. + +For internal use only. No backwards compatibility guarantees. + +Pickles created by the pickling library contain non-ASCII characters, so +we base64-encode the results so that we can put them in a JSON objects. +The pickler is used to embed FlatMap callable objects into the workflow JSON +description. + +The pickler module should be used to pickle functions and modules; for values, +the coders.*PickleCoder classes should be used instead. +""" + +# pytype: skip-file + +import base64 +import bz2 +import logging +import sys +import threading +import traceback +import types +import zlib +from typing import Any +from typing import Dict +from typing import Tuple + +import dill + +settings = {'dill_byref': None} + + +class _NoOpContextManager(object): + def __enter__(self): + pass + + def __exit__(self, *unused_exc_info): + pass + + +# Pickling, especially unpickling, causes broken module imports on Python 3 +# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. +_pickle_lock = threading.RLock() +# Dill 0.28.0 renamed dill.dill to dill._dill: +# https://github.com/uqfoundation/dill/commit/f0972ecc7a41d0b8acada6042d557068cac69baa +# TODO: Remove this once Beam depends on dill >= 0.2.8 +if not getattr(dill, 'dill', None): + dill.dill = dill._dill + sys.modules['dill.dill'] = dill._dill + +# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8 +if not getattr(dill, '_dill', None): + dill._dill = dill.dill + sys.modules['dill._dill'] = dill.dill + + +def _is_nested_class(cls): + """Returns true if argument is a class object that appears to be nested.""" + return ( + isinstance(cls, type) and cls.__module__ is not None and + cls.__module__ != 'builtins' and + cls.__name__ not in sys.modules[cls.__module__].__dict__) + + +def _find_containing_class(nested_class): + """Finds containing class of a nested class passed as argument.""" + + seen = set() + + def _find_containing_class_inner(outer): + if outer in seen: + return None + seen.add(outer) + for k, v in outer.__dict__.items(): + if v is nested_class: + return outer, k + elif isinstance(v, type) and hasattr(v, '__dict__'): + res = _find_containing_class_inner(v) + if res: return res + + return _find_containing_class_inner(sys.modules[nested_class.__module__]) + + +def _nested_type_wrapper(fun): + """A wrapper for the standard pickler handler for class objects. + + Args: + fun: Original pickler handler for type objects. + + Returns: + A wrapper for type objects that handles nested classes. + + The wrapper detects if an object being pickled is a nested class object. + For nested class object only it will save the containing class object so + the nested structure is recreated during unpickle. + """ + def wrapper(pickler, obj): + # When the nested class is defined in the __main__ module we do not have to + # do anything special because the pickler itself will save the constituent + # parts of the type (i.e., name, base classes, dictionary) and then + # recreate it during unpickling. + if _is_nested_class(obj) and obj.__module__ != '__main__': + containing_class_and_name = _find_containing_class(obj) + if containing_class_and_name is not None: + return pickler.save_reduce(getattr, containing_class_and_name, obj=obj) + try: + return fun(pickler, obj) + except dill.dill.PicklingError: + # pylint: disable=protected-access + return pickler.save_reduce( + dill.dill._create_type, + ( + type(obj), + obj.__name__, + obj.__bases__, + dill.dill._dict_from_dictproxy(obj.__dict__)), + obj=obj) + # pylint: enable=protected-access + + return wrapper + + +# Monkey patch the standard pickler dispatch table entry for type objects. +# Dill, for certain types, defers to the standard pickler (including type +# objects). We wrap the standard handler using type_wrapper() because +# for nested class we want to pickle the actual enclosing class object so we +# can recreate it during unpickling. +# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project. +dill.dill.Pickler.dispatch[type] = _nested_type_wrapper( + dill.dill.Pickler.dispatch[type]) + + +# Dill pickles generators objects without complaint, but unpickling produces +# TypeError: object.__new__(generator) is not safe, use generator.__new__() +# on some versions of Python. +def _reject_generators(unused_pickler, unused_obj): + raise TypeError("can't (safely) pickle generator objects") + + +dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators + +# This if guards against dill not being full initialized when generating docs. +if 'save_module' in dir(dill.dill): + + # Always pickle non-main modules by name. + old_save_module = dill.dill.save_module + + @dill.dill.register(dill.dill.ModuleType) + def save_module(pickler, obj): + if dill.dill.is_dill(pickler) and obj is pickler._main: + return old_save_module(pickler, obj) + else: + dill.dill.log.info('M2: %s' % obj) + # pylint: disable=protected-access + pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj) + # pylint: enable=protected-access + dill.dill.log.info('# M2') + + # Pickle module dictionaries (commonly found in lambda's globals) + # by referencing their module. + old_save_module_dict = dill.dill.save_module_dict + known_module_dicts = { + } # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] + + @dill.dill.register(dict) + def new_save_module_dict(pickler, obj): + obj_id = id(obj) + if not known_module_dicts or '__file__' in obj or '__package__' in obj: + if obj_id not in known_module_dicts: + # Trigger loading of lazily loaded modules (such as pytest vendored + # modules). + # This pass over sys.modules needs to iterate on a copy of sys.modules + # since lazy loading modifies the dictionary, hence the use of list(). + for m in list(sys.modules.values()): + try: + _ = m.__dict__ + except AttributeError: + pass + + for m in list(sys.modules.values()): + try: + if (m and m.__name__ != '__main__' and + isinstance(m, dill.dill.ModuleType)): + d = m.__dict__ + known_module_dicts[id(d)] = m, d + except AttributeError: + # Skip modules that do not have the __name__ attribute. + pass + if obj_id in known_module_dicts and dill.dill.is_dill(pickler): + m = known_module_dicts[obj_id][0] + try: + # pylint: disable=protected-access + dill.dill._import_module(m.__name__) + return pickler.save_reduce( + getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj) + except (ImportError, AttributeError): + return old_save_module_dict(pickler, obj) + else: + return old_save_module_dict(pickler, obj) + + dill.dill.save_module_dict = new_save_module_dict + + def _nest_dill_logging(): + """Prefix all dill logging with its depth in the callstack. + + Useful for debugging pickling of deeply nested structures. + """ + old_log_info = dill.dill.log.info + + def new_log_info(msg, *args, **kwargs): + old_log_info( + ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, + *args, + **kwargs) + + dill.dill.log.info = new_log_info + + +# Turn off verbose logging from the dill pickler. +logging.getLogger('dill').setLevel(logging.WARN) + + +def dumps(o, enable_trace=True, use_zlib=False): + # type: (...) -> bytes + + """For internal use only; no backwards-compatibility guarantees.""" + with _pickle_lock: + try: + s = dill.dumps(o, byref=settings['dill_byref']) + except Exception: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + s = dill.dumps(o, byref=settings['dill_byref']) + else: + raise + finally: + dill.dill._trace(False) # pylint: disable=protected-access + + # Compress as compactly as possible (compresslevel=9) to decrease peak memory + # usage (of multiple in-memory copies) and to avoid hitting protocol buffer + # limits. + # WARNING: Be cautious about compressor change since it can lead to pipeline + # representation change, and can break streaming job update compatibility on + # runners such as Dataflow. + if use_zlib: + c = zlib.compress(s, 9) + else: + c = bz2.compress(s, compresslevel=9) + del s # Free up some possibly large and no-longer-needed memory. + + return base64.b64encode(c) + + +def loads(encoded, enable_trace=True, use_zlib=False): + """For internal use only; no backwards-compatibility guarantees.""" + + c = base64.b64decode(encoded) + + if use_zlib: + s = zlib.decompress(c) + else: + s = bz2.decompress(c) + + del c # Free up some possibly large and no-longer-needed memory. + + with _pickle_lock: + try: + return dill.loads(s) + except Exception: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + return dill.loads(s) + else: + raise + finally: + dill.dill._trace(False) # pylint: disable=protected-access + + +def dump_session(file_path): + """For internal use only; no backwards-compatibility guarantees. + + Pickle the current python session to be used in the worker. + + Note: Due to the inconsistency in the first dump of dill dump_session we + create and load the dump twice to have consistent results in the worker and + the running session. Check: https://github.com/uqfoundation/dill/issues/195 + """ + with _pickle_lock: + dill.dump_session(file_path) + dill.load_session(file_path) + return dill.dump_session(file_path) + + +def load_session(file_path): + with _pickle_lock: + return dill.load_session(file_path) + + +def override_pickler_hooks(extend=True): + """ Extends the dill library hooks into that of the standard pickler library. + + If false all hooks that dill overrides will be removed. + If true dill hooks will be injected into the pickler library dispatch_table. + """ + dill.extend(extend) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 9cecedd1b5c1..0c53c0cf249e 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -28,286 +28,50 @@ the coders.*PickleCoder classes should be used instead. """ -# pytype: skip-file +from apache_beam.internal import cloudpickle_pickler +from apache_beam.internal import dill_pickler -import base64 -import bz2 -import logging -import sys -import threading -import traceback -import types -import zlib -from typing import Any -from typing import Dict -from typing import Tuple +USE_CLOUDPICKLE = 'cloudpickle' +USE_DILL = 'dill' +DEFAULT_PICKLE_LIB = USE_DILL -import dill - -settings = {'dill_byref': None} - - -class _NoOpContextManager(object): - def __enter__(self): - pass - - def __exit__(self, *unused_exc_info): - pass - - -# Pickling, especially unpickling, causes broken module imports on Python 3 -# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. -_pickle_lock = threading.RLock() -# Dill 0.28.0 renamed dill.dill to dill._dill: -# https://github.com/uqfoundation/dill/commit/f0972ecc7a41d0b8acada6042d557068cac69baa -# TODO: Remove this once Beam depends on dill >= 0.2.8 -if not getattr(dill, 'dill', None): - dill.dill = dill._dill - sys.modules['dill.dill'] = dill._dill - -# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8 -if not getattr(dill, '_dill', None): - dill._dill = dill.dill - sys.modules['dill._dill'] = dill.dill - - -def _is_nested_class(cls): - """Returns true if argument is a class object that appears to be nested.""" - return ( - isinstance(cls, type) and cls.__module__ is not None and - cls.__module__ != 'builtins' and - cls.__name__ not in sys.modules[cls.__module__].__dict__) - - -def _find_containing_class(nested_class): - """Finds containing class of a nested class passed as argument.""" - - seen = set() - - def _find_containing_class_inner(outer): - if outer in seen: - return None - seen.add(outer) - for k, v in outer.__dict__.items(): - if v is nested_class: - return outer, k - elif isinstance(v, type) and hasattr(v, '__dict__'): - res = _find_containing_class_inner(v) - if res: return res - - return _find_containing_class_inner(sys.modules[nested_class.__module__]) - - -def _nested_type_wrapper(fun): - """A wrapper for the standard pickler handler for class objects. - - Args: - fun: Original pickler handler for type objects. - - Returns: - A wrapper for type objects that handles nested classes. - - The wrapper detects if an object being pickled is a nested class object. - For nested class object only it will save the containing class object so - the nested structure is recreated during unpickle. - """ - def wrapper(pickler, obj): - # When the nested class is defined in the __main__ module we do not have to - # do anything special because the pickler itself will save the constituent - # parts of the type (i.e., name, base classes, dictionary) and then - # recreate it during unpickling. - if _is_nested_class(obj) and obj.__module__ != '__main__': - containing_class_and_name = _find_containing_class(obj) - if containing_class_and_name is not None: - return pickler.save_reduce(getattr, containing_class_and_name, obj=obj) - try: - return fun(pickler, obj) - except dill.dill.PicklingError: - # pylint: disable=protected-access - return pickler.save_reduce( - dill.dill._create_type, - ( - type(obj), - obj.__name__, - obj.__bases__, - dill.dill._dict_from_dictproxy(obj.__dict__)), - obj=obj) - # pylint: enable=protected-access - - return wrapper - - -# Monkey patch the standard pickler dispatch table entry for type objects. -# Dill, for certain types, defers to the standard pickler (including type -# objects). We wrap the standard handler using type_wrapper() because -# for nested class we want to pickle the actual enclosing class object so we -# can recreate it during unpickling. -# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project. -dill.dill.Pickler.dispatch[type] = _nested_type_wrapper( - dill.dill.Pickler.dispatch[type]) - - -# Dill pickles generators objects without complaint, but unpickling produces -# TypeError: object.__new__(generator) is not safe, use generator.__new__() -# on some versions of Python. -def _reject_generators(unused_pickler, unused_obj): - raise TypeError("can't (safely) pickle generator objects") - - -dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators - -# This if guards against dill not being full initialized when generating docs. -if 'save_module' in dir(dill.dill): - - # Always pickle non-main modules by name. - old_save_module = dill.dill.save_module - - @dill.dill.register(dill.dill.ModuleType) - def save_module(pickler, obj): - if dill.dill.is_dill(pickler) and obj is pickler._main: - return old_save_module(pickler, obj) - else: - dill.dill.log.info('M2: %s' % obj) - # pylint: disable=protected-access - pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj) - # pylint: enable=protected-access - dill.dill.log.info('# M2') - - # Pickle module dictionaries (commonly found in lambda's globals) - # by referencing their module. - old_save_module_dict = dill.dill.save_module_dict - known_module_dicts = { - } # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] - - @dill.dill.register(dict) - def new_save_module_dict(pickler, obj): - obj_id = id(obj) - if not known_module_dicts or '__file__' in obj or '__package__' in obj: - if obj_id not in known_module_dicts: - # Trigger loading of lazily loaded modules (such as pytest vendored - # modules). - # This pass over sys.modules needs to iterate on a copy of sys.modules - # since lazy loading modifies the dictionary, hence the use of list(). - for m in list(sys.modules.values()): - try: - _ = m.__dict__ - except AttributeError: - pass - - for m in list(sys.modules.values()): - try: - if (m and m.__name__ != '__main__' and - isinstance(m, dill.dill.ModuleType)): - d = m.__dict__ - known_module_dicts[id(d)] = m, d - except AttributeError: - # Skip modules that do not have the __name__ attribute. - pass - if obj_id in known_module_dicts and dill.dill.is_dill(pickler): - m = known_module_dicts[obj_id][0] - try: - # pylint: disable=protected-access - dill.dill._import_module(m.__name__) - return pickler.save_reduce( - getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj) - except (ImportError, AttributeError): - return old_save_module_dict(pickler, obj) - else: - return old_save_module_dict(pickler, obj) - - dill.dill.save_module_dict = new_save_module_dict - - def _nest_dill_logging(): - """Prefix all dill logging with its depth in the callstack. - - Useful for debugging pickling of deeply nested structures. - """ - old_log_info = dill.dill.log.info - - def new_log_info(msg, *args, **kwargs): - old_log_info( - ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, - *args, - **kwargs) - - dill.dill.log.info = new_log_info - - -# Turn off verbose logging from the dill pickler. -logging.getLogger('dill').setLevel(logging.WARN) +desired_pickle_lib = dill_pickler def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes - """For internal use only; no backwards-compatibility guarantees.""" - with _pickle_lock: - try: - s = dill.dumps(o, byref=settings['dill_byref']) - except Exception: # pylint: disable=broad-except - if enable_trace: - dill.dill._trace(True) # pylint: disable=protected-access - s = dill.dumps(o, byref=settings['dill_byref']) - else: - raise - finally: - dill.dill._trace(False) # pylint: disable=protected-access - - # Compress as compactly as possible (compresslevel=9) to decrease peak memory - # usage (of multiple in-memory copies) and to avoid hitting protocol buffer - # limits. - # WARNING: Be cautious about compressor change since it can lead to pipeline - # representation change, and can break streaming job update compatibility on - # runners such as Dataflow. - if use_zlib: - c = zlib.compress(s, 9) - else: - c = bz2.compress(s, compresslevel=9) - del s # Free up some possibly large and no-longer-needed memory. - - return base64.b64encode(c) + return desired_pickle_lib.dumps( + o, enable_trace=enable_trace, use_zlib=use_zlib) def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - c = base64.b64decode(encoded) - - if use_zlib: - s = zlib.decompress(c) - else: - s = bz2.decompress(c) - - del c # Free up some possibly large and no-longer-needed memory. - - with _pickle_lock: - try: - return dill.loads(s) - except Exception: # pylint: disable=broad-except - if enable_trace: - dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(s) - else: - raise - finally: - dill.dill._trace(False) # pylint: disable=protected-access + return desired_pickle_lib.loads( + encoded, enable_trace=enable_trace, use_zlib=use_zlib) def dump_session(file_path): """For internal use only; no backwards-compatibility guarantees. Pickle the current python session to be used in the worker. - - Note: Due to the inconsistency in the first dump of dill dump_session we - create and load the dump twice to have consistent results in the worker and - the running session. Check: https://github.com/uqfoundation/dill/issues/195 """ - with _pickle_lock: - dill.dump_session(file_path) - dill.load_session(file_path) - return dill.dump_session(file_path) + + return desired_pickle_lib.dump_session(file_path) def load_session(file_path): - with _pickle_lock: - return dill.load_session(file_path) + return desired_pickle_lib.load_session(file_path) + + +def set_library(selected_library=DEFAULT_PICKLE_LIB): + """ Sets pickle library that will be used. """ + global desired_pickle_lib + if selected_library == USE_DILL and desired_pickle_lib != dill_pickler: + desired_pickle_lib = dill_pickler + dill_pickler.override_pickler_hooks(True) + elif (selected_library == USE_CLOUDPICKLE and + desired_pickle_lib != cloudpickle_pickler): + desired_pickle_lib = cloudpickle_pickler + dill_pickler.override_pickler_hooks(False) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c8f31e395a67..d15e8754555b 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1056,6 +1056,12 @@ def _add_argparse_args(cls, parser): 'currently an experimental flag and provides no stability. ' 'Multiple --beam_plugin options can be specified if more than ' 'one plugin is needed.')) + parser.add_argument( + '--pickle_library', + default='default', + help=( + 'Chooses which pickle library to use. Options are dill, ' + 'cloudpickle or default.')) parser.add_argument( '--save_main_session', default=False, diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 394781b15bf7..5510d36e1457 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -34,6 +34,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.internal import names @@ -76,6 +77,7 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) + pickler.set_library(sdk_pipeline_options.view_as(SetupOptions).pickle_library) if 'SEMI_PERSISTENT_DIRECTORY' in environment: semi_persistent_directory = environment['SEMI_PERSISTENT_DIRECTORY'] diff --git a/sdks/python/container/py36/base_image_requirements.txt b/sdks/python/container/py36/base_image_requirements.txt index 15c2c7b5b28d..01ade0fe5794 100644 --- a/sdks/python/container/py36/base_image_requirements.txt +++ b/sdks/python/container/py36/base_image_requirements.txt @@ -33,6 +33,7 @@ certifi==2021.10.8 charset-normalizer==2.0.7 clang==5.0 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 dataclasses==0.8 diff --git a/sdks/python/container/py37/base_image_requirements.txt b/sdks/python/container/py37/base_image_requirements.txt index d658b1568efb..119f8bcf4be2 100644 --- a/sdks/python/container/py37/base_image_requirements.txt +++ b/sdks/python/container/py37/base_image_requirements.txt @@ -32,6 +32,7 @@ cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 deprecation==2.1.0 diff --git a/sdks/python/container/py38/base_image_requirements.txt b/sdks/python/container/py38/base_image_requirements.txt index 6a0f54884b2b..75af33c12166 100644 --- a/sdks/python/container/py38/base_image_requirements.txt +++ b/sdks/python/container/py38/base_image_requirements.txt @@ -31,6 +31,7 @@ cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 deprecation==2.1.0 diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 7f502bf1714e..f9ae895ba1ca 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -137,6 +137,7 @@ def get_version(): # server, therefore list of allowed versions is very narrow. # See: https://github.com/uqfoundation/dill/issues/341. 'dill>=0.3.1.1,<0.3.2', + 'cloudpickle>=2.0.0,<3', 'fastavro>=0.21.4,<2', 'grpcio>=1.29.0,<2', 'hdfs>=2.1.0,<3.0.0', @@ -258,7 +259,6 @@ def run(self): 'Python %s.%s. You may encounter bugs or missing features.' % (sys.version_info.major, sys.version_info.minor)) - if __name__ == '__main__': setuptools.setup( name=PACKAGE_NAME, @@ -303,10 +303,10 @@ def run(self): # BEAM-8840: Do NOT use tests_require or setup_requires. extras_require={ 'docs': [ - 'Sphinx>=1.5.2,<2.0', - # Pinning docutils as a workaround for Sphinx issue: - # https://github.com/sphinx-doc/sphinx/issues/9727 - 'docutils==0.17.1' + 'Sphinx>=1.5.2,<2.0', + # Pinning docutils as a workaround for Sphinx issue: + # https://github.com/sphinx-doc/sphinx/issues/9727 + 'docutils==0.17.1' ], 'test': REQUIRED_TEST_PACKAGES, 'gcp': GCP_REQUIREMENTS,