diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler.py b/sdks/python/apache_beam/internal/cloudpickle_pickler.py index eebba178e7c3..4f3fae4f68df 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler.py @@ -156,17 +156,24 @@ def _dumps( enable_stable_code_identifier_pickling=False, config: cloudpickle.CloudPickleConfig = DEFAULT_CONFIG) -> bytes: + if enable_stable_code_identifier_pickling: + config = STABLE_CODE_IDENTIFIER_CONFIG + config_kwargs = config.__dict__.copy() + if enable_best_effort_determinism: # TODO: Add support once https://github.com/cloudpipe/cloudpickle/pull/563 # is merged in. - _LOGGER.warning( - 'Ignoring unsupported option: enable_best_effort_determinism. ' - 'This has only been implemented for dill.') + config_kwargs['filepath_interceptor'] = cloudpickle.get_relative_path + _LOGGER.info( + 'Option not fully supported:' + 'enable_best_effort_determinism is True: Applying file path ' + 'normalization for pickling.' + 'This has been fully implemented for dill.') + + final_config = cloudpickle.CloudPickleConfig(**config_kwargs) with _pickle_lock: with io.BytesIO() as file: - if enable_stable_code_identifier_pickling: - config = STABLE_CODE_IDENTIFIER_CONFIG - pickler = cloudpickle.CloudPickler(file, config=config) + pickler = cloudpickle.CloudPickler(file, config=final_config) try: pickler.dispatch_table[type(flags.FLAGS)] = _pickle_absl_flags except NameError: diff --git a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py index b63ebd6c7109..f2e0f57faab5 100644 --- a/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py +++ b/sdks/python/apache_beam/internal/cloudpickle_pickler_test.py @@ -25,8 +25,10 @@ from apache_beam.coders import proto2_coder_test_messages_pb2 from apache_beam.internal import module_test +from apache_beam.internal.cloudpickle import register_pickle_by_value from apache_beam.internal.cloudpickle_pickler import dumps from apache_beam.internal.cloudpickle_pickler import loads +from apache_beam.internal.test_data import module_for_path_test from apache_beam.utils import shared GLOBAL_DICT_REF = module_test.GLOBAL_DICT @@ -212,14 +214,30 @@ def test_dataclass(self): self.assertEqual(DataClass(datum='abc'), loads(dumps(DataClass(datum='abc')))) ''') - def test_best_effort_determinism_not_implemented(self): + def test_best_effort_determinism_is_partially_supported(self): with self.assertLogs('apache_beam.internal.cloudpickle_pickler', - "WARNING") as l: + "INFO") as l: dumps(123, enable_best_effort_determinism=True) self.assertIn( - 'Ignoring unsupported option: enable_best_effort_determinism', + 'Option not fully supported:' + 'enable_best_effort_determinism is True: Applying file path ' + 'normalization for pickling.' + 'This has been fully implemented for dill.', '\n'.join(l.output)) + def test_code_object_path_is_normalized(self): + """Tests that cloudpickle normalizes a function's co_filename.""" + func = module_for_path_test.func_for_path_test + + register_pickle_by_value(module_for_path_test) + unpickled_func = loads(dumps(func, enable_best_effort_determinism=True)) + + unpickled_filename = unpickled_func.__code__.co_filename + expected_normalized_filename = 'apache_beam/internal/test_data/' \ + 'module_for_path_test.py' + + self.assertEqual(unpickled_filename, expected_normalized_filename) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/internal/test_data/module_for_path_test.py b/sdks/python/apache_beam/internal/test_data/module_for_path_test.py new file mode 100644 index 000000000000..f31e7eb46dd3 --- /dev/null +++ b/sdks/python/apache_beam/internal/test_data/module_for_path_test.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A test module for source file path normalization.""" + + +def func_for_path_test(): + """A simple function for path normalization testing.""" + return "hello"