From 39a964ddf1206f0c7b37e5e592449a55627f5053 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 7 Sep 2021 15:09:38 -0400 Subject: [PATCH 01/45] wrapped pickler so that pickler is chosen --- .../internal/cloudPickle_pickler.py | 119 +++++++ .../apache_beam/internal/dill_pickler.py | 313 ++++++++++++++++++ sdks/python/apache_beam/internal/pickler.py | 276 +-------------- 3 files changed, 441 insertions(+), 267 deletions(-) create mode 100644 sdks/python/apache_beam/internal/cloudPickle_pickler.py create mode 100644 sdks/python/apache_beam/internal/dill_pickler.py diff --git a/sdks/python/apache_beam/internal/cloudPickle_pickler.py b/sdks/python/apache_beam/internal/cloudPickle_pickler.py new file mode 100644 index 000000000000..75aad3b9555b --- /dev/null +++ b/sdks/python/apache_beam/internal/cloudPickle_pickler.py @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pickler for values, functions, and classes. + +For internal use only. No backwards compatibility guarantees. + +Pickles created by the pickling library contain non-ASCII characters, so +we base64-encode the results so that we can put them in a JSON objects. +The pickler is used to embed FlatMap callable objects into the workflow JSON +description. + +The pickler module should be used to pickle functions and modules; for values, +the coders.*PickleCoder classes should be used instead. +""" + +# pytype: skip-file + +import base64 +import bz2 +import logging +import sys +import threading +import traceback +import types +import zlib +from typing import Any +from typing import Dict +from typing import Tuple + +import cloudpickle + +# Pickling, especially unpickling, causes broken module imports on Python 3 +# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. +_pickle_lock = threading.RLock() +import __main__ as _main_module + + +def dumps(o, enable_trace=True, use_zlib=False): + # type: (...) -> bytes + + """For internal use only; no backwards-compatibility guarantees.""" + with _pickle_lock: + try: + s = cloudpickle.dumps(o) + except Exception: # pylint: disable=broad-except + # TODO: decide what to do on exceptions. + print('TODO figure out what to do with cloudpickle exceptions.') + raise + + # Compress as compactly as possible (compresslevel=9) to decrease peak memory + # usage (of multiple in-memory copies) and to avoid hitting protocol buffer + # limits. + # WARNING: Be cautious about compressor change since it can lead to pipeline + # representation change, and can break streaming job update compatibility on + # runners such as Dataflow. + if use_zlib: + c = zlib.compress(s, 9) + else: + c = bz2.compress(s, compresslevel=9) + del s # Free up some possibly large and no-longer-needed memory. + + return base64.b64encode(c) + + +def loads(encoded, enable_trace=True, use_zlib=False): + """For internal use only; no backwards-compatibility guarantees.""" + + c = base64.b64decode(encoded) + + if use_zlib: + s = zlib.decompress(c) + else: + s = bz2.decompress(c) + + del c # Free up some possibly large and no-longer-needed memory. + + with _pickle_lock: + try: + return cloudpickle.loads(s) + except Exception: # pylint: disable=broad-except + print('TODO figure out what to do with cloudpickle exceptions.') + raise + +def dump_session(file_path): + with _pickle_lock: + try: + module_dict = _main_module.__dict__.copy() + with open(file_path, 'wb') as file: + cloudpickle.dump(module_dict, file) + except Exception: + print('TODO figure out what to do with cloudpickle exceptions.') + raise + + + +def load_session(file_path): + with _pickle_lock: + try: + module_dict = _main_module.__dict__.copy() + with open(file_path, 'wb') as file: + cloudpickle.dump(module_dict, file) + except Exception: + print('TODO figure out what to do with cloudpickle exceptions.') + raise diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py new file mode 100644 index 000000000000..9cecedd1b5c1 --- /dev/null +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -0,0 +1,313 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pickler for values, functions, and classes. + +For internal use only. No backwards compatibility guarantees. + +Pickles created by the pickling library contain non-ASCII characters, so +we base64-encode the results so that we can put them in a JSON objects. +The pickler is used to embed FlatMap callable objects into the workflow JSON +description. + +The pickler module should be used to pickle functions and modules; for values, +the coders.*PickleCoder classes should be used instead. +""" + +# pytype: skip-file + +import base64 +import bz2 +import logging +import sys +import threading +import traceback +import types +import zlib +from typing import Any +from typing import Dict +from typing import Tuple + +import dill + +settings = {'dill_byref': None} + + +class _NoOpContextManager(object): + def __enter__(self): + pass + + def __exit__(self, *unused_exc_info): + pass + + +# Pickling, especially unpickling, causes broken module imports on Python 3 +# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. +_pickle_lock = threading.RLock() +# Dill 0.28.0 renamed dill.dill to dill._dill: +# https://github.com/uqfoundation/dill/commit/f0972ecc7a41d0b8acada6042d557068cac69baa +# TODO: Remove this once Beam depends on dill >= 0.2.8 +if not getattr(dill, 'dill', None): + dill.dill = dill._dill + sys.modules['dill.dill'] = dill._dill + +# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8 +if not getattr(dill, '_dill', None): + dill._dill = dill.dill + sys.modules['dill._dill'] = dill.dill + + +def _is_nested_class(cls): + """Returns true if argument is a class object that appears to be nested.""" + return ( + isinstance(cls, type) and cls.__module__ is not None and + cls.__module__ != 'builtins' and + cls.__name__ not in sys.modules[cls.__module__].__dict__) + + +def _find_containing_class(nested_class): + """Finds containing class of a nested class passed as argument.""" + + seen = set() + + def _find_containing_class_inner(outer): + if outer in seen: + return None + seen.add(outer) + for k, v in outer.__dict__.items(): + if v is nested_class: + return outer, k + elif isinstance(v, type) and hasattr(v, '__dict__'): + res = _find_containing_class_inner(v) + if res: return res + + return _find_containing_class_inner(sys.modules[nested_class.__module__]) + + +def _nested_type_wrapper(fun): + """A wrapper for the standard pickler handler for class objects. + + Args: + fun: Original pickler handler for type objects. + + Returns: + A wrapper for type objects that handles nested classes. + + The wrapper detects if an object being pickled is a nested class object. + For nested class object only it will save the containing class object so + the nested structure is recreated during unpickle. + """ + def wrapper(pickler, obj): + # When the nested class is defined in the __main__ module we do not have to + # do anything special because the pickler itself will save the constituent + # parts of the type (i.e., name, base classes, dictionary) and then + # recreate it during unpickling. + if _is_nested_class(obj) and obj.__module__ != '__main__': + containing_class_and_name = _find_containing_class(obj) + if containing_class_and_name is not None: + return pickler.save_reduce(getattr, containing_class_and_name, obj=obj) + try: + return fun(pickler, obj) + except dill.dill.PicklingError: + # pylint: disable=protected-access + return pickler.save_reduce( + dill.dill._create_type, + ( + type(obj), + obj.__name__, + obj.__bases__, + dill.dill._dict_from_dictproxy(obj.__dict__)), + obj=obj) + # pylint: enable=protected-access + + return wrapper + + +# Monkey patch the standard pickler dispatch table entry for type objects. +# Dill, for certain types, defers to the standard pickler (including type +# objects). We wrap the standard handler using type_wrapper() because +# for nested class we want to pickle the actual enclosing class object so we +# can recreate it during unpickling. +# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project. +dill.dill.Pickler.dispatch[type] = _nested_type_wrapper( + dill.dill.Pickler.dispatch[type]) + + +# Dill pickles generators objects without complaint, but unpickling produces +# TypeError: object.__new__(generator) is not safe, use generator.__new__() +# on some versions of Python. +def _reject_generators(unused_pickler, unused_obj): + raise TypeError("can't (safely) pickle generator objects") + + +dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators + +# This if guards against dill not being full initialized when generating docs. +if 'save_module' in dir(dill.dill): + + # Always pickle non-main modules by name. + old_save_module = dill.dill.save_module + + @dill.dill.register(dill.dill.ModuleType) + def save_module(pickler, obj): + if dill.dill.is_dill(pickler) and obj is pickler._main: + return old_save_module(pickler, obj) + else: + dill.dill.log.info('M2: %s' % obj) + # pylint: disable=protected-access + pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj) + # pylint: enable=protected-access + dill.dill.log.info('# M2') + + # Pickle module dictionaries (commonly found in lambda's globals) + # by referencing their module. + old_save_module_dict = dill.dill.save_module_dict + known_module_dicts = { + } # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] + + @dill.dill.register(dict) + def new_save_module_dict(pickler, obj): + obj_id = id(obj) + if not known_module_dicts or '__file__' in obj or '__package__' in obj: + if obj_id not in known_module_dicts: + # Trigger loading of lazily loaded modules (such as pytest vendored + # modules). + # This pass over sys.modules needs to iterate on a copy of sys.modules + # since lazy loading modifies the dictionary, hence the use of list(). + for m in list(sys.modules.values()): + try: + _ = m.__dict__ + except AttributeError: + pass + + for m in list(sys.modules.values()): + try: + if (m and m.__name__ != '__main__' and + isinstance(m, dill.dill.ModuleType)): + d = m.__dict__ + known_module_dicts[id(d)] = m, d + except AttributeError: + # Skip modules that do not have the __name__ attribute. + pass + if obj_id in known_module_dicts and dill.dill.is_dill(pickler): + m = known_module_dicts[obj_id][0] + try: + # pylint: disable=protected-access + dill.dill._import_module(m.__name__) + return pickler.save_reduce( + getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj) + except (ImportError, AttributeError): + return old_save_module_dict(pickler, obj) + else: + return old_save_module_dict(pickler, obj) + + dill.dill.save_module_dict = new_save_module_dict + + def _nest_dill_logging(): + """Prefix all dill logging with its depth in the callstack. + + Useful for debugging pickling of deeply nested structures. + """ + old_log_info = dill.dill.log.info + + def new_log_info(msg, *args, **kwargs): + old_log_info( + ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, + *args, + **kwargs) + + dill.dill.log.info = new_log_info + + +# Turn off verbose logging from the dill pickler. +logging.getLogger('dill').setLevel(logging.WARN) + + +def dumps(o, enable_trace=True, use_zlib=False): + # type: (...) -> bytes + + """For internal use only; no backwards-compatibility guarantees.""" + with _pickle_lock: + try: + s = dill.dumps(o, byref=settings['dill_byref']) + except Exception: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + s = dill.dumps(o, byref=settings['dill_byref']) + else: + raise + finally: + dill.dill._trace(False) # pylint: disable=protected-access + + # Compress as compactly as possible (compresslevel=9) to decrease peak memory + # usage (of multiple in-memory copies) and to avoid hitting protocol buffer + # limits. + # WARNING: Be cautious about compressor change since it can lead to pipeline + # representation change, and can break streaming job update compatibility on + # runners such as Dataflow. + if use_zlib: + c = zlib.compress(s, 9) + else: + c = bz2.compress(s, compresslevel=9) + del s # Free up some possibly large and no-longer-needed memory. + + return base64.b64encode(c) + + +def loads(encoded, enable_trace=True, use_zlib=False): + """For internal use only; no backwards-compatibility guarantees.""" + + c = base64.b64decode(encoded) + + if use_zlib: + s = zlib.decompress(c) + else: + s = bz2.decompress(c) + + del c # Free up some possibly large and no-longer-needed memory. + + with _pickle_lock: + try: + return dill.loads(s) + except Exception: # pylint: disable=broad-except + if enable_trace: + dill.dill._trace(True) # pylint: disable=protected-access + return dill.loads(s) + else: + raise + finally: + dill.dill._trace(False) # pylint: disable=protected-access + + +def dump_session(file_path): + """For internal use only; no backwards-compatibility guarantees. + + Pickle the current python session to be used in the worker. + + Note: Due to the inconsistency in the first dump of dill dump_session we + create and load the dump twice to have consistent results in the worker and + the running session. Check: https://github.com/uqfoundation/dill/issues/195 + """ + with _pickle_lock: + dill.dump_session(file_path) + dill.load_session(file_path) + return dill.dump_session(file_path) + + +def load_session(file_path): + with _pickle_lock: + return dill.load_session(file_path) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 9cecedd1b5c1..2ae290f2b52a 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -28,286 +28,28 @@ the coders.*PickleCoder classes should be used instead. """ -# pytype: skip-file - -import base64 -import bz2 -import logging -import sys -import threading -import traceback -import types -import zlib -from typing import Any -from typing import Dict -from typing import Tuple - -import dill - -settings = {'dill_byref': None} - - -class _NoOpContextManager(object): - def __enter__(self): - pass - - def __exit__(self, *unused_exc_info): - pass - - -# Pickling, especially unpickling, causes broken module imports on Python 3 -# if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. -_pickle_lock = threading.RLock() -# Dill 0.28.0 renamed dill.dill to dill._dill: -# https://github.com/uqfoundation/dill/commit/f0972ecc7a41d0b8acada6042d557068cac69baa -# TODO: Remove this once Beam depends on dill >= 0.2.8 -if not getattr(dill, 'dill', None): - dill.dill = dill._dill - sys.modules['dill.dill'] = dill._dill - -# TODO: Remove once Dataflow has containers with a preinstalled dill >= 0.2.8 -if not getattr(dill, '_dill', None): - dill._dill = dill.dill - sys.modules['dill._dill'] = dill.dill - - -def _is_nested_class(cls): - """Returns true if argument is a class object that appears to be nested.""" - return ( - isinstance(cls, type) and cls.__module__ is not None and - cls.__module__ != 'builtins' and - cls.__name__ not in sys.modules[cls.__module__].__dict__) - - -def _find_containing_class(nested_class): - """Finds containing class of a nested class passed as argument.""" - - seen = set() - - def _find_containing_class_inner(outer): - if outer in seen: - return None - seen.add(outer) - for k, v in outer.__dict__.items(): - if v is nested_class: - return outer, k - elif isinstance(v, type) and hasattr(v, '__dict__'): - res = _find_containing_class_inner(v) - if res: return res - - return _find_containing_class_inner(sys.modules[nested_class.__module__]) - - -def _nested_type_wrapper(fun): - """A wrapper for the standard pickler handler for class objects. - - Args: - fun: Original pickler handler for type objects. - - Returns: - A wrapper for type objects that handles nested classes. - - The wrapper detects if an object being pickled is a nested class object. - For nested class object only it will save the containing class object so - the nested structure is recreated during unpickle. - """ - def wrapper(pickler, obj): - # When the nested class is defined in the __main__ module we do not have to - # do anything special because the pickler itself will save the constituent - # parts of the type (i.e., name, base classes, dictionary) and then - # recreate it during unpickling. - if _is_nested_class(obj) and obj.__module__ != '__main__': - containing_class_and_name = _find_containing_class(obj) - if containing_class_and_name is not None: - return pickler.save_reduce(getattr, containing_class_and_name, obj=obj) - try: - return fun(pickler, obj) - except dill.dill.PicklingError: - # pylint: disable=protected-access - return pickler.save_reduce( - dill.dill._create_type, - ( - type(obj), - obj.__name__, - obj.__bases__, - dill.dill._dict_from_dictproxy(obj.__dict__)), - obj=obj) - # pylint: enable=protected-access - - return wrapper - - -# Monkey patch the standard pickler dispatch table entry for type objects. -# Dill, for certain types, defers to the standard pickler (including type -# objects). We wrap the standard handler using type_wrapper() because -# for nested class we want to pickle the actual enclosing class object so we -# can recreate it during unpickling. -# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project. -dill.dill.Pickler.dispatch[type] = _nested_type_wrapper( - dill.dill.Pickler.dispatch[type]) - - -# Dill pickles generators objects without complaint, but unpickling produces -# TypeError: object.__new__(generator) is not safe, use generator.__new__() -# on some versions of Python. -def _reject_generators(unused_pickler, unused_obj): - raise TypeError("can't (safely) pickle generator objects") - - -dill.dill.Pickler.dispatch[types.GeneratorType] = _reject_generators - -# This if guards against dill not being full initialized when generating docs. -if 'save_module' in dir(dill.dill): - - # Always pickle non-main modules by name. - old_save_module = dill.dill.save_module - - @dill.dill.register(dill.dill.ModuleType) - def save_module(pickler, obj): - if dill.dill.is_dill(pickler) and obj is pickler._main: - return old_save_module(pickler, obj) - else: - dill.dill.log.info('M2: %s' % obj) - # pylint: disable=protected-access - pickler.save_reduce(dill.dill._import_module, (obj.__name__, ), obj=obj) - # pylint: enable=protected-access - dill.dill.log.info('# M2') - - # Pickle module dictionaries (commonly found in lambda's globals) - # by referencing their module. - old_save_module_dict = dill.dill.save_module_dict - known_module_dicts = { - } # type: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] - - @dill.dill.register(dict) - def new_save_module_dict(pickler, obj): - obj_id = id(obj) - if not known_module_dicts or '__file__' in obj or '__package__' in obj: - if obj_id not in known_module_dicts: - # Trigger loading of lazily loaded modules (such as pytest vendored - # modules). - # This pass over sys.modules needs to iterate on a copy of sys.modules - # since lazy loading modifies the dictionary, hence the use of list(). - for m in list(sys.modules.values()): - try: - _ = m.__dict__ - except AttributeError: - pass - - for m in list(sys.modules.values()): - try: - if (m and m.__name__ != '__main__' and - isinstance(m, dill.dill.ModuleType)): - d = m.__dict__ - known_module_dicts[id(d)] = m, d - except AttributeError: - # Skip modules that do not have the __name__ attribute. - pass - if obj_id in known_module_dicts and dill.dill.is_dill(pickler): - m = known_module_dicts[obj_id][0] - try: - # pylint: disable=protected-access - dill.dill._import_module(m.__name__) - return pickler.save_reduce( - getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj) - except (ImportError, AttributeError): - return old_save_module_dict(pickler, obj) - else: - return old_save_module_dict(pickler, obj) - - dill.dill.save_module_dict = new_save_module_dict - - def _nest_dill_logging(): - """Prefix all dill logging with its depth in the callstack. - - Useful for debugging pickling of deeply nested structures. - """ - old_log_info = dill.dill.log.info - - def new_log_info(msg, *args, **kwargs): - old_log_info( - ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg, - *args, - **kwargs) - - dill.dill.log.info = new_log_info - - -# Turn off verbose logging from the dill pickler. -logging.getLogger('dill').setLevel(logging.WARN) - +PICKLE_LIB = 'cloudPickle' +if PICKLE_LIB == 'dill': + from apache_beam.internal import dill_pickler as pickler_lib +elif PICKLE_LIB == 'cloudPickle': + from apache_beam.internal import cloudPickle_pickler as pickler_lib def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes - """For internal use only; no backwards-compatibility guarantees.""" - with _pickle_lock: - try: - s = dill.dumps(o, byref=settings['dill_byref']) - except Exception: # pylint: disable=broad-except - if enable_trace: - dill.dill._trace(True) # pylint: disable=protected-access - s = dill.dumps(o, byref=settings['dill_byref']) - else: - raise - finally: - dill.dill._trace(False) # pylint: disable=protected-access - - # Compress as compactly as possible (compresslevel=9) to decrease peak memory - # usage (of multiple in-memory copies) and to avoid hitting protocol buffer - # limits. - # WARNING: Be cautious about compressor change since it can lead to pipeline - # representation change, and can break streaming job update compatibility on - # runners such as Dataflow. - if use_zlib: - c = zlib.compress(s, 9) - else: - c = bz2.compress(s, compresslevel=9) - del s # Free up some possibly large and no-longer-needed memory. - - return base64.b64encode(c) - + return pickler_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib) def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - - c = base64.b64decode(encoded) - - if use_zlib: - s = zlib.decompress(c) - else: - s = bz2.decompress(c) - - del c # Free up some possibly large and no-longer-needed memory. - - with _pickle_lock: - try: - return dill.loads(s) - except Exception: # pylint: disable=broad-except - if enable_trace: - dill.dill._trace(True) # pylint: disable=protected-access - return dill.loads(s) - else: - raise - finally: - dill.dill._trace(False) # pylint: disable=protected-access - + return pickler_lib.loads(encoded, enable_trace=enable_trace, use_zlib=use_zlib) def dump_session(file_path): """For internal use only; no backwards-compatibility guarantees. Pickle the current python session to be used in the worker. - - Note: Due to the inconsistency in the first dump of dill dump_session we - create and load the dump twice to have consistent results in the worker and - the running session. Check: https://github.com/uqfoundation/dill/issues/195 """ - with _pickle_lock: - dill.dump_session(file_path) - dill.load_session(file_path) - return dill.dump_session(file_path) + return pickler_lib.dump_session(file_path) def load_session(file_path): - with _pickle_lock: - return dill.load_session(file_path) + return pickler_lib.load_session(file_path) From c1f8608fddce8a0cba2eb83eda8f838ca1b726cb Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 6 Oct 2021 10:11:57 -0400 Subject: [PATCH 02/45] changes to pickler --- .../internal/cloudPickle_pickler.py | 82 +++++++++++++------ 1 file changed, 55 insertions(+), 27 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudPickle_pickler.py b/sdks/python/apache_beam/internal/cloudPickle_pickler.py index 75aad3b9555b..80773b773bc6 100644 --- a/sdks/python/apache_beam/internal/cloudPickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudPickle_pickler.py @@ -32,15 +32,20 @@ import base64 import bz2 +import io import logging import sys import threading import traceback import types import zlib +from absl import flags from typing import Any from typing import Dict from typing import Tuple +from _thread import RLock as RLockType + +from absl import flags import cloudpickle @@ -55,12 +60,20 @@ def dumps(o, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" with _pickle_lock: - try: - s = cloudpickle.dumps(o) - except Exception: # pylint: disable=broad-except - # TODO: decide what to do on exceptions. - print('TODO figure out what to do with cloudpickle exceptions.') - raise + with io.BytesIO() as file: + try: + pickler = cloudpickle.CloudPickler(file) + pickler.dispatch_table[RLockType] = _pickle_rlock + pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + pickler.dump(o) + s = file.getvalue() + except Exception as e: # pylint: disable=broad-except + # TODO: decide what to do on exceptions. + if enable_trace: + s = b'' + # return pickler.dump(o) + else: + raise # Compress as compactly as possible (compresslevel=9) to decrease peak memory # usage (of multiple in-memory copies) and to avoid hitting protocol buffer @@ -91,29 +104,44 @@ def loads(encoded, enable_trace=True, use_zlib=False): with _pickle_lock: try: - return cloudpickle.loads(s) - except Exception: # pylint: disable=broad-except - print('TODO figure out what to do with cloudpickle exceptions.') - raise + unpickled = cloudpickle.loads(s) + print('Unpickle Successful - ' + str(unpickled)) + return unpickled + except Exception as e: # pylint: disable=broad-except + if enable_trace: + print('TODO figure out what to do with cloudpickle exceptions. ' + str(e)) + return cloudpickle.loads(s) + else: + raise + + +def _pickle_rlock(obj): + return _create_rlock, tuple([]) -def dump_session(file_path): - with _pickle_lock: - try: - module_dict = _main_module.__dict__.copy() - with open(file_path, 'wb') as file: - cloudpickle.dump(module_dict, file) - except Exception: - print('TODO figure out what to do with cloudpickle exceptions.') - raise +def _create_rlock(): + return RLockType() + +def _pickle_absl_flags(obj): + return _create_absl_flags, tuple([]) + +def _create_absl_flags(): + return flags.FLAGS + +def dump_session(file_path): + try: + module_dict = _main_module.__dict__.copy() + with open(file_path, 'wb') as file: + pickler = cloudpickle.CloudPickler(file) + pickler.dispatch_table[RLockType] = _pickle_rlock + pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + pickler.dump(module_dict) + except Exception: + print('TODO figure out what to do with cloudpickle exceptions.') + raise def load_session(file_path): - with _pickle_lock: - try: - module_dict = _main_module.__dict__.copy() - with open(file_path, 'wb') as file: - cloudpickle.dump(module_dict, file) - except Exception: - print('TODO figure out what to do with cloudpickle exceptions.') - raise + with open(file_path,'rb') as file: + module = cloudpickle.load(file) + _main_module.__dict__.update(module) From 954a46f686aff0fca44e3e4bd913214c8ccb4e01 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 28 Oct 2021 19:04:42 -0400 Subject: [PATCH 03/45] Update unit tests and correctly add global setter --- .../apache_beam/coders/coders_test_common.py | 16 +++--- ...ckle_pickler.py => cloudpickle_pickler.py} | 57 +++++++------------ sdks/python/apache_beam/internal/pickler.py | 30 ++++++++-- 3 files changed, 52 insertions(+), 51 deletions(-) rename sdks/python/apache_beam/internal/{cloudPickle_pickler.py => cloudpickle_pickler.py} (72%) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index dbe24533b2ae..e6ea68680d8b 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -189,7 +189,7 @@ def check_coder(self, coder, *values, **kwargs): self.assertEqual( coder.get_impl().get_estimated_size_and_observables(v), (coder.get_impl().estimate_size(v), [])) - copy1 = pickler.loads(pickler.dumps(coder)) + copy1 = pickler.loads(pickler.dumps(coder)) copy2 = coders.Coder.from_runner_api(coder.to_runner_api(context), context) for v in values: self.assertEqual(v, copy1.decode(copy2.encode(v))) @@ -665,11 +665,13 @@ def test_state_backed_iterable_coder(self): state = {} def iterable_state_write(values, element_coder_impl): + global state token = b'state_token_%d' % len(state) state[token] = [element_coder_impl.encode(e) for e in values] return token def iterable_state_read(token, element_coder_impl): + global state return [element_coder_impl.decode(s) for s in state[token]] coder = coders.StateBackedIterableCoder( @@ -680,14 +682,14 @@ def iterable_state_read(token, element_coder_impl): context = pipeline_context.PipelineContext( iterable_state_read=iterable_state_read, iterable_state_write=iterable_state_write) - self.check_coder( - coder, [1, 2, 3], context=context, test_size_estimation=False) + # Note: do not use check_coder see https://github.com/cloudpipe/cloudpickle/issues/452 + self._observe(coder) + self.assertEqual([1,2,3], coder.decode(coder.encode([1,2,3]))) # Ensure that state was actually used. self.assertNotEqual(state, {}) - self.check_coder( - coders.TupleCoder((coder, coder)), ([1], [2, 3]), - context=context, - test_size_estimation=False) + tupleCoder = coders.TupleCoder((coder, coder)) + self._observe(tupleCoder) + self.assertEqual(([1], [2, 3]), tupleCoder.decode(tupleCoder.encode(([1], [2, 3])))) def test_nullable_coder(self): self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64) diff --git a/sdks/python/apache_beam/internal/cloudPickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py similarity index 72% rename from sdks/python/apache_beam/internal/cloudPickle_pickler.py rename to sdks/python/apache_beam/internal/cloudpickle_pickler.py index 80773b773bc6..f0f53d293356 100644 --- a/sdks/python/apache_beam/internal/cloudPickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -61,19 +61,12 @@ def dumps(o, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" with _pickle_lock: with io.BytesIO() as file: - try: - pickler = cloudpickle.CloudPickler(file) - pickler.dispatch_table[RLockType] = _pickle_rlock - pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags - pickler.dump(o) - s = file.getvalue() - except Exception as e: # pylint: disable=broad-except - # TODO: decide what to do on exceptions. - if enable_trace: - s = b'' - # return pickler.dump(o) - else: - raise + pickler = cloudpickle.CloudPickler(file) + pickler.dispatch_table[RLockType] = _pickle_rlock + pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + pickler.dump(o) + s = file.getvalue() + # TODO(ryanthompson): See if echoing dill.enable_trace is useful. # Compress as compactly as possible (compresslevel=9) to decrease peak memory # usage (of multiple in-memory copies) and to avoid hitting protocol buffer @@ -103,16 +96,8 @@ def loads(encoded, enable_trace=True, use_zlib=False): del c # Free up some possibly large and no-longer-needed memory. with _pickle_lock: - try: - unpickled = cloudpickle.loads(s) - print('Unpickle Successful - ' + str(unpickled)) - return unpickled - except Exception as e: # pylint: disable=broad-except - if enable_trace: - print('TODO figure out what to do with cloudpickle exceptions. ' + str(e)) - return cloudpickle.loads(s) - else: - raise + unpickled = cloudpickle.loads(s) + return unpickled def _pickle_rlock(obj): @@ -122,26 +107,22 @@ def _pickle_rlock(obj): def _create_rlock(): return RLockType() + def _pickle_absl_flags(obj): - return _create_absl_flags, tuple([]) + return _create_absl_flags, tuple([]) + def _create_absl_flags(): - return flags.FLAGS + return flags.FLAGS + def dump_session(file_path): - try: - module_dict = _main_module.__dict__.copy() - with open(file_path, 'wb') as file: - pickler = cloudpickle.CloudPickler(file) - pickler.dispatch_table[RLockType] = _pickle_rlock - pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags - pickler.dump(module_dict) - except Exception: - print('TODO figure out what to do with cloudpickle exceptions.') - raise + # It is possible to dump session with cloudpickle. However, since references + # are saved it should not be necessary. See https://s.apache.org/beam-picklers + pass def load_session(file_path): - with open(file_path,'rb') as file: - module = cloudpickle.load(file) - _main_module.__dict__.update(module) + # It is possible to load_session with cloudpickle. However, since references + # are saved it should not be necessary. See https://s.apache.org/beam-picklers + pass \ No newline at end of file diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 2ae290f2b52a..d8bb66c0eae3 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -28,20 +28,27 @@ the coders.*PickleCoder classes should be used instead. """ -PICKLE_LIB = 'cloudPickle' -if PICKLE_LIB == 'dill': - from apache_beam.internal import dill_pickler as pickler_lib -elif PICKLE_LIB == 'cloudPickle': - from apache_beam.internal import cloudPickle_pickler as pickler_lib +from apache_beam.internal import cloudPickle_pickler +from apache_beam.internal import dill_pickler + +pickler_lib = dill_pickler + +USE_CLOUDPICKLE = 1 +USE_DILL = 2 + def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes return pickler_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib) + def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - return pickler_lib.loads(encoded, enable_trace=enable_trace, use_zlib=use_zlib) + + return pickler_lib.loads(encoded, enable_trace=enable_trace, + use_zlib=use_zlib) + def dump_session(file_path): """For internal use only; no backwards-compatibility guarantees. @@ -51,5 +58,16 @@ def dump_session(file_path): return pickler_lib.dump_session(file_path) + def load_session(file_path): return pickler_lib.load_session(file_path) + + +def change_pickle_lib(pickle_lib): + """ Changes pickling library. Users should prefer the default library.""" + global pickler_lib + if pickle_lib == USE_CLOUDPICKLE: + pickler_lib = dill_pickler + elif pickler_lib == USE_DILL: + pickler_lib = cloudPickle_pickler + From 9947720543b6a1f337bf51046cfc7057ec742773 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 28 Oct 2021 19:16:57 -0400 Subject: [PATCH 04/45] updated cloudpickle_pickler_test --- .../internal/cloudpickle_pickler_test.py | 108 ++++++++++++++++++ sdks/python/apache_beam/internal/pickler.py | 2 +- 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 sdks/python/apache_beam/internal/cloudpickle_pickler_test.py diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py new file mode 100644 index 000000000000..5df77905a539 --- /dev/null +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the pickler module.""" + +# pytype: skip-file + +import sys +import types +import unittest + +from apache_beam.internal import module_test +from apache_beam.internal.cloudpickle_pickler import dumps +from apache_beam.internal.cloudpickle_pickler import loads + + +class PicklerTest(unittest.TestCase): + + NO_MAPPINGPROXYTYPE = not hasattr(types, "MappingProxyType") + + def test_basics(self): + self.assertEqual([1, 'a', (u'z', )], loads(dumps([1, 'a', (u'z', )]))) + fun = lambda x: 'xyz-%s' % x + self.assertEqual('xyz-abc', loads(dumps(fun))('abc')) + + def test_lambda_with_globals(self): + """Tests that the globals of a function are preserved.""" + + # The point of the test is that the lambda being called after unpickling + # relies on having the re module being loaded. + self.assertEqual(['abc', 'def'], + loads(dumps( + module_test.get_lambda_with_globals()))('abc def')) + + def test_lambda_with_main_globals(self): + self.assertEqual(unittest, loads(dumps(lambda: unittest))()) + + def test_lambda_with_closure(self): + """Tests that the closure of a function is preserved.""" + self.assertEqual( + 'closure: abc', + loads(dumps(module_test.get_lambda_with_closure('abc')))()) + + def test_class(self): + """Tests that a class object is pickled correctly.""" + self.assertEqual(['abc', 'def'], + loads(dumps(module_test.Xyz))().foo('abc def')) + + def test_object(self): + """Tests that a class instance is pickled correctly.""" + self.assertEqual(['abc', 'def'], + loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) + + def test_nested_class(self): + """Tests that a nested class object is pickled correctly.""" + self.assertEqual( + 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) + self.assertEqual( + 'Y:abc', + loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) + + def test_dynamic_class(self): + """Tests that a nested class object is pickled correctly.""" + self.assertEqual( + 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) + + def test_generators(self): + with self.assertRaises(TypeError): + dumps((_ for _ in range(10))) + + def test_recursive_class(self): + self.assertEqual( + 'RecursiveClass:abc', + loads(dumps(module_test.RecursiveClass('abc').datum))) + + @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') + def test_dump_and_load_mapping_proxy(self): + self.assertEqual( + 'def', loads(dumps(types.MappingProxyType({'abc': 'def'})))['abc']) + self.assertEqual( + types.MappingProxyType, type(loads(dumps(types.MappingProxyType({}))))) + + # pylint: disable=exec-used + @unittest.skipIf(sys.version_info < (3, 7), 'Python 3.7 or above only') + def test_dataclass(self): + exec( + ''' +from apache_beam.internal.module_test import DataClass +self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc')))) + ''') + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index d8bb66c0eae3..ca1c27fb029e 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -28,7 +28,7 @@ the coders.*PickleCoder classes should be used instead. """ -from apache_beam.internal import cloudPickle_pickler +from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler pickler_lib = dill_pickler From dfa434a06faa90e3c4c94155eb4256e76c37ed8b Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 28 Oct 2021 19:36:07 -0400 Subject: [PATCH 05/45] added scope test --- .../python/apache_beam/internal/cloudpickle_pickler_test.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 5df77905a539..77abdda15cc6 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -87,6 +87,12 @@ def test_recursive_class(self): 'RecursiveClass:abc', loads(dumps(module_test.RecursiveClass('abc').datum))) + def test_function_with_external_reference(self): + out_of_scope_var = 'expected_value' + def foo(): + return out_of_scope_var + self.assertEqual('expected_value', loads(dumps(foo))()) + @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') def test_dump_and_load_mapping_proxy(self): self.assertEqual( From c8ff50caf4ded520ea82dae3881edc3afed8c8d2 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 29 Oct 2021 11:23:20 -0400 Subject: [PATCH 06/45] updated some comments --- sdks/python/apache_beam/coders/coders_test_common.py | 2 +- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 9 +++------ .../apache_beam/internal/cloudpickle_pickler_test.py | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index e6ea68680d8b..0ea79aafca08 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -189,7 +189,7 @@ def check_coder(self, coder, *values, **kwargs): self.assertEqual( coder.get_impl().get_estimated_size_and_observables(v), (coder.get_impl().estimate_size(v), [])) - copy1 = pickler.loads(pickler.dumps(coder)) + copy1 = pickler.loads(pickler.dumps(coder)) copy2 = coders.Coder.from_runner_api(coder.to_runner_api(context), context) for v in values: self.assertEqual(v, copy1.decode(copy2.encode(v))) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index f0f53d293356..fd7886832923 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -19,13 +19,10 @@ For internal use only. No backwards compatibility guarantees. -Pickles created by the pickling library contain non-ASCII characters, so -we base64-encode the results so that we can put them in a JSON objects. -The pickler is used to embed FlatMap callable objects into the workflow JSON -description. +Uses the cloudpickle library to pickle data, functions, lambdas +and classes. -The pickler module should be used to pickle functions and modules; for values, -the coders.*PickleCoder classes should be used instead. +dump_session and load_session are no ops. """ # pytype: skip-file diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 77abdda15cc6..93d7143205b6 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -15,7 +15,7 @@ # limitations under the License. # -"""Unit tests for the pickler module.""" +"""Unit tests for the cloudpickle_pickler module.""" # pytype: skip-file From 54d1c1b8fe5f3d85e0d3ac8ddc71c7f4a0385043 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 29 Oct 2021 12:16:54 -0400 Subject: [PATCH 07/45] Allow absl library not to be present. --- .../python/apache_beam/internal/cloudpickle_pickler.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index fd7886832923..54243410c07b 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -42,7 +42,10 @@ from typing import Tuple from _thread import RLock as RLockType -from absl import flags +try: + from absl import flags +except ImportError: + pass import cloudpickle @@ -60,7 +63,10 @@ def dumps(o, enable_trace=True, use_zlib=False): with io.BytesIO() as file: pickler = cloudpickle.CloudPickler(file) pickler.dispatch_table[RLockType] = _pickle_rlock - pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + try: + pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags + except NameError: + pass pickler.dump(o) s = file.getvalue() # TODO(ryanthompson): See if echoing dill.enable_trace is useful. From 36556fc5d0aab4b24472ac1a5b45122b02f65ed5 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 29 Oct 2021 13:32:50 -0400 Subject: [PATCH 08/45] allow multiple error types --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 54243410c07b..7ffb1a963681 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -44,7 +44,7 @@ try: from absl import flags -except ImportError: +except (ImportError, ModuleNotFoundError): pass import cloudpickle From 5d7de3685961571225ff62b8d3fcd2f96917a3cc Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 29 Oct 2021 14:54:26 -0400 Subject: [PATCH 09/45] removed absl flags import --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 7ffb1a963681..8c4d71f48147 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -36,7 +36,6 @@ import traceback import types import zlib -from absl import flags from typing import Any from typing import Dict from typing import Tuple From 9d5587b04dc8ea8228b4b860527468717094342d Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Fri, 29 Oct 2021 15:29:39 -0400 Subject: [PATCH 10/45] add cloudpickle dependency --- sdks/python/container/base_image_requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt index b98d530b389a..6eb6d5e2d540 100644 --- a/sdks/python/container/base_image_requirements.txt +++ b/sdks/python/container/base_image_requirements.txt @@ -26,6 +26,7 @@ # TODO(AVRO-2429): Upgrade to >= 1.9.0 only after resolved avro-python3==1.8.2 fastavro==1.0.0.post1 +cloudpickle=2.0.0 crcmod==1.7 dill==0.3.1.1 future==0.18.2 From 2d91d600ff53b81e0336d4fe575e5510960122f4 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 4 Nov 2021 12:30:10 -0400 Subject: [PATCH 11/45] applied valentyn comments. remove dill dependencies --- .../internal/cloudpickle_pickler_test.py | 12 +++----- .../apache_beam/internal/dill_pickler.py | 8 ++++++ sdks/python/apache_beam/internal/pickler.py | 28 +++++++++++-------- .../container/base_image_requirements.txt | 2 +- sdks/python/setup.py | 1 + 5 files changed, 31 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 93d7143205b6..647883ee49a1 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -55,26 +55,22 @@ def test_lambda_with_closure(self): 'closure: abc', loads(dumps(module_test.get_lambda_with_closure('abc')))()) - def test_class(self): - """Tests that a class object is pickled correctly.""" + def test_class_object_pickled(self): self.assertEqual(['abc', 'def'], loads(dumps(module_test.Xyz))().foo('abc def')) - def test_object(self): - """Tests that a class instance is pickled correctly.""" + def test_class_instance_pickled(self): self.assertEqual(['abc', 'def'], loads(dumps(module_test.XYZ_OBJECT)).foo('abc def')) - def test_nested_class(self): - """Tests that a nested class object is pickled correctly.""" + def test_pickling_preserves_closure_of_a_function(self): self.assertEqual( 'X:abc', loads(dumps(module_test.TopClass.NestedClass('abc'))).datum) self.assertEqual( 'Y:abc', loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum) - def test_dynamic_class(self): - """Tests that a nested class object is pickled correctly.""" + def test_pickle_dynamic_class(self): self.assertEqual( 'Z:abc', loads(dumps(module_test.create_class('abc'))).get()) diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index 9cecedd1b5c1..feaa8a11157d 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -311,3 +311,11 @@ def dump_session(file_path): def load_session(file_path): with _pickle_lock: return dill.load_session(file_path) + +def override_pickler_hooks(extend=True): + """ Extends the dill library hooks into that of the standard pickler library. + + If false all hooks that dill overrides will be removed. + If true dill hooks will be injected into the pickler library dispatch_table. + """ + dill.extend(extend) \ No newline at end of file diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index ca1c27fb029e..ec4e9e119b7a 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -31,23 +31,25 @@ from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler -pickler_lib = dill_pickler USE_CLOUDPICKLE = 1 USE_DILL = 2 +DEFAULT_PICKLE_LIB = USE_DILL +desired_pickle_lib = None +change_pickle_lib(DEFAULT_PICKLE_LIB) def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes - return pickler_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib) + return desired_pickle_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib) def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - return pickler_lib.loads(encoded, enable_trace=enable_trace, - use_zlib=use_zlib) + return desired_pickle_lib.loads(encoded, enable_trace=enable_trace, + use_zlib=use_zlib) def dump_session(file_path): @@ -56,18 +58,22 @@ def dump_session(file_path): Pickle the current python session to be used in the worker. """ - return pickler_lib.dump_session(file_path) + return desired_pickle_lib.dump_session(file_path) def load_session(file_path): - return pickler_lib.load_session(file_path) + return desired_pickle_lib.load_session(file_path) -def change_pickle_lib(pickle_lib): +def set_pickle_lib(pickle_lib): """ Changes pickling library. Users should prefer the default library.""" - global pickler_lib + global desired_pickle_lib if pickle_lib == USE_CLOUDPICKLE: - pickler_lib = dill_pickler - elif pickler_lib == USE_DILL: - pickler_lib = cloudPickle_pickler + # Dill will override hooks in the dispatch table of the standard pickler. + # Those hooks overrides will cause cloudpickle to fail. + dill_pickler.override_pickler_hooks(False) + desired_pickle_lib = cloudPickle_pickler + elif desired_pickle_lib == USE_DILL: + desired_pickle_lib = dill_pickler + dill_pickler.override_pickler_hooks(True) diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt index 6eb6d5e2d540..d6d5326469c3 100644 --- a/sdks/python/container/base_image_requirements.txt +++ b/sdks/python/container/base_image_requirements.txt @@ -26,7 +26,7 @@ # TODO(AVRO-2429): Upgrade to >= 1.9.0 only after resolved avro-python3==1.8.2 fastavro==1.0.0.post1 -cloudpickle=2.0.0 +cloudpickle==2.0.0 crcmod==1.7 dill==0.3.1.1 future==0.18.2 diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d8ffe495b0aa..368d615d73c7 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -139,6 +139,7 @@ def get_version(): # server, therefore list of allowed versions is very narrow. # See: https://github.com/uqfoundation/dill/issues/341. 'dill>=0.3.1.1,<0.3.2', + 'cloudpickle>=2.0.0,<2.0.0', 'fastavro>=0.21.4,<2', 'future>=0.18.2,<1.0.0', 'grpcio>=1.29.0,<2', From eaa17ab69d15eda57cfa8be54b5dd3aabdf99003 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 4 Nov 2021 12:39:26 -0400 Subject: [PATCH 12/45] linted file --- sdks/python/apache_beam/coders/coders_test_common.py | 5 +++-- .../apache_beam/internal/cloudpickle_pickler_test.py | 2 ++ sdks/python/apache_beam/internal/dill_pickler.py | 3 ++- sdks/python/apache_beam/internal/pickler.py | 10 +++++----- sdks/python/setup.py | 9 ++++----- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0ea79aafca08..95576867c498 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -684,12 +684,13 @@ def iterable_state_read(token, element_coder_impl): iterable_state_write=iterable_state_write) # Note: do not use check_coder see https://github.com/cloudpipe/cloudpickle/issues/452 self._observe(coder) - self.assertEqual([1,2,3], coder.decode(coder.encode([1,2,3]))) + self.assertEqual([1, 2, 3], coder.decode(coder.encode([1, 2, 3]))) # Ensure that state was actually used. self.assertNotEqual(state, {}) tupleCoder = coders.TupleCoder((coder, coder)) self._observe(tupleCoder) - self.assertEqual(([1], [2, 3]), tupleCoder.decode(tupleCoder.encode(([1], [2, 3])))) + self.assertEqual(([1], [2, 3]), + tupleCoder.decode(tupleCoder.encode(([1], [2, 3])))) def test_nullable_coder(self): self.check_coder(coders.NullableCoder(coders.VarIntCoder()), None, 2 * 64) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index 647883ee49a1..3f8655d9536e 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -85,8 +85,10 @@ def test_recursive_class(self): def test_function_with_external_reference(self): out_of_scope_var = 'expected_value' + def foo(): return out_of_scope_var + self.assertEqual('expected_value', loads(dumps(foo))()) @unittest.skipIf(NO_MAPPINGPROXYTYPE, 'test if MappingProxyType introduced') diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index feaa8a11157d..f99bf2af59eb 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -312,10 +312,11 @@ def load_session(file_path): with _pickle_lock: return dill.load_session(file_path) + def override_pickler_hooks(extend=True): """ Extends the dill library hooks into that of the standard pickler library. If false all hooks that dill overrides will be removed. If true dill hooks will be injected into the pickler library dispatch_table. """ - dill.extend(extend) \ No newline at end of file + dill.extend(extend) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index ec4e9e119b7a..22e1a3893979 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -31,7 +31,6 @@ from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler - USE_CLOUDPICKLE = 1 USE_DILL = 2 DEFAULT_PICKLE_LIB = USE_DILL @@ -39,17 +38,19 @@ desired_pickle_lib = None change_pickle_lib(DEFAULT_PICKLE_LIB) + def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes - return desired_pickle_lib.dumps(o, enable_trace=enable_trace, use_zlib=use_zlib) + return desired_pickle_lib.dumps( + o, enable_trace=enable_trace, use_zlib=use_zlib) def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - return desired_pickle_lib.loads(encoded, enable_trace=enable_trace, - use_zlib=use_zlib) + return desired_pickle_lib.loads( + encoded, enable_trace=enable_trace, use_zlib=use_zlib) def dump_session(file_path): @@ -76,4 +77,3 @@ def set_pickle_lib(pickle_lib): elif desired_pickle_lib == USE_DILL: desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) - diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 368d615d73c7..5b6940e584eb 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -261,7 +261,6 @@ def run(self): 'Python %s.%s. You may encounter bugs or missing features.' % (sys.version_info.major, sys.version_info.minor)) - if __name__ == '__main__': setuptools.setup( name=PACKAGE_NAME, @@ -305,10 +304,10 @@ def run(self): # BEAM-8840: Do NOT use tests_require or setup_requires. extras_require={ 'docs': [ - 'Sphinx>=1.5.2,<2.0', - # Pinning docutils as a workaround for Sphinx issue: - # https://github.com/sphinx-doc/sphinx/issues/9727 - 'docutils==0.17.1' + 'Sphinx>=1.5.2,<2.0', + # Pinning docutils as a workaround for Sphinx issue: + # https://github.com/sphinx-doc/sphinx/issues/9727 + 'docutils==0.17.1' ], 'test': REQUIRED_TEST_PACKAGES, 'gcp': GCP_REQUIREMENTS, From 9eed7118fb89ca87bade18456e9d4896104524b0 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 4 Nov 2021 13:58:35 -0400 Subject: [PATCH 13/45] change cloudpickle max requirement --- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 5b6940e584eb..dfcf746b0e0c 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -139,7 +139,7 @@ def get_version(): # server, therefore list of allowed versions is very narrow. # See: https://github.com/uqfoundation/dill/issues/341. 'dill>=0.3.1.1,<0.3.2', - 'cloudpickle>=2.0.0,<2.0.0', + 'cloudpickle>=2.0.0,<3', 'fastavro>=0.21.4,<2', 'future>=0.18.2,<1.0.0', 'grpcio>=1.29.0,<2', From 62c7d0dba44f3de8ee51dd0d305655ec7d190d6b Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 4 Nov 2021 14:05:24 -0400 Subject: [PATCH 14/45] fixed pickle lib typo --- sdks/python/apache_beam/internal/pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 22e1a3893979..68b269159a02 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -36,7 +36,7 @@ DEFAULT_PICKLE_LIB = USE_DILL desired_pickle_lib = None -change_pickle_lib(DEFAULT_PICKLE_LIB) +set_pickle_lib(DEFAULT_PICKLE_LIB) def dumps(o, enable_trace=True, use_zlib=False): From c5a9df06bb4c23e97b794c60e57efaf905a13e2e Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Thu, 4 Nov 2021 14:10:19 -0400 Subject: [PATCH 15/45] fixed pickle lib typo 2 --- sdks/python/apache_beam/internal/pickler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 68b269159a02..df74a2e2705f 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -36,7 +36,6 @@ DEFAULT_PICKLE_LIB = USE_DILL desired_pickle_lib = None -set_pickle_lib(DEFAULT_PICKLE_LIB) def dumps(o, enable_trace=True, use_zlib=False): @@ -77,3 +76,6 @@ def set_pickle_lib(pickle_lib): elif desired_pickle_lib == USE_DILL: desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) + +if not desired_pickle_lib: + set_pickle_lib(DEFAULT_PICKLE_LIB) From 771235b482e4dba48f154a6413f4348cb8cc1b6e Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 8 Nov 2021 12:32:39 -0500 Subject: [PATCH 16/45] sets the default pickler --- sdks/python/apache_beam/internal/pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index df74a2e2705f..e43820727873 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -36,7 +36,7 @@ DEFAULT_PICKLE_LIB = USE_DILL desired_pickle_lib = None - +set_pickle_lib(DEFAULT_PICKLE_LIB) def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes From 64b74f4f607165bd4e2f15896e2b5082111cc84b Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 8 Nov 2021 12:35:35 -0500 Subject: [PATCH 17/45] revert last change --- sdks/python/apache_beam/internal/pickler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index e43820727873..674cc1cdf57b 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -36,7 +36,6 @@ DEFAULT_PICKLE_LIB = USE_DILL desired_pickle_lib = None -set_pickle_lib(DEFAULT_PICKLE_LIB) def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes From 4344529b5ead03689d14bb0167978e225fb8b002 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 8 Nov 2021 14:49:19 -0500 Subject: [PATCH 18/45] fixed pickle lib typo --- .../apache_beam/examples/complete/autocomplete_test.py | 3 +-- sdks/python/apache_beam/internal/pickler.py | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 2048742ef62b..7b62ad2e1e21 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -21,9 +21,8 @@ import unittest -import pytest - import apache_beam as beam +import pytest from apache_beam.examples.complete import autocomplete from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_utils import compute_hash diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 674cc1cdf57b..d32f016a7a8d 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -64,15 +64,15 @@ def load_session(file_path): return desired_pickle_lib.load_session(file_path) -def set_pickle_lib(pickle_lib): +def set_pickle_lib(pickle_lib_enum): """ Changes pickling library. Users should prefer the default library.""" global desired_pickle_lib - if pickle_lib == USE_CLOUDPICKLE: + if pickle_lib_enum == USE_CLOUDPICKLE: # Dill will override hooks in the dispatch table of the standard pickler. # Those hooks overrides will cause cloudpickle to fail. - dill_pickler.override_pickler_hooks(False) desired_pickle_lib = cloudPickle_pickler - elif desired_pickle_lib == USE_DILL: + dill_pickler.override_pickler_hooks(False) + elif pickle_lib_enum == USE_DILL: desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) From d842061689e12b1842112246b7d139012fe85ecd Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 15 Nov 2021 17:49:32 -0500 Subject: [PATCH 19/45] fixed typo --- sdks/python/apache_beam/examples/wordcount.py | 4 ++-- sdks/python/apache_beam/internal/pickler.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index b59baa61a469..f121a9600f86 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -90,5 +90,5 @@ def format_result(word, count): if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run() + logging.getLogger().setLevel(logging.DEBUG) + run(save_main_session = False) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index d32f016a7a8d..d776a78db1ba 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -70,7 +70,7 @@ def set_pickle_lib(pickle_lib_enum): if pickle_lib_enum == USE_CLOUDPICKLE: # Dill will override hooks in the dispatch table of the standard pickler. # Those hooks overrides will cause cloudpickle to fail. - desired_pickle_lib = cloudPickle_pickler + desired_pickle_lib = cloudpickle_pickler dill_pickler.override_pickler_hooks(False) elif pickle_lib_enum == USE_DILL: desired_pickle_lib = dill_pickler From b8d249e7cc30a7617f9461a7cca829f02249da53 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 15 Nov 2021 17:50:52 -0500 Subject: [PATCH 20/45] revert wordcount --- sdks/python/apache_beam/examples/wordcount.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index f121a9600f86..b59baa61a469 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -90,5 +90,5 @@ def format_result(word, count): if __name__ == '__main__': - logging.getLogger().setLevel(logging.DEBUG) - run(save_main_session = False) + logging.getLogger().setLevel(logging.INFO) + run() From ea2cd96f8c20f645d3a9c558eeb26fa716ee2cac Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 15 Nov 2021 18:52:11 -0500 Subject: [PATCH 21/45] added pipeline options --- sdks/python/apache_beam/internal/pickler.py | 13 +++++++++++++ sdks/python/apache_beam/options/pipeline_options.py | 6 ++++++ .../apache_beam/runners/worker/sdk_worker_main.py | 1 + 3 files changed, 20 insertions(+) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index d776a78db1ba..0719b6d1d62e 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -30,6 +30,7 @@ from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler +from apache_beam.options import SetupOptions USE_CLOUDPICKLE = 1 USE_DILL = 2 @@ -76,5 +77,17 @@ def set_pickle_lib(pickle_lib_enum): desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) + +def set_pipeline_options(pipeline_options): + """ Sets pickle library based on pipeline settings. """ + selected_library = pipeline_options.view_as(SetupOptions).set_pickle_library + if selected_library == 'default': + set_pickle_lib(DEFAULT_PICKLE_LIB) + elif selected_library == 'dill': + set_pickle_lib(USE_DILL) + elif selected_library == 'cloudpickle': + set_pickle_lib(USE_CLOUDPICKLE) + + if not desired_pickle_lib: set_pickle_lib(DEFAULT_PICKLE_LIB) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c8f31e395a67..47d52c0ec572 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1056,6 +1056,12 @@ def _add_argparse_args(cls, parser): 'currently an experimental flag and provides no stability. ' 'Multiple --beam_plugin options can be specified if more than ' 'one plugin is needed.')) + parser.add_argument( + '--set_pickle_library', + default='default', + help=( + 'Chooses which pickle library to use. Options are dill, cloudpickle ' + 'or default.')) parser.add_argument( '--save_main_session', default=False, diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 394781b15bf7..60c7e00c3872 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -76,6 +76,7 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) + pickler.set_pipeline_options(sdk_pipeline_options) if 'SEMI_PERSISTENT_DIRECTORY' in environment: semi_persistent_directory = environment['SEMI_PERSISTENT_DIRECTORY'] From 87f39f054df0eee9064eafe6472a256f23644b23 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 16 Nov 2021 12:21:41 -0500 Subject: [PATCH 22/45] added pickler setting to worker --- sdks/python/apache_beam/internal/pickler.py | 5 ++--- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 0719b6d1d62e..1506187e1b4b 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -30,7 +30,6 @@ from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler -from apache_beam.options import SetupOptions USE_CLOUDPICKLE = 1 USE_DILL = 2 @@ -38,6 +37,7 @@ desired_pickle_lib = None + def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes @@ -78,9 +78,8 @@ def set_pickle_lib(pickle_lib_enum): dill_pickler.override_pickler_hooks(True) -def set_pipeline_options(pipeline_options): +def set_pipeline_options(pickler_option): """ Sets pickle library based on pipeline settings. """ - selected_library = pipeline_options.view_as(SetupOptions).set_pickle_library if selected_library == 'default': set_pickle_lib(DEFAULT_PICKLE_LIB) elif selected_library == 'dill': diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 60c7e00c3872..733bb65a13ec 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -76,7 +76,8 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) - pickler.set_pipeline_options(sdk_pipeline_options) + pickler.set_pipeline_options( + sdk_pipeline_options.view_as(SetupOptions).set_pickle_library) if 'SEMI_PERSISTENT_DIRECTORY' in environment: semi_persistent_directory = environment['SEMI_PERSISTENT_DIRECTORY'] From 7339c29c47d272db145f86aba4661b1e0a2854e3 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 16 Nov 2021 13:54:19 -0500 Subject: [PATCH 23/45] added setup options --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 733bb65a13ec..8879e3b7813d 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -34,6 +34,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners.internal import names From 2eea347662bc628a38099b838cbb98438c0ca957 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 17 Nov 2021 14:30:56 -0500 Subject: [PATCH 24/45] Upgraded arguement names and simplify pickle changing interface in pickler.py --- sdks/python/apache_beam/internal/pickler.py | 30 ++++++------------- .../apache_beam/options/pipeline_options.py | 2 +- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 1506187e1b4b..3ed4f32ffedb 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -31,8 +31,8 @@ from apache_beam.internal import cloudpickle_pickler from apache_beam.internal import dill_pickler -USE_CLOUDPICKLE = 1 -USE_DILL = 2 +USE_CLOUDPICKLE = 'cloudpickle' +USE_DILL = 'dill' DEFAULT_PICKLE_LIB = USE_DILL desired_pickle_lib = None @@ -65,28 +65,16 @@ def load_session(file_path): return desired_pickle_lib.load_session(file_path) -def set_pickle_lib(pickle_lib_enum): - """ Changes pickling library. Users should prefer the default library.""" +def set_library(selected_library = DEFAULT_PICKLE_LIB): + """ Sets pickle library that will be used. """ global desired_pickle_lib - if pickle_lib_enum == USE_CLOUDPICKLE: - # Dill will override hooks in the dispatch table of the standard pickler. - # Those hooks overrides will cause cloudpickle to fail. - desired_pickle_lib = cloudpickle_pickler - dill_pickler.override_pickler_hooks(False) - elif pickle_lib_enum == USE_DILL: + if selected_library == USE_DILL and desired_pickle_lib != dill_pickler: desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) - - -def set_pipeline_options(pickler_option): - """ Sets pickle library based on pipeline settings. """ - if selected_library == 'default': - set_pickle_lib(DEFAULT_PICKLE_LIB) - elif selected_library == 'dill': - set_pickle_lib(USE_DILL) - elif selected_library == 'cloudpickle': - set_pickle_lib(USE_CLOUDPICKLE) + elif selected_library == USE_CLOUDPICKLE and desired_pickle_lib != cloudpickle_pickler: + desired_pickle_lib = cloudpickle_pickler + dill_pickler.override_pickler_hooks(False) if not desired_pickle_lib: - set_pickle_lib(DEFAULT_PICKLE_LIB) + set_library() diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 47d52c0ec572..6f2c8369e6ba 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1057,7 +1057,7 @@ def _add_argparse_args(cls, parser): 'Multiple --beam_plugin options can be specified if more than ' 'one plugin is needed.')) parser.add_argument( - '--set_pickle_library', + '--pickle_library', default='default', help=( 'Chooses which pickle library to use. Options are dill, cloudpickle ' From 7cb995f8951d7cae8534d16dcb53e57271912f43 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 17 Nov 2021 15:43:52 -0500 Subject: [PATCH 25/45] updated function name and param name in sdk_worker_main --- sdks/python/apache_beam/runners/worker/sdk_worker_main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py index 8879e3b7813d..5510d36e1457 100644 --- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py +++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py @@ -77,8 +77,7 @@ def create_harness(environment, dry_run=False): RuntimeValueProvider.set_runtime_options(pipeline_options_dict) sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict) filesystems.FileSystems.set_options(sdk_pipeline_options) - pickler.set_pipeline_options( - sdk_pipeline_options.view_as(SetupOptions).set_pickle_library) + pickler.set_library(sdk_pipeline_options.view_as(SetupOptions).pickle_library) if 'SEMI_PERSISTENT_DIRECTORY' in environment: semi_persistent_directory = environment['SEMI_PERSISTENT_DIRECTORY'] From a2012cb6cb934c4a37ad7368db41a678b83da5ad Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 23 Nov 2021 14:00:03 -0500 Subject: [PATCH 26/45] updated base_image_requirements --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 2 +- sdks/python/container/py36/base_image_requirements.txt | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 8c4d71f48147..c740f48b33f3 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -127,4 +127,4 @@ def dump_session(file_path): def load_session(file_path): # It is possible to load_session with cloudpickle. However, since references # are saved it should not be necessary. See https://s.apache.org/beam-picklers - pass \ No newline at end of file + pass diff --git a/sdks/python/container/py36/base_image_requirements.txt b/sdks/python/container/py36/base_image_requirements.txt index 15c2c7b5b28d..b51fad8dffc7 100644 --- a/sdks/python/container/py36/base_image_requirements.txt +++ b/sdks/python/container/py36/base_image_requirements.txt @@ -33,6 +33,7 @@ certifi==2021.10.8 charset-normalizer==2.0.7 clang==5.0 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 dataclasses==0.8 @@ -68,7 +69,7 @@ google-cloud-videointelligence==1.16.1 google-cloud-vision==1.0.0 google-crc32c==1.3.0 google-pasta==0.2.0 -google-python-cloud-debugger==2.18 +google-python-cloud-debugger==2.8 google-resumable-media==2.1.0 googleapis-common-protos==1.53.0 greenlet==1.1.2 @@ -84,7 +85,7 @@ importlib-metadata==4.8.2 joblib==1.1.0 keras==2.7.0 Keras-Preprocessing==1.1.2 -libcst==0.3.21 +libcst==0.3.22 Markdown==3.3.6 mmh3==3.0.0 mock==2.0.0 From b078766af3719118952a6a3ec6e80307b4009663 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 23 Nov 2021 16:18:32 -0500 Subject: [PATCH 27/45] linted to remove space in default arg' --- sdks/python/apache_beam/internal/pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 3ed4f32ffedb..1860b546031c 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -65,7 +65,7 @@ def load_session(file_path): return desired_pickle_lib.load_session(file_path) -def set_library(selected_library = DEFAULT_PICKLE_LIB): +def set_library(selected_library=DEFAULT_PICKLE_LIB): """ Sets pickle library that will be used. """ global desired_pickle_lib if selected_library == USE_DILL and desired_pickle_lib != dill_pickler: From db0c27e814789f24f073abd70a4c6114f78ed390 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 23 Nov 2021 17:23:37 -0500 Subject: [PATCH 28/45] Added cloudpickle to requirements --- sdks/python/container/py37/base_image_requirements.txt | 1 + sdks/python/container/py38/base_image_requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/sdks/python/container/py37/base_image_requirements.txt b/sdks/python/container/py37/base_image_requirements.txt index d658b1568efb..119f8bcf4be2 100644 --- a/sdks/python/container/py37/base_image_requirements.txt +++ b/sdks/python/container/py37/base_image_requirements.txt @@ -32,6 +32,7 @@ cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 deprecation==2.1.0 diff --git a/sdks/python/container/py38/base_image_requirements.txt b/sdks/python/container/py38/base_image_requirements.txt index 6a0f54884b2b..75af33c12166 100644 --- a/sdks/python/container/py38/base_image_requirements.txt +++ b/sdks/python/container/py38/base_image_requirements.txt @@ -31,6 +31,7 @@ cachetools==4.2.4 certifi==2021.10.8 charset-normalizer==2.0.7 click==8.0.3 +cloudpickle==2.0.0 crcmod==1.7 Cython==0.29.24 deprecation==2.1.0 From adb7ae4164820a4e387b6964f8bfa88e54a75f48 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 23 Nov 2021 17:49:22 -0500 Subject: [PATCH 29/45] linted --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 8 -------- sdks/python/apache_beam/internal/pickler.py | 3 ++- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index c740f48b33f3..d13e7d9adaa5 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -30,15 +30,8 @@ import base64 import bz2 import io -import logging -import sys import threading -import traceback -import types import zlib -from typing import Any -from typing import Dict -from typing import Tuple from _thread import RLock as RLockType try: @@ -51,7 +44,6 @@ # Pickling, especially unpickling, causes broken module imports on Python 3 # if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. _pickle_lock = threading.RLock() -import __main__ as _main_module def dumps(o, enable_trace=True, use_zlib=False): diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 1860b546031c..55dfe3612439 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -71,7 +71,8 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB): if selected_library == USE_DILL and desired_pickle_lib != dill_pickler: desired_pickle_lib = dill_pickler dill_pickler.override_pickler_hooks(True) - elif selected_library == USE_CLOUDPICKLE and desired_pickle_lib != cloudpickle_pickler: + elif (selected_library == USE_CLOUDPICKLE and + desired_pickle_lib != cloudpickle_pickler): desired_pickle_lib = cloudpickle_pickler dill_pickler.override_pickler_hooks(False) From 3400ef41bea892dc0480d78ed336ced6065de59f Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 23 Nov 2021 18:02:12 -0500 Subject: [PATCH 30/45] change dill reference in coders --- sdks/python/apache_beam/coders/coders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 66997956474a..d1ceef70b6ed 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -80,7 +80,7 @@ try: # Import dill from the pickler module to make sure our monkey-patching of dill # occurs. - from apache_beam.internal.pickler import dill + from apache_beam.internal.dill_pickler import dill except ImportError: # We fall back to using the stock dill library in tests that don't use the # full Python SDK. From e848b7ac4328267dc9c4e3a91f12ee7ba61f00a4 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 24 Nov 2021 12:25:43 -0500 Subject: [PATCH 31/45] only import lock if it can be imported otherwise ignore --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index d13e7d9adaa5..ec32fbaaa670 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -32,7 +32,10 @@ import io import threading import zlib -from _thread import RLock as RLockType +try: + from _thread import RLock as RLockType +except (ImportError, ModuleNotFoundError): + pass try: from absl import flags @@ -53,14 +56,13 @@ def dumps(o, enable_trace=True, use_zlib=False): with _pickle_lock: with io.BytesIO() as file: pickler = cloudpickle.CloudPickler(file) - pickler.dispatch_table[RLockType] = _pickle_rlock try: + pickler.dispatch_table[RLockType] = _pickle_rlock pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: pass pickler.dump(o) s = file.getvalue() - # TODO(ryanthompson): See if echoing dill.enable_trace is useful. # Compress as compactly as possible (compresslevel=9) to decrease peak memory # usage (of multiple in-memory copies) and to avoid hitting protocol buffer From dd56672957bb1a9da0d12a2212dd4daca42e1049 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 29 Nov 2021 10:37:09 -0500 Subject: [PATCH 32/45] linted tests removed unused variables changed line size --- sdks/python/apache_beam/coders/coders_test_common.py | 8 ++------ sdks/python/apache_beam/options/pipeline_options.py | 4 ++-- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 80d29c0a972d..13a9f733a72d 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -667,13 +667,11 @@ def test_state_backed_iterable_coder(self): state = {} def iterable_state_write(values, element_coder_impl): - global state token = b'state_token_%d' % len(state) state[token] = [element_coder_impl.encode(e) for e in values] return token def iterable_state_read(token, element_coder_impl): - global state return [element_coder_impl.decode(s) for s in state[token]] coder = coders.StateBackedIterableCoder( @@ -681,10 +679,8 @@ def iterable_state_read(token, element_coder_impl): read_state=iterable_state_read, write_state=iterable_state_write, write_state_threshold=1) - context = pipeline_context.PipelineContext( - iterable_state_read=iterable_state_read, - iterable_state_write=iterable_state_write) - # Note: do not use check_coder see https://github.com/cloudpipe/cloudpickle/issues/452 + # Note: do not use check_coder + # see https://github.com/cloudpipe/cloudpickle/issues/452 self._observe(coder) self.assertEqual([1, 2, 3], coder.decode(coder.encode([1, 2, 3]))) # Ensure that state was actually used. diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 6f2c8369e6ba..d15e8754555b 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1060,8 +1060,8 @@ def _add_argparse_args(cls, parser): '--pickle_library', default='default', help=( - 'Chooses which pickle library to use. Options are dill, cloudpickle ' - 'or default.')) + 'Chooses which pickle library to use. Options are dill, ' + 'cloudpickle or default.')) parser.add_argument( '--save_main_session', default=False, From 7fb9f53200be0691b35ce89bef40c21e9d2e1d7e Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 29 Nov 2021 11:58:54 -0500 Subject: [PATCH 33/45] moved imports, reverted file that wasnt changed --- sdks/python/apache_beam/examples/complete/autocomplete_test.py | 3 ++- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 7b62ad2e1e21..2048742ef62b 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -21,8 +21,9 @@ import unittest -import apache_beam as beam import pytest + +import apache_beam as beam from apache_beam.examples.complete import autocomplete from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_utils import compute_hash diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index ec32fbaaa670..ada9c984fb1d 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -29,6 +29,7 @@ import base64 import bz2 +import cloudpickle import io import threading import zlib @@ -42,8 +43,6 @@ except (ImportError, ModuleNotFoundError): pass -import cloudpickle - # Pickling, especially unpickling, causes broken module imports on Python 3 # if executed concurrently, see: BEAM-8651, http://bugs.python.org/issue38884. _pickle_lock = threading.RLock() From 9f0888778565501c00cf6a80538859aa835895b9 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 29 Nov 2021 12:22:49 -0500 Subject: [PATCH 34/45] changed import order --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index ada9c984fb1d..7eb39494fd0b 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -29,10 +29,12 @@ import base64 import bz2 -import cloudpickle import io import threading import zlib + +import cloudpickle + try: from _thread import RLock as RLockType except (ImportError, ModuleNotFoundError): From 032c631bacae93f2d3a7ec1486c3aa681bc38e83 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 29 Nov 2021 15:43:57 -0500 Subject: [PATCH 35/45] trying a small fix --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 2 +- sdks/python/apache_beam/internal/pickler.py | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 7eb39494fd0b..ddb7384294d4 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -37,7 +37,7 @@ try: from _thread import RLock as RLockType -except (ImportError, ModuleNotFoundError): +except: pass try: diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 55dfe3612439..0c53c0cf249e 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -35,7 +35,7 @@ USE_DILL = 'dill' DEFAULT_PICKLE_LIB = USE_DILL -desired_pickle_lib = None +desired_pickle_lib = dill_pickler def dumps(o, enable_trace=True, use_zlib=False): @@ -75,7 +75,3 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB): desired_pickle_lib != cloudpickle_pickler): desired_pickle_lib = cloudpickle_pickler dill_pickler.override_pickler_hooks(False) - - -if not desired_pickle_lib: - set_library() From 92e3072307a0d8b34e21d6468c9f5ebf7968634e Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Mon, 29 Nov 2021 17:21:59 -0500 Subject: [PATCH 36/45] removed rlock pickling --- .../apache_beam/internal/cloudpickle_pickler.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index ddb7384294d4..3f3a76f30258 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -35,11 +35,6 @@ import cloudpickle -try: - from _thread import RLock as RLockType -except: - pass - try: from absl import flags except (ImportError, ModuleNotFoundError): @@ -58,7 +53,6 @@ def dumps(o, enable_trace=True, use_zlib=False): with io.BytesIO() as file: pickler = cloudpickle.CloudPickler(file) try: - pickler.dispatch_table[RLockType] = _pickle_rlock pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: pass @@ -97,14 +91,6 @@ def loads(encoded, enable_trace=True, use_zlib=False): return unpickled -def _pickle_rlock(obj): - return _create_rlock, tuple([]) - - -def _create_rlock(): - return RLockType() - - def _pickle_absl_flags(obj): return _create_absl_flags, tuple([]) From 94a9c9600542f51247e4532309567f305183c264 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Tue, 30 Nov 2021 09:56:47 -0500 Subject: [PATCH 37/45] added to change file --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 06a90699380d..70d91e4b1b3c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -33,6 +33,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275) +* Added support for cloudpickle library to replace dill library. ## Breaking Changes From 4ec71c33311c78e3521481620e34e86f592c64ee Mon Sep 17 00:00:00 2001 From: tvalentyn Date: Tue, 30 Nov 2021 15:50:31 -0800 Subject: [PATCH 38/45] Minor wording suggestion. --- sdks/python/apache_beam/internal/cloudpickle_pickler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index 3f3a76f30258..40719be2971c 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -22,7 +22,7 @@ Uses the cloudpickle library to pickle data, functions, lambdas and classes. -dump_session and load_session are no ops. +dump_session and load_session are no-ops. """ # pytype: skip-file From 165c8d01256188a741895b760bc4066c796f21aa Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 10:03:04 -0500 Subject: [PATCH 39/45] merged --- sdks/python/apache_beam/internal/dill_pickler.py | 1 - sdks/python/apache_beam/internal/pickler.py | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index f99bf2af59eb..2c51db5b76f0 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -312,7 +312,6 @@ def load_session(file_path): with _pickle_lock: return dill.load_session(file_path) - def override_pickler_hooks(extend=True): """ Extends the dill library hooks into that of the standard pickler library. diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 0c53c0cf249e..4476fc8bf9bd 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -37,7 +37,6 @@ desired_pickle_lib = dill_pickler - def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes @@ -48,8 +47,8 @@ def dumps(o, enable_trace=True, use_zlib=False): def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - return desired_pickle_lib.loads( - encoded, enable_trace=enable_trace, use_zlib=use_zlib) + return desired_pickle_lib.loads(encoded, enable_trace=enable_trace, + use_zlib=use_zlib) def dump_session(file_path): From f7b2682fb1373dc87df3d19e86e41838021866ce Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 10:13:22 -0500 Subject: [PATCH 40/45] Update CHANGES.md added valentines suggestions. Co-authored-by: tvalentyn --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 70d91e4b1b3c..09882901e239 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -33,7 +33,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275) -* Added support for cloudpickle library to replace dill library. +* Added support for cloudpickle as a pickling library for Python SDK. ## Breaking Changes From 31aba4898669841b6a3d33eb1f1ef71ed3ca4a37 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 10:18:08 -0500 Subject: [PATCH 41/45] merged change.md changes --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 09882901e239..6c6061a8debd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -33,7 +33,7 @@ * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * Remote packages can now be downloaded from locations supported by apache_beam.io.filesystems. The files will be downloaded on Stager and uploaded to staging location. For more information, see [BEAM-11275](https://issues.apache.org/jira/browse/BEAM-11275) -* Added support for cloudpickle as a pickling library for Python SDK. +* Added support for cloudpickle as a pickling library for Python SDK. To use cloudpickle set pipeline options, --pickler_lib=cloudpickle. ## Breaking Changes From f20ce43b948734d9a00a95cb7609efe4b4d54db2 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 10:52:08 -0500 Subject: [PATCH 42/45] merged --- .../container/base_image_requirements.txt | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 sdks/python/container/base_image_requirements.txt diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt new file mode 100644 index 000000000000..d6d5326469c3 --- /dev/null +++ b/sdks/python/container/base_image_requirements.txt @@ -0,0 +1,91 @@ +############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +############################################################################### + +# These are packages needed by the Apache Beam Python SDK. Their versions need +# to be compatible with the requirements in sdks/python/setup.py. +# Specifying the versions manually helps to resolve dependency conflicts +# with other packages installed in the container. +# Any SDK dependencies not listed here will be installed when SDK is installed +# into the container. + +# TODO(AVRO-2429): Upgrade to >= 1.9.0 only after resolved +avro-python3==1.8.2 +fastavro==1.0.0.post1 +cloudpickle==2.0.0 +crcmod==1.7 +dill==0.3.1.1 +future==0.18.2 +grpcio==1.40.0 +hdfs==2.5.8 +httplib2==0.19.1 +oauth2client==4.1.3 +protobuf==3.17.3 +pyarrow==3.0.0 +pydot==1.4.1 +pymongo==3.10.1 +pytz==2020.1 +pyyaml==5.4 +typing-extensions==3.7.4.3 + +# GCP extra features +google-auth==1.31.0 +google-api-core==1.22.2 +google-apitools==0.5.31 +google-cloud-pubsub==1.0.2 +google-cloud-bigquery==1.26.1 +google-cloud-bigtable==1.0.0 +google-cloud-core==1.4.1 +google-cloud-datastore==1.15.3 +google-cloud-dlp==0.13.0 +google-cloud-language==1.3.0 +google-cloud-profiler==3.0.4 +google-cloud-recommendations-ai==0.2.0 +google-cloud-spanner==1.13.0 +google-cloud-videointelligence==1.13.0 +google-cloud-vision==0.42.0 +google-python-cloud-debugger == 2.15 +grpcio-gcp==0.2.2 + +## These are additional optional packages likely to be used by customers. +beautifulsoup4 == 4.9.1 +bs4 == 0.0.1 +cython==0.29.21 +cachetools == 3.1.1 +dataclasses == 0.8 ; python_version=="3.6" +guppy3==3.0.10 +mmh3==2.5.1 +orjson==3.6.1 +python-dateutil == 2.8.1 +requests == 2.24.0 +freezegun == 0.3.15 +pillow == 7.2.0 +python-snappy == 0.5.4 +numpy==1.19.5 +scipy==1.4.1 +scikit-learn==0.24.1 +pandas==1.1.5 ; python_version<"3.7" +pandas==1.3.3 ; python_version>="3.7" +protorpc==0.12.0 +python-gflags==3.1.2 +tensorflow==2.6.0 +nltk==3.5.0 + +# Packages needed for testing. +tenacity>=5.0.2 +pyhamcrest<2.0,>=1.9 +pytest==4.6.11 From 4f617b9eb11399bb660d8fbb7d4bdfda2a4d346e Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 10:49:18 -0500 Subject: [PATCH 43/45] linted again --- sdks/python/apache_beam/internal/dill_pickler.py | 1 + sdks/python/apache_beam/internal/pickler.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/internal/dill_pickler.py b/sdks/python/apache_beam/internal/dill_pickler.py index 2c51db5b76f0..f99bf2af59eb 100644 --- a/sdks/python/apache_beam/internal/dill_pickler.py +++ b/sdks/python/apache_beam/internal/dill_pickler.py @@ -312,6 +312,7 @@ def load_session(file_path): with _pickle_lock: return dill.load_session(file_path) + def override_pickler_hooks(extend=True): """ Extends the dill library hooks into that of the standard pickler library. diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 4476fc8bf9bd..0c53c0cf249e 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -37,6 +37,7 @@ desired_pickle_lib = dill_pickler + def dumps(o, enable_trace=True, use_zlib=False): # type: (...) -> bytes @@ -47,8 +48,8 @@ def dumps(o, enable_trace=True, use_zlib=False): def loads(encoded, enable_trace=True, use_zlib=False): """For internal use only; no backwards-compatibility guarantees.""" - return desired_pickle_lib.loads(encoded, enable_trace=enable_trace, - use_zlib=use_zlib) + return desired_pickle_lib.loads( + encoded, enable_trace=enable_trace, use_zlib=use_zlib) def dump_session(file_path): From 4e0a330998fc147c66ce71798b1d83d0eebefdff Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 11:37:22 -0500 Subject: [PATCH 44/45] removed file that is also removed in head, not sure why git keeps bringing it back --- .../container/base_image_requirements.txt | 91 ------------------- 1 file changed, 91 deletions(-) delete mode 100644 sdks/python/container/base_image_requirements.txt diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt deleted file mode 100644 index d6d5326469c3..000000000000 --- a/sdks/python/container/base_image_requirements.txt +++ /dev/null @@ -1,91 +0,0 @@ -############################################################################### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -############################################################################### - -# These are packages needed by the Apache Beam Python SDK. Their versions need -# to be compatible with the requirements in sdks/python/setup.py. -# Specifying the versions manually helps to resolve dependency conflicts -# with other packages installed in the container. -# Any SDK dependencies not listed here will be installed when SDK is installed -# into the container. - -# TODO(AVRO-2429): Upgrade to >= 1.9.0 only after resolved -avro-python3==1.8.2 -fastavro==1.0.0.post1 -cloudpickle==2.0.0 -crcmod==1.7 -dill==0.3.1.1 -future==0.18.2 -grpcio==1.40.0 -hdfs==2.5.8 -httplib2==0.19.1 -oauth2client==4.1.3 -protobuf==3.17.3 -pyarrow==3.0.0 -pydot==1.4.1 -pymongo==3.10.1 -pytz==2020.1 -pyyaml==5.4 -typing-extensions==3.7.4.3 - -# GCP extra features -google-auth==1.31.0 -google-api-core==1.22.2 -google-apitools==0.5.31 -google-cloud-pubsub==1.0.2 -google-cloud-bigquery==1.26.1 -google-cloud-bigtable==1.0.0 -google-cloud-core==1.4.1 -google-cloud-datastore==1.15.3 -google-cloud-dlp==0.13.0 -google-cloud-language==1.3.0 -google-cloud-profiler==3.0.4 -google-cloud-recommendations-ai==0.2.0 -google-cloud-spanner==1.13.0 -google-cloud-videointelligence==1.13.0 -google-cloud-vision==0.42.0 -google-python-cloud-debugger == 2.15 -grpcio-gcp==0.2.2 - -## These are additional optional packages likely to be used by customers. -beautifulsoup4 == 4.9.1 -bs4 == 0.0.1 -cython==0.29.21 -cachetools == 3.1.1 -dataclasses == 0.8 ; python_version=="3.6" -guppy3==3.0.10 -mmh3==2.5.1 -orjson==3.6.1 -python-dateutil == 2.8.1 -requests == 2.24.0 -freezegun == 0.3.15 -pillow == 7.2.0 -python-snappy == 0.5.4 -numpy==1.19.5 -scipy==1.4.1 -scikit-learn==0.24.1 -pandas==1.1.5 ; python_version<"3.7" -pandas==1.3.3 ; python_version>="3.7" -protorpc==0.12.0 -python-gflags==3.1.2 -tensorflow==2.6.0 -nltk==3.5.0 - -# Packages needed for testing. -tenacity>=5.0.2 -pyhamcrest<2.0,>=1.9 -pytest==4.6.11 From 232ecac79607fcbc5179cc44208a41a7e403ef91 Mon Sep 17 00:00:00 2001 From: Ryan Thompson Date: Wed, 1 Dec 2021 11:40:51 -0500 Subject: [PATCH 45/45] removed changes that shouldnt be relevant to this pr --- sdks/python/container/py36/base_image_requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/container/py36/base_image_requirements.txt b/sdks/python/container/py36/base_image_requirements.txt index b51fad8dffc7..01ade0fe5794 100644 --- a/sdks/python/container/py36/base_image_requirements.txt +++ b/sdks/python/container/py36/base_image_requirements.txt @@ -69,7 +69,7 @@ google-cloud-videointelligence==1.16.1 google-cloud-vision==1.0.0 google-crc32c==1.3.0 google-pasta==0.2.0 -google-python-cloud-debugger==2.8 +google-python-cloud-debugger==2.18 google-resumable-media==2.1.0 googleapis-common-protos==1.53.0 greenlet==1.1.2 @@ -85,7 +85,7 @@ importlib-metadata==4.8.2 joblib==1.1.0 keras==2.7.0 Keras-Preprocessing==1.1.2 -libcst==0.3.22 +libcst==0.3.21 Markdown==3.3.6 mmh3==3.0.0 mock==2.0.0