diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index b3ca92aa9a1a..935ba83709c0 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -828,6 +828,17 @@ def only_element(iterable): return element +def _environments_compatible(submission, runtime): + # type: (str, str) -> bool + if submission == runtime: + return True + if 'rc' in submission and runtime in submission: + # TODO(https://github.com/apache/beam/issues/28084): Loosen + # the check for RCs until RC containers install the matching version. + return True + return False + + def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor): # type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None @@ -836,7 +847,7 @@ def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor): env = process_bundle_descriptor.environments[t.environment_id] for c in env.capabilities: if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and - c != runtime_sdk): + not _environments_compatible(c, runtime_sdk)): raise RuntimeError( "Pipeline construction environment and pipeline runtime " "environment are not compatible. If you use a custom " diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py index 673c1cea111a..292b8431063c 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py @@ -402,5 +402,25 @@ def test_can_sample_exceptions(self): data_sampler.stop() +class EnvironmentCompatibilityTest(unittest.TestCase): + def test_rc_environments_are_compatible_with_released_images(self): + # TODO(https://github.com/apache/beam/issues/28084): remove when + # resolved. + self.assertTrue( + bundle_processor._environments_compatible( + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0rc1", + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0")) + + def test_user_modified_sdks_need_to_be_installed_in_runtime_env(self): + self.assertFalse( + bundle_processor._environments_compatible( + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom", + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0")) + self.assertTrue( + bundle_processor._environments_compatible( + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom", + "beam:version:sdk_base:apache/beam_python3.5_sdk:2.1.0-custom")) + + if __name__ == '__main__': unittest.main()