diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 98d582884e86..18ab0f091aaf 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -23,6 +23,7 @@ import platform import unittest +import mock import pytest import apache_beam as beam @@ -680,6 +681,21 @@ def test_track_pcoll_unbounded_flatten(self): self.assertIs(pcoll2_unbounded.is_bounded, False) self.assertIs(merged.is_bounded, False) + def test_incompatible_submission_and_runtime_envs_fail_pipeline(self): + with mock.patch( + 'apache_beam.transforms.environments.sdk_base_version_capability' + ) as base_version: + base_version.side_effect = [ + f"beam:version:sdk_base:apache/beam_python3.5_sdk:2.{i}.0" + for i in range(100) + ] + with self.assertRaisesRegex( + RuntimeError, + 'Pipeline construction environment and pipeline runtime ' + 'environment are not compatible.'): + with TestPipeline() as p: + _ = p | Create([None]) + class DoFnTest(unittest.TestCase): def test_element(self): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index cea5062b582f..d1bf50bdfcee 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -67,6 +67,7 @@ from apache_beam.runners.worker import statesampler from apache_beam.transforms import TimeDomain from apache_beam.transforms import core +from apache_beam.transforms import environments from apache_beam.transforms import sideinputs from apache_beam.transforms import userstate from apache_beam.transforms import window @@ -823,6 +824,27 @@ def only_element(iterable): return element +def _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor): + # type: (beam_fn_api_pb2.ProcessBundleDescriptor) -> None + + runtime_sdk = environments.sdk_base_version_capability() + for t in process_bundle_descriptor.transforms.values(): + env = process_bundle_descriptor.environments[t.environment_id] + for c in env.capabilities: + if (c.startswith(environments.SDK_VERSION_CAPABILITY_PREFIX) and + c != runtime_sdk): + raise RuntimeError( + "Pipeline construction environment and pipeline runtime " + "environment are not compatible. If you use a custom " + "container image, check that the Python interpreter minor version " + "and the Apache Beam version in your image match the versions " + "used at pipeline construction time. " + f"Submission environment: {c}. " + f"Runtime environment: {runtime_sdk}.") + + # TODO: Consider warning on mismatches in versions of installed packages. + + class BundleProcessor(object): """ A class for processing bundles of elements. """ @@ -846,6 +868,7 @@ def __init__(self, self.data_channel_factory = data_channel_factory self.current_instruction_id = None # type: Optional[str] + _verify_descriptor_created_in_a_compatible_env(process_bundle_descriptor) # There is no guarantee that the runner only set # timer_api_service_descriptor when having timers. So this field cannot be # used as an indicator of timers. diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index 769f3d9331b5..825ddef994fd 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -90,8 +90,8 @@ def looks_like_json(s): APACHE_BEAM_DOCKER_IMAGE_PREFIX = 'apache/beam' - APACHE_BEAM_JAVA_CONTAINER_NAME_PREFIX = 'beam_java' +SDK_VERSION_CAPABILITY_PREFIX = 'beam:version:sdk_base:' def is_apache_beam_container(container_image): @@ -777,6 +777,11 @@ def python_sdk_docker_capabilities(): return python_sdk_capabilities() + [common_urns.protocols.SIBLING_WORKERS.urn] +def sdk_base_version_capability(): + return ( + SDK_VERSION_CAPABILITY_PREFIX + DockerEnvironment.default_docker_image()) + + def _python_sdk_capabilities_iter(): # type: () -> Iterator[str] for urn_spec in common_urns.coders.__dict__.values(): @@ -786,7 +791,7 @@ def _python_sdk_capabilities_iter(): yield common_urns.protocols.HARNESS_MONITORING_INFOS.urn yield common_urns.protocols.WORKER_STATUS.urn yield python_urns.PACKED_COMBINE_FN - yield 'beam:version:sdk_base:' + DockerEnvironment.default_docker_image() + yield sdk_base_version_capability() yield common_urns.sdf_components.TRUNCATE_SIZED_RESTRICTION.urn yield common_urns.primitives.TO_STRING.urn